Data Collection and Data Ingestion are the processes of fetching data from any data source which we can perform in two ways -
In Today’s World, Enterprises are generating data from different Sources and building Real Time Data lake; we need to Integrate various sources of Data into One Stream.
In this Blog We are sharing how to Ingest, Store and Process Twitter Data using Apache Nifi and in Coming Blogs, we will be Sharing Data Collection and Ingestion from Below Sources
Apache NiFi provides an easy to use, the powerful, and reliable system to process and distribute the data over several resources.
Apache NiFi is used for routing and processing data from any source to any destination. The process can also do some data transformation.
It is a UI based platform where we need to define our source from where we want to collect data, processors for the conversion of the data, a destination where we want to store the data.
Each processor in NiFi have some relationships like success, retry, failed, invalid data, etc. which we can use while connecting one processor to another. These links help in transferring the data to any storage or processor even after the failure by the processor.
When we require moving a large amount of data, then the only single instance of NiFi is not enough to handle that amount of data. So to handle this we can do clustering of the NiFi Servers, this will help us in scaling.
We just need to create the data flow on one node, and this will make a copy of this data flow on each node in the cluster.
NiFi introduces Zero-Master Clustering paradigm in Apache NiFi 1.0.0. A previous version of Apache NiFi based upon a single “Master Node” (more formally known as the NiFi Cluster Manager).
If the master node gets lost, data continued to flow, but the application was unable to show the topology of the flow, or show any stats. But in Zero-Master we can make changes from any node of the cluster.
And if master node disconnects, then automatically any active node is elected as Master Node.
Each node has the same the data flow, so they work on the same task as the other nodes are working, but each operates on the different datasets.
In NiFi cluster, one node is elected as the Master(Cluster Coordinator), and another node sends heartbeats/status information to the master node. This node is responsible for the disconnection of the other nodes that do not send any pulse/status information.
This election of the master node is done via Apache Zookeeper. And In the case when the master nodes get disconnected, Apache Zookeeper elects any active node as the master node.
NiFi’s ‘GetTwitter’ processor is used to fetch tweets. It uses Twitter Streaming API for retrieving tweets. In this processor, we need to define the endpoint which we need to use. We can also apply filters by location, hashtags, particular IDs.
- Sample Endpoint — Fetch public tweets from all over the world.
- Firehose Endpoint — This is same as streaming API, but it ensures 100% guarantee delivery of tweets with filters.
- Filter Endpoint — If we want to filter by any hashtags or keywords
Now processor GetTwitter is ready for transmission of the data(tweets). From here we can move our data stream to anywhere like Amazon S3, Apache Kafka, ElasticSearch, Amazon Redshift, HDFS, Hive, Cassandra, etc. NiFi can move data multiple destinations parallelly.
For this, we are using NiFi processor ‘PublishKafka_0_10’.
In the Scheduling Tab, we can configure how many concurrent tasks to be executed and schedule the processor.
In Properties Tab, we can set up our Kafka broker URLs, topic name, request size, etc. It will write data to the given topic. For the best results, we can create a Kafka topic manually of a defined partitions.
Apache Kafka can be used to process data with Apache Beam, Apache Flink, Apache Spark.
Now we integrate Apache NiFi to Amazon Redshift. NiFi uses Amazon Kinesis Firehose Delivery Stream to store data to Amazon Redshift.
This delivery Stream should get utilized for moving data to Amazon Redshift, Amazon S3, Amazon ElasticSearch Service. We need to specify this while creating Amazon Kinesis Firehose Delivery Stream.
Now we have to move data to Amazon Redshift, so firstly we need to configure Amazon Kinesis Firehose Delivery Stream. While delivering data to Amazon Redshift, firstly the data is provided to Amazon S3 bucket, and then Amazon Redshift Copy command is used to move data to Amazon Redshift Cluster.
We can also enable data transformation while creating Kinesis Firehose Delivery Stream. In this, we can also backup the data to another Amazon S3 bucket other than an intermediate bucket.
So for this, we will use processor PutKinesisFirehose. This processor will use that Kinesis Firehose stream for delivering data to Amazon Redshift. Here we will configure AWS credentials and Kinesis Firehose Delivery Stream.
PutKinesisFirehose sends data to both Amazon Redshift and uses Amazon S3 as the intermediator. Now if someone only wants to use Amazon S3 as the storage so NiFi can also use for sending data to Amazon S3 only.
For this, we need to use NiFi processor PutS3Object. In it, we have to configure our AWS credentials, bucket name, and path, etc.
Most important aspect while storing data in S3 is partitioning. Here we can partition our data using expression language in the object key field. So Right now we have used day wise partitioning.