paint-brush
Golang을 사용하여 Redis 호환 Pub/Sub 시스템을 구축하는 방법~에 의해@kelvinm
2,869 판독값
2,869 판독값

Golang을 사용하여 Redis 호환 Pub/Sub 시스템을 구축하는 방법

~에 의해 Kelvin Clement M.18m2024/04/28
Read on Terminal Reader

너무 오래; 읽다

작년에 저는 Golang 생태계를 위한 내장형 Redis 대안인 EchoVault를 구축해 왔습니다. EchoVault는 RESP 프로토콜을 사용하는 기존 Redis 클라이언트와 호환되는 임베디드 인터페이스와 클라이언트-서버 인터페이스를 모두 제공하면서 대부분의 Redis 기능을 복제하는 것을 목표로 합니다. EchoVault에 구현된 기능 중 하나는 Pub/Sub 기능입니다. 이 문서는 작성 당시 Pub/Sub 모듈이 어떻게 구현되었는지 간략하게 설명합니다.
featured image - Golang을 사용하여 Redis 호환 Pub/Sub 시스템을 구축하는 방법
Kelvin Clement M. HackerNoon profile picture
0-item

작년에 저는 Golang 생태계를 위한 내장형 Redis 대안인 EchoVault를 구축해 왔습니다. EchoVault는 RESP 프로토콜을 사용하는 기존 Redis 클라이언트와 호환되는 임베디드 인터페이스와 클라이언트-서버 인터페이스를 모두 제공하면서 대부분의 Redis 기능을 복제하는 것을 목표로 합니다.


EchoVault에 구현된 기능 중 하나는 Pub/Sub 기능입니다. 이 문서는 작성 당시 Pub/Sub 모듈이 어떻게 구현되었는지 간략하게 설명합니다.

게시/구독이란 무엇입니까?

Pub/Sub는 게시/구독을 의미합니다. 이 패턴을 통해 소비자는 특정 채널을 구독할 수 있습니다. 생산자는 채널에 메시지를 게시하고 해당 채널을 구독하는 모든 소비자는 메시지를 받습니다.


우리의 맥락에서는 EchoVault가 내장된 경우 지정된 채널을 구독하려면 Go 프로세스를 활성화해야 합니다. 클라이언트-서버 모드에서 실행할 때 클라이언트 TCP 연결이 채널을 구독하도록 허용해야 합니다.


게시 측면에서 Go 프로세스는 포함된 모드에서 채널에 게시할 수 있어야 하며, TCP 클라이언트 연결은 클라이언트-서버 모드에서 채널에 메시지를 게시할 수 있어야 합니다.

요구사항

시작하기 전에 이 구현의 요구 사항과 점수를 결정해야 합니다. EchoVault는 작성 당시 Redis Pub/Sub에서 사용 가능한 모든 기능을 구현하지 않습니다. 그러나 가장 중요한 핵심 기능은 구현되었습니다. EchoVault PubSub로 수행할 수 있는 작업은 다음과 같습니다.


  1. SUBSCRIBE channel [channel …] 명령을 사용하여 TCP 클라이언트가 채널 목록을 구독하도록 허용합니다. 서버는 클라이언트 연결에 구독 확인을 보내야 합니다.


  2. 내장된 EchoVault 인스턴스는 채널 목록을 구독할 수 있어야 합니다.


  3. TCP 클라이언트가 PSUBSCRIBE pattern [pattern …] 사용하여 패턴을 구독하도록 허용합니다. 여기서 패턴은 클라이언트가 패턴을 충족하는 모든 채널에 게시된 메시지를 수신할 수 있도록 하는 glob 문자열입니다.


  4. 내장된 EchoVault 인스턴스는 패턴 목록을 구독할 수 있어야 합니다.


  5. PUBLISH channel message 명령을 사용하여 TCP 클라이언트가 채널에 메시지를 게시하도록 허용합니다.


  6. 내장된 EchoVault 인스턴스에서 채널에 게시합니다.


  7. UNSUBSCRIBE channel [channel …]PUNSUBSCRIBE pattern [pattern …] 명령을 각각 사용하여 TCP 클라이언트가 채널 및 패턴에서 구독을 취소할 수 있도록 허용합니다.


  8. TCP 클라이언트 연결에서 PUBSUB CHANNELS [pattern] 명령을 사용하여 지정된 패턴과 일치하는 채널이 포함된 배열을 볼 수 있도록 허용합니다. 패턴이 제공되지 않으면 모든 활성 채널이 반환됩니다. 활성 채널은 한 명 이상의 구독자가 있는 채널입니다.


  9. TCP 클라이언트가 PUBSUB NUMPAT 명령을 사용하여 클라이언트가 현재 구독하고 있는 패턴 수를 볼 수 있도록 허용합니다.


  10. 활성 패턴 수를 볼 수 있는 내장된 API를 제공합니다.


  11. TCP 클라이언트가 PUBSUB NUMSUB [channel [channel ...]] 명령을 사용하여 제공된 채널 이름과 현재 채널에 구독 중인 클라이언트 수를 포함하는 배열 배열을 볼 수 있도록 허용합니다.


  12. 특정 채널의 구독자 수를 볼 수 있는 내장된 API를 제공합니다.

구현

이제 요구 사항이 설정되었으므로 구현을 시작해 보겠습니다. 높은 수준에서는 모든 PubSub 기능을 제공하는 PubSub 구조체를 갖게 됩니다. 또한 채널 및 패턴 채널에 대한 기능을 제공하는 채널 구조체도 있습니다.


이 섹션에서는 tidwall/resp 패키지를 사용합니다. EchoVault 사용자는 이 패키지를 사용하여 채널 구독자에게 RESP 응답을 보냅니다. gobwas/glob 패키지는 glob 패턴 논리를 처리합니다.

채널

먼저 Channel 구조체와 해당 메서드를 모두 만듭니다. 일반 채널과 패턴 채널이라는 두 가지 유형의 채널이 있습니다.

일반 채널은 이름은 있지만 연관된 패턴은 없는 채널입니다. 패턴 채널은 패턴을 이름으로 사용하고 컴파일된 글로브 패턴이 채널과 연결됩니다.


패턴 채널은 패턴 구독에 사용됩니다. 그렇지 않으면 일반 채널이 사용됩니다.


Channel 구조체의 모양은 다음과 같습니다.


 type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }


옵션 패턴을 사용하여 새 채널 인스턴스를 생성하겠습니다. 사용 가능한 두 가지 옵션은 다음과 같습니다.


 // WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }


Channel 구조체의 첫 번째 메서드는 Start 메서드입니다. 이 메서드는 messageChan 에서 모든 구독자에게 브로드캐스트할 메시지를 수신하는 goroutine을 시작합니다. 구현은 다음과 같습니다.


 func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }


고루틴은 messageChan 에서 다음 메시지를 선택하고 구독자에 대한 읽기 잠금을 획득하고 각 구독자에게 메시지를 브로드캐스트한 다음 읽기 잠금을 해제합니다. 보시다시피 tidwall/resp 패키지를 사용하면 RESP 값을 클라이언트 연결에 쉽게 보낼 수 있습니다.


대부분의 Redis 클라이언트는 게시/구독 메시지 형식이 [“메시지”, <채널 이름>, <메시지 문자열>] 형태의 배열일 것으로 예상합니다. 이것이 EchoVault에서 방송되는 내용입니다.


net.Conn 이 아닌 resp.Conn 으로 메시지를 보내고 있음을 알 수 있습니다. resp.Conn 사용하는 이유는 WriteArray 와 같이 RESP 값을 쓰기 위한 도우미 메서드를 제공하기 때문입니다. Subscribe 메소드에서 볼 수 있듯이 본질적으로 net.Conn 을 둘러싼 래퍼입니다.


 func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }


Subscribe 메서드와 함께 Unsubscribe 메서드도 필요합니다.


 func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }


구독자 맵에서 연결이 발견되어 삭제된 경우 Unsubscribe 메소드는 true 반환합니다. 그렇지 않으면 false 반환합니다. 위의 메서드 외에도 Channel 구조체에 대한 몇 가지 도우미 메서드가 더 있습니다.


 func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }

PubSub

PubSub 모듈을 사용하여 채널과 상호작용하겠습니다. PubSub 구조체의 모양은 다음과 같습니다.


 type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }


첫 번째 방법은 Subscribe 방법입니다. 이 메소드는 클라이언트의 지정된 채널 구독을 처리합니다.


 func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }


클라이언트를 채널에 구독하기 위해 EchoVault는 SUBSCRIBE 또는 PSUBSCRIBE 명령을 수신합니다. 클라이언트로부터 명령이 수신되면 클라이언트의 TCP 연결이 채널 목록과 함께 Subscribe 메서드에 전달됩니다. 클라이언트가 PSUBSCRIBE 명령을 사용하여 구독하면 withPatterns 매개변수는 true 가 되고 action 변수의 값은 "psubscribe"로 설정됩니다.


Unsubscribe 메소드를 사용하면 클라이언트는 SUBSCRIBE 또는 PSUBSCRIBE 어느 것을 사용했는지에 따라 채널이나 패턴의 구독을 취소할 수 있습니다. 구현은 다음과 같습니다.


 func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }


클라이언트가 채널/패턴 구독을 취소하면 배열 배열이 포함된 응답을 받게 됩니다. 각 내부 배열에는 작업(구독 취소/punsubscribe), 채널/패턴 이름 및 인덱스가 포함됩니다.


다음 방법은 게시 방법입니다. 이 메서드는 메시지와 채널 이름을 수락한 다음 내부적으로 메시지 게시를 처리합니다.


 func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }


다음 방법은 PUBSUB CHANNELS [pattern] 명령을 처리하는 Channels 방법입니다.


 func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }


다음 2가지 방법은 PUBSUB NUMPAT 및 NumPat을 처리하는 NumPatNumSub 방법입니다.

PUBSUB NUMSUB [channel [channel …]] 명령이 각각 실행됩니다.


 func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }


임베디드 API

PubSub 모듈은 클라이언트를 채널에 구독하기 위해 연결이 전달될 것으로 예상합니다. 그러나 EchoVault가 포함된 경우 구독에 사용할 수 있는 클라이언트에 대한 TCP 연결이 없습니다. 이 문제를 해결하기 위해 net.Pipe 사용하여 연결의 양쪽 끝을 가져옵니다.


연결의 한쪽 끝은 이를 사용하여 지정된 채널을 구독하는 명령 처리기로 전달됩니다. 연결의 다른 쪽 끝은 반환된 ReadPubSubMessage 함수에서 사용됩니다.


 type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }


다음은 SUBSCRIBE 및 PUBLISH API를 사용하는 방법에 대한 예입니다.


 // Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()

결론

이 문서는 EchoVault의 Pub/Sub 구현에 대한 간략한 요약이었습니다. 물론 여기에 공유된 코드를 둘러싸고 있는 많은 맥락은 한 기사에 다 담을 수 없습니다. 컨텍스트에 관심이 있다면 EchoVault의 GitHub 저장소를 확인하고 우리가 구축 중인 내용을 살펴보세요. 본 내용이 마음에 든다면 Github 스타를 추천해 주시면 매우 감사하겠습니다!