Recently, I faced with lack of documentation when I wanted to use Consul as a host resolver in gRPC connections. That’s why I want to fill the hole for engineers who struggle with the same problem. So, let’s start with a little bit of theory.
gRPC is a trendy RPC framework developed by Google. It reached its popularity cause of several reasons:
Consul is a robust service mesh developed by HashiCorp. There are a lot of use-cases for Consul:
In our project, Consul keeps info about all microservice hosts. So when a microservice instance goes down, or there is a new one — Consul knows it immediately. So we want to use the information to resolve a host for any gRPC interaction to ensure that microservices are connected to actual healthy hosts.
The main problem is that there is no official documentation on resolving hosts by Consul in gRPC, so we tried to Google a solution. There was a suggestion to implement a host resolver by ourselves.
OK, let’s do it:
Golang — is the primary language in our tech stack, so all written code will be in Go.
package grpcclient
import (
"errors"
"fmt"
"net/url"
"strings"
"utils/pkg/connection"
"utils/pkg/consul"
"github.com/hashicorp/consul/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/resolver"
)
const (
resolverSchemeConsul = "consul"
)
var (
errUnknownScheme = errors.New("unknown scheme. Only 'consul' is applicable")
)
// consulBuilder builds the address resolver for gRPC dialer.
type consulBuilder struct {
consul *consul.Cluster
serviceWatcher connection.Watcher
}
// newConsulBuilder is a constructor.
func newConsulBuilder(consul *consul.Cluster, serviceWatcher connection.Watcher) *consulBuilder {
return &consulBuilder{
consul: consul,
serviceWatcher: serviceWatcher,
}
}
// Builds the consul address resolver for gRPC.
func (c consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.Scheme != resolverSchemeConsul {
return nil, errUnknownScheme
}
serviceName, tag, err := parseTarget(target)
if err != nil {
return nil, fmt.Errorf("parse target: %w", err)
}
cr := newConsulResolver(c.serviceWatcher, cc)
err = cr.watch(serviceName, []string{tag})
if err != nil {
return nil, fmt.Errorf("watch: %w", err)
}
return cr, nil
}
// parses the target and returns service name and tag.
// example of target endpoint: 127.0.0.1:8500/service?tag=master
func parseTarget(target resolver.Target) (string, string, error) {
u, err := url.Parse(fmt.Sprintf("%s://%s", resolverSchemeConsul, target.Endpoint))
if err != nil {
return "", "", fmt.Errorf("parse: %w", err)
}
return strings.Trim(u.Path, "/"), u.Query().Get("tag"), nil
}
// Scheme returns the consul resolver scheme.
func (c consulBuilder) Scheme() string {
return resolverSchemeConsul
}
type consulResolver struct {
cc resolver.ClientConn
serviceWatcher connection.Watcher
unwatchFunc func() error
}
func newConsulResolver(serviceWatcher connection.Watcher, cc resolver.ClientConn) *consulResolver {
return &consulResolver{
serviceWatcher: serviceWatcher,
cc: cc,
}
}
func (c *consulResolver) onServiceChanged(entries []*api.ServiceEntry) {
addresses := make([]resolver.Address, len(entries))
for i, e := range entries {
addresses[i] = resolver.Address{
Addr: connection.BuildAddr(e),
}
}
err := c.cc.UpdateState(resolver.State{
Addresses: addresses,
})
if err != nil {
logrus.WithError(err).Error("update gRPC consul resolver addresses")
}
}
func (c *consulResolver) watch(serviceName string, tags []string) error {
var err error
c.unwatchFunc, err = c.serviceWatcher.WatchService(serviceName, tags, true, c.onServiceChanged)
if err != nil {
return fmt.Errorf("watch service: %w", err)
}
return nil
}
// ResolveNow we don't need to anything here because all addresses are updated on change in consul.
func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {}
// Close is an interface method. Gracefully closes the consulResolver.
func (c *consulResolver) Close() {
err := c.unwatchFunc()
if err != nil {
logrus.WithError(err).Error("unwatch service in gRPC consul resolver")
}
}
In that case, we need to specify the custom host resolver on dialing:
import (
...
"google.golang.org/grpc"
)
...
// Dial dials to a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
cfg := consul.Config()
target := fmt.Sprintf("%s:///%s/%s?tag=%s", resolverSchemeConsul, cfg.Address, serviceName, tag)
return grpc.Dial(
target,
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithResolvers(newConsulBuilder(consul, consul.ServiceWatcher())),
grpc.WithInsecure(),
)
}
The implementation works, but we wrote some tricky code and must maintain it. It looks like there is a better way to do that.
The official gRPC name resolution docs don’t contain Consul info, so I found a good solution accidentally while doing some job.
As it turned out, we don’t need a custom host resolver for Consul, and gRPC has the built integration.
To use it, we need to specify a dial target correctly:
import (
...
"google.golang.org/grpc"
_ "github.com/mbobakov/grpc-consul-resolver"
)
...
// Dial dials a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
cfg := consul.Config()
target := fmt.Sprintf("consul://%s:%s@%s/%s?tag=%s", cfg.User, cfg.Password, cfg.Address, serviceName, tag)
return grpc.Dial(
target,
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithInsecure(),
)
}
The dialing target will be something like:
consul://user:[email protected]:8500/service_name?tag=service_tag
.
And that’s it. No need to write any code.
I want to summarize some conclusions based on the story: