paint-brush
Designing a Pipeline for Ethereum DeFi Token Analysisby@krieker
322 reads
322 reads

Designing a Pipeline for Ethereum DeFi Token Analysis

by Alina ArsamakovaMay 19th, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

By analysing crypto tokens, you can find out the number of token holders, see if big market players are buying the token, and even learn the exact prices they're trading at. Extracting and analysing this data for crypto tokens is no easy task. In this article, I want to share my high-level ideas and architectural considerations for building a system that helps to collect data from the Ethereum blockchain and runs real-time analysis of DeFi tokens data.
featured image - Designing a Pipeline for Ethereum DeFi Token Analysis
Alina Arsamakova HackerNoon profile picture

With legacy stocks, a common practice is to perform fundamental analysis to decide whether they're worth investing in; similarly, you can analyze tokens on the Ethereum blockchain.

By analyzing legacy stocks, you gain insights into the company's profitability, and its latest rounds of funding, see the performance of the specific sector that the company is in, and the overall economic conditions to make an informed investment decision.


With crypto tokens, you can do the same and even more. By analyzing crypto tokens, you can also find out the number of token holders, see if big market players are buying the token, and even learn the exact prices they're trading at and copy their actions.


However, extracting and analyzing this data for crypto tokens is no easy task. You need robust data pipelines suitable for aggregating and performing analysis on vast amounts of data.


In this article, I want to share my architectural considerations for building a system that helps to collect data from the Ethereum blockchain and runs a real-time analysis of DeFi tokens data. We will focus on the big picture rather than diving into details and explore the key decisions that shape such a pipeline.


Table of Contents

  • Background

    • Main Objective
    • Challenges
    • Pipeline Characteristics
  • Pipeline Mechanism

  • Selecting the Tools

    • Cloud Provider
    • Blockchain Node
    • Data Storage
    • Message Broker
  • Running the Analysis

  • Results


Background

Main Objective

The core objective of the data pipeline is to quickly perform complex maths operations on Ethereum blockchain data. More specifically, we want to analyze ERC-20 tokens used in DeFi protocols on Ethereum.


To achieve that, we need our pipeline to robustly perform 4 main tasks:


  1. Quickly extract large amounts of data from the Ethereum blockchain;
  2. Filter out the data related to ERC-20 DeFi tokens;
  3. Store this data in a format suitable for fast analysis;
  4. Perform analytics on the stored data efficiently.

Challenges

One of the initial challenges we face is accessing blockchain data. In theory, deploying an Ethereum node on your server allows for unlimited access to this data. However, the data format stored on the node is not suitable for analytics. Hence, it is preferable to organize the data into Postgres tables. To achieve this, we need to extract all historical blockchain data starting from 2015 and continuously gather the latest real-time data.


The Ethereum blockchain data is accessed using the JSON-RPC protocol. There is a vast amount of historical data, and therefore, in order to extract it all to your database, you will either need a lot of computing power, which is quite costly, or takes a considerable amount of time.


When it comes to real-time blocks that are generated roughly every 10 seconds, things get a bit easier. There are several ways to get this data. One is to use the Ethereum-RPC WebSocket that essentially sends notifications if a new block has appeared on the blockchain. Once you receive such a notification, you can make a direct request to the node to extract all of the data.


Alternatively, instead of subscribing to block notifications, we can directly receive data from the latest blocks using a web socket.


Yet, there is an important caveat: it is crucial for analytics not to miss even a single block or a single transaction. If you miss even one piece of data, it breaks the chain and messes up the analysis. For example, if we overlook a transfer between wallets, our balance information for that wallet would be incorrect, leading to a flawed analysis.


This limitation makes the Ethereum JSON-RPC WebSocket solution ineffective—the WebSocket connection is not reliable, it can break every now and then, leading to data loss during interruptions. And that’s something we want to avoid.


Therefore, we've opted for a more balanced approach. We’ll still rely on web sockets, but not as the sole source of data. Instead, we will utilize WebSockets to receive notifications about new blocks. Then, once we know that the new block is out there, we will make a separate request to the node.


Also, we need to address an important yet somewhat unpleasant aspect of the Ethereum blockchain—forks. When a fork occurs, it splits the blockchain into two separate paths: the original chain and the new chain. This is similar to a road splitting into two different directions.


When we request a block that has just appeared (say, in the last few minutes), we cannot say for sure whether it is part of a fork or not. This only becomes clear after a while, when nodes from all over the world reach a consensus. If we want to perform real-time analytics second-by-second and block-by-block, we need to find a way to handle forks.


To deal with this complication, at least for the initial versions of the pipeline, we’ve found a trade-off: by analyzing blocks that are a couple of dozen blocks behind the latest block we can be reasonably sure that the block came from the main branch of the blockchain.


Pipeline Characteristics

When designing our pipeline, we want to create a system that:


  1. Processes new blocks as soon as they appear on the blockchain;
  2. Guarantees that data is complete and reliable;
  3. Keeps costs reasonably low;
  4. Easy to build and simple;
  5. Allows to add new functionality without changing the existing mechanisms;
  6. Has room for performance improvement.


  • To make our system flexible and adaptable to change, we need to make sure that its components are as independent from each other as possible.


  • To process data quickly, tasks should be divided among several workers and worked on asynchronously.


  • To guarantee the data is complete, we need to run checks which include cross-checks with nodes and wallet balance verifications, to ensure that no transaction has been missed.


  • To keep the system inexpensive, it is important to properly manage computing resources and data storage, as well as to write optimal code.


That's why event-driven architecture becomes the clear choice.


With each part of the system being completely independent of the others, we gain scalability. We can add as many components as we want, anywhere we want, and configure them to work with specific events.


  • All parts of the system communicate with each other asynchronously through events, providing speed and the capacity to handle large amounts of data.


  • The message inside each event is serialized in byte format, as the broker is solely responsible for conveying the message to the right consumer and doesn't need to bother with its content. This serialization process makes message delivery extremely fast, resulting in a higher throughput of data streams.


  • Messages are not lost during downtime, so we have fault tolerance.


  • When the service goes down, the entire workflow is usually interrupted. With event-driven architecture, the consumer can process all accumulated messages after the working capacity is restored.


Pipeline Mechanism

Image description


Let's walk through the flow of our pipeline mechanism:`


  1. We receive a WebSocket notification informing us about a new block #1000 has arrived. This message enters our event pipeline.`


  2. The extraction worker picks up the message from the event pipeline. Since we only process blocks that are ∆ blocks away from the latest block (to avoid forks), the worker requests data for block #1000-∆. At this point, we only need the smart contract logs, specifically the transfer logs. Once the data is fetched, the worker sends a message back to the event pipeline stating that the data for block #1000-∆ has been retrieved and is ready for transformation.`


  3. The transformation worker scans the event queue and finds the message indicating that new data has been fetched from the blockchain. It then applies the necessary transformations, decoding the raw data into a format that is suitable for analytics, and writes it to the database.


Here are some of the topics:


  • INIT 0x98636036cb66a9c19a37435efc1e90142190214e8abeb821bdba3f2990dd4c95
  • MINT 0x7a53080ba414158be7ec69b987b5fb7d07dee101fe85488f0853ae16239d0bde
  • TRANSFER 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef
  • SWAP 0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67
  • COLLECT 0x70935338e69775456a85ddef226c395fb668b63fa0115f5f20610b388e6ca9c0
  • BURN 0x0c396cd989a39f4459b5fa1aed6a9a8dcdbc45908acfd67e028cd568da98982c


The main object that we work with is a block. The block goes through a series of events that mutate its state.


This graph showcases the process of block state transition:

Image description


The new_block event firing is triggered by a new block notification. This event assigns the new state to the block and triggers the extraction worker. Upon receiving the block, the extraction worker gets on it and changes its state to extraction_in_progress.


Once extraction is complete, the extraction_complete event is fired, which in turn triggers a transformation worker. The transformation worker changes the block state to transformation_in_progress and upon completing the transformation, the block is put into the transformation_complete state.


If any worker is unable to process the block, then the block's state is changed to failed. This process is repeated up to three times, after which the block is considered to have failed. If all workers process the block successfully, then the block goes to the complete state.


Every part of the system is aware of the state of the object it is working on and knows what to do next. Managing these blocks is also easy. For example, if we need to send several blocks for reprocessing, we can simply change their state from complete to new again.


Selecting the tools

Cloud Provider

For our pipeline, we've chosen the Google Cloud Platform. It is well-documented, very user-friendly, and offers a vast array of tools for any conceivable task. Also, they provide start-ups and small companies with $100,000 in free credits per year.

Blockchain Node

You can either deploy your own blockchain node or use a ready-made one from a Node-as-a-Service provider. The decision is entirely up to you. If you choose to deploy your own Node, you will have to support it and allocate the necessary computing resources. On the other hand, opting for a ready-made solution from a provider means you will have to pay for it.

Data Storage

Now, let's discuss the characteristics we need in our database. Here's what we're looking for:

  • Fast performance when working with large tables;

  • Efficient handling of long string indexes;

  • Speedy execution of asynchronous access;

  • Data replication to provide for reliability and query load distribution across the instances.


Initially, we considered utilizing NoSQL databases such as MongoDB — it is an in-memory database, and its key: value format appeared more suitable for storing information about transactions. However, after more in-depth research, we realized that PostgreSQL outperforms MongoDB in all the key parameters.


  • MongoDB is falling behind in terms of performance, speed, and resource consumption, especially with asynchronous access.

  • Even the key:value storage, which appeared to be an advantage, is no longer a benefit since Postgres supports the jsonb data type, which is a complete data type that is indexed, ordered, fast, and compresses data better than Mongo.

  • As a key:value store, Mongo is incredibly inefficient at joins, which are necessary if we want to calculate metrics on the database side.


Asynchronous database interaction requires speed, which is why support for the JSON data type and data replication are crucial. So ultimately, PostgreSQL offers excellent capabilities for calculating metrics.

Message Broker

For our message broker, we've chosen Google Pub/Sub as it perfectly meets all our needs. Specifically, its message delivery semantics allow multiple consumers to read the same queue independently, which is precisely our requirement. Additionally, Google Pub/Sub comes with built-in deduplication and customizable keys.

Running the Analysis

We will use Google Cloud Functions to calculate analytics according to a given schedule. This can be done at least once per block, once every hour or once a day. The shorter the interval between metric calculations, the faster they will be computed and the fewer resources they will consume. Google Cloud Functions are well-suited for this task because they are short, scheduled computing tasks that use memory and automatically shut down when finished, saving resources and money.

Results

We have designed a data pipeline to run on the Google Cloud Platform; it utilizes Cloud Functions and a set of microservices that communicate via Pub/Sub and write results to a PostgreSQL cluster. This pipeline has been running successfully for more than 6 months. During this time it has processed 13M blocks and 40M smart contract logs, with an average block processing time of 250 milliseconds.


Through observations and occasional bug fixes, we have found areas for improvement. By addressing these, we hope to minimize manual maintenance required and decrease failures. The current system serves as a solid foundation; it is flexible and relatively low-cost. Moreover, the improvements are quite straightforward:


  • Implementing monitoring and alerts;
  • Collecting logs across the entire system into traces using OpenTelemetry;
  • Implementing automated and manual switches between data sources (nodes);
  • Running data integrity checks using Cron.


This pipeline is designed in a way that makes it easy to integrate analytics to any new DeFi token on Ethereum. Adding new components to the pipeline, including native Ethereum token parsers, is also simple.

The results are available at whalemap.io, where you can see analytics for Uniswap, Curve, Sushi, and six other popular tokens.


The featured image for this piece was generated with Kadinsky 2.

Prompt: Illustrate a cryptocurrency pipeline.