Apache Flink monitoring support was recently made available in the open source OpenTelemetry (OTel) collector. You can check out the OpenTelemetry repo here! You can utilize this receiver in conjunction with any OTel collector: including the OpenTelemetry Collector and observIQ’s distribution of the collector. Below are quick instructions for setting up observIQ’s OpenTelemetry distribution, and shipping Apache Flink telemetry to a backend, in our case, we are using Google Cloud Ops.
You can find out more on observIQ’s GitHub page: https://github.com/observIQ/observiq-otel-collector
Apache Flink is an open source, unified batch processing, and stream processing framework. The Apache Flink collector records 29 unique metrics, so there is a lot of data to pay attention to. Some specific metrics that users find valuable are:
If you don’t already have an OpenTelemetry collector built with the latest Apache Flink receiver installed, you’ll need to do that first. I suggest using the observIQ OpenTelemetry Collector distro that includes the Apache Flink receiver (and many others) and is simple to install with a one-line installer for Linux, Windows, MacOS. For how to deploy the collector on Kubernetes, there’s further documentation on the observiq-otel-collector-k8s repository.
Navigate to your OpenTelemetry configuration file. If you’re using the observIQ Collector, you’ll find it in one of the following location:
For the observIQ OpenTelemetry Collector, edit the configuration file to include the Apache Flink receiver as shown below:
receivers:
flinkmetrics:
endpoint: http://localhost:8081
collection_interval: 10s
Processors:
nop:
# Resourcedetection is used to add a unique (host.name)
# to the metric resource(s),... target_key: namespace
exporters:
nop:
# Add the exporter for your preferred destination(s)
service:
pipelines:
metrics:
receivers: [flinkmetrics]
processors: [nop]
exporters: [nop]
If you’re using the Google Ops Agent instead, you can find the relevant config file here.
If you followed the steps detailed above, the following Apache Flink metrics will now be delivered to your preferred destination.
Metric |
Description |
---|---|
flink.jvm.cpu.load |
The CPU usage of the JVM for a jobmanager or taskmanager. |
flink.jvm.cpu.time |
The CPU time used by the JVM for a jobmanager or taskmanager. |
flink.jvm.memory.heap.used |
The amount of heap memory currently used. |
flink.jvm.memory.heap.committed |
The amount of heap memory guaranteed to be available to the JVM. |
flink.jvm.memory.heap.max |
The maximum amount of heap memory that can be used for memory management. |
flink.jvm.memory.nonheap.used |
The amount of non-heap memory currently used. |
flink.jvm.memory.nonheap.committed |
The amount of non-heap memory guaranteed to be available to the JVM. |
flink.jvm.memory.nonheap.max |
The maximum amount of non-heap memory that can be used for memory management. |
flink.jvm.memory.metaspace.used |
The amount of memory currently used in the Metaspace memory pool. |
flink.jvm.memory.metaspace.committed |
The amount of memory guaranteed to be available to the JVM in the Metaspace memory pool. |
flink.jvm.memory.metaspace.max |
The maximum amount of memory that can be used in the Metaspace memory pool. |
flink.jvm.memory.direct.used |
The amount of memory used by the JVM for the direct buffer pool. |
flink.jvm.memory.direct.total_capacity |
The total capacity of all buffers in the direct buffer pool. |
flink.jvm.memory.mapped.used |
The amount of memory used by the JVM for the mapped buffer pool. |
flink.jvm.memory.mapped.total_capacity |
The number of buffers in the mapped buffer pool. |
flink.memory.managed.used |
The amount of managed memory currently used. |
flink.memory.managed.total |
The total amount of managed memory. |
flink.jvm.threads.count |
The total number of live threads. |
flink.jvm.gc.collections.count |
The total number of collections that have occurred. |
flink.jvm.gc.collections.time |
The total time spent performing garbage collection. |
flink.jvm.class_loader.classes_loaded |
The total number of classes loaded since the start of the JVM. |
flink.job.restart.count |
The total number of restarts since this job was submitted, including full restarts and fine-grained restarts. |
flink.job.last_checkpoint.time |
The end to end duration of the last checkpoint. |
flink.job.last_checkpoint.size |
The total size of the last checkpoint. |
flink.job.checkpoint.count |
The number of checkpoints completed or failed. |
flink.job.checkpoint.in_progress |
The number of checkpoints in progress. |
flink.task.record.count |
The number of records a task has. |
flink.operator.record.count |
The number of records an operator has. |
flink.operator.watermark.output |
The last watermark this operator has emitted. |