paint-brush
How to Implement Client-Side Interceptors for Logging GRPC Requestsby@dmitriiantonov90
987 reads
987 reads

How to Implement Client-Side Interceptors for Logging GRPC Requests

by Dmitrii AntonovDecember 7th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

Our implementation will be straightforward; we will store requests and responses in Queues, and then put this data to MDC. There are default implementations such as SimpleForwardingClientCall and SimpleForwardingClientCallListener in which all methods are implemented. If we look at their implementation, we can see that they invoke a next implementation ClientCall or ClientCall.Listener.

Company Mentioned

Mention Thumbnail
featured image - How to Implement Client-Side Interceptors for Logging GRPC Requests
Dmitrii Antonov HackerNoon profile picture

In the previous article, we figured out how to implement server-side interceptors. In this article, we try to figure out how to log GRPC requests on the client side. We must implement three interfaces ClientInterceptor, ClientCall, and ClientCall.Listener.


There are default implementations such as SimpleForwardingClientCall and SimpleForwardingClientCallListener in which all methods are implemented. If we look at their implementation, we can see that they invoke a next implementation ClientCall or ClientCall.Listener.

The Implementation

Our implementation will be straightforward; we will store requests and responses in Queues, and then put this data to MDC. First, let’s implement ClientCall.Listener.


import com.google.protobuf.GeneratedMessageV3
import com.google.protobuf.util.JsonFormat
import io.grpc.ClientCall
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener
import io.grpc.Metadata
import io.grpc.Status
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import java.util.Queue

class LoggingClientCallListener<ReqT, ResT>(
    delegate: ClientCall.Listener<ResT>,
    private val requestQueue: Queue<ReqT>,
    private val responseQueue: Queue<ResT>
) : SimpleForwardingClientCallListener<ResT>(delegate) {

    override fun onMessage(message: ResT) {
        responseQueue.offer(message)
        super.onMessage(message)
    }

    override fun onClose(status: Status, trailers: Metadata) {
        val request = requestQueue
            .filterIsInstance<GeneratedMessageV3>()
            .joinToString(separator = ", ", prefix = "[", postfix = "]", transform = PROTOBUF_TO_JSON_PRINTER::print)

        val response = responseQueue
            .filterIsInstance<GeneratedMessageV3>()
            .joinToString(separator = ", ", prefix = "[", postfix = "]", transform = PROTOBUF_TO_JSON_PRINTER::print)

        if (status.isOk) {
            MDC.put("request", request)
            MDC.put("response", response)
            logger.info("The request was processed successfully")
            MDC.clear()
        } else if (UNSUCCESSFUL_STATUS_CODES.contains(status.code)) {
            MDC.put("request", request)
            logger.error("An error occurred while processing the request", status.asRuntimeException())
            MDC.clear()
        }

        super.onClose(status, trailers)
    }

    companion object {
        private val logger = LoggerFactory.getLogger(LoggingServerCallListener::class.java)
        private val PROTOBUF_TO_JSON_PRINTER = JsonFormat.printer()
        private val UNSUCCESSFUL_STATUS_CODES = listOf(
            Status.INVALID_ARGUMENT.code,
            Status.INTERNAL.code,
            Status.NOT_FOUND.code
        )
    }
}



We have overridden the two methods. First, we have overridden the onMessage method. The method will be invoked before the client will receive data. The method can be invoked from zero to n times if the method result is defined as Stream.


Second, we have overridden the onClose method.


The method will be called before the client closes the connection, and no data will be retrieved after that. After that, we must implement the interface ClientCall.


import io.grpc.ClientCall
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall
import io.grpc.Metadata
import java.util.*

class LoggingClientCall<ReqT, RespT>(
    delegate: ClientCall<ReqT, RespT>,
    private val requestQueue: Queue<ReqT>,
    private val responseQueue: Queue<RespT>
) : SimpleForwardingClientCall<ReqT, RespT>(delegate) {

    override fun sendMessage(message: ReqT) {
        requestQueue.offer(message)
        super.sendMessage(message)
    }

    override fun start(responseListener: Listener<RespT>, headers: Metadata) {
        super.start(LoggingClientCallListener(responseListener, requestQueue, responseQueue), headers)
    }
}



We have overridden two methods also. First, we have overridden the sendMessage method. This method will be invoked before a message will be sent to a service. We push our message to the queue. In the next method, we create our implementation of our LoggingClientCallListener.


This method will be invoked before all methods. Next, we must implement our client interceptor and register this interceptor in the stub.


import io.grpc.*
import java.util.concurrent.LinkedBlockingDeque

class LoggingClientInterceptor : ClientInterceptor {
    override fun <ReqT : Any, RespT : Any> interceptCall(
        method: MethodDescriptor<ReqT, RespT>,
        callOptions: CallOptions,
        next: Channel
    ): ClientCall<ReqT, RespT> {
        return LoggingClientCall(next.newCall(method, callOptions), LinkedBlockingDeque(), LinkedBlockingDeque())
    }
}


fun main(args: Array<String>) {
	val client = OurServiceGrpc
		.newBlockingStub(ManagedChannelBuilder.forAddress("localhost", 8080).build())
		.withInterceptors(LoggingClientInterceptor())
}


I hope I managed to show you how to use server and client interceptors in GRPC.