paint-brush
如何使用 Golang 构建与 Redis 兼容的发布/订阅系统by@kelvinm
2,545
2,545

如何使用 Golang 构建与 Redis 兼容的发布/订阅系统

Kelvin Clement M.18m2024/04/28
Read on Terminal Reader

在过去的一年里,我一直在构建 EchoVault,这是 Golang 生态系统的可嵌入 Redis 替代品。EchoVault 旨在复制大部分 Redis 功能,同时提供嵌入式接口和与使用 RESP 协议的现有 Redis 客户端兼容的客户端-服务器接口。 EchoVault 中实现的功能之一是 Pub/Sub 功能。本文简要介绍了在撰写本文时 Pub/Sub 模块的实现方式。
featured image - 如何使用 Golang 构建与 Redis 兼容的发布/订阅系统
Kelvin Clement M. HackerNoon profile picture
0-item

在过去的一年里,我一直在构建EchoVault ,这是 Golang 生态系统的可嵌入 Redis 替代品。EchoVault 旨在复制大部分 Redis 功能,同时提供嵌入式接口和与使用 RESP 协议的现有 Redis 客户端兼容的客户端-服务器接口。


EchoVault 中实现的功能之一是 Pub/Sub 功能。本文简要介绍了在撰写本文时 Pub/Sub 模块的实现方式。

什么是 Pub/Sub?

Pub/Sub 代表发布/订阅。此模式允许消费者订阅某些频道。生产者将消息发布到频道,订阅该频道的所有消费者都会收到该消息。


在我们的上下文中,当嵌入 EchoVault 时,我们必须启用 Go 进程来订阅指定频道。在客户端-服务器模式下运行时,我们必须允许客户端 TCP 连接订阅频道。


在发布方面,Go 进程应该能够以嵌入模式发布到频道,并且 TCP 客户端连接应该能够以客户端-服务器模式发布消息到频道。

要求

在开始之前,我们需要确定此实现的要求和分数。在撰写本文时,EchoVault 尚未实现 Redis Pub/Sub 中可用的所有功能。但是,最重要的核心功能已经实现。以下是我们应该能够使用 EchoVault PubSub 做的事情:


  1. 允许 TCP 客户端使用命令SUBSCRIBE channel [channel …]订阅频道列表。服务器应向客户端连接发送订阅确认。


  2. 嵌入式 EchoVault 实例应该能够订阅频道列表。


  3. 允许 TCP 客户端使用PSUBSCRIBE pattern [pattern …]订阅模式,其中模式是一个 glob 字符串,允许客户端接收发布到满足该模式的所有频道的消息。


  4. 嵌入式 EchoVault 实例应该能够订阅模式列表。


  5. 允许 TCP 客户端使用命令PUBLISH channel message向频道发布消息。


  6. 从嵌入式 EchoVault 实例发布到频道。


  7. 允许 TCP 客户端分别使用UNSUBSCRIBE channel [channel …]PUNSUBSCRIBE pattern [pattern …]命令取消订阅频道和模式。


  8. 允许 TCP 客户端连接使用PUBSUB CHANNELS [pattern]命令查看包含与给定模式匹配的频道的数组。如果未提供模式,则返回所有活动频道。活动频道是具有一个或多个订阅者的频道。


  9. 允许 TCP 客户端使用PUBSUB NUMPAT命令查看客户端当前订阅的模式数量。


  10. 提供嵌入式 API 来查看活动模式的数量。


  11. 允许 TCP 客户端使用PUBSUB NUMSUB [channel [channel ...]]命令查看包含提供的频道名称以及当前有多少个客户端订阅该频道的数组。


  12. 提供嵌入式 API 来查看给定频道中的订阅者数量。

执行

现在我们已经确定了需求,让我们开始实施。从高层次来看,我们将有一个提供所有 PubSub 功能的 PubSub 结构。我们还将有一个提供频道和模式频道功能的 Channel 结构。


在本节中,我们将使用 tidwall/resp 包。EchoVault 用户使用此包向频道订阅者发送 RESP 响应。gobwas/glob 包处理 glob 模式逻辑。

渠道

首先,我们将创建Channel结构及其所有方法。我们将有两种类型的通道:常规通道和模式通道。

常规通道是有名称但没有关联模式的通道。模式通道将使用模式作为名称,并且编译后的 glob 模式将与通道关联。


模式频道用于订阅模式。否则,使用常规频道。


Channel结构具有以下形状:


 type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }


我们将使用选项模式来创建一个新的通道实例。以下是两个可用的选项:


 // WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }


Channel 结构的第一个方法是Start方法。此方法启动一个 goroutine,该 goroutine 在messageChan上侦听要向其所有订阅者广播的消息。以下是实现:


 func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }


goroutine 从messageChan中获取下一条消息,获取订阅者的读锁,将消息广播给每个订阅者,然后释放读锁。如您所见,tidwall/resp 包允许我们轻松地将 RESP 值发送到客户端连接。


大多数 Redis 客户端都希望发布/订阅消息格式为以下形状的数组:[“消息”, <频道名称>, <消息字符串>]。这就是 EchoVault 广播的内容。


你可能会注意到,我们将消息发送到resp.Conn而不是net.Conn 。我们使用resp.Conn是因为它提供了用于将 RESP 值写入其中的辅助方法,例如WriteArray 。它本质上是net.Conn的包装器,正如我们在Subscribe方法中看到的那样:


 func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }


除了订阅方法之外,我们还需要一个取消订阅方法:


 func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }


如果找到连接并将其从订阅者映射中删除,则 Unsubscribe 方法返回true 。否则,返回false 。除了上述方法之外, Channel结构还有一些辅助方法:


 func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }

发布订阅

我们将使用 PubSub 模块与频道进行交互。PubSub 结构的形状如下:


 type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }


第一个方法是Subscribe方法。此方法负责将客户端订阅到指定的频道。


 func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }


为了使客户端订阅频道,EchoVault 会监听SUBSCRIBEPSUBSCRIBE命令。当从客户端收到命令时,客户端的 TCP 连接以及频道列表将传递给Subscribe方法。当客户端使用PSUBSCRIBE命令订阅时, withPatterns参数将为true ,并且action变量的值将设置为“psubscribe”。


Unsubscribe方法允许客户端根据其使用的是SUBSCRIBE还是PSUBSCRIBE来取消订阅频道或模式。其实现如下:


 func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }


当客户端取消订阅频道/模式时,他们将收到一个包含数组的响应。每个内部数组包含操作(取消订阅/取消订阅)、频道/模式名称和索引。


下一个方法是 Publish 方法。此方法接受消息和频道名称,然后在内部处理消息的发布。


 func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }


下一个方法是Channels方法,它处理PUBSUB CHANNELS [pattern]命令:


 func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }


以下两个方法是NumPatNumSub方法,用于处理PUBSUB NUMPAT

分别是PUBSUB NUMSUB [channel [channel …]]命令。


 func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }


嵌入式 API

PubSub 模块需要传递连接才能让客户端订阅频道。但是,当 EchoVault 嵌入时,没有可用于订阅的客户端 TCP 连接。为了解决这个问题,我们使用net.Pipe来获取连接的两端。


连接的一端传递给命令处理程序,命令处理程序使用它来订阅指定的频道。连接的另一端用于返回的ReadPubSubMessage函数。


 type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }


以下是如何使用 SUBSCRIBE 和 PUBLISH API 的示例:


 // Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()

结论

本文简要介绍了 EchoVault 中的 Pub/Sub 实现。当然,这里分享的代码有很多背景信息,一篇文章无法全部介绍完。如果您对背景信息感兴趣,请查看 EchoVault 的 GitHub 存储库并查看我们正在构建的内容。如果您喜欢所看到的内容,我们将非常感激您为 Github 点赞!