paint-brush
Golang'ı Kullanarak Redis Uyumlu Bir Pub/Sub Sistemini Nasıl Oluşturdum?ile@kelvinm
2,869 okumalar
2,869 okumalar

Golang'ı Kullanarak Redis Uyumlu Bir Pub/Sub Sistemini Nasıl Oluşturdum?

ile Kelvin Clement M.18m2024/04/28
Read on Terminal Reader

Çok uzun; Okumak

Geçtiğimiz yıl, Golang ekosistemi için yerleştirilebilir bir Redis alternatifi olan EchoVault'u inşa ediyordum. EchoVault, hem yerleşik bir arayüz hem de RESP protokolünü kullanan mevcut Redis istemcileriyle uyumlu bir istemci-sunucu arayüzü sağlarken Redis özelliklerinin çoğunu kopyalamayı amaçlamaktadır. EchoVault'ta uygulanan özelliklerden biri de Pub/Sub özelliğidir. Bu makale, yazıldığı sırada Pub/Sub modülünün nasıl uygulandığına dair kısa bir açıklamadır.
featured image - Golang'ı Kullanarak Redis Uyumlu Bir Pub/Sub Sistemini Nasıl Oluşturdum?
Kelvin Clement M. HackerNoon profile picture
0-item

Geçtiğimiz yıl, Golang ekosistemi için yerleştirilebilir bir Redis alternatifi olan EchoVault'u inşa ediyordum. EchoVault, hem yerleşik bir arayüz hem de RESP protokolünü kullanan mevcut Redis istemcileriyle uyumlu bir istemci-sunucu arayüzü sağlarken Redis özelliklerinin çoğunu kopyalamayı amaçlamaktadır.


EchoVault'ta uygulanan özelliklerden biri de Pub/Sub özelliğidir. Bu makale, yazıldığı sırada Pub/Sub modülünün nasıl uygulandığına dair kısa bir açıklamadır.

Pub/Sub Nedir?

Pub/Sub, yayınlama/abone olma anlamına gelir. Bu model tüketicilerin belirli kanallara abone olmalarına olanak tanır. Yapımcılar kanallara mesaj yayınlar ve o kanala abone olan tüm tüketiciler mesajı alır.


Bizim bağlamımızda, EchoVault yerleştirildiğinde belirli kanallara abone olmak için bir Go sürecini etkinleştirmeliyiz. İstemci-sunucu modunda çalışırken, istemci TCP bağlantısının kanallara abone olmasına izin vermeliyiz.


Yayımlama tarafında, bir Go işlemi, gömülü modda bir kanala yayınlayabilmeli ve bir TCP istemci bağlantısı, istemci-sunucu modunda bir kanala mesaj yayınlayabilmelidir.

Gereksinimler

Başlamadan önce bu uygulamanın gerekliliklerini ve puanını belirlememiz gerekiyor. EchoVault, bu yazının yazıldığı sırada Redis Pub/Sub'da mevcut olan tüm özellikleri uygulamamaktadır. Ancak en önemli temel özellikler uygulanır. EchoVault PubSub ile şunları yapabilmemiz gerekiyor:


  1. Bir TCP istemcisinin SUBSCRIBE channel [channel …] komutunu kullanarak bir kanal listesine abone olmasına izin verin. Sunucu, istemci bağlantısına aboneliklerin onayını göndermelidir.


  2. Gömülü bir EchoVault örneği, bir kanal listesine abone olabilmelidir.


  3. Bir TCP istemcisinin PSUBSCRIBE pattern [pattern …] kullanarak bir kalıba abone olmasına izin verin; burada kalıp, istemcinin kalıbı karşılayan tüm kanallara yayınlanan mesajları almasına olanak tanıyan bir glob dizesidir.


  4. Gömülü bir EchoVault örneği, bir kalıp listesine abone olabilmelidir.


  5. Bir TCP istemcisinin, PUBLISH channel message komutunu kullanarak bir kanala mesaj yayınlamasına izin verin.


  6. Gömülü bir EchoVault örneğinden bir kanala yayınlayın.


  7. Sırasıyla UNSUBSCRIBE channel [channel …] ve PUNSUBSCRIBE pattern [pattern …] komutlarını kullanarak TCP istemcilerinin kanal ve kalıp aboneliğinden çıkmalarına izin verin.


  8. Verilen modelle eşleşen kanalları içeren bir diziyi görüntülemek için TCP istemci bağlantısının PUBSUB CHANNELS [pattern] komutunu kullanmasına izin verin. Herhangi bir model sağlanmazsa tüm aktif kanallar döndürülür. Aktif kanallar bir veya daha fazla abonesi olan kanallardır.


  9. TCP istemcilerinin, istemciler tarafından halihazırda abone olunan modellerin sayısını görüntülemek için PUBSUB NUMPAT komutunu kullanmasına izin verin.


  10. Etkin kalıpların sayısını görüntülemek için yerleşik bir API sağlayın.


  11. TCP istemcilerinin, sağlanan kanal adını ve o anda kanala kaç istemcinin abone olduğunu içeren bir dizi diziyi görüntülemek için PUBSUB NUMSUB [channel [channel ...]] komutunu kullanmasına izin verin.


  12. Belirli kanallardaki abone sayısını görüntülemek için yerleşik bir API sağlayın.

Uygulama

Gereksinimleri belirlediğimize göre artık uygulamaya geçebiliriz. Yüksek düzeyde, tüm PubSub işlevlerini sağlayan bir PubSub yapısına sahip olacağız. Ayrıca kanallar ve kalıp kanalları için işlevsellik sağlayan bir Kanal yapısına da sahip olacağız.


Bu bölüm için tidwall/resp paketini kullanacağız. EchoVault kullanıcıları bu paketi kanal abonelerine RESP yanıtları göndermek için kullanır. gobwas/glob paketi glob desen mantığını yönetir.

Kanal

Öncelikle Channel yapısını ve onun tüm metodlarını oluşturacağız. İki tür kanalımız olacak: normal kanallar ve kalıp kanalları.

Normal kanallar, adları olan ve bunlarla ilişkilendirilmiş kalıpları olmayan kanallardır. Desen kanalları, modeli ad olarak kullanacak ve derlenmiş bir glob modeli, kanalla ilişkilendirilecektir.


Desen kanalları, kalıplara abonelikler için kullanılır. Aksi takdirde normal kanallar kullanılır.


Channel yapısı aşağıdaki şekle sahiptir:


 type Channel struct { name string // Channel name. This can be a glob pattern string. pattern glob.Glob // Compiled glob pattern. This is nil if the channel is not a pattern channel. subscribersRWMut sync.RWMutex // RWMutex to concurrency control when accessing channel subscribers. subscribers map[*net.Conn]*resp.Conn // Map containing the channel subscribers. messageChan *chan string // Messages published to this channel will be sent to this channel. }


Yeni bir kanal örneği oluşturmak için seçenek modelini kullanacağız. İşte mevcut iki seçenek:


 // WithName option sets the channels name. func WithName(name string) func(channel *Channel) { return func(channel *Channel) { channel.name = name } } // WithPattern option sets the compiled glob pattern for the channel if it's a pattern channel. func WithPattern(pattern string) func(channel *Channel) { return func(channel *Channel) { channel.name = pattern channel.pattern = glob.MustCompile(pattern) } } func NewChannel(options ...func(channel *Channel)) *Channel { messageChan := make(chan string, 4096) // messageChan is buffered. This could be a configurable value. channel := &Channel{ name: "", pattern: nil, subscribersRWMut: sync.RWMutex{}, subscribers: make(map[*net.Conn]*resp.Conn), messageChan: &messageChan, } for _, option := range options { option(channel) } return channel }


Channel yapısının ilk metodu Start metodudur. Bu yöntem, tüm abonelerine yayınlanacak mesajları messageChan dinleyen bir goroutine başlatır. İşte uygulama:


 func (ch *Channel) Start() { go func() { for { message := <-*ch.messageChan ch.subscribersRWMut.RLock() for _, conn := range ch.subscribers { go func(conn *resp.Conn) { if err := conn.WriteArray([]resp.Value{ resp.StringValue("message"), resp.StringValue(ch.name), resp.StringValue(message), }); err != nil { log.Println(err) } }(conn) } ch.subscribersRWMut.RUnlock() } }() }


Goroutine, messageChan bir sonraki mesajı alır, aboneler için bir okuma kilidi elde eder, mesajı her aboneye yayınlar ve ardından okuma kilidini serbest bırakır. Gördüğünüz gibi tidwall/resp paketi RESP değerlerini client bağlantısına kolaylıkla göndermemizi sağlıyor.


Redis istemcilerinin çoğu, pub/sub mesaj formatının şu şekilde bir dizi olmasını bekler: [“mesaj”, <kanal adı>, <mesaj dizesi>]. EchoVault tarafından yayınlanan şey budur.


Mesajı net.Conn değil resp.Conn gönderdiğimizi fark edebilirsiniz. RESP değerlerini yazmak için WriteArray gibi yardımcı yöntemler sağladığı için resp.Conn kullanıyoruz. Subscribe yönteminde göreceğimiz gibi, aslında net.Conn çevreleyen bir sarmalayıcıdır:


 func (ch *Channel) Subscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() // Acquire write-lock because we'll be modifying the subscriber map. defer ch.subscribersRWMut.Unlock() // If the connection does not exist in the subscriber map, add it. if _, ok := ch.subscribers[conn]; !ok { ch.subscribers[conn] = resp.NewConn(*conn) } _, ok := ch.subscribers[conn] return ok }


Abone Ol yönteminin yanı sıra Abonelikten Çık yöntemine de ihtiyacımız var:


 func (ch *Channel) Unsubscribe(conn *net.Conn) bool { ch.subscribersRWMut.Lock() defer ch.subscribersRWMut.Unlock() if _, ok := ch.subscribers[conn]; !ok { return false } delete(ch.subscribers, conn) return true }


Bağlantının bulunması ve abone haritasından silinmesi durumunda Abonelikten Çıkma yöntemi true döndürür. Aksi takdirde false döndürür. Yukarıdaki yöntemlere ek olarak Channel yapısı için birkaç yardımcı yöntem daha vardır:


 func (ch *Channel) Name() string { return ch.name } func (ch *Channel) Pattern() glob.Glob { return ch.pattern } func (ch *Channel) Publish(message string) { *ch.messageChan <- message } // IsActive returns true when the channel has 1 or more subscribers. func (ch *Channel) IsActive() bool { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() active := len(ch.subscribers) > 0 return active } // NumSubs returns the number of subscribers for this channel. func (ch *Channel) NumSubs() int { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() n := len(ch.subscribers) return n } // Subscribers returns a copy of the subscriber map. func (ch *Channel) Subscribers() map[*net.Conn]*resp.Conn { ch.subscribersRWMut.RLock() defer ch.subscribersRWMut.RUnlock() subscribers := make(map[*net.Conn]*resp.Conn, len(ch.subscribers)) for k, v := range ch.subscribers { subscribers[k] = v } return subscribers }

PubSub

Kanallarla etkileşim kurmak için PubSub modülünü kullanacağız. PubSub yapısının şekli aşağıdaki gibidir:


 type PubSub struct { channels []*Channel // Slice of references to channels channelsRWMut sync.RWMutex // RWMutex for concurrency controls when accessing channels } func NewPubSub() *PubSub { return &PubSub{ channels: []*Channel{}, channelsRWMut: sync.RWMutex{}, } }


İlk yöntem Subscribe yöntemidir. Bu yöntem, istemcinin belirtilen kanal(lar)a abone olmasını sağlar.


 func (ps *PubSub) Subscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) { ps.channelsRWMut.Lock() // Acquire write-lock as we may edit the slice of channels. defer ps.channelsRWMut.Unlock() r := resp.NewConn(*conn) // Wrap net.Conn connection with resp.Conn. action := "subscribe" if withPattern { action = "psubscribe" } // Loop through all the channels that the client has requested to subscribe to. for i := 0; i < len(channels); i++ { // Check if channel with given name exists // If it does, subscribe the connection to the channel // If it does not, create the channel and subscribe to it channelIdx := slices.IndexFunc(ps.channels, func(channel *Channel) bool { return channel.name == channels[i] }) if channelIdx == -1 { // If the channel does not exist, create new channel, start it, and subscribe to it. var newChan *Channel if withPattern { newChan = NewChannel(WithPattern(channels[i])) } else { newChan = NewChannel(WithName(channels[i])) } newChan.Start() if newChan.Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(newChan.name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } ps.channels = append(ps.channels, newChan) // Append the new channel to the list of channels. } else { // Subscribe to existing channel if ps.channels[channelIdx].Subscribe(conn) { // Write string array to the client connection confirming the subscription. if err := r.WriteArray([]resp.Value{ resp.StringValue(action), resp.StringValue(ps.channels[channelIdx].name), resp.IntegerValue(i + 1), }); err != nil { log.Println(err) } } } } }


Bir istemciyi bir kanala abone olmak için EchoVault, SUBSCRIBE veya PSUBSCRIBE komutunu dinler. Komut bir istemciden alındığında, istemcinin TCP bağlantısı kanal listesiyle birlikte Subscribe yöntemine iletilir. Bir istemci PSUBSCRIBE komutunu kullanarak abone olduğunda withPatterns parametresi true olacak ve action değişkeninin değeri "psubscribe" olarak ayarlanacaktır.


Unsubscribe yöntemi, bir müşterinin SUBSCRIBE veya PSUBSCRIBE kullanıp kullanmadığına bağlı olarak kanallardan veya kalıplardan aboneliğini iptal etmesine olanak tanır. Uygulaması şu şekildedir:


 func (ps *PubSub) Unsubscribe(_ context.Context, conn *net.Conn, channels []string, withPattern bool) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() action := "unsubscribe" if withPattern { action = "punsubscribe" } unsubscribed := make(map[int]string) // A map of all the channels/patterns successfully unsubscribed from. idx := 1 // idx holds the 1-based index of the channel/pattern unsubscribed from. if len(channels) <= 0 { if !withPattern { // If the channels slice is empty, and no pattern is provided // unsubscribe from all channels. for _, channel := range ps.channels { if channel.pattern != nil { // Skip pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } else { // If the channels slice is empty, and pattern is provided // unsubscribe from all patterns. for _, channel := range ps.channels { if channel.pattern == nil { // Skip non-pattern channels continue } if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } // Unsubscribe from channels where the name exactly matches channel name. // If unsubscribing from a pattern, also unsubscribe from all channel whose // names exactly matches the pattern name. for _, channel := range ps.channels { // For each channel in PubSub for _, c := range channels { // For each channel name provided if channel.name == c && channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } // If withPattern is true, unsubscribe from channels where pattern matches pattern provided, // also unsubscribe from channels where the name matches the given pattern. if withPattern { for _, pattern := range channels { g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If it's a pattern channel, directly compare the patterns if channel.pattern != nil && channel.name == pattern { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } continue } // If this is a regular channel, check if the channel name matches the pattern given if g.Match(channel.name) { if channel.Unsubscribe(conn) { unsubscribed[idx] = channel.name idx += 1 } } } } } // Construct a RESP response confirming the channels/patterns unsubscribed from. res := fmt.Sprintf("*%d\r\n", len(unsubscribed)) for key, value := range unsubscribed { res += fmt.Sprintf("*3\r\n+%s\r\n$%d\r\n%s\r\n:%d\r\n", action, len(value), value, key) } return []byte(res) }


İstemci kanal/kalıp aboneliğinden çıktığında, bir dizi diziyi içeren bir yanıt alacaktır. Her bir iç dizi, eylemi (abonelikten çıkma/aboneliği iptal etme), kanal/kalıp adını ve dizini içerir.


Bir sonraki yöntem Yayınla yöntemidir. Bu yöntem, mesajı ve kanal adını kabul eder ve ardından mesajın dahili olarak yayınlanmasını gerçekleştirir.


 func (ps *PubSub) Publish(_ context.Context, message string, channelName string) { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() // Loop through all of the existing channels. for _, channel := range ps.channels { // If it's a regular channel, check if the channel name matches the name given. if channel.pattern == nil { if channel.name == channelName { channel.Publish(message) // Publish the message to the channel. } continue } // If it's a glob pattern channel, check if the provided channel name matches the pattern. if channel.pattern.Match(channelName) { channel.Publish(message) // Publish the message to the channel } } }


Sonraki yöntem, PUBSUB CHANNELS [pattern] komutunu işleyen Channels yöntemidir:


 func (ps *PubSub) Channels(pattern string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int var res string // If pattern is an empty string, return all the active channels. if pattern == "" { for _, channel := range ps.channels { if channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } res = fmt.Sprintf("*%d\r\n%s", count, res) return []byte(res) } g := glob.MustCompile(pattern) for _, channel := range ps.channels { // If channel is a pattern channel, then directly compare the channel name to pattern. if channel.pattern != nil && channel.name == pattern && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 continue } // Channel is not a pattern channel. Check if the channel name matches the provided glob pattern. if g.Match(channel.name) && channel.IsActive() { res += fmt.Sprintf("$%d\r\n%s\r\n", len(channel.name), channel.name) count += 1 } } // Return a RESP array containing all the active channel names. return []byte(fmt.Sprintf("*%d\r\n%s", count, res)) }


Aşağıdaki 2 yöntem PUBSUB NUMPAT işlemek için NumPat ve NumSub yöntemleridir ve

PUBSUB NUMSUB [channel [channel …]] komutları sırasıyla.


 func (ps *PubSub) NumPat() int { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() var count int for _, channel := range ps.channels { if channel.pattern != nil && channel.IsActive() { count += 1 } } return count } func (ps *PubSub) NumSub(channels []string) []byte { ps.channelsRWMut.RLock() defer ps.channelsRWMut.RUnlock() res := fmt.Sprintf("*%d\r\n", len(channels)) for _, channel := range channels { // If it's a pattern channel, skip it chanIdx := slices.IndexFunc(ps.channels, func(c *Channel) bool { return c.name == channel }) if chanIdx == -1 { res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:0\r\n", len(channel), channel) continue } res += fmt.Sprintf("*2\r\n$%d\r\n%s\r\n:%d\r\n", len(channel), channel, ps.channels[chanIdx].NumSubs()) } return []byte(res) }


Gömülü API

PubSub modülü, bir istemcinin bir kanala abone olması için bir bağlantının iletilmesini bekler. Ancak EchoVault gömülü olduğunda istemciye abonelik için kullanılabilecek bir TCP bağlantısı yoktur. Bunu aşmak için bağlantının her iki ucunu da almak amacıyla net.Pipe kullanırız.


Bağlantının bir ucu, onu belirtilen kanala abone olmak için kullanan komut işleyicisine iletilir. Bağlantının diğer ucu, döndürülen ReadPubSubMessage işlevinde kullanılır.


 type conn struct { readConn *net.Conn writeConn *net.Conn } var connections map[string]conn // ReadPubSubMessage is returned by the SUBSCRIBE and PSUBSCRIBE functions. // // This function is lazy, therefore it needs to be invoked in order to read the next message. // When the message is read, the function returns a string slice with 3 elements. // Index 0 holds the event type which in this case will be "message". Index 1 holds the channel name. // Index 2 holds the actual message. type ReadPubSubMessage func() []string // SUBSCRIBE subscribes the caller to the list of provided channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) SUBSCRIBE(tag string, channels ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"SUBSCRIBE"}, channels...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // UNSUBSCRIBE unsubscribes the caller from the given channels. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `channels` - ...string - The list of channels to unsubscribe from. func (server *EchoVault) UNSUBSCRIBE(tag string, channels ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"UNSUBSCRIBE"}, channels...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) } // PSUBSCRIBE subscribes the caller to the list of provided glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to subscribe to. // // Returns: ReadPubSubMessage function which reads the next message sent to the subscription instance. // This function is blocking. func (server *EchoVault) PSUBSCRIBE(tag string, patterns ...string) ReadPubSubMessage { // Initialize connection tracker if calling subscribe for the first time if connections == nil { connections = make(map[string]conn) } // If connection with this name does not exist, create new connection it var readConn net.Conn var writeConn net.Conn if _, ok := connections[tag]; !ok { readConn, writeConn = net.Pipe() connections[tag] = conn{ readConn: &readConn, writeConn: &writeConn, } } // Subscribe connection to the provided channels cmd := append([]string{"PSUBSCRIBE"}, patterns...) go func() { _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }() return func() []string { r := resp.NewConn(readConn) v, _, _ := r.ReadValue() res := make([]string, len(v.Array())) for i := 0; i < len(res); i++ { res[i] = v.Array()[i].String() } return res } } // PUNSUBSCRIBE unsubscribes the caller from the given glob patterns. // // Parameters: // // `tag` - string - The tag used to identify this subscription instance. // // `patterns` - ...string - The list of glob patterns to unsubscribe from. func (server *EchoVault) PUNSUBSCRIBE(tag string, patterns ...string) { if connections == nil { return } if _, ok := connections[tag]; !ok { return } cmd := append([]string{"PUNSUBSCRIBE"}, patterns...) _, _ = server.handleCommand(server.context, internal.EncodeCommand(cmd), connections[tag].writeConn, false) }


ABONE OL ve YAYINLA API'sinin nasıl kullanılacağına dair bir örnek:


 // Subscribe to multiple EchoVault channels. readMessage := server.SUBSCRIBE("subscriber1", "channel_1", "channel_2", "channel_3") wg.Add(1) go func() { wg.Done() for { message := readMessage() fmt.Printf("EVENT: %s, CHANNEL: %s, MESSAGE: %s\n", message[0], message[1], message[2]) } }() wg.Wait() wg.Add(1) go func() { for i := 1; i <= 3; i++ { // Simulating delay. <-time.After(1 * time.Second) // Publish message to each EchoVault channel. _, _ = server.PUBLISH(fmt.Sprintf("channel_%d", i), "Hello!") } wg.Done() }() wg.Wait()

Çözüm

Bu makale, EchoVault'taki Pub/Sub uygulamasının kısa bir özetiydi. Elbette, burada paylaşılan kodun etrafında tek bir makaleye sığmayacak kadar çok bağlam var. Bağlamla ilgileniyorsanız EchoVault'un GitHub deposuna göz atın ve ne oluşturduğumuza bir göz atın. Gördüğünüzü beğendiyseniz Github yıldızı çok takdir edilecektir!