I am creating an AWS online course my goal is giving people the opportunity to learn DevOps technologies with quality courses and practical learning paths.
If you are interested in Practical AWS training, you can make a preorder now and get the training at almost its half-price.
You can also download our mini ebook 8 Great Tips to Learn AWS.
This is the part 2 of the last post called Creating A Realtime Analytics & Event Processing System For Big Data Using Amazon Kinesis .
After this tutorial, you will be able to send a large amounts of data to your AWS infrastructure while sending a Hipchat notification in real time everytime an event is detected.
We will use: Amazon Kinesis, Python and Amazon Lambda
The last tutorial is addressed to engineers, developers and architects who would like to build a realtime analytics and event processing system for large amounts of data collected from multiple sources : IoT, logs from servers, routers, distributed processing systems ..etc
This one focus more on the event processing part even though the two tutorials are logically linked. I recommend you to do a least a speed reading of the previous post.
In this post we are going to see how to process and respond to real time events as fast as possible. There are multiple use cases but let’s imagine thousands of IoT devices sending logs and in some cases where a condition is checked an operation should be triggered.
In few words, data will travel through Kinesis, will be checked by AWS Lambda and the latter may trigger other lambda functions.
If you are used to Apache Kafka, Kinesis is a cloud-based managed alternative to Kafka.
In other words, Kinesis is a system used for building real-time data pipelines and streaming apps and storing the same data to AWS Redshift or S3.
AWS Lambda is a compute server that allow developers and engineers to create a serverless architecture to execute an uploaded code.
AWS Lambda can be used as :
In our tutorial, the second scenario is used with Kinesis streams.
At this step, we should have a set up Kinesis stream.
The Kinesis stream will collect and stream data for ordered, replayable, real-time processing. Go to you console and just create a stream.
This is an example of the output of describe_stream() function (already seen in the last tutorial):
{'StreamDescription':{'StreamARN':'arn:aws:kinesis:eu-west-1:95312312312:stream/TestStream','EnhancedMonitoring':[{'ShardLevelMetrics':[
]}],'StreamStatus':'ACTIVE','RetentionPeriodHours':24,'HasMoreShards':False,'StreamName':'TestStream','Shards':[{'SequenceNumberRange':{'StartingSequenceNumber':'495662365138256951538110533149564956654'},'HashKeyRange':{'EndingHashKey':'4956312312314956123124956124956','StartingHashKey':'0'},'ShardId':'shardId-000000000000'}]}}
Go then to the Lambda dashboard and skip this step:
Choose Kinesis as an upstream service, enable the trigger and do not forget to enable the popups to add the permissions.
Now you should click on Next and setup your code in the next screen.
You can choose other options like allowed memory, timeout or VPC ..etc
I am now going to execute the producer function (A python script that will connect to an API, get the data (in this example, a list of users) and send it to Kinesis):
from api.users import Usersimport jsonfrom boto import kinesis
stream_name = "TestStream"u = get_users()x = u.list()
for line in x.iter_lines():kinesis = kinesis.connect_to_region("eu-west-1")kinesis.put_record(stream_name, line, "partitionkey")if line:print (line)
For testing purposes, I kept executing this script on my terminal while seeing the monitoring dashboard for Lambda, and the result was:
If you see that the invocation counts increased then you are on the right path. Well this is normal because every transiting stream will trigger the code to be executed.
Now that everything is set up, we are going to upload a Python application that will be executed every time we have a Kinesis Stream.
In this example, I will notify a Hipchat room and I will use a tool that I developed : Hippy — A Python Wrapper For Hipchat Api v2.
Download the code from Github to your app directory, install dependencies in the same folder and rename the example.py yo lambda_function.py
mkdir apppip install -r requirements.txt -t app/cd appmv example.py lambda_function.py
This is the original code of example.py (renamed to lambda_function.py):
from api.notify import Notify
key = ""
room_id = ""
n = Notify(key, room_id)
n.notify( "Testing Hippy", "red", "false", "text")
This is not enough for our application to work with Lambda, the code should be like
def lambda_handler(event, context):from api.notify import Notifykey = ""room_id = ""n = Notify(key, room_id)n.notify( "Testing Hippy From AWS Lambda", "red", "false", "text")
We installed dependancies in the same folder, we changed the name of the example.py and its structure, now evreything shoudl go into a zip archive:
zip -r lambda.zip .
Go to Lambda dashboard and choose upload zip then upload lambda.zip :
Now, at every stream detection, a message will be sent to your Hipchat room.
You can debug your application using cloud watch and see its logs.
That’s all folks :-)
If you resonated with this article, please subscribe to DevOpsLinks : An Online Community Of Diverse & Passionate DevOps, SysAdmins & Developers From All Over The World.
You can find me on Twitter, Clarity or my blog and you can also check my books: SaltStack For DevOps,The Jumpstart Up & Painless Docker.
If you liked this post, please recommend and share it to your followers.