How I Built a Redis-Compatible Pub/Sub System Using Golang

Written by kelvinm | Published 2024/04/28
Tech Story Tags: redis | golang | pubsub | tcp | in-memory-cache | hackernoon-top-story | what-is-the-pubsup-system | redis-compatible-system

TLDROver the last year, I’ve been building EchoVault, an embeddable Redis alternative for the Golang ecosystem. EchoVault aims to replicate most of the Redis features while providing both an embedded interface and a client-server interface that is compatible with existing Redis clients using the RESP protocol. One of the features implemented in EchoVault is the Pub/Sub feature. This article is a short walkthrough of how the Pub/Sub module has been implemented at the time of writing.via the TL;DR App

Over the last year, I’ve been building EchoVault, an embeddable Redis alternative for the Golang ecosystem. EchoVault aims to replicate most of the Redis features while providing both an embedded interface and a client-server interface that is compatible with existing Redis clients using the RESP protocol.

One of the features implemented in EchoVault is the Pub/Sub feature. This article is a short walkthrough of how the Pub/Sub module has been implemented at the time of writing.

What Is Pub/Sub?

Pub/Sub stands for publish/subscribe. This pattern allows consumers to subscribe to certain channels. Producers publish messages to channels, and all the consumers subscribed to that channel receive the message.

In our context, we must enable a Go process to subscribe to specified channels when EchoVault is embedded. When running in client-server mode, we must allow a client TCP connection to subscribe to channels.

On the publishing side, a Go process should be able to publish to a channel in embedded mode, and a TCP client connection should be able to publish a message to a channel in client-server mode.

Requirements

Before we get started, we need to determine the requirements and score of this implementation. EchoVault does not implement all the features available in the Redis Pub/Sub at the time of writing. However, the most important core features are implemented. Here’s what we should be able to do with EchoVault PubSub:

  1. Allow a TCP client to subscribe to a list of channels using the command SUBSCRIBE channel [channel …]. The server should send the client connection a confirmation of the subscriptions.

  2. An embedded EchoVault instance should be able to subscribe to a list of channels.

  3. Allow a TCP client to subscribe to a pattern using PSUBSCRIBE pattern [pattern …] where the pattern is a glob string that allows the client to receive messages published to all channels that satisfy the pattern.

  4. An embedded EchoVault instance should be able to subscribe to a list of patterns.

  5. Allow a TCP client to publish a message to a channel using the command PUBLISH channel message .

  6. Publish to a channel from an embedded EchoVault instance.

  7. Allow TCP clients to unsubscribe from channels and patterns using UNSUBSCRIBE channel [channel …] and PUNSUBSCRIBE pattern [pattern …] commands, respectively.

  8. Allow TCP client connection to use the PUBSUB CHANNELS [pattern] command to view an array containing the channels that match the given pattern. If no pattern is provided, all active channels are returned. Active channels are channels with one or more subscribers.

  9. Allow TCP clients to use the PUBSUB NUMPAT command to view the number of patterns currently subscribed to by clients.

  10. Provide an embedded API to view the number of active patterns.

  11. Allow TCP clients to use the PUBSUB NUMSUB [channel [channel ...]] command to view an array of arrays containing the provided channel name and how many clients are currently subscribed to the channel.

  12. Provide an embedded API to view the number of subscribers in the given channels.

Implementation

Now that we have the requirements set, let’s get into the implementation. At a high level, we’ll have a PubSub struct that provides all of the PubSub functionality. We will also have a Channel struct that provides the functionality for channels and pattern channels.

For this section, we’ll use the tidwall/resp package. EchoVault users use this package to send RESP responses to channel subscribers. The gobwas/glob package handles glob pattern logic.

Channel

First, we’ll create the Channel struct and all its methods. We will have two types of channels: regular channels and patterns channels.

Regular channels are channels with names and no patterns associated with them. Pattern channels will use the pattern as the name, and a compiled glob pattern will be associated with the channel.

Pattern channels are used for subscriptions to patterns. Otherwise, regular channels are used.

The Channel struct has the following shape:

type Channel struct {
	name             string                   // Channel name. This can be a glob pattern string.
	pattern          glob.Glob                // Compiled glob pattern. This is nil if the channel is not a pattern channel.
	subscribersRWMut sync.RWMutex             // RWMutex to concurrency control when accessing channel subscribers.
	subscribers      map[*net.Conn]*resp.Conn // Map containing the channel subscribers.
	messageChan      *chan string             // Messages published to this channel will be sent to this channel.
}

We will use the option pattern to create a new channel instance. Here are the two options available:

// WithName option sets the channels name.
func WithName(name string) func(channel *Channel) {
	return func(channel *Channel) {
		channel.name = name
	}
}

// WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel.
func WithPattern(pattern string) func(channel *Channel) {
	return func(channel *Channel) {
		channel.name = pattern
		channel.pattern = glob.MustCompile(pattern)
	}
}

func NewChannel(options ...func(channel *Channel)) *Channel {
	messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value.

	channel := &Channel{
		name:             "",
		pattern:          nil,
		subscribersRWMut: sync.RWMutex{},
		subscribers:      make(map[*net.Conn]*resp.Conn),
		messageChan:      &messageChan,
	}

	for _, option := range options {
		option(channel)
	}

	return channel
}

The first method for the Channel struct is the Start method. This method starts a goroutine that listens on messageChan for messages to broadcast to all of its subscribers. Here’s the implementation:

func (ch *Channel) Start() {
	go func() {
		for {
			message := <-*ch.messageChan

			ch.subscribersRWMut.RLock()

			for _, conn := range ch.subscribers {
				go func(conn *resp.Conn) {
					if err := conn.WriteArray([]resp.Value{
						resp.StringValue("message"),
						resp.StringValue(ch.name),
						resp.StringValue(message),
					}); err != nil {
						log.Println(err)
					}
				}(conn)
			}

			ch.subscribersRWMut.RUnlock()
		}
	}()
}

The goroutine picks up the next message from messageChan, acquires a read-lock for the subscribers, broadcasts the message to each subscriber, and then releases the read-lock. As you can see, the tidwall/resp package allows us to send RESP values to the client connection easily.

Most Redis clients expect the pub/sub message format to be an array in the following shape: [“message”, <channel name>, <message string>]. That’s what is broadcast by EchoVault.

You might notice that we’re sending the message to resp.Conn and not net.Conn. We’re using resp.Conn because it provides helper methods for writing RESP values to it, such as WriteArray. It is essentially a wrapper around net.Conn as we’ll see in the Subscribe method:

func (ch *Channel) Subscribe(conn *net.Conn) bool {
	ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map.
	defer ch.subscribersRWMut.Unlock()

    // If the connection does not exist in the subscriber map, add it.
	if _, ok := ch.subscribers[conn]; !ok {
		ch.subscribers[conn] = resp.NewConn(*conn)
	}
	
    _, ok := ch.subscribers[conn]
	return ok
}

Along with the Subscribe method, we also need an Unsubscribe method:

func (ch *Channel) Unsubscribe(conn *net.Conn) bool {
	ch.subscribersRWMut.Lock()
	defer ch.subscribersRWMut.Unlock()
	if _, ok := ch.subscribers[conn]; !ok {
		return false
	}
	delete(ch.subscribers, conn)
	return true
}

The Unsubscribe method returns true if the connection was found and deleted from the subscribers map. Otherwise, it returns false. In addition to the methods above, there are some more helper methods for the Channel struct:


func (ch *Channel) Name() string {
	return ch.name
}

func (ch *Channel) Pattern() glob.Glob {
	return ch.pattern
}

func (ch *Channel) Publish(message string) {
	*ch.messageChan <- message
}

// IsActive returns true when the channel has 1 or more subscribers.
func (ch *Channel) IsActive() bool {
	ch.subscribersRWMut.RLock()
	defer ch.subscribersRWMut.RUnlock()

	active := len(ch.subscribers) > 0

	return active
}

// NumSubs returns the number of subscribers for this channel.
func (ch *Channel) NumSubs() int {
	ch.subscribersRWMut.RLock()
	defer ch.subscribersRWMut.RUnlock()

	n := len(ch.subscribers)

	return n
}

// Subscribers returns a copy of the subscriber map.
func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn {
	ch.subscribersRWMut.RLock()
	defer ch.subscribersRWMut.RUnlock()

	subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers))
	for k, v := range ch.subscribers {
		subscribers[k] = v
	}

	return subscribers
}

PubSub

We will use The PubSub module to interact with the channels. The shape of the PubSub struct is as follows:

type PubSub struct {
	channels      []*Channel   // Slice of references to channels
	channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels
}

func NewPubSub() *PubSub {
	return &PubSub{
		channels:      []*Channel{},
		channelsRWMut: sync.RWMutex{},
	}
}

The first method is the Subscribe method. This method handles subscribing the client to the specified channel(s).

func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) {
	ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels.
	defer ps.channelsRWMut.Unlock()

	r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn.

	action := "subscribe"
	if withPattern {
		action = "psubscribe"
	}

    // Loop through all the channels that the client has requested to subscribe to.
	for i := 0; i < len(channels); i++ {
		// Check if channel with given name exists
		// If it does, subscribe the connection to the channel
		// If it does not, create the channel and subscribe to it
		channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool {
			return channel.name == channels[i]
		})

		if channelIdx == -1 {
			// If the channel does not exist, create new channel, start it, and subscribe to it.
			var newChan *Channel
			if withPattern {
				newChan = NewChannel(WithPattern(channels[i]))
			} else {
				newChan = NewChannel(WithName(channels[i]))
			}
			newChan.Start()
			if newChan.Subscribe(conn) {
                // Write string array to the client connection confirming the subscription.
				if err := r.WriteArray([]resp.Value{
					resp.StringValue(action),
					resp.StringValue(newChan.name),
					resp.IntegerValue(i + 1),
				}); err != nil {
					log.Println(err)
				}
			}
			ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels.
		} else {
			// Subscribe to existing channel
			if ps.channels[channelIdx].Subscribe(conn) {
                // Write string array to the client connection confirming the subscription.
				if err := r.WriteArray([]resp.Value{
					resp.StringValue(action),
					resp.StringValue(ps.channels[channelIdx].name),
					resp.IntegerValue(i + 1),
				}); err != nil {
					log.Println(err)
				}
			}
		}
	}
}

To subscribe a client to a channel, EchoVault listens for a SUBSCRIBE or PSUBSCRIBE command. When the command is received from a client, the client’s TCP connection, along with the list of channels, is passed to the Subscribe method. When a client subscribes using the PSUBSCRIBE command, the withPatterns parameter will be true, and the action variable’s value will be set to “psubscribe.”

The Unsubscribe method allows a client to unsubscribe from channels or patterns based on whether they used SUBSCRIBE or PSUBSCRIBE. Its implementation is as follows:

func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte {
	ps.channelsRWMut.RLock()
	defer ps.channelsRWMut.RUnlock()

	action := "unsubscribe"
	if withPattern {
		action = "punsubscribe"
	}

	unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from.
	idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from.

	if len(channels) <= 0 {
		if !withPattern {
			// If the channels slice is empty, and no pattern is provided
			// unsubscribe from all channels.
			for _, channel := range ps.channels {
				if channel.pattern != nil { // Skip pattern channels
					continue
				}
				if channel.Unsubscribe(conn) {
					unsubscribed[idx] = channel.name
					idx += 1
				}
			}
		} else {
			// If the channels slice is empty, and pattern is provided
			// unsubscribe from all patterns.
			for _, channel := range ps.channels {
				if channel.pattern == nil { // Skip non-pattern channels
					continue
				}
				if channel.Unsubscribe(conn) {
					unsubscribed[idx] = channel.name
					idx += 1
				}
			}
		}
	}

	// Unsubscribe from channels where the name exactly matches channel name.
	// If unsubscribing from a pattern, also unsubscribe from all channel whose
	// names exactly matches the pattern name.
	for _, channel := range ps.channels { // For each channel in PubSub
		for _, c := range channels { // For each channel name provided
			if channel.name == c && channel.Unsubscribe(conn) {
				unsubscribed[idx] = channel.name
				idx += 1
			}
		}
	}

	// If withPattern is true, unsubscribe from channels where pattern matches pattern provided,
	// also unsubscribe from channels where the name matches the given pattern.
	if withPattern {
		for _, pattern := range channels {
			g := glob.MustCompile(pattern)
			for _, channel := range ps.channels {
				// If it's a pattern channel, directly compare the patterns
				if channel.pattern != nil && channel.name == pattern {
					if channel.Unsubscribe(conn) {
						unsubscribed[idx] = channel.name
						idx += 1
					}
					continue
				}
				// If this is a regular channel, check if the channel name matches the pattern given
				if g.Match(channel.name) {
					if channel.Unsubscribe(conn) {
						unsubscribed[idx] = channel.name
						idx += 1
					}
				}
			}
		}
	}

    // Construct a RESP response confirming the channels/patterns unsubscribed from.
	res := fmt.Sprintf("*%d\r\n", len(unsubscribed))
	for key, value := range unsubscribed {
		res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key)
	}

	return []byte(res)
}

When the client unsubscribes from channels/patterns, they will receive a response containing an array of arrays. Each inner array contains the action (unsubscribe/punsubscribe), the channel/pattern name, and the index.

The next method is the Publish method. This method accepts the message and channel name and then handles the publishing of the message internally.

func (ps *PubSub) Publish(_ context.Context, message string, channelName string) {
	ps.channelsRWMut.RLock()
	defer ps.channelsRWMut.RUnlock()

    // Loop through all of the existing channels.
	for _, channel := range ps.channels {
		// If it's a regular channel, check if the channel name matches the name given.
		if channel.pattern == nil {
			if channel.name == channelName {
				channel.Publish(message) // Publish the message to the channel.
			}
			continue
		}
		// If it's a glob pattern channel, check if the provided channel name matches the pattern.
		if channel.pattern.Match(channelName) {
			channel.Publish(message) // Publish the message to the channel
		}
	}
}

The next method is the Channels method, which handles the PUBSUB CHANNELS [pattern] command:

func (ps *PubSub) Channels(pattern string) []byte {
	ps.channelsRWMut.RLock()
	defer ps.channelsRWMut.RUnlock()

	var count int
	var res string

    // If pattern is an empty string, return all the active channels.
	if pattern == "" {
		for _, channel := range ps.channels {
			if channel.IsActive() {
				res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name)
				count += 1
			}
		}
		res = fmt.Sprintf("*%d\r\n%s", count, res)
		return []byte(res)
	}

	g := glob.MustCompile(pattern)

	for _, channel := range ps.channels {
		// If channel is a pattern channel, then directly compare the channel name to pattern.
		if channel.pattern != nil && channel.name == pattern && channel.IsActive() {
			res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name)
			count += 1
			continue
		}
		// Channel is not a pattern channel. Check if the channel name matches the provided glob pattern.
		if g.Match(channel.name) && channel.IsActive() {
			res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name)
			count += 1
		}
	}

    // Return a RESP array containing all the active channel names.
	return []byte(fmt.Sprintf("*%d\r\n%s", count, res))
}

The following 2 methods are the NumPat and NumSub methods to handle the PUBSUB NUMPAT and

PUBSUB NUMSUB [channel [channel …]] commands respectively.

func (ps *PubSub) NumPat() int {
	ps.channelsRWMut.RLock()
	defer ps.channelsRWMut.RUnlock()

	var count int
	for _, channel := range ps.channels {
		if channel.pattern != nil && channel.IsActive() {
			count += 1
		}
	}

	return count
}

func (ps *PubSub) NumSub(channels []string) []byte {
	ps.channelsRWMut.RLock()
	defer ps.channelsRWMut.RUnlock()

	res := fmt.Sprintf("*%d\r\n", len(channels))

	for _, channel := range channels {
		// If it's a pattern channel, skip it
		chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool {
			return c.name == channel
		})
		if chanIdx == -1 {
			res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel)
			continue
		}
		res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs())
	}
	return []byte(res)
}

Embedded API

The PubSub module expects a connection to be passed to subscribe a client to a channel. However, when EchoVault is embedded, there’s no TCP connection to a client that can be used for a subscription. To get around this, we use net.Pipe to get both ends of a connection.

One end of the connection is passed to the command handler, which uses it to subscribe to the specified channel. The other end of the connection is used in the returned ReadPubSubMessage function.

type conn struct {
	readConn  *net.Conn
	writeConn *net.Conn
}

var connections map[string]conn

// ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions.
//
// This function is lazy, therefore it needs to be invoked in order to read the next message.
// When the message is read, the function returns a string slice with 3 elements.
// Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name.
// Index 2 holds the actual message.
type ReadPubSubMessage func() []string

// SUBSCRIBE subscribes the caller to the list of provided channels.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `channels` - ...string - The list of channels to subscribe to.
//
// Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance.
// This function is blocking.
func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage {
	// Initialize connection tracker if calling subscribe for the first time
	if connections == nil {
		connections = make(map[string]conn)
	}

	// If connection with this name does not exist, create new connection it
	var readConn net.Conn
	var writeConn net.Conn
	if _, ok := connections[tag]; !ok {
		readConn, writeConn = net.Pipe()
		connections[tag] = conn{
			readConn:  &readConn,
			writeConn: &writeConn,
		}
	}

	// Subscribe connection to the provided channels
	cmd := append([]string{"SUBSCRIBE"}, channels...)
	go func() {
		_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false)
	}()

	return func() []string {
		r := resp.NewConn(readConn)
		v, _, _ := r.ReadValue()

		res := make([]string, len(v.Array()))
		for i := 0; i < len(res); i++ {
			res[i] = v.Array()[i].String()
		}

		return res
	}
}

// UNSUBSCRIBE unsubscribes the caller from the given channels.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `channels` - ...string - The list of channels to unsubscribe from.
func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) {
	if connections == nil {
		return
	}

	if _, ok := connections[tag]; !ok {
		return
	}

	cmd := append([]string{"UNSUBSCRIBE"}, channels...)
	_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false)
}

// PSUBSCRIBE subscribes the caller to the list of provided glob patterns.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `patterns` - ...string - The list of glob patterns to subscribe to.
//
// Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance.
// This function is blocking.
func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage {
	// Initialize connection tracker if calling subscribe for the first time
	if connections == nil {
		connections = make(map[string]conn)
	}

	// If connection with this name does not exist, create new connection it
	var readConn net.Conn
	var writeConn net.Conn
	if _, ok := connections[tag]; !ok {
		readConn, writeConn = net.Pipe()
		connections[tag] = conn{
			readConn:  &readConn,
			writeConn: &writeConn,
		}
	}

	// Subscribe connection to the provided channels
	cmd := append([]string{"PSUBSCRIBE"}, patterns...)
	go func() {
		_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false)
	}()

	return func() []string {
		r := resp.NewConn(readConn)
		v, _, _ := r.ReadValue()

		res := make([]string, len(v.Array()))
		for i := 0; i < len(res); i++ {
			res[i] = v.Array()[i].String()
		}

		return res
	}
}

// PUNSUBSCRIBE unsubscribes the caller from the given glob patterns.
//
// Parameters:
//
// `tag` - string - The tag used to identify this subscription instance.
//
// `patterns` - ...string - The list of glob patterns to unsubscribe from.
func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) {
	if connections == nil {
		return
	}

	if _, ok := connections[tag]; !ok {
		return
	}

	cmd := append([]string{"PUNSUBSCRIBE"}, patterns...)
	_, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false)
}

Here’s an example of how to use the SUBSCRIBE and PUBLISH API:

    // Subscribe to multiple EchoVault channels.
	readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3")
	wg.Add(1)
	go func() {
		wg.Done()
		for {
			message := readMessage()
			fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2])
		}
	}()
	wg.Wait()

	wg.Add(1)
	go func() {
		for i := 1; i <= 3; i++ {
			// Simulating delay.
			<-time.After(1 * time.Second)
			// Publish message to each EchoVault channel.
			_, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!")
		}
		wg.Done()
	}()
	wg.Wait()

Conclusion

This article was a quick rundown on the Pub/Sub implementation in EchoVault. Of course, a lot of context surrounds the code shared here that can not fit into one article. If you’re interested in the context, check out EchoVault’s GitHub repository and take a look at what we’re building. If you like what you see, a Github star would be massively appreciated!


Written by kelvinm | Full-Stack web developer with a deep interest in developer tools and cloud software.
Published by HackerNoon on 2024/04/28