How to Use Consul as a Host Resolver in gRPC by@vgukasov

How to Use Consul as a Host Resolver in gRPC

image
Vladislav Gukasov HackerNoon profile picture

Vladislav Gukasov

Senior SWE at Akma Trading

linkedin social icon

Recently, I faced with lack of documentation when I wanted to use Consul as a host resolver in gRPC connections. That’s why I want to fill the hole for engineers who struggle with the same problem. So, let’s start with a little bit of theory.

Why gRPC

image


gRPC is a trendy RPC framework developed by Google. It reached its popularity cause of several reasons:


  • it’s extremely fast because the Protocol Buffers (protobuf), a powerful binary serialization toolset and language
  • it generates the client/server code by a single protobuf contract.
  • it works with HTTP/2 and supports BI-directional data streams

Why Consul

image


Consul is a robust service mesh developed by HashiCorp. There are a lot of use-cases for Consul:


  • service discovery
  • config storage
  • key-value storage


In our project, Consul keeps info about all microservice hosts. So when a microservice instance goes down, or there is a new one — Consul knows it immediately. So we want to use the information to resolve a host for any gRPC interaction to ensure that microservices are connected to actual healthy hosts.


First non-optimal solution

The main problem is that there is no official documentation on resolving hosts by Consul in gRPC, so we tried to Google a solution. There was a suggestion to implement a host resolver by ourselves.


OK, let’s do it:

Golang — is the primary language in our tech stack, so all written code will be in Go.

package grpcclient

import (
	"errors"
	"fmt"
	"net/url"
	"strings"

	"utils/pkg/connection"
	"utils/pkg/consul"

	"github.com/hashicorp/consul/api"
	"github.com/sirupsen/logrus"
	"google.golang.org/grpc/resolver"
)

const (
	resolverSchemeConsul = "consul"
)

var (
	errUnknownScheme = errors.New("unknown scheme. Only 'consul' is applicable")
)

// consulBuilder builds the address resolver for gRPC dialer.
type consulBuilder struct {
	consul         *consul.Cluster
	serviceWatcher connection.Watcher
}

// newConsulBuilder is a constructor.
func newConsulBuilder(consul *consul.Cluster, serviceWatcher connection.Watcher) *consulBuilder {
	return &consulBuilder{
		consul:         consul,
		serviceWatcher: serviceWatcher,
	}
}

// Builds the consul address resolver for gRPC.
func (c consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	if target.Scheme != resolverSchemeConsul {
		return nil, errUnknownScheme
	}

	serviceName, tag, err := parseTarget(target)
	if err != nil {
		return nil, fmt.Errorf("parse target: %w", err)
	}

	cr := newConsulResolver(c.serviceWatcher, cc)
	err = cr.watch(serviceName, []string{tag})
	if err != nil {
		return nil, fmt.Errorf("watch: %w", err)
	}

	return cr, nil
}

// parses the target and returns service name and tag.
// example of target endpoint: 127.0.0.1:8500/service?tag=master
func parseTarget(target resolver.Target) (string, string, error) {
	u, err := url.Parse(fmt.Sprintf("%s://%s", resolverSchemeConsul, target.Endpoint))
	if err != nil {
		return "", "", fmt.Errorf("parse: %w", err)
	}

	return strings.Trim(u.Path, "/"), u.Query().Get("tag"), nil
}

// Scheme returns the consul resolver scheme.
func (c consulBuilder) Scheme() string {
	return resolverSchemeConsul
}

type consulResolver struct {
	cc             resolver.ClientConn
	serviceWatcher connection.Watcher
	unwatchFunc    func() error
}

func newConsulResolver(serviceWatcher connection.Watcher, cc resolver.ClientConn) *consulResolver {
	return &consulResolver{
		serviceWatcher: serviceWatcher,
		cc:             cc,
	}
}

func (c *consulResolver) onServiceChanged(entries []*api.ServiceEntry) {
	addresses := make([]resolver.Address, len(entries))
	for i, e := range entries {
		addresses[i] = resolver.Address{
			Addr: connection.BuildAddr(e),
		}
	}

	err := c.cc.UpdateState(resolver.State{
		Addresses: addresses,
	})
	if err != nil {
		logrus.WithError(err).Error("update gRPC consul resolver addresses")
	}
}

func (c *consulResolver) watch(serviceName string, tags []string) error {
	var err error
	c.unwatchFunc, err = c.serviceWatcher.WatchService(serviceName, tags, true, c.onServiceChanged)
	if err != nil {
		return fmt.Errorf("watch service: %w", err)
	}

	return nil
}

// ResolveNow we don't need to anything here because all addresses are updated on change in consul.
func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {}

// Close is an interface method. Gracefully closes the consulResolver.
func (c *consulResolver) Close() {
	err := c.unwatchFunc()
	if err != nil {
		logrus.WithError(err).Error("unwatch service in gRPC consul resolver")
	}
}


In that case, we need to specify the custom host resolver on dialing:


import (
  ...
  "google.golang.org/grpc"
)
...

// Dial dials to a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
    cfg := consul.Config()
    target := fmt.Sprintf("%s:///%s/%s?tag=%s", resolverSchemeConsul, cfg.Address, serviceName, tag)

	return grpc.Dial(
		target,
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
		grpc.WithResolvers(newConsulBuilder(consul, consul.ServiceWatcher())),
		grpc.WithInsecure(),
	)
}


The implementation works, but we wrote some tricky code and must maintain it. It looks like there is a better way to do that.

The undocumented, optimal solution

The official gRPC name resolution docs don’t contain Consul info, so I found a good solution accidentally while doing some job.


As it turned out, we don’t need a custom host resolver for Consul, and gRPC has the built integration.


To use it, we need to specify a dial target correctly:


import (
  ...
  "google.golang.org/grpc"
)
...

// Dial dials a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
    cfg := consul.Config()
    target := fmt.Sprintf("consul://%s:%[email protected]%s/%s?tag=%s", cfg.User, cfg.Password, cfg.Address, serviceName, tag)

	return grpc.Dial(
		target,
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
		grpc.WithInsecure(),
	)
}


The dialing target will be something like:


consul://user:[email protected]:8500/service_name?tag=service_tag .


And that’s it. No need to write any code.

Conclusion

I want to summarize some conclusions based on the story:


  • the first obvious solution from Google to a problem sometimes isn’t the best
  • if a lib is very popular, it can still have a lack of docs, so carefully explore the library internals before implementing your improvements
  • but it’s still OK to come up with a non-optimal solution at the first because all great products are built iteratively from scratch


react to story with heart
react to story with light
react to story with boat
react to story with money
L O A D I N G
. . . comments & more!