With legacy stocks, a common practice is to perform fundamental analysis to decide whether they're worth investing in; similarly, you can analyze tokens on the Ethereum blockchain.
By analyzing legacy stocks, you gain insights into the company's profitability, and its latest rounds of funding, see the performance of the specific sector that the company is in, and the overall economic conditions to make an informed investment decision.
With crypto tokens, you can do the same and even more. By analyzing crypto tokens, you can also find out the number of token holders, see if big market players are buying the token, and even learn the exact prices they're trading at and copy their actions.
However, extracting and analyzing this data for crypto tokens is no easy task. You need robust data pipelines suitable for aggregating and performing analysis on vast amounts of data.
In this article, I want to share my architectural considerations for building a system that helps to collect data from the Ethereum blockchain and runs a real-time analysis of DeFi tokens data. We will focus on the big picture rather than diving into details and explore the key decisions that shape such a pipeline.
Background
Pipeline Mechanism
Selecting the Tools
Running the Analysis
Results
The core objective of the data pipeline is to quickly perform complex maths operations on Ethereum blockchain data. More specifically, we want to analyze ERC-20 tokens used in DeFi protocols on Ethereum.
To achieve that, we need our pipeline to robustly perform 4 main tasks:
One of the initial challenges we face is accessing blockchain data. In theory, deploying an Ethereum node on your server allows for unlimited access to this data. However, the data format stored on the node is not suitable for analytics. Hence, it is preferable to organize the data into Postgres tables. To achieve this, we need to extract all historical blockchain data starting from 2015 and continuously gather the latest real-time data.
The Ethereum blockchain data is accessed using the JSON-RPC protocol. There is a vast amount of historical data, and therefore, in order to extract it all to your database, you will either need a lot of computing power, which is quite costly, or takes a considerable amount of time.
When it comes to real-time blocks that are generated roughly every 10 seconds, things get a bit easier. There are several ways to get this data. One is to use the Ethereum-RPC WebSocket that essentially sends notifications if a new block has appeared on the blockchain. Once you receive such a notification, you can make a direct request to the node to extract all of the data.
Alternatively, instead of subscribing to block notifications, we can directly receive data from the latest blocks using a web socket.
Yet, there is an important caveat: it is crucial for analytics not to miss even a single block or a single transaction. If you miss even one piece of data, it breaks the chain and messes up the analysis. For example, if we overlook a transfer between wallets, our balance information for that wallet would be incorrect, leading to a flawed analysis.
This limitation makes the Ethereum JSON-RPC WebSocket solution ineffective—the WebSocket connection is not reliable, it can break every now and then, leading to data loss during interruptions. And that’s something we want to avoid.
Therefore, we've opted for a more balanced approach. We’ll still rely on web sockets, but not as the sole source of data. Instead, we will utilize WebSockets to receive notifications about new blocks. Then, once we know that the new block is out there, we will make a separate request to the node.
Also, we need to address an important yet somewhat unpleasant aspect of the Ethereum blockchain—forks. When a fork occurs, it splits the blockchain into two separate paths: the original chain and the new chain. This is similar to a road splitting into two different directions.
When we request a block that has just appeared (say, in the last few minutes), we cannot say for sure whether it is part of a fork or not. This only becomes clear after a while, when nodes from all over the world reach a consensus. If we want to perform real-time analytics second-by-second and block-by-block, we need to find a way to handle forks.
To deal with this complication, at least for the initial versions of the pipeline, we’ve found a trade-off: by analyzing blocks that are a couple of dozen blocks behind the latest block we can be reasonably sure that the block came from the main branch of the blockchain.
When designing our pipeline, we want to create a system that:
To make our system flexible and adaptable to change, we need to make sure that its components are as independent from each other as possible.
To process data quickly, tasks should be divided among several workers and worked on asynchronously.
To guarantee the data is complete, we need to run checks which include cross-checks with nodes and wallet balance verifications, to ensure that no transaction has been missed.
To keep the system inexpensive, it is important to properly manage computing resources and data storage, as well as to write optimal code.
That's why event-driven architecture becomes the clear choice.
With each part of the system being completely independent of the others, we gain scalability. We can add as many components as we want, anywhere we want, and configure them to work with specific events.
All parts of the system communicate with each other asynchronously through events, providing speed and the capacity to handle large amounts of data.
The message inside each event is serialized in byte format, as the broker is solely responsible for conveying the message to the right consumer and doesn't need to bother with its content. This serialization process makes message delivery extremely fast, resulting in a higher throughput of data streams.
Messages are not lost during downtime, so we have fault tolerance.
When the service goes down, the entire workflow is usually interrupted. With event-driven architecture, the consumer can process all accumulated messages after the working capacity is restored.
Let's walk through the flow of our pipeline mechanism:`
We receive a WebSocket notification informing us about a new block #1000 has arrived. This message enters our event pipeline.`
The extraction worker picks up the message from the event pipeline. Since we only process blocks that are ∆ blocks away from the latest block (to avoid forks), the worker requests data for block #1000-∆. At this point, we only need the smart contract logs, specifically the transfer logs. Once the data is fetched, the worker sends a message back to the event pipeline stating that the data for block #1000-∆ has been retrieved and is ready for transformation.`
The transformation worker scans the event queue and finds the message indicating that new data has been fetched from the blockchain. It then applies the necessary transformations, decoding the raw data into a format that is suitable for analytics, and writes it to the database.
Here are some of the topics:
The main object that we work with is a block. The block goes through a series of events that mutate its state.
This graph showcases the process of block state transition:
The new_block
event firing is triggered by a new block notification. This event assigns the new state to the block and triggers the extraction worker. Upon receiving the block, the extraction worker gets on it and changes its state to extraction_in_progress.
Once extraction is complete, the extraction_complete event is fired, which in turn triggers a transformation worker. The transformation worker changes the block state to transformation_in_progres
s and upon completing the transformation, the block is put into the transformation_complete state.
If any worker is unable to process the block, then the block's state is changed to failed. This process is repeated up to three times, after which the block is considered to have failed. If all workers process the block successfully, then the block goes to the complete state.
Every part of the system is aware of the state of the object it is working on and knows what to do next. Managing these blocks is also easy. For example, if we need to send several blocks for reprocessing, we can simply change their state from complete to new again.
For our pipeline, we've chosen the Google Cloud Platform. It is well-documented, very user-friendly, and offers a vast array of tools for any conceivable task. Also, they provide start-ups and small companies with $100,000 in free credits per year.
You can either deploy your own blockchain node or use a ready-made one from a Node-as-a-Service provider. The decision is entirely up to you. If you choose to deploy your own Node, you will have to support it and allocate the necessary computing resources. On the other hand, opting for a ready-made solution from a provider means you will have to pay for it.
Now, let's discuss the characteristics we need in our database. Here's what we're looking for:
Fast performance when working with large tables;
Efficient handling of long string indexes;
Speedy execution of asynchronous access;
Data replication to provide for reliability and query load distribution across the instances.
Initially, we considered utilizing NoSQL databases such as MongoDB — it is an in-memory database, and its key: value format appeared more suitable for storing information about transactions. However, after more in-depth research, we realized that PostgreSQL outperforms MongoDB in all the key parameters.
MongoDB is falling behind in terms of performance, speed, and resource consumption, especially with asynchronous access.
Even the key:value storage, which appeared to be an advantage, is no longer a benefit since Postgres supports the jsonb data type, which is a complete data type that is indexed, ordered, fast, and compresses data better than Mongo.
As a key:value store, Mongo is incredibly inefficient at joins, which are necessary if we want to calculate metrics on the database side.
Asynchronous database interaction requires speed, which is why support for the JSON data type and data replication are crucial. So ultimately, PostgreSQL offers excellent capabilities for calculating metrics.
For our message broker, we've chosen Google Pub/Sub as it perfectly meets all our needs. Specifically, its message delivery semantics allow multiple consumers to read the same queue independently, which is precisely our requirement. Additionally, Google Pub/Sub comes with built-in deduplication and customizable keys.
We will use Google Cloud Functions to calculate analytics according to a given schedule. This can be done at least once per block, once every hour or once a day. The shorter the interval between metric calculations, the faster they will be computed and the fewer resources they will consume. Google Cloud Functions are well-suited for this task because they are short, scheduled computing tasks that use memory and automatically shut down when finished, saving resources and money.
We have designed a data pipeline to run on the Google Cloud Platform; it utilizes Cloud Functions and a set of microservices that communicate via Pub/Sub and write results to a PostgreSQL cluster. This pipeline has been running successfully for more than 6 months. During this time it has processed 13M blocks and 40M smart contract logs, with an average block processing time of 250 milliseconds.
Through observations and occasional bug fixes, we have found areas for improvement. By addressing these, we hope to minimize manual maintenance required and decrease failures. The current system serves as a solid foundation; it is flexible and relatively low-cost. Moreover, the improvements are quite straightforward:
This pipeline is designed in a way that makes it easy to integrate analytics to any new DeFi token on Ethereum. Adding new components to the pipeline, including native Ethereum token parsers, is also simple.
The results are available at whalemap.io, where you can see analytics for Uniswap, Curve, Sushi, and six other popular tokens.
The featured image for this piece was generated with Kadinsky 2.
Prompt: Illustrate a cryptocurrency pipeline.