Designing Robust Real‑Time Pipelines with Flink, Kafka, and an OLAP Store

Written by rohitmuthyala | Published 2026/01/07
Tech Story Tags: kafka | real-time-event-pipeline | flink | olap-store | union-and-load | pinot-+-warehouse | kafka-guide | kafka-tutorial

TLDRDesign a real-time event pipeline with Kafka, Flink, an OLAP store, and a warehouse where every event is processed only once.via the TL;DR App

Introduction:

If you’re building ad or billing pipelines, “close enough” is not good enough.

Every impression, click, and conversion event represents real money. Losing events under‑charges. Double‑counting over‑charges. Both are unacceptable.

At the same time, downstream systems (budget pacing, bidding, dashboards) want near real‑time data, not yesterday’s batch.

This post walks through a concrete architecture and implementation for:

  • Ingesting raw ad events into Kafka
  • Cleaning and aggregating them in near real‑time with Flink
  • Attributing conversions to ads
  • Powering low‑latency analytics in Pinot and long‑term reporting in a warehouse

…while maintaining exactly‑once semantics end‑to‑end.

The Problem: Money Events in Motion

Typical event types:

  • Impression – an ad was shown
  • Click – a user clicked
  • Conversion – an order, signup, or some billable event

We want to:

  • Ingest events from clients/services
  • Clean & dedupe noisy streams (retries, client bugs, late arrival)
  • Aggregate metrics (impressions, clicks, CTR) in small time buckets
  • Attribute conversions back to ads
  • Serve:
    • Low‑latency analytics for dashboards and internal services
    • Durable history for offline reporting and ML
  • With:
    • Sub‑minute to a few‑minute latency
    • No silent data loss
    • Exactly‑once accounting

Building Blocks

We’ll use:

  • Apache Kafka – durable event log, supports transactions + read_committed
  • Apache Flink – stream processing with exactly‑once state and sinks
  • Apache Pinot – real‑time OLAP store, low‑latency queries, upserts
  • Warehouse / lake (Hive / Iceberg / Delta) – long‑term history & SQL

You can swap components (e.g., Druid for Pinot, BigQuery/Snowflake for the warehouse); the patterns stay the same.

Event Model and Kafka Topics

We’ll assume Protobuf or Avro for schema‑managed messages. For simplicity, here’s a JSON/Avro‑style schema for the raw ad events topic:

// Topic: ad_events_raw
{
  "type": "record",
  "name": "AdEvent",
  "fields": [
    { "name": "event_id", "type": "string" },          // client-side UUID
    { "name": "user_id",  "type": ["null", "string"], "default": null },
    { "name": "ad_id",    "type": "string" },
    { "name": "campaign_id", "type": "string" },
    { "name": "event_type", "type": { "type": "enum", "name": "EventType",
      "symbols": ["IMPRESSION", "CLICK"] } },
    { "name": "ts",       "type": "long" },            // event time (ms)
    { "name": "region",   "type": "string" },
    { "name": "meta",     "type": ["null", { "type": "map", "values": "string" }],
      "default": null }
  ]
}

Kafka configs for transactional producers (Flink’s Kafka sinks will use this):

# Kafka broker config (cluster-wide)
transaction.state.log.min.isr=3
transaction.state.log.replication.factor=3

# Producer config (used by Flink Kafka sink)
enable.idempotence=true
acks=all
retries=10
max.in.flight.requests.per.connection=1
transactional.id=flink-ad-agg-job-1

High-Level Job Graph

We will have three Flink jobs:

  1. Aggregation Job
    • Consumes ad_events_raw
    • Cleans & dedupes
    • Aggregates into 1‑minute buckets
    • Writes aggregated metrics to ad_metrics_agg (Kafka) with record UUIDs
  2. Attribution Job
    • Consumes conversion_events (orders)
    • Looks up prior ad events
    • Emits attributed conversions to ad_conversions_attr
  3. Union & Load Job
    • Consumes ad_metrics_agg + ad_conversions_attr
    • Writes to Pinot (real‑time) and the warehouse

All Kafka ↔ Flink paths use exactly‑once semantics via Flink checkpoints + Kafka transactions.

We rely on three mechanisms:

  1. Flink’s exactly‑once state + two‑phase commit sink

In code (Scala/Java):

env.enableCheckpointing(60_000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000L);
env.getCheckpointConfig().setCheckpointTimeout(5 * 60_000L);
env.setStateBackend(new RocksDBStateBackend("hdfs://checkpoints/ad-pipelines"));

  1. Kafka transactions + read_committed

Flink’s KafkaSink writes in transactions that are committed when a checkpoint completes:

KafkaSink<AggregatedMetric> aggSink =
    KafkaSink.<AggregatedMetric>builder()
        .setBootstrapServers(kafkaBrokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("ad_metrics_agg")
            .setValueSerializationSchema(new AggregatedMetricSerde())
            .build())
        .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .setTransactionalIdPrefix("flink-ad-agg-")
        .build();

Downstream consumers (Pinot, ETL jobs) must use:

isolation.level=read_committed

  1. Record‑level idempotency

Aggregated/attributed records use stable IDs:

String recordId = UUID.nameUUIDFromBytes(
    (adId + "|" + minuteBucket + "|" + region).getBytes(StandardCharsets.UTF_8)
).toString();

We store this record_id field and use it as:

  • Pinot primary key (upsert)
  • Dedup key in the warehouse ETL

So if Flink replays, we just overwrite the same logical record.

Job 1: Aggregation – From Noisy Stream to Clean Buckets

Let’s look at the Flink job in more detail (Scala, but Java is similar).

1. Source and basic cleansing

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE)

val source: KafkaSource[AdEvent] =
  KafkaSource.builder[AdEvent]()
    .setBootstrapServers(kafkaBrokers)
    .setTopics("ad_events_raw")
    .setGroupId("ad-agg-job")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new AdEventDeserializer)
    .build()

val rawEvents: DataStream[AdEvent] = env.fromSource(
  source,
  WatermarkStrategy
    .forBoundedOutOfOrderness[AdEvent](java.time.Duration.ofSeconds(30))
    .withTimestampAssigner((e: AdEvent, _: Long) => e.ts),
  "ad_events_raw"
)

// Filter invalid / stale events
val validEvents = rawEvents
  .filter(e => e.adId != null && e.eventType != null)
  .filter(e => isRecent(e.ts))  // e.g., within 7 days

2. Deduplication with keyed state

We assume event_id is client‑side unique. We’ll keep a small TTL cache per user/ad to drop duplicates.

case class DedupedAdEvent(e: AdEvent)

val deduped: DataStream[AdEvent] = validEvents
  .keyBy(_.eventId)
  .process(new KeyedProcessFunction[String, AdEvent, AdEvent] {
    lazy val seen: ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("seen-ts", classOf[Long])
    )

    override def processElement(
        value: AdEvent,
        ctx: KeyedProcessFunction[String, AdEvent, AdEvent]#Context,
        out: Collector[AdEvent]): Unit = {

      val lastSeen = seen.value()
      if (lastSeen == 0L || isNewer(value.ts, lastSeen)) {
        seen.update(value.ts)
        out.collect(value)
      }
      // else: drop duplicate / older retry
    }
  })

In production, you’d use a more space‑efficient structure (Bloom filter, TTL, etc.), but this illustrates the idea.

3. Aggregation into 1‑minute buckets

Define a case class for the aggregate:

case class AdMetricAgg(
  recordId: String,
  adId: String,
  campaignId: String,
  region: String,
  minuteBucket: Long,
  impressions: Long,
  clicks: Long,
  updatedAt: Long
)

Aggregation:

val keyed = deduped
  .map(e => (e, truncateToMinute(e.ts)))
  .keyBy { case (e, minuteBucket) => (e.adId, minuteBucket, e.region) }

val aggregated: DataStream[AdMetricAgg] =
  keyed
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction[(AdEvent, Long), AdMetricAgg, (String, Long, String), TimeWindow] {
      override def process(
          key: (String, Long, String),
          context: Context,
          elements: Iterable[(AdEvent, Long)],
          out: Collector[AdMetricAgg]): Unit = {

        val (adId, minuteBucket, region) = key
        var imps = 0L
        var clicks = 0L

        elements.foreach { case (e, _) =>
          e.eventType match {
            case IMPRESSION => imps += 1
            case CLICK      => clicks += 1
          }
        }

        val recordId = UUID.nameUUIDFromBytes(
          (adId + "|" + minuteBucket + "|" + region).getBytes(StandardCharsets.UTF_8)
        ).toString

        out.collect(AdMetricAgg(
          recordId = recordId,
          adId = adId,
          campaignId = elements.head._1.campaignId,
          region = region,
          minuteBucket = minuteBucket,
          impressions = imps,
          clicks = clicks,
          updatedAt = System.currentTimeMillis()
        ))
      }
    })

4. Transactional sink to Kafka

val aggSink: KafkaSink[AdMetricAgg] =
  KafkaSink.builder[AdMetricAgg]()
    .setBootstrapServers(kafkaBrokers)
    .setRecordSerializer(
      KafkaRecordSerializationSchema.builder[AdMetricAgg]()
        .setTopic("ad_metrics_agg")
        .setValueSerializationSchema(new AdMetricAggSerializer)
        .build()
    )
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-ad-agg-")
    .build()

aggregated.sinkTo(aggSink)

env.execute("ad-aggregation-job")

At this point, we have exactly‑once, deduped, 1‑minute aggregates in Kafka.

Job 2: Attribution – Connecting Orders to Ads

We’ll assume a separate Kafka topic conversion_events:

// Topic: conversion_events
{
  "conversion_id": "string",
  "user_id": "string",
  "order_id": "string",
  "amount": "double",
  "ts": "long",
  "region": "string"
}

We need to join conversions with prior ad events. The simplest design is:

  • Write clean ad events into a TTL‑backed KV store (ad_event_id → context)
  • In the attribution job, for each conversion:
    • Look up matching ad events
    • Emit attributed conversion records with deterministic IDs

Illustrative Flink snippet (pseudocode for KV lookup):

case class AttributedConversion(
  recordId: String,
  adId: String,
  campaignId: String,
  conversionId: String,
  orderId: String,
  amount: Double,
  minuteBucket: Long,
  region: String,
  ts: Long
)

val conversions: DataStream[ConversionEvent] = // from Kafka

val attributed: DataStream[AttributedConversion] =
  conversions
    .process(new ProcessFunction[ConversionEvent, AttributedConversion] {
      override def processElement(
          value: ConversionEvent,
          ctx: Context,
          out: Collector[AttributedConversion]): Unit = {

        // Example: lookup by user_id in external KV
        val adEvents: List[AdEvent] = adEventStore.lookupByUser(value.userId)

        adEvents.foreach { ad =>
          val minuteBucket = truncateToMinute(value.ts)
          val idBytes = (ad.adId + "|" + value.orderId + "|" + minuteBucket).getBytes("UTF-8")
          val recordId = UUID.nameUUIDFromBytes(idBytes).toString

          out.collect(AttributedConversion(
            recordId = recordId,
            adId = ad.adId,
            campaignId = ad.campaignId,
            conversionId = value.conversionId,
            orderId = value.orderId,
            amount = value.amount,
            minuteBucket = minuteBucket,
            region = value.region,
            ts = value.ts
          ))
        }
      }
    })

Then we sink attributed to a Kafka topic ad_conversions_attr with the same EXACTLY_ONCE sink pattern.

Job 3: Union & Load – Pinot + Warehouse

Pinot real-time table config (upsert)

Pinot table (simplified):

{
  "tableName": "ad_metrics_rt",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "minuteBucket",
    "timeType": "MILLISECONDS",
    "replication": "3"
  },
  "fieldConfigList": [],
  "schemaName": "ad_metrics_rt",
  "ingestionConfig": {
    "transformConfigs": [],
    "streamIngestionConfig": {
      "type": "kafka",
      "config": {
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.topic.name": "ad_metrics_agg",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.prop.isolation.level": "read_committed"
      }
    }
  },
  "upsertConfig": {
    "mode": "FULL",
    "primaryKeyColumns": ["recordId"]
  }
}

Now, each aggregate (and later updates) will upsert on recordId, ensuring idempotency.

Warehouse ETL (dedupe on recordId)

If you stage Kafka data into a lake table (e.g., Iceberg), you can dedupe like:

CREATE TABLE ad_metrics_cleaned AS
SELECT *
FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (PARTITION BY recordId ORDER BY updatedAt DESC) AS rn
  FROM ad_metrics_staging
)
WHERE rn = 1;

This preserves exactly one row per logical record, even if Flink had to replay some windows.

Latency and Trade-Offs

Rough latency components:

  • Kafka ingest: ms–seconds
  • Flink window + checkpoint: e.g., 1 min window + 60s checkpoint interval
  • Pinot ingestion: seconds

You can push this down by:

  • Shorter windows
  • More frequent checkpoints
  • Tuning Kafka/Pinot flush intervals

Trade-offs:

  • Lower latency = more overhead, more small files/segments
  • Larger windows = less overhead, but slower feedback

Reliability comes from:

  • Kafka retention for replay
  • Flink checkpoints for state + offsets
  • Idempotent downstream ingestion via primary keys and read_committed

Exactly‑once doesn’t mean “no failures”; it means when you recover, your numbers are consistent.

Takeaways

If you’re designing ad, billing, or any money‑sensitive event pipeline:

  • Use Flink + Kafka transactions with EXACTLY_ONCE and read_committed
  • Assign stable record IDs and treat downstream streams as upsert logs
  • Keep aggregation / attribution / serving as separate, testable jobs
  • Use an OLAP store (Pinot/Druid) with upsert on primary key for real‑time analytics
  • Deduplicate on those same IDs in your warehouse/lake

The key is to design for exactly‑once at every hop:

  • Source → Flink: Kafka offsets + checkpoints
  • Flink → Kafka: transactional sinks
  • Kafka → OLAP/warehouse: primary keys + idempotent ingestion



Written by rohitmuthyala | Principal Software Engineer, specializing in petabyte-scale data platforms, ML, and entity resolution systems.
Published by HackerNoon on 2026/01/07