paint-brush
Golang を使って Redis 互換の Pub/Sub システムを構築する方法by@kelvinm
2,545
2,545

Golang を使って Redis 互換の Pub/Sub システムを構築する方法

Kelvin Clement M.18m2024/04/28
Read on Terminal Reader

昨年、私は Golang エコシステム向けの埋め込み可能な Redis 代替品である EchoVault を構築してきました。EchoVault は、Redis 機能のほとんどを再現しながら、埋め込みインターフェースと、RESP プロトコルを使用する既存の Redis クライアントと互換性のあるクライアント サーバー インターフェースの両方を提供することを目指しています。 EchoVault に実装されている機能の 1 つに、Pub/Sub 機能があります。この記事は、執筆時点で Pub/Sub モジュールがどのように実装されているかについての簡単なウォークスルーです。
featured image - Golang を使って Redis 互換の Pub/Sub システムを構築する方法
Kelvin Clement M. HackerNoon profile picture
0-item

昨年、私は Golang エコシステム用の埋め込み可能な Redis 代替品であるEchoVaultを構築してきました。EchoVault は、Redis 機能のほとんどを再現しながら、RESP プロトコルを使用する既存の Redis クライアントと互換性のある埋め込みインターフェイスとクライアント サーバー インターフェイスの両方を提供することを目指しています。


EchoVault に実装されている機能の 1 つに、Pub/Sub 機能があります。この記事では、執筆時点での Pub/Sub モジュールの実装方法を簡単に説明します。

Pub/Sub とは何ですか?

Pub/Sub は、publish/subscribe の略です。このパターンにより、コンシューマーは特定のチャネルをサブスクライブできます。プロデューサーはチャネルにメッセージをパブリッシュし、そのチャネルにサブスクライブしているすべてのコンシューマーがメッセージを受信します。


私たちのコンテキストでは、EchoVault が埋め込まれている場合、Go プロセスが指定されたチャネルをサブスクライブできるようにする必要があります。クライアント サーバー モードで実行している場合は、クライアント TCP 接続がチャネルをサブスクライブできるようにする必要があります。


公開側では、Go プロセスは埋め込みモードでチャネルに公開でき、TCP クライアント接続はクライアント サーバー モードでチャネルにメッセージを公開できる必要があります。

要件

始める前に、この実装の要件とスコアを決定する必要があります。執筆時点では、EchoVault は Redis Pub/Sub で利用可能なすべての機能を実装していません。ただし、最も重要なコア機能は実装されています。EchoVault PubSub で実行できることは次のとおりです。


  1. TCP クライアントがSUBSCRIBE channel [channel …]コマンドを使用してチャネルのリストにサブスクライブできるようにします。サーバーはクライアント接続にサブスクリプションの確認を送信する必要があります。


  2. 埋め込まれた EchoVault インスタンスは、チャネルのリストにサブスクライブできる必要があります。


  3. TCP クライアントがPSUBSCRIBE pattern [pattern …]を使用してパターンをサブスクライブできるようにします。パターンは、パターンを満たすすべてのチャネルに公開されたメッセージをクライアントが受信できるようにする glob 文字列です。


  4. 埋め込まれた EchoVault インスタンスは、パターンのリストをサブスクライブできる必要があります。


  5. PUBLISH channel messageコマンドを使用して、TCP クライアントがチャネルにメッセージを公開できるようにします。


  6. 埋め込まれた EchoVault インスタンスからチャネルに公開します。


  7. TCP クライアントがUNSUBSCRIBE channel [channel …]PUNSUBSCRIBE pattern [pattern …]コマンドを使用して、チャネルとパターンから登録解除できるようにします。


  8. TCP クライアント接続でPUBSUB CHANNELS [pattern]コマンドを使用して、指定されたパターンに一致するチャネルを含む配列を表示できるようにします。パターンが指定されていない場合は、すべてのアクティブなチャネルが返されます。アクティブなチャネルとは、1 つ以上のサブスクライバーを持つチャネルです。


  9. TCP クライアントがPUBSUB NUMPATコマンドを使用して、現在クライアントがサブスクライブしているパターンの数を表示できるようにします。


  10. アクティブなパターンの数を表示するための埋め込み API を提供します。


  11. TCP クライアントがPUBSUB NUMSUB [channel [channel ...]]コマンドを使用して、指定されたチャネル名と現在チャネルにサブスクライブしているクライアントの数を含む配列の配列を表示できるようにします。


  12. 指定されたチャンネルの登録者数を表示するための埋め込み API を提供します。

実装

要件が設定されたので、実装に取り掛かりましょう。大まかに言うと、すべての PubSub 機能を提供する PubSub 構造体があります。また、チャネルとパターン チャネルの機能を提供する Channel 構造体もあります。


このセクションでは、tidwall/resp パッケージを使用します。EchoVault ユーザーは、このパッケージを使用して、チャネル サブスクライバーに RESP 応答を送信します。gobwas/glob パッケージは、glob パターン ロジックを処理します。

チャネル

まず、 Channel構造体とそのすべてのメソッドを作成します。通常のチャネルとパターン チャネルの 2 種類のチャネルがあります。

通常のチャネルは、名前はあるもののパターンが関連付けられていないチャネルです。パターン チャネルはパターンを名前として使用し、コンパイルされた glob パターンがチャネルに関連付けられます。


パターン チャネルは、パターンのサブスクリプションに使用されます。それ以外の場合は、通常のチャネルが使用されます。


Channel構造体の形状は次のとおりです。


 type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }


オプション パターンを使用して新しいチャネル インスタンスを作成します。使用可能なオプションは次の 2 つです。


 // WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }


Channel 構造体の最初のメソッドはStartメソッドです。このメソッドは、すべてのサブスクライバーにブロードキャストするメッセージをmessageChanでリッスンする goroutine を開始します。実装は次のとおりです。


 func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }


goroutine はmessageChanから次のメッセージを取得し、サブスクライバーの読み取りロックを取得し、各サブスクライバーにメッセージをブロードキャストしてから、読み取りロックを解除します。ご覧のとおり、tidwall/resp パッケージを使用すると、RESP 値をクライアント接続に簡単に送信できます。


ほとんどの Redis クライアントは、pub/sub メッセージ形式が次の形式の配列であることを期待しています: [“message”, <channel name>, <message string>]。これが EchoVault によってブロードキャストされるものです。


メッセージをnet.Connではなくresp.Connに送信していることに気付いたかもしれません。 resp.Connを使用するのは、 WriteArrayなど、RESP 値を書き込むためのヘルパー メソッドを提供するためです。Subscribe メソッドでわかるようにSubscribeこれは基本的にnet.Connのラッパーです。


 func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }


Subscribe メソッドに加えて、Unsubscribe メソッドも必要です。


 func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }


Unsubscribe メソッドは、接続が見つかり、サブスクライバー マップから削除された場合はtrueを返します。それ以外の場合はfalseを返します。上記のメソッドに加えて、 Channel構造体にはさらにいくつかのヘルパー メソッドがあります。


 func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }

パブリッシュサブスクライブ

チャネルとやり取りするには、PubSub モジュールを使用します。PubSub 構造体の形状は次のとおりです。


 type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }


最初のメソッドはSubscribeメソッドです。このメソッドは、指定されたチャネルへのクライアントのサブスクライブを処理します。


 func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }


クライアントをチャネルにサブスクライブするために、EchoVault はSUBSCRIBEまたはPSUBSCRIBEコマンドをリッスンします。クライアントからコマンドを受信すると、クライアントの TCP 接続とチャネルのリストがSubscribeメソッドに渡されます。クライアントがPSUBSCRIBEコマンドを使用してサブスクライブすると、 withPatternsパラメーターはtrueになり、 action変数の値は "psubscribe" に設定されます。


Unsubscribeメソッドを使用すると、クライアントはSUBSCRIBEまたはPSUBSCRIBEどちらを使用したかに基づいて、チャネルまたはパターンから登録解除できます。実装は次のとおりです。


 func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }


クライアントがチャネル/パターンから登録解除すると、配列の配列を含む応答が受信されます。各内部配列には、アクション (unsubscribe/punsubscribe)、チャネル/パターン名、およびインデックスが含まれます。


次のメソッドは Publish メソッドです。このメソッドは、メッセージとチャネル名を受け取り、メッセージの公開を内部的に処理します。


 func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }


次のメソッドは、 PUBSUB CHANNELS [pattern]コマンドを処理するChannelsメソッドです。


 func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }


次の2つのメソッドは、 PUBSUB NUMPATを処理するNumPatメソッドとNumSubメソッドです。

PUBSUB NUMSUB [channel [channel …]]コマンドをそれぞれ実行します。


 func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }


組み込みAPI

PubSub モジュールは、クライアントをチャネルにサブスクライブするために接続が渡されることを想定しています。ただし、EchoVault が埋め込まれている場合、サブスクリプションに使用できるクライアントへの TCP 接続はありません。これを回避するには、 net.Pipeを使用して接続の両端を取得します。


接続の一方の端はコマンド ハンドラーに渡され、指定されたチャネルをサブスクライブするために使用されます。接続のもう一方の端は、返されたReadPubSubMessage関数で使用されます。


 type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }


SUBSCRIBE および PUBLISH API の使用方法の例を次に示します。


 // Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()

結論

この記事は、EchoVault の Pub/Sub 実装について簡単に説明したものです。もちろん、ここで共有したコードには多くのコンテキストがあり、1 つの記事に収まりきりません。コンテキストに興味がある場合は、EchoVault の GitHub リポジトリをチェックして、私たちが構築しているものをご覧ください。気に入った場合は、Github スターを付けていただければ大変助かります。