How to Use Consul as a Host Resolver in gRPC by@vgukasov

How to Use Consul as a Host Resolver in gRPC

image
Vladislav Gukasov HackerNoon profile picture

Vladislav Gukasov

Senior SWE at Akma Trading

Recently, I faced with lack of documentation when I wanted to use Consul as a host resolver in gRPC connections. That’s why I want to fill the hole for engineers who struggle with the same problem. So, let’s start with a little bit of theory.

Why gRPC

image

gRPC is a trendy RPC framework developed by Google. It reached its popularity cause of several reasons:

  • it’s extremely fast because the Protocol Buffers (protobuf), a powerful binary serialization toolset and language
  • it generates the client/server code by a single protobuf contract.
  • it works with HTTP/2 and supports BI-directional data streams

Why Consul

image

Consul is a robust service mesh developed by HashiCorp. There are a lot of use-cases for Consul:

  • service discovery
  • config storage
  • key-value storage

In our project, Consul keeps info about all microservice hosts. So when a microservice instance goes down, or there is a new one — Consul knows it immediately. So we want to use the information to resolve a host for any gRPC interaction to ensure that microservices are connected to actual healthy hosts.

First non-optimal solution

The main problem is that there is no official documentation on resolving hosts by Consul in gRPC, so we tried to Google a solution. There was a suggestion to implement a host resolver by ourselves.

OK, let’s do it:

Golang — is the primary language in our tech stack, so all written code will be in Go.

package grpcclient

import (
	"errors"
	"fmt"
	"net/url"
	"strings"

	"utils/pkg/connection"
	"utils/pkg/consul"

	"github.com/hashicorp/consul/api"
	"github.com/sirupsen/logrus"
	"google.golang.org/grpc/resolver"
)

const (
	resolverSchemeConsul = "consul"
)

var (
	errUnknownScheme = errors.New("unknown scheme. Only 'consul' is applicable")
)

// consulBuilder builds the address resolver for gRPC dialer.
type consulBuilder struct {
	consul         *consul.Cluster
	serviceWatcher connection.Watcher
}

// newConsulBuilder is a constructor.
func newConsulBuilder(consul *consul.Cluster, serviceWatcher connection.Watcher) *consulBuilder {
	return &consulBuilder{
		consul:         consul,
		serviceWatcher: serviceWatcher,
	}
}

// Builds the consul address resolver for gRPC.
func (c consulBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	if target.Scheme != resolverSchemeConsul {
		return nil, errUnknownScheme
	}

	serviceName, tag, err := parseTarget(target)
	if err != nil {
		return nil, fmt.Errorf("parse target: %w", err)
	}

	cr := newConsulResolver(c.serviceWatcher, cc)
	err = cr.watch(serviceName, []string{tag})
	if err != nil {
		return nil, fmt.Errorf("watch: %w", err)
	}

	return cr, nil
}

// parses the target and returns service name and tag.
// example of target endpoint: 127.0.0.1:8500/service?tag=master
func parseTarget(target resolver.Target) (string, string, error) {
	u, err := url.Parse(fmt.Sprintf("%s://%s", resolverSchemeConsul, target.Endpoint))
	if err != nil {
		return "", "", fmt.Errorf("parse: %w", err)
	}

	return strings.Trim(u.Path, "/"), u.Query().Get("tag"), nil
}

// Scheme returns the consul resolver scheme.
func (c consulBuilder) Scheme() string {
	return resolverSchemeConsul
}

type consulResolver struct {
	cc             resolver.ClientConn
	serviceWatcher connection.Watcher
	unwatchFunc    func() error
}

func newConsulResolver(serviceWatcher connection.Watcher, cc resolver.ClientConn) *consulResolver {
	return &consulResolver{
		serviceWatcher: serviceWatcher,
		cc:             cc,
	}
}

func (c *consulResolver) onServiceChanged(entries []*api.ServiceEntry) {
	addresses := make([]resolver.Address, len(entries))
	for i, e := range entries {
		addresses[i] = resolver.Address{
			Addr: connection.BuildAddr(e),
		}
	}

	err := c.cc.UpdateState(resolver.State{
		Addresses: addresses,
	})
	if err != nil {
		logrus.WithError(err).Error("update gRPC consul resolver addresses")
	}
}

func (c *consulResolver) watch(serviceName string, tags []string) error {
	var err error
	c.unwatchFunc, err = c.serviceWatcher.WatchService(serviceName, tags, true, c.onServiceChanged)
	if err != nil {
		return fmt.Errorf("watch service: %w", err)
	}

	return nil
}

// ResolveNow we don't need to anything here because all addresses are updated on change in consul.
func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {}

// Close is an interface method. Gracefully closes the consulResolver.
func (c *consulResolver) Close() {
	err := c.unwatchFunc()
	if err != nil {
		logrus.WithError(err).Error("unwatch service in gRPC consul resolver")
	}
}

In that case, we need to specify the custom host resolver on dialing:

import (
  ...
  "google.golang.org/grpc"
)
...

// Dial dials to a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
    cfg := consul.Config()
    target := fmt.Sprintf("%s:///%s/%s?tag=%s", resolverSchemeConsul, cfg.Address, serviceName, tag)

	return grpc.Dial(
		target,
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
		grpc.WithResolvers(newConsulBuilder(consul, consul.ServiceWatcher())),
		grpc.WithInsecure(),
	)
}

The implementation works, but we wrote some tricky code and must maintain it. It looks like there is a better way to do that.

The undocumented, optimal solution

The official gRPC name resolution docs don’t contain Consul info, so I found a good solution accidentally while doing some job.

As it turned out, we don’t need a custom host resolver for Consul, and gRPC has the built integration.

To use it, we need to specify a dial target correctly:

import (
  ...
  "google.golang.org/grpc"
)
...

// Dial dials a service through gRPC and returns a new connection.
func Dial(serviceName, tag string, timeout time.Duration) (grpc.ClientConnInterface, error) {
    cfg := consul.Config()
    target := fmt.Sprintf("consul://%s:%[email protected]%s/%s?tag=%s", cfg.User, cfg.Password, cfg.Address, serviceName, tag)

	return grpc.Dial(
		target,
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
		grpc.WithInsecure(),
	)
}

The dialing target will be something like:

consul://user:[email protected]:8500/service_name?tag=service_tag .

And that’s it. No need to write any code.

Conclusion

I want to summarize some conclusions based on the story:

  • the first obvious solution from Google to a problem sometimes isn’t the best
  • if a lib is very popular, it can still have a lack of docs, so carefully explore the library internals before implementing your improvements
  • but it’s still OK to come up with a non-optimal solution at the first because all great products are built iteratively from scratch

Comments

Signup or Login to Join the Discussion

Tags

Related Stories