M. Cetin

@manorie

Communicating Go Applications through Redis Pub/Sub Messaging Paradigm

Redis Pub/Sub messaging paradigm allows applications talk to each other through subscription to channels. Subscribers declare their interest in specific channels and Redis pushes messages to all subscribed clients without knowledge of what (if any) subscribers there may be. You can read more about Publish/Subscribe pattern on wikipedia.

In our case, replicas of our application runs on different machines as a docker swarm service and we need them to talk to each other. We are handling many users who are connected to our system through websocket connections. Those connections are handled by replicas of our application running on different machines or instances.

We will use Gorilla Websocket, Redigo (Redis client) and UUID for unique ID creation. The simple application we are going to demonstrate will forward JSON messages between clients. Note that, this is a simpler version of what we run on production. For the sake of simplicity and readability, I will skip possible improvements.

We start by declaring a user struct and a store to keep track of connected users.

type User struct {
ID string
conn *websocket.Conn
}
type Store struct {
Users []*User
sync.Mutex
}

In case you have noticed, as our application will run a http server, we are protecting our store with Mutex. Next we define the message we are going to exchange between users.

type Message struct {
DeliveryID string `json:"id"`
Content string `json:"content"`
}

It might be better to use a Pool connection for Redis communication but we will skip it for simplicity and initialize app as follows.

var (
gStore *Store
gPubSubConn *redis.PubSubConn
gRedisConn = func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
}
)
func init() {
gStore = &Store{
Users: make([]*User, 0, 1),
}
}

At this point, we can declare a function to create a new user and subscribe to Redis Pub/Sub channel by its unique id.

func (s *Store) newUser(conn *websocket.Conn) *User {
u := &User{
ID: uuid.NewV4().String(),
conn: conn,
}
    if err := gPubSubConn.Subscribe(u.ID); err != nil {
panic(err)
}
s.Lock()
defer s.Unlock()
    s.Users = append(s.Users, u)
return u
}

Normally we do not want to panic on pub/sub connection error. You can redial Redis or borrow another connection from the pool on error. So, I do leave it as an improvement point here. (An example unique ID that we assign here is ce6df22e-b497–4a71–81c5–2da31a5566e8.)

Now we have reached the point where we will listen to published messages and deliver them to the users.

func deliverMessages() {
for {
switch v := gPubSubConn.Receive().(type) {
case redis.Message:
gStore.findAndDeliver(v.Channel, string(v.Data))

case redis.Subscription:
log.Printf("subscription message: %s: %s %d\n", v.Channel, v.Kind, v.Count)

case error:
log.Println("error pub/sub, delivery has stopped")
return
}
}
}
func (s *Store) findAndDeliver(userID string, content string) {
m := Message{
Content: content,
}
    for _, u := range s.Users {
if u.ID == userID {
if err := u.conn.WriteJSON(m); err != nil {
log.Printf("error on message delivery e: %s\n", err)
} else {
log.Printf("user %s found, message sent\n", userID)
}
return
}
}
    log.Printf("user %s not found at our store\n", userID)
}

We will be calling deliverMessages function from another goroutine not to block our application. Now its time to declare our main function.

var serverAddress = ":8080"
func main() {
gRedisConn, err := gRedisConn()
if err != nil {
panic(err)
}
defer gRedisConn.Close()
    gPubSubConn = &redis.PubSubConn{Conn: gRedisConn}
defer gPubSubConn.Close()
    go deliverMessages()
    http.HandleFunc("/ws", wsHandler)
log.Printf("server started at %s\n", serverAddress)

log.Fatal(http.ListenAndServe(serverAddress, nil))
}

And our handler.

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("upgrader error %s\n" + err.Error())
return
}
u := gStore.newUser(conn)
log.Printf("user %s joined\n", u.ID)

for {
var m Message
        if err := u.conn.ReadJSON(&m); err != nil {
log.Printf("error on ws. message %s\n", err)
}
        if c, err := gRedisConn(); err != nil {
log.Printf("error on redis conn. %s\n", err)
} else {
c.Do("PUBLISH", m.DeliveryID, string(m.Content))
}
}
}

WebSockets can perform cross domain communication and they are not limited by the SOP (Same Origin Policy). The default upgrader checks the Origin field of the incoming request with the Host header value to confirm they are equal before allowing the request. If you want to use the default CheckOrigin function you need to ensure that your client includes the Origin header with its request. Again for easier demonstration, we are overriding the function but you should not do it for safety purposes.

This is whole you need basically. You can check the code sniplet here.

If you have reached this point, let’s try it. I started my server and opened Chrome Developer Tools. Executing commands in the console as follows.

> var con0 = new WebSocket('ws://localhost:8080/ws')
> undefined
> var con1 = new WebSocket('ws://localhost:8080/ws')
> undefined

Our server logs those connections as,

2017/01/21 17:26:46 server started at :8080
2017/01/21 17:28:24 user 7c27943d-dd98-4bfe-829f-7bd9834f9f63 joined
2017/01/21 17:28:24 subscription message: 7c27943d-dd98-4bfe-829f-7bd9834f9f63: subscribe 1
2017/01/21 17:28:28 user ffbe9040-b424-4bdc-89a9-f8b045c878c6 joined
2017/01/21 17:28:28 subscription message: ffbe9040-b424-4bdc-89a9-f8b045c878c6: subscribe 2

As you can see, our first and second websocket connections created two users:

User 1 ID: 7c27943d-dd98–4bfe-829f-7bd9834f9f63

User 2 ID: ffbe9040-b424–4bdc-89a9-f8b045c878c6

Lets send message to User 1 (represented by first websocket connection) from User 2.

> con0.onmessage = function(e) { console.log("connection 0 received message", e.data) }

> var mes = new Object()
> mes.id = "7c27943d-dd98–4bfe-829f-7bd9834f9f63"
> mes.content = "hello"
> con1.send(JSON.stringify(mes))
> VM154:1 connection 0 received message {"id":"","content":"hello"}

and our server logs,

2017/01/21 17:32:58 user 7c27943d-dd98-4bfe-829f-7bd9834f9f63 found at our store, message sent

Conclusion

The good side of this implementation is, if you are running your application in a distributed/clustered manner, you can scale it with ease. If you are using Docker Swarm (like us), Kubernetes or a similar clustering service scaling is easy and this pattern works well in a situation like ours.

Note 
You should handle websocket connection closings (browser closed, etc.) and Redis connections. This is just a demo script.

Topics of interest

More Related Stories