Resembling a graceful rendition of Tchaikovsky’s infamous ballet, the namesake Swan Lake release of the (referred to as Ballerinalang in the article) programming language comes packed with a revamped gRPC library to provide a more elegant way of handling client and bi-directional streaming cases. This article aims to discuss this improved gRPC streaming functionality by referring to an example for better understanding. However, if you are totally new to gRPC in Ballerinalang and seeking in-detail knowledge on the basics and implementation of a Unary application, read this blog on . Ballerina Ballerina + gRPC Microservices Bill Calculator with Ballerinalang Let’s look at a basic microservices-based bill calculator as an example. The client would stream the price and quantity (the input values) of various items to be included in the total bill, and the server would essentially multiply and add them together in order to return the total bill as a reply. In this example, the messages streamed by the client are named as containing the name of the item, price, and quantity. The server responds with a message named containing the type of the bill (final or interim bill), the total quantity of items ordered, and the total price. InputValues Bill This application would have two modes of operation or remote procedure calls, which can be described as follows: — This is an example of the gRPC scenario. In this case, the server waits until the client has streamed all the to be included in the bill. Once the client indicates that the stream is completed, one final is sent by the server. quickBilling client streaming InputValues Bill — This is an example of the gRPC scenario. For each streamed by the client, the server replies back with the interim (the total calculated so far). Similar to , the final is sent once the client indicates the stream’s completion. oneByOneBilling bi-directional streaming InputValue Bill quickBilling Bill Bill Calculator Architecture When it comes to client and bi-directional streaming cases, firstly the generated gRPC stub must be used by the client to open a connection with the gRPC server and register a message listener as shown in the diagram below. Using the Streaming Client object, the client can start streaming messages to the server. The messages sent by the server are captured by the message listener specified in the initialization stage. For each of the two remote procedure calls in this example, the flow of the message is illustrated in the following diagram. Getting Started This tutorial is based on Ballerina Swan Lake Preview 1 release and upwards, which can be downloaded . The server implementation would not work on earlier Ballerinalang releases. here We start off this tutorial by creating a Ballerinalang project and adding the necessary modules using the Ballerinalang CLI commands. In this example, two modules were created for the server and the client. // Create a new Ballerina project //Add server module to the project // Add client module to the project $ ballerina new grpc-streaming-example $ grpc-streaming-example cd $ ballerina add server $ ballerina add client Next, the protocol buffer definition of the above-described example’s remote procedure calls is defined under a single service (which is supported in the Ballerinalang gRPC server since the Swan Lake Preview 1 release). syntax= ; { ; ; } { itemName = ; quantity = ; price = ; } { billType = ; totalQuantity = ; totalPrice = ; } // streaming-stub.proto "proto3" package ; BillingServer service service quickBilling(stream InputValue) (Bill) rpc returns oneByOneBilling(stream InputValue) (stream Bill) rpc returns message InputValue string 1 int64 2 float 3 message Bill string 1 int64 2 float 3 Using the above protobuf file and the Ballerinalang CLI gRPC tool, we can generate the required stub file and the server boilerplate code. Use the following command to generate the stub file for the clients. // Copy the generated file to the client module directory $ ballerina grpc --input streaming-stub.proto --output . $ cp service/stub_pb.bal <PROJECT_DIRECTORY>/src/client The Ballerinalang gRPC tool generates the following stub file. import ballerina/grpc; type BillingServerClient client object { *grpc:AbstractClientEndpoint; grpc:Client grpcClient; { .grpcClient = (url, config); checkpanic .grpcClient.initStub( , , ROOT_DESCRIPTOR, getDescriptorMap()); } remote { .grpcClient->streamingExecute( , msgListener, headers); } remote { .grpcClient->streamingExecute( , msgListener, headers); } }; type InputValue record {| string itemName = ; quantity = ; price = ; |}; type Bill record {| string billType = ; totalQuantity = ; totalPrice = ; |}; string ROOT_DESCRIPTOR = ; { { : }; } public private public function init (string url, grpc:ClientConfiguration? config = ) () // initialize client endpoint. self new self self "non-blocking" public function quickBilling (service msgListener, grpc:Headers? headers = ) () returns (grpc:StreamingClient|grpc:Error) return self "service.BillingServer/quickBilling" public function oneByOneBilling (service msgListener, grpc:Headers? headers = ) () returns (grpc:StreamingClient|grpc:Error) return self "service.BillingServer/oneByOneBilling" public "" int 0 float 0.0 public "" int 0 float 0.0 const "0A1473747265616D696E672D737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33" < > function getDescriptorMap () returns map string return "streaming-stub.proto" "0A1473747265616D696E672D737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33" The server boilerplate code can also be generated using the following command. This generated file can be copied into the server module directory. // Copy the generated file to the server module directory $ ballerina grpc --input streaming-stub.proto --mode service --output . $ cp service/BillingServer_sample_service.bal <PROJECT_DIRECTORY>/src/server import ballerina/grpc; listener grpc:Listener ep = ( ); service BillingServer on ep { resource quick { } resource one { } } public InputValue record { new 9090 function Billing( :Caller , <InputValue, > ) grpc caller stream error clientStream // Implementation goes here. // You should return a Bill function ByOneBilling( :Caller , <InputValue, > ) grpc caller stream error clientStream // Implementation goes here. // You should return a Bill type | item = ""; quantity = 0; price = 0.0; |}; public record {| bill = ""; total = 0; total = 0.0; |}; const = "0 "; get returns map< > { return { "stub.proto":"0 " }; } string Name int float type Bill string Type int Quantity float Price string ROOT_DESCRIPTOR A0A737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33 function DescriptorMap() string A0A737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33 Lastly, we can generate the client boilerplate code by using the following command. This file must be copied to the client module directory. // Copy the generated file to the client module directory located at $ ballerina grpc --input streaming-stub.proto --mode client --output . $ cp service/BillingServer_sample_client.bal <PROJECT_DIRECTORY>/src/client Server Implementation Now that the preparations are done, we can start to implement the server. The server boilerplate code can be edited over with the resource functions included in this section. The resource function for (client streaming) can be defined as follows. quickBilling { totalNumber = ; totalBill = ; log:printInfo( ); error? e = clientStream.forEach(function(InputValue ) { log:printInfo( + .itemName + + .quantity.toString() + + .price.toString()); totalNumber += .quantity; totalBill += ( .quantity * .price); }); (e grpc:EOS) { Bill finalBill = { billType: , totalQuantity: totalNumber, totalPrice: totalBill }; grpc:Error? result = caller->send(finalBill); (result grpc:Error) { log:printError( + result.message() + + < >result.detail()[ ]); } { log:printInfo ( + finalBill.totalPrice.toString() + + finalBill.totalQuantity.toString() + ); } result = caller->complete(); (result grpc:Error) { log:printError( + result.message() + + < >result.detail()[ ]); } } (e grpc:Error) { log:printError( + e.message() + + < >e.detail()[ ]); } } // Make the edit in the server boilerplate code BillingServer_sample_service.bal resource function ( ) quickBilling grpc:Caller caller, stream<InputValue,error> clientStream int 0 float 0 "Starting Quick Billing Service" //Iterating through streamed messages here value "Item:" value " Quantity:" value " Price:" value value value value //Once the client completes stream, a grpc:EOS error is returned to indicate it if is "Final Bill" //Sending the total bill to the client if is "Error occured when sending the bill: " " - " string "message" else "Sending Final Bill Total: " " for " " items" if is "Error occured when closing the connection: " " - " string "message" //If the client sends an error instead it can be handled here else if is "An unexpected error occured: " " - " string "message" The client’s streamed messages are made available as a stream object argument, which can be iterated through using a loop processing each message sent by the client. Once the client stream has completed, a error is returned, which can be used to identify when to send the final response message (the final bill) to the client using the caller object. grpc:EOS The resource function (bi-directional streaming) can be defined as follows. oneByOneBilling resource function oneByOneBilling(grpc:Caller caller, stream<InputValue, > clientStream) { int totalNumber = 0; float totalBill = 0; :printInfo( ); ? = clientStream. (function(InputValue value) { :printInfo( + value.itemName + + value.quantity. () + + value.price. ()); totalNumber += value.quantity; totalBill += (value.quantity * value.price); Bill tempBill = { billType: , totalQuantity: totalNumber, totalPrice: totalBill }; grpc: ? result = caller->send(tempBill); (result is grpc: ) { :printError( + result.message() + + <string>result.detail()[ ]); } { :printInfo ( + tempBill.totalPrice. () + + tempBill.totalQuantity. () + ); } }); ( is grpc:EOS) { Bill finalBill = { billType: , totalQuantity: totalNumber, totalPrice: totalBill }; grpc: ? result = caller->send(finalBill); (result is grpc: ) { :printError( + result.message() + + <string>result.detail()[ ]); } { :printInfo ( + finalBill.totalPrice. () + + finalBill.totalQuantity. () + ); } result = caller->complete(); (result is grpc: ) { :printError( + result.message() + + <string>result.detail()[ ]); } } ( is grpc: ) { :printError( + .message() + + <string> .detail()[ ]); } } // Make the edit to the server code BillingServer_sample_service.bal error log "Starting One by One Billing Service" //Iterating through streamed messages here error e forEach log "Item:" " Quantity:" toString " Price:" toString "Interim Bill" //Sending the interim bill to the client Error if Error log "Error occured when sending the bill: " " - " "message" else log "Sending Interim Bill Total: " toString " for " toString " items" //Once the client completes stream, a grpc:EOS error is returned to indicate it if e "Final Bill" //Sending the total bill to the client Error if Error log "Error occured when sending the bill: " " - " "message" else log "Sending Final Bill Total: " toString " for " toString " items" if Error log "Error occured when closing the connection: " " - " "message" //If the client sends an error instead it can be handled here else if e Error log "An unexpected error occured: " e " - " e "message" The code above is similar to the resource function code. The only difference here is that, as this is a bi-directional streaming resource, for each message that is processed, the interim bill value is sent back to the client using the caller object. quickBilling Client Implementation In order to test the above gRPC streaming server, I have included an example client implementation in this article. The following can be added to the client code generated using the Ballerinalang gRPC tool. ballerina/grpc; ballerina/io; string opMode = ; function main (string mode) { ep = new( ); grpc: | grpc: streamClient; (mode == ) { opMode = ; streamClient = ep->quickBilling( ); } (mode == ) { opMode = ; streamClient = ep->oneByOneBilling( ); } { io: ( ); ; } (streamClient grpc: ) { io: ( + streamClient.message() + + <string>streamClient.detail()[ ]); ; } { io: ( + opMode + ); item = { itemName: , quantity: , price: }; grpc: ? connErr = streamClient->send(item); (connErr grpc: ) { io: ( + connErr.message() + + <string>connErr.detail()[ ]); } { io: ( + item.itemName + + item.quantity. () + + item.price. ()); } item = { itemName: , quantity: , price: }; connErr = streamClient->send(item); (connErr grpc: ) { io: ( + connErr.message() + + <string>connErr.detail()[ ]); } { io: ( + item.itemName + + item.quantity. () + + item.price. ()); } item = { itemName: , quantity: , price: }; connErr = streamClient->send(item); (connErr grpc: ) { io: ( + connErr.message() + + <string>connErr.detail()[ ]); } { io: ( + item.itemName + + item.quantity. () + + item.price. ()); } connErr = streamClient->complete(); (connErr grpc: ) { io: ( + connErr.message() + + <string>connErr.detail()[ ]); } } } service = service { resource function onMessage( message) { io: ( + message.billType + + message.totalPrice. () + + message.totalQuantity. () + ); } resource function onError(error err) { io: ( + err.message() + + <string>err.detail()[ ]); } resource function onComplete() { io: (opMode + ); } }; import import "" public BillingServerClient "http://localhost:9090" StreamingClient Error if "quick" "Quick" // Initialize call with quickBilling resource BillingServerMessageListener else if "oneByOne" "One by one" // Initialize call with oneByOneBilling resource BillingServerMessageListener else println "Unsupported operation mode entered!" return if is Error println "Error from Connector: " " - " "message" return else //Start sending messages to the server println "Starting " " billing service" // Sending first message InputValue "Apples" 4 30.50 Error if is Error println "Error from Connector: " " - " "message" else println "Sent item: " " Quantity: " toString " Price: " toString // Sending second message "Oranges" 6 43.4 if is Error println "Error from Connector: " " - " "message" else println "Sent item: " " Quantity: " toString " Price: " toString // Sending third message "Grapes" 20 11.5 if is Error println "Error from Connector: " " - " "message" else println "Sent item: " " Quantity: " toString " Price: " toString // Sending complete signal if is Error println "Error from Connector: " " - " "message" // Message listener for incoming messages BillingServerMessageListener Bill println "Received " " Total: " toString " for " toString " items" println "Error reported from server: " " - " "message" println " billing completed" When running the above client an argument is required, which indicates whether to use or . Based on this, the correct call is initialized, returning a streaming client object. Using this object, three messages are streamed to the server, and finally the complete signal is sent to the server. The , which was registered during the initialization phase, captures the messages and the complete signal sent by the server. quickBilling onByOneBilling BillingServerMessageListener Trying out the example Finally it’s time to lay back, relax, try out the code we just implemented and see the magic in action. We’ll start off by running the server using the following command. $ ballerina run server Once the server is up and running, we can test out the client streaming and bi-directional streaming remote procedure calls separately. When running the client, include the “quick” flag for the former and the “oneByOne” flag for the latter. The command for running them and the expected results are illustrated below. // Calling the quickBilling resource Starting Quick billing service Sent item: Apples Quantity: 4 Price: 30.5 Sent item: Oranges Quantity: 6 Price: 43.4 Sent item: Grapes Quantity: 20 Price: 11.5 Received Final Bill Total: 612.4 for 30 items Quick billing completed // Calling the oneByOneBilling resource Starting One by one billing service Sent item: Apples Quantity: 4 Price: 30.5 Sent item: Oranges Quantity: 6 Price: 43.4 Sent item: Grapes Quantity: 20 Price: 11.5 Received Interim Bill Total: 122.0 for 4 items Received Interim Bill Total: 382.4 for 10 items Received Interim Bill Total: 612.4 for 30 items Received Final Bill Total: 612.4 for 30 items One by one billing completed $ ballerina run client quick $ ballerina run client oneByOne Congratulations if you were able to make it up to this point. Getting a gRPC streaming service up and running in Ballerinalang is exceedingly simple and you have mastered everything you need to get there. If you want to take a look at the entire implementation of the example above, you can find it in this . GitHub repository Conclusion With gRPC being increasingly adopted in microservices based applications, Ballerinalang provides a graceful language embedded approach to implement it. This is taken a step further in the Ballerinalang Swan Lake Preview 1 release with a server-side re-haul of gRPC streaming, providing a cleaner way to handle client streams and multiple resource support within a single service. There is no better way than trying the above example out yourself to see how easy and quick it is to implement and get a gRPC streaming service up and running using Ballerinalang. Here are some resources to get you started with Ballerinalang: Ballerina by Examples API Documentation