paint-brush
A Guide For Data Quality Monitoring with Amazon Deequby@lexaneon
911 reads
911 reads

A Guide For Data Quality Monitoring with Amazon Deequ

by Alexey ArtemovFebruary 6th, 2024
Read on Terminal Reader
tldt arrow

Too Long; Didn't Read

Monitor data quality with Amazon Deequ, InfluxDB, and Grafana in a Dockerized environment using Scala/Java and Apache Spark.
featured image - A Guide For Data Quality Monitoring with Amazon Deequ
Alexey Artemov HackerNoon profile picture

Intro

Data Quality is a vast topic that consists of different parts, some of which include:

  • Schema validation;
  • Data cleaning;
  • Data profiling;
  • Unit tests for Data;
  • Monitoring;
  • and others.


In this article, I will explain how Amazon Deequ could be used for monitoring data quality (based on regular profiling). For example, we will:

  • Take a CSV file with data;
  • Make profiling rules with Deequ;
  • Write results to InfluxDB;
  • Visualise results in Grafana.

Introduction to Amazon Deequ & Metric Repository

According to Amazon Deequ developers, Deequ is a library built on top of Apache Spark for defining "unit tests for data.” It measures data quality in large datasets. The entire project is located here.


We can use Deequ to calculate different metrics (such as CountDistinct, Distinctness, Maximum, Mean, Min, Uniqueness, and others) against various datasets. All calculations work on top of Apache Spark, making the library effective and scalable.


The Metric Repository is an interface that allows the saving of computation results to different systems/formats. The library offers the capability to save results as a DataFrame or to a file system as JSON or in memory. You can extend this interface and add implementation for different systems/formats. In this article, we will save our results to InfluxDB for further visualization in Grafana.

Environment

To run the example that we will create in the article, you need the following:

  • Apache Spark versions 2.2.x to 2.4.x and 3.0.x.
  • Docker
  • Scala/Java

Let’s run InfluxDB (run the next commands in the console):

# pull influxDB docker image
docker pull influxdb:1.8.4
# run docker container
docker run — rm — name=influxdb -d -p 8086:8086 influxdb:1.8.4
# connect to docker container and run influxDB console
docker exec -it influxdb influx
# create database which we will use for storing our metrics
create database example

Let’s run Grafana (use new console tab):

# pull grafana docker image
docker pull grafana/grafana
# run Grafana docker container & connect it with influxDB container
docker run -d — rm — name=grafana -p 3000:3000 — link influxdb grafana/grafana

Now, you can clone this repository from GitHub and open it in your favorite IDE.

Under path: src/main/scala/lexaneon/amazon/deequ/repository/influxdb , you can find MetricRepository for InfluxDB — we will use it for our task.

Under example folder: src/main/scala/lexaneon/amazon/deequ/example/InfluxDBMetricRepository.scala you can find the full example, which is discussed below

package lexaneon.amazon.deequ.example

import com.amazon.deequ.analyzers.{Completeness, CountDistinct, Distinctness, Size, Uniqueness}
import com.amazon.deequ.analyzers.runners.AnalysisRunner
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.repository.ResultKey
import lexaneon.amazon.deequ.repository.influxdb.{InfluxDBConnectionProperties, InfluxDBMetricsRepository}
import org.apache.spark.sql.SparkSession

object InfluxDBMetricRepository extends App{

  val spark = initSpark()
  // here is our data file, which we use for this example, you can get your own file.
  val filePath = "src/main/resources/dataForExample/data.csv" 

  val df = spark.read.option("header", "true").csv(filePath).toDF()

  /**
  * "http://localhost:8086" - URL to InfluxDB, if you do not change - it should be the same
  * "example" - database name, which was created before
  * "InfluxDBMetricsRepository" - table (measurement) name which is used for data
  */
  val influxDBConnectionProperties = InfluxDBConnectionProperties("http://localhost:8086", "example", "InfluxDBMetricsRepository")

  /**
  * Object resultKey is information that uniquely identifies a AnalysisResult
  * @param dataSetDate A date related to the AnalysisResult
  * @param tags        A map with additional annotations, you can add your own tags, 
  * they will be stored in column (tag) with name: tags_
  */
  val resultKey = ResultKey(
    System.currentTimeMillis(),
    Map("dataSetFilePath" -> filePath, "dataSetName" -> "orders"))

  val analysisResult = AnalysisRunner
    .onData(df) // pass our DataFrame
    .useRepository(new InfluxDBMetricsRepository(influxDBConnectionProperties))
    .saveOrAppendResult(resultKey) 
    .addAnalyzer(Size()) // here we setup which metrics should be calculated
    .addAnalyzer(Distinctness("customer_id"))
    .addAnalyzer(CountDistinct("customer_id"))
    .addAnalyzer(Uniqueness(Seq("customer_id", "id")))
    .addAnalyzer(Uniqueness("id"))
    .addAnalyzer(Completeness("trans_date"))
    .addAnalyzer(Completeness("id"))
    .run()

  val metric = successMetricsAsDataFrame( spark, analysisResult)

  metric.show(false) // output to console

  spark.close()
  def initSpark(isLocalRun: Boolean = true): SparkSession = {
    val sparkSessionBuilder =
      SparkSession
        .builder
        .appName(this.getClass.getSimpleName)

    val spark =
      if (isLocalRun){
        sparkSessionBuilder
          .master("local[*]")
          .getOrCreate()
      }else
        sparkSessionBuilder.getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    spark.conf.set("spark.sql.session.timeZone", "UTC")

    spark
  }

}

when we execute InfluxDBMetricRepository object — we can go to the console where we connected to InfluxDB and check the results:

use example
select * from InfluxDBMetricsRepository


We have to see the same results:

If you run this example several times agains same data, then you will find results for each execution, btw at InfluxDB you can define retention period (how much time data should be stored).

At the IDE console output you find out something like this:

Now it is time to create a dashboard in Grafana. Open the link at your browser: http://localhost:3000/ (default user/password admin/admin)

  • let’s create a new Data Source for our InfluxDB. Go to configuration — > Data Sources — > Add data source
Name: InfluxDB
URL: http://influxdb:8086
Database: example
User: admin
Password: admin

Press “Save & Test”

Then, define query parameters, You can also play with other dashboard parameters to achieve the necessary results.

Conclusion

You can use Amazon Deequ not only as the library for Unit Tests but also for creating a regular Data quality monitoring process. For example, I’ve created an adapter for InfluxDB. However, you can create your own or use either my InfluxDB MetricRepository or FileSystemMetricRepository from Amazon (then process JSON files with some tool that you prefer more). Of course, you can build derived metrics and use them to visualize or even send notifications to data Stewart.

References

  • Amazon Deequ project page
  • Test data quality at scale with Deequ — link
  • InfluxDB MetricRepository & example — link
  • InfluxDB key concepts — link

Also published here.