No último ano, venho construindo o EchoVault , uma alternativa Redis incorporável para o ecossistema Golang. O EchoVault visa replicar a maioria dos recursos do Redis, ao mesmo tempo que fornece uma interface incorporada e uma interface cliente-servidor compatível com clientes Redis existentes que usam o protocolo RESP.
Um dos recursos implementados no EchoVault é o recurso Pub/Sub. Este artigo é um breve passo a passo de como o módulo Pub/Sub foi implementado no momento da redação.
Pub/Sub significa publicar/assinar. Esse padrão permite que os consumidores se inscrevam em determinados canais. Os produtores publicam mensagens nos canais e todos os consumidores inscritos nesse canal recebem a mensagem.
Em nosso contexto, devemos habilitar um processo Go para assinar canais específicos quando o EchoVault estiver incorporado. Ao executar no modo cliente-servidor, devemos permitir que uma conexão TCP do cliente se inscreva em canais.
No lado da publicação, um processo Go deve ser capaz de publicar em um canal no modo incorporado, e uma conexão cliente TCP deve ser capaz de publicar uma mensagem em um canal no modo cliente-servidor.
Antes de começarmos, precisamos determinar os requisitos e a pontuação desta implementação. O EchoVault não implementa todos os recursos disponíveis no Redis Pub/Sub no momento em que este artigo foi escrito. No entanto, os recursos principais mais importantes são implementados. Aqui está o que devemos ser capazes de fazer com o EchoVault PubSub:
Permita que um cliente TCP se inscreva em uma lista de canais usando o comando SUBSCRIBE channel [channel …]
. O servidor deve enviar à conexão do cliente uma confirmação das assinaturas.
Uma instância EchoVault incorporada deve ser capaz de se inscrever em uma lista de canais.
Permitir que um cliente TCP assine um padrão usando PSUBSCRIBE pattern [pattern …]
onde o padrão é uma string glob que permite ao cliente receber mensagens publicadas em todos os canais que satisfazem o padrão.
Uma instância EchoVault incorporada deve ser capaz de assinar uma lista de padrões.
Permita que um cliente TCP publique uma mensagem em um canal usando o comando PUBLISH channel message
.
Publique em um canal a partir de uma instância incorporada do EchoVault.
Permitir que clientes TCP cancelem a assinatura de canais e padrões usando os comandos UNSUBSCRIBE channel [channel …]
e PUNSUBSCRIBE pattern [pattern …]
, respectivamente.
Permita que a conexão do cliente TCP use o comando PUBSUB CHANNELS [pattern]
para visualizar uma matriz contendo os canais que correspondem ao padrão fornecido. Se nenhum padrão for fornecido, todos os canais ativos serão retornados. Canais ativos são canais com um ou mais assinantes.
Permitir que clientes TCP usem o comando PUBSUB NUMPAT
para visualizar o número de padrões atualmente assinados pelos clientes.
Forneça uma API incorporada para visualizar o número de padrões ativos.
Permita que clientes TCP usem o comando PUBSUB NUMSUB [channel [channel ...]]
para visualizar uma matriz de matrizes contendo o nome do canal fornecido e quantos clientes estão atualmente inscritos no canal.
Forneça uma API incorporada para visualizar o número de assinantes em determinados canais.
Agora que temos os requisitos definidos, vamos entrar na implementação. Em um nível superior, teremos uma estrutura PubSub que fornece todas as funcionalidades do PubSub. Teremos também uma estrutura Channel que fornece a funcionalidade para canais e canais padrão.
Para esta seção, usaremos o pacote tidwall/resp. Os usuários do EchoVault usam este pacote para enviar respostas RESP aos assinantes do canal. O pacote gobwas/glob lida com a lógica do padrão glob.
Primeiro, criaremos a estrutura Channel
e todos os seus métodos. Teremos dois tipos de canais: canais regulares e canais padrões.
Canais regulares são canais com nomes e sem padrões associados a eles. Os canais padrão usarão o padrão como nome e um padrão glob compilado será associado ao canal.
Canais de padrão são usados para assinaturas de padrões. Caso contrário, serão usados canais regulares.
A estrutura Channel
tem o seguinte formato:
type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }
Usaremos o padrão de opção para criar uma nova instância de canal. Aqui estão as duas opções disponíveis:
// WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }
O primeiro método para a estrutura Channel é o método Start
. Este método inicia uma goroutine que escuta no messageChan
as mensagens a serem transmitidas para todos os seus assinantes. Aqui está a implementação:
func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }
A goroutine pega a próxima mensagem de messageChan
, adquire um bloqueio de leitura para os assinantes, transmite a mensagem para cada assinante e então libera o bloqueio de leitura. Como você pode ver, o pacote tidwall/resp nos permite enviar valores RESP para a conexão do cliente facilmente.
A maioria dos clientes Redis espera que o formato da mensagem pub/sub seja uma matriz no seguinte formato: [“mensagem”, <nome do canal>, <string da mensagem>]. É isso que é transmitido pela EchoVault.
Você pode perceber que estamos enviando a mensagem para resp.Conn
e não net.Conn
. Estamos usando resp.Conn
porque ele fornece métodos auxiliares para gravar valores RESP nele, como WriteArray
. É essencialmente um wrapper em torno net.Conn
, como veremos no método Subscribe
:
func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }
Junto com o método Subscribe, também precisamos de um método Unsubscribe:
func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }
O método Unsubscribe retorna true
se a conexão foi encontrada e excluída do mapa de assinantes. Caso contrário, ele retorna false
. Além dos métodos acima, existem mais alguns métodos auxiliares para a estrutura Channel
:
func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }
Usaremos o módulo PubSub para interagir com os canais. A forma da estrutura PubSub é a seguinte:
type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }
O primeiro método é o método Subscribe
. Este método trata da inscrição do cliente no(s) canal(is) especificado(s).
func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }
Para inscrever um cliente em um canal, o EchoVault escuta um comando SUBSCRIBE
ou PSUBSCRIBE
. Quando o comando é recebido de um cliente, a conexão TCP do cliente, juntamente com a lista de canais, é passada para o método Subscribe
. Quando um cliente assina usando o comando PSUBSCRIBE
, o parâmetro withPatterns
será true
e o valor da variável action
será definido como “psubscribe”.
O método Unsubscribe
permite que um cliente cancele a assinatura de canais ou padrões com base no uso SUBSCRIBE
ou PSUBSCRIBE
. Sua implementação é a seguinte:
func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }
Quando o cliente cancela a assinatura de canais/padrões, ele receberá uma resposta contendo uma matriz de matrizes. Cada array interno contém a ação (cancelar assinatura/cancelar assinatura), o nome do canal/padrão e o índice.
O próximo método é o método Publish. Este método aceita a mensagem e o nome do canal e então trata da publicação da mensagem internamente.
func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }
O próximo método é o método Channels
, que lida com o comando PUBSUB CHANNELS [pattern]
:
func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }
Os 2 métodos a seguir são os métodos NumPat
e NumSub
para lidar com PUBSUB NUMPAT
e
Comandos PUBSUB NUMSUB [channel [channel …]]
respectivamente.
func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }
O módulo PubSub espera que uma conexão seja passada para inscrever um cliente em um canal. No entanto, quando o EchoVault está incorporado, não há conexão TCP com um cliente que possa ser usado para uma assinatura. Para contornar isso, usamos net.Pipe
para obter as duas extremidades de uma conexão.
Uma extremidade da conexão é passada para o manipulador de comandos, que a utiliza para assinar o canal especificado. A outra extremidade da conexão é usada na função ReadPubSubMessage
retornada.
type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }
Aqui está um exemplo de como usar a API SUBSCRIBE e PUBLISH:
// Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()
Este artigo foi um rápido resumo da implementação do Pub/Sub no EchoVault. É claro que há muito contexto em torno do código compartilhado aqui que não cabe em um artigo. Se você estiver interessado no contexto, confira o repositório GitHub do EchoVault e dê uma olhada no que estamos construindo. Se você gosta do que vê, uma estrela do Github seria muito apreciada!