Resembling a graceful rendition of Tchaikovsky’s infamous ballet, the namesake Swan Lake release of the Ballerina (referred to as Ballerinalang in the article) programming language comes packed with a revamped gRPC library to provide a more elegant way of handling client and bi-directional streaming cases. This article aims to discuss this improved gRPC streaming functionality by referring to an example for better understanding. However, if you are totally new to gRPC in Ballerinalang and seeking in-detail knowledge on the basics and implementation of a Unary application, read this blog on Ballerina + gRPC.
Let’s look at a basic microservices-based bill calculator as an example. The client would stream the price and quantity (the input values) of various items to be included in the total bill, and the server would essentially multiply and add them together in order to return the total bill as a reply.
In this example, the messages streamed by the client are named as
InputValues
containing the name of the item, price, and quantity. The server responds with a message named Bill
containing the type of the bill (final or interim bill), the total quantity of items ordered, and the total price.This application would have two modes of operation or remote procedure calls, which can be described as follows:
quickBilling
— This is an example of the gRPC client streaming scenario. In this case, the server waits until the client has streamed all the InputValues
to be included in the bill. Once the client indicates that the stream is completed, one final Bill
is sent by the server.oneByOneBilling
— This is an example of the gRPC bi-directional streaming scenario. For each InputValue
streamed by the client, the server replies back with the interim Bill
(the total calculated so far). Similar to quickBilling
, the final Bill
is sent once the client indicates the stream’s completion.When it comes to client and bi-directional streaming cases, firstly the generated gRPC stub must be used by the client to open a connection with the gRPC server and register a message listener as shown in the diagram below.
Using the Streaming Client object, the client can start streaming messages to the server. The messages sent by the server are captured by the message listener specified in the initialization stage. For each of the two remote procedure calls in this example, the flow of the message is illustrated in the following diagram.
This tutorial is based on Ballerina Swan Lake Preview 1 release and upwards, which can be downloaded here. The server implementation would not work on earlier Ballerinalang releases.
We start off this tutorial by creating a Ballerinalang project and adding the necessary modules using the Ballerinalang CLI commands. In this example, two modules were created for the server and the client.
// Create a new Ballerina project
$ ballerina new grpc-streaming-example
$ cd grpc-streaming-example
//Add server module to the project
$ ballerina add server
// Add client module to the project
$ ballerina add client
Next, the protocol buffer definition of the above-described example’s remote procedure calls is defined under a single service (which is supported in the Ballerinalang gRPC server since the Swan Lake Preview 1 release).
// streaming-stub.proto
syntax="proto3";
package service;
service BillingServer {
rpc quickBilling(stream InputValue) returns (Bill);
rpc oneByOneBilling(stream InputValue) returns (stream Bill);
}
message InputValue {
string itemName = 1;
int64 quantity = 2;
float price = 3;
}
message Bill {
string billType = 1;
int64 totalQuantity = 2;
float totalPrice = 3;
}
Using the above protobuf file and the Ballerinalang CLI gRPC tool, we can generate the required stub file and the server boilerplate code. Use the following command to generate the stub file for the clients.
$ ballerina grpc --input streaming-stub.proto --output .
// Copy the generated file to the client module directory
$ cp service/stub_pb.bal <PROJECT_DIRECTORY>/src/client
The Ballerinalang gRPC tool generates the following stub file.
import ballerina/grpc;
public type BillingServerClient client object {
*grpc:AbstractClientEndpoint;
private grpc:Client grpcClient;
public function init(string url, grpc:ClientConfiguration? config = ()) {
// initialize client endpoint.
self.grpcClient = new(url, config);
checkpanic self.grpcClient.initStub(self, "non-blocking", ROOT_DESCRIPTOR, getDescriptorMap());
}
public remote function quickBilling(service msgListener, grpc:Headers? headers = ()) returns (grpc:StreamingClient|grpc:Error) {
return self.grpcClient->streamingExecute("service.BillingServer/quickBilling", msgListener, headers);
}
public remote function oneByOneBilling(service msgListener, grpc:Headers? headers = ()) returns (grpc:StreamingClient|grpc:Error) {
return self.grpcClient->streamingExecute("service.BillingServer/oneByOneBilling", msgListener, headers);
}
};
public type InputValue record {|
string itemName = "";
int quantity = 0;
float price = 0.0;
|};
public type Bill record {|
string billType = "";
int totalQuantity = 0;
float totalPrice = 0.0;
|};
const string ROOT_DESCRIPTOR = "0A1473747265616D696E672D737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33";
function getDescriptorMap() returns map<string> {
return {
"streaming-stub.proto":"0A1473747265616D696E672D737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33"
};
}
The server boilerplate code can also be generated using the following command. This generated file can be copied into the server module directory.
$ ballerina grpc --input streaming-stub.proto --mode service --output .
// Copy the generated file to the server module directory
$ cp service/BillingServer_sample_service.bal <PROJECT_DIRECTORY>/src/server
import ballerina/grpc;
listener grpc:Listener ep = new (9090);
service BillingServer on ep {
resource function quickBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
// Implementation goes here.
// You should return a Bill
}
resource function oneByOneBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
// Implementation goes here.
// You should return a Bill
}
}
public type InputValue record {|
string itemName = "";
int quantity = 0;
float price = 0.0;
|};
public type Bill record {|
string billType = "";
int totalQuantity = 0;
float totalPrice = 0.0;
|};
const string ROOT_DESCRIPTOR = "0A0A737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33";
function getDescriptorMap() returns map<string> {
return {
"stub.proto":"0A0A737475622E70726F746F120773657276696365225A0A0A496E70757456616C7565121A0A086974656D4E616D6518012001280952086974656D4E616D65121A0A087175616E7469747918022001280352087175616E7469747912140A0570726963651803200128025205707269636522680A0442696C6C121A0A0862696C6C54797065180120012809520862696C6C5479706512240A0D746F74616C5175616E74697479180220012803520D746F74616C5175616E74697479121E0A0A746F74616C5072696365180320012802520A746F74616C50726963653280010A0D42696C6C696E6753657276657212340A0C717569636B42696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C280112390A0F6F6E6542794F6E6542696C6C696E6712132E736572766963652E496E70757456616C75651A0D2E736572766963652E42696C6C28013001620670726F746F33"
};
}
Lastly, we can generate the client boilerplate code by using the following command. This file must be copied to the client module directory.
$ ballerina grpc --input streaming-stub.proto --mode client --output .
// Copy the generated file to the client module directory located at
$ cp service/BillingServer_sample_client.bal <PROJECT_DIRECTORY>/src/client
Now that the preparations are done, we can start to implement the server. The server boilerplate code can be edited over with the resource functions included in this section.
The resource function for
quickBilling
(client streaming) can be defined as follows.// Make the edit in the server boilerplate code BillingServer_sample_service.bal
resource function quickBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
int totalNumber = 0;
float totalBill = 0;
log:printInfo("Starting Quick Billing Service");
//Iterating through streamed messages here
error? e = clientStream.forEach(function(InputValue value) {
log:printInfo("Item:" + value.itemName + " Quantity:" +
value.quantity.toString() + " Price:" + value.price.toString());
totalNumber += value.quantity;
totalBill += (value.quantity * value.price);
});
//Once the client completes stream, a grpc:EOS error is returned to indicate it
if (e is grpc:EOS) {
Bill finalBill = {
billType: "Final Bill",
totalQuantity: totalNumber,
totalPrice: totalBill
};
//Sending the total bill to the client
grpc:Error? result = caller->send(finalBill);
if (result is grpc:Error) {
log:printError("Error occured when sending the bill: " +
result.message() + " - " + <string>result.detail()["message"]);
} else {
log:printInfo ("Sending Final Bill Total: " + finalBill.totalPrice.toString() +
" for " + finalBill.totalQuantity.toString() + " items");
}
result = caller->complete();
if (result is grpc:Error) {
log:printError("Error occured when closing the connection: " + result.message() +
" - " + <string>result.detail()["message"]);
}
}
//If the client sends an error instead it can be handled here
else if (e is grpc:Error) {
log:printError("An unexpected error occured: " + e.message() + " - " + <string>e.detail()["message"]);
}
}
The client’s streamed messages are made available as a stream object argument, which can be iterated through using a loop processing each message sent by the client. Once the client stream has completed, a
grpc:EOS
error is returned, which can be used to identify when to send the final response message (the final bill) to the client using the caller object.The resource function
oneByOneBilling
(bi-directional streaming) can be defined as follows.// Make the edit to the server code BillingServer_sample_service.bal
resource function oneByOneBilling(grpc:Caller caller, stream<InputValue,error> clientStream) {
int totalNumber = 0;
float totalBill = 0;
log:printInfo("Starting One by One Billing Service");
//Iterating through streamed messages here
error? e = clientStream.forEach(function(InputValue value) {
log:printInfo("Item:" + value.itemName + " Quantity:" + value.quantity.toString() +
" Price:" + value.price.toString());
totalNumber += value.quantity;
totalBill += (value.quantity * value.price);
Bill tempBill = {
billType: "Interim Bill",
totalQuantity: totalNumber,
totalPrice: totalBill
};
//Sending the interim bill to the client
grpc:Error? result = caller->send(tempBill);
if (result is grpc:Error) {
log:printError("Error occured when sending the bill: " + result.message() +
" - " + <string>result.detail()["message"]);
} else {
log:printInfo ("Sending Interim Bill Total: " + tempBill.totalPrice.toString() +
" for " + tempBill.totalQuantity.toString() + " items");
}
});
//Once the client completes stream, a grpc:EOS error is returned to indicate it
if (e is grpc:EOS) {
Bill finalBill = {
billType: "Final Bill",
totalQuantity: totalNumber,
totalPrice: totalBill
};
//Sending the total bill to the client
grpc:Error? result = caller->send(finalBill);
if (result is grpc:Error) {
log:printError("Error occured when sending the bill: " + result.message() +
" - " + <string>result.detail()["message"]);
} else {
log:printInfo ("Sending Final Bill Total: " + finalBill.totalPrice.toString() +
" for " + finalBill.totalQuantity.toString() + " items");
}
result = caller->complete();
if (result is grpc:Error) {
log:printError("Error occured when closing the connection: " + result.message() +
" - " + <string>result.detail()["message"]);
}
}
//If the client sends an error instead it can be handled here
else if (e is grpc:Error) {
log:printError("An unexpected error occured: " + e.message() + " - " + <string>e.detail()["message"]);
}
}
The code above is similar to the
quickBilling
resource function code. The only difference here is that, as this is a bi-directional streaming resource, for each message that is processed, the interim bill value is sent back to the client using the caller object.In order to test the above gRPC streaming server, I have included an example client implementation in this article. The following can be added to the client code generated using the Ballerinalang gRPC tool.
import ballerina/grpc;
import ballerina/io;
string opMode = "";
public function main (string mode) {
BillingServerClient ep = new("http://localhost:9090");
grpc:StreamingClient | grpc:Error streamClient;
if (mode == "quick") {
opMode = "Quick";
// Initialize call with quickBilling resource
streamClient = ep->quickBilling(BillingServerMessageListener);
} else if (mode == "oneByOne") {
opMode = "One by one";
// Initialize call with oneByOneBilling resource
streamClient = ep->oneByOneBilling(BillingServerMessageListener);
} else {
io:println("Unsupported operation mode entered!");
return;
}
if (streamClient is grpc:Error) {
io:println("Error from Connector: " + streamClient.message() + " - "
+ <string>streamClient.detail()["message"]);
return;
} else {
//Start sending messages to the server
io:println("Starting " + opMode + " billing service");
// Sending first message
InputValue item = {
itemName: "Apples",
quantity: 4,
price: 30.50
};
grpc:Error? connErr = streamClient->send(item);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
} else {
io:println("Sent item: " + item.itemName + " Quantity: " + item.quantity.toString() +
" Price: " + item.price.toString());
}
// Sending second message
item = {
itemName: "Oranges",
quantity: 6,
price: 43.4
};
connErr = streamClient->send(item);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
} else {
io:println("Sent item: " + item.itemName + " Quantity: " + item.quantity.toString() +
" Price: " + item.price.toString());
}
// Sending third message
item = {
itemName: "Grapes",
quantity: 20,
price: 11.5
};
connErr = streamClient->send(item);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
} else {
io:println("Sent item: " + item.itemName + " Quantity: " + item.quantity.toString() +
" Price: " + item.price.toString());
}
// Sending complete signal
connErr = streamClient->complete();
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.message() + " - "
+ <string>connErr.detail()["message"]);
}
}
}
// Message listener for incoming messages
service BillingServerMessageListener = service {
resource function onMessage(Bill message) {
io:println("Received " + message.billType + " Total: " + message.totalPrice.toString() +
" for " + message.totalQuantity.toString() + " items" );
}
resource function onError(error err) {
io:println("Error reported from server: " + err.message() + " - "
+ <string>err.detail()["message"]);
}
resource function onComplete() {
io:println(opMode +" billing completed");
}
};
When running the above client an argument is required, which indicates whether to use
quickBilling
or onByOneBilling
. Based on this, the correct call is initialized, returning a streaming client object. Using this object, three messages are streamed to the server, and finally the complete signal is sent to the server. The BillingServerMessageListener
, which was registered during the initialization phase, captures the messages and the complete signal sent by the server.Finally it’s time to lay back, relax, try out the code we just implemented and see the magic in action. We’ll start off by running the server using the following command.
$ ballerina run server
Once the server is up and running, we can test out the client streaming and bi-directional streaming remote procedure calls separately. When running the client, include the “quick” flag for the former and the “oneByOne” flag for the latter. The command for running them and the expected results are illustrated below.
// Calling the quickBilling resource
$ ballerina run client quick
Starting Quick billing service
Sent item: Apples Quantity: 4 Price: 30.5
Sent item: Oranges Quantity: 6 Price: 43.4
Sent item: Grapes Quantity: 20 Price: 11.5
Received Final Bill Total: 612.4 for 30 items
Quick billing completed
// Calling the oneByOneBilling resource
$ ballerina run client oneByOne
Starting One by one billing service
Sent item: Apples Quantity: 4 Price: 30.5
Sent item: Oranges Quantity: 6 Price: 43.4
Sent item: Grapes Quantity: 20 Price: 11.5
Received Interim Bill Total: 122.0 for 4 items
Received Interim Bill Total: 382.4 for 10 items
Received Interim Bill Total: 612.4 for 30 items
Received Final Bill Total: 612.4 for 30 items
One by one billing completed
Congratulations if you were able to make it up to this point. Getting a gRPC streaming service up and running in Ballerinalang is exceedingly simple and you have mastered everything you need to get there.
If you want to take a look at the entire implementation of the example above, you can find it in this GitHub repository.
With gRPC being increasingly adopted in microservices based applications, Ballerinalang provides a graceful language embedded approach to implement it. This is taken a step further in the Ballerinalang Swan Lake Preview 1 release with a server-side re-haul of gRPC streaming, providing a cleaner way to handle client streams and multiple resource support within a single service.
There is no better way than trying the above example out yourself to see how easy and quick it is to implement and get a gRPC streaming service up and running using Ballerinalang.
Here are some resources to get you started with Ballerinalang: