Some of us are faced with a situation where we need to create a web socket server that will distribute a message to all clients subscribed to an event.
Most applications are built on a microservice architecture, which causes some peculiarities when creating a web socket server.
Let's imagine a small system:
Its work scenario is very simple. The user on the front end changes some data via the REST API, and the DataService notifies its subscribers that something has changed. Sounds simple, right? But what if we remember that all our services work through Kubernetes, which means that each of them can have several pods and DataService is no exception? This means that our circuit will look like this:
Question: what happens if the frontend calls DataService (pod 2)? Will all subscribers receive notifications about data changes? If you do nothing, then only subscribers to pod 2 will receive messages, because under 1 is unaware of data changes. But we need subscribers of all pods to receive a notification.
In this matter, socket.io adapters come to our aid. The simplest and one of the most popular of them is the Redis Adapter (https://socket.io/docs/v4/redis-adapter/). Implemented based on the Redis pub/sub mechanism (https://redis.io/docs/interact/pubsub/). In short, a pod that has learned about data changes publishes a message about this in Redis, this message is read by other pods and sends a notification to its subscribers.
Let's try to test the Redis Adapter in a simple NestJs application.
First, let's write a simple gateway.
@WebSocketGateway()
export class WebSocketApiGateway {
@WebSocketServer()
server: Server;
@SubscribeMessage('join')
async join(@ConnectedSocket() client: Socket): Promise<void> {
console.log(`web-socket api join`);
client.join(Rooms.dataRoom);
client.emit('join', 'successful join');
}
async updateData(data: DataDto): Promise<void> {
console.log(`web-socket api emit data '${JSON.stringify(data)}'`);
this.server.to(Rooms.dataRoom).emit(Rooms.dataRoom, data);
}
}
It will send a message to all subscribers when the function is called.
Next, I created a controller and a service for updating data.
@Controller('data')
export class DataController {
constructor(
private readonly dataService: DataService
) {
}
@Post()
async createData(@Body() data: DataDto): Promise<void> {
await this.dataService.createData(data);
}
}
@Injectable()
export class DataService{
constructor(
private readonly socketGateway: WebSocketApiGateway
) {
}
async createData(data: DataDto): Promise<void> {
await this.socketGateway.updateData(data);
}
}
Here's some basic logic to help you focus on the main thing.
Then I wrote client
export let socketClient: Socket;
@Injectable()
export class WebSocketClient implements OnModuleInit, OnModuleDestroy {
private readonly socket: Socket;
private readonly onConnected?: () => void;
constructor(
options: Options,
private readonly webSocketService: WebSocketService
) {
this.socket = io(options.serverUrl,{
reconnection: true,
reconnectionDelay: 2000,
autoConnect: true
});
socketClient = this.socket;
this.onConnected = options.onConnected;
this.socket.on('connect_error', (err) => {
console.log('connect_error: ', undefined, err)
});
}
onModuleInit(): void {
this.connect();
}
onModuleDestroy(): void {
this.socket.disconnect();
}
connect(): void{
this.socket.on('join', (body: string): void => {
if (this.onConnected) {
this.onConnected();
}
console.log(`join message '${body}'`);
});
this.socket.on(Rooms.dataRoom, (data: DataDto): void => {
console.log(`web-socket client on data '${JSON.stringify(data)}'`);
this.webSocketService.updateData(data);
});
this.socket.on('connect', async () => {
this.socket.emit('join');
console.log('connected')
});
}
}
Pay attention to the onConnected
and socketClient
variable. We will need them for tests. I will explain their purpose a little later. The main thing is that this client subscribes to Rooms.dataRoom
room events and calls the updateData
method on the export service, which simply updates this data in inMemory.
@Injectable()
export class WebSocketService{
private data: DataDto;
updateData(data: DataDto): void{
this.data = data
};
getData(): DataDto{
return this.data;
}
}
Now the fun part - the redis adapter. I created theRedisIoAdapter
class - a descendant of IoAdapter
, everything is as in the documentation.
export class RedisIoAdapter extends IoAdapter {
private readonly adapterConstructor: ReturnType<typeof createAdapter>;
constructor(app: INestApplication, connectionKeeper: RedisConnectionsKeeper) {
super(app);
const subClient = connectionKeeper.createConnect();
const pubClient = connectionKeeper.createConnect();
this.adapterConstructor = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions): unknown {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
return server;
}
}
And I connected it when creating our application
async function bootstrap() {
const app = await NestFactory.create(AppModule, {
bufferLogs: true,
});
const connectionsKeeper = app.get(RedisConnectionsKeeper);
app.useWebSocketAdapter(new RedisIoAdapter(app, connectionsKeeper));
await app.listen(config.get('app.port'));
};
void bootstrap();
Look like that's it. But no! The difficulties began when writing tests for this code.
In tests, I lift a container with redis to check the functionality of the adapter. Everything is typical here.
export async function redisSetup(): Promise<StartedTestContainer> {
const redisContainer = await new GenericContainer(REDIS_IMAGE)
.withEnvironment({
REDIS_PASSWORD,
REDIS_DISABLE_COMMANDS: 'FLUSHDB,FLUSHALL',
})
.withExposedPorts(REDIS_PORT)
.withWaitStrategy(Wait.forListeningPorts())
.start();
// (await redisContainer.logs())
// .on('data', (line) => console.log(line))
// .on('err', (line) => console.error(line))
// .on('end', () => console.log('Stream closed'));
const redisPort = redisContainer.getMappedPort(REDIS_PORT);
config.set('redis.host', 'localhost');
config.set('redis.port', redisPort);
config.set('redis.password', REDIS_PASSWORD);
config.set('redis.db', REDIS_DB);
return redisContainer;
}
I call this function inbeforeAll
, and in afterAll
I do redisContainer.stop({ removeVolumes: true })
. This is where the first problem appears. The redis adapter uses connections to it that need to be closed. That's why I created the RedisConnectionsKeeper
class, which stores these connections and closes them when the application is destroyed.
@Injectable()
export class RedisConnectionsKeeper implements OnModuleDestroy {
private clients: Redis[] = [];
async onModuleDestroy(): Promise<void> {
for (const client of this.clients) {
await client.quit();
}
}
createConnect(): Redis {
const client = new Redis({
host: config.get('redis.host'),
port: config.get('redis.port'),
db: config.get('redis.db'),
password: config.get('redis.password'),
});
client.on('error', (err) => {
console.log(`redis client error ${`${err.message}`}`)
});
this.clients.push(client);
return client;
}
}
Then the second problem emerged. In the test, the value check could occur BEFORE the socket logic is executed. It’s time to take the variable with the connection to the socket server outside and check that the message actually arrived. In the end my tests look like this:
describe('websocket api gateway', () => {
let redisContainer: StartedTestContainer;
let appServer1: INestApplication;
let appServer2: INestApplication;
let appClient: INestApplication;
beforeAll(async () => {
redisContainer = await redisSetup();
const server1Port = 3000;
appServer1 = await initServer(server1Port);
const server2Port = 4000;
appServer2 = await initServer(server2Port);
appClient = await initClient(`http://localhost:${server1Port}`);
}, 15 * 1000);
afterAll(async () => {
await Promise.all([
appServer1.close(),
appServer2.close(),
appClient.close(),
redisContainer.stop({ removeVolumes: true }),
]);
}, 15 * 1000);
describe('join', () => {
it('should update data, if new data created', async () => {
const first = new Promise((resolve) => {
socketClient.once(Rooms.dataRoom, resolve);
});
const data: DataDto = {
data: 'hello'
};
const dataService: DataService = appServer1.get(DataService);
await dataService.createData(data);
await first;
const webSocketService = appClient.get(WebSocketService);
expect(webSocketService.getData().data).toBe(data.data);
});
it('should update data, if new data created on other server', async () => {
const first = new Promise((resolve) => {
socketClient.once(Rooms.dataRoom, resolve);
});
const data: DataDto = {
data: 'hello'
};
const dataService: DataService = appServer2.get(DataService);
await dataService.createData(data);
await first;
const webSocketService = appClient.get(WebSocketService);
expect(webSocketService.getData().data).toBe(data.data);
});
});
});
That's all. Both of our tests worked great. In the first of them, I called the method for changing information on the server to which the client was subscribed, and in the second, the changes were called on another server. This shows that the adapter successfully copes with the task.
Link to the project in GitHub -https://github.com/waksund/redis-adapter-example