paint-brush
Your Definitive Guide to Lakehouse Architecture with Iceberg and MinIOby@minio
9,095 reads
9,095 reads

Your Definitive Guide to Lakehouse Architecture with Iceberg and MinIO

by MinIOAugust 8th, 2023
Read on Terminal Reader
Read this story w/o Javascript

Too Long; Didn't Read

Apache Iceberg seems to have taken the data world by storm. Initially incubated at Netflix by Ryan Blue, it was eventually transmitted to the Apache Software Foundation where it currently resides. At its core it is an open table format for at-scale analytic data sets (think hundreds of TBs to hundreds of PBs). It is a multi-engine compatible format. What that means is that Spark, Trino, Flink, Presto, Hive, and Impala can all operate independently and simultaneously on the data set. It supports the lingua franca of data analysis, SQL, as well as key features like full schema evolution, hidden partitioning, time travel, and rollback and data compaction. This post focuses on how Iceberg and MinIO complement each other and how various analytic frameworks (Spark, Flink, Trino, Dremio, and Snowflake) can leverage the two.
featured image - Your Definitive Guide to Lakehouse Architecture with Iceberg and MinIO
MinIO HackerNoon profile picture

Apache Iceberg seems to have taken the data world by storm. Initially incubated at Netflix by Ryan Blue, it was eventually transmitted to the Apache Software Foundation where it currently resides. At its core it is an open table format for at-scale analytic data sets (think hundreds of TBs to hundreds of PBs).


It is a multi-engine compatible format. What that means is that Spark, Trino, Flink, Presto, Hive, and Impala can all operate independently and simultaneously on the data set. It supports the lingua franca of data analysis, SQL, as well as key features like full schema evolution, hidden partitioning, time travel, and rollback and data compaction.


This post focuses on how Iceberg and MinIO complement each other and how various analytic frameworks (Spark, Flink, Trino, Dremio, and Snowflake) can leverage the two.

Background

While Apache Hive was a major step forward for its time, ultimately it began to show cracks as analytics applications became more numerous, diverse, and sophisticated. To achieve performance, data needed to remain in the directories and those directories needed to be constantly managed. This led to a database of directories. That solved the problem of where the data was, but it introduced the problem of what the state was for that table — which was now in two places (database of directories and the file system).


This limited what you could do and what flexibility existed — specifically with regard to changes, which couldn’t be guaranteed in both places with a single operation.


Imagine large amounts of multi-year data partitioned on date. Years partitioned into months and weeks and, if weeks are partitioned into days, and days into hours and so on — the directory listing explodes. The Hive Metastore (HMS) is a transactional RDBMS. The file system (HDFS) is non-transactional. When partition information is changed it requires the recreation of both the partition store and file system.


The problem was unsustainable and no amount of patching was going to resolve the inherent issues. In fact, the challenges were only accelerating with data growth.

Goals for a Modern Open Table Format

One of the key selling points around data lakehouse architecture is that it supports multiple analytical engines and frameworks. For example, you need to support both ELT (Extract, Load, Transform) and ETL (Extract, Transform, Load). You need to support business intelligence, business analytics, and AI/ML types of workloads. You need to successfully interface with the same set of tables in a safe and predictable manner. This means multiple engines like Spark, Flink, Trino, Arrow and Dask all need to be in some way tied into a cohesive architecture.


A multi-engine platform that houses data efficiently while enabling each engine to be successful is what the analytical world has been yearning for, and what Iceberg and Data Lakehouse architectures deliver.


This is not simple and there are a lot of challenges with it; there is no easy way to use multiple engines with reliable update of the data. But even now that we have two or three formats that provide reliable updates there is still a lot of confusion and there are problems in this area.



Modern requirements look like this:


  1. Central Table Storage:  Storing the data independent of the compute becomes a critical architectural decision. The reason it matters is because data has gravity and it pulls us toward the data’s location. So, if our data is entirely in one vendor or cloud provider then we are tied only to that vendor or cloud provider. This is inherently problematic when those systems are closed or specialized in design. Open software becomes the requirement for modern architectures.


  2. Portable Compute: Another modern requirement is the ability to take your compute engines to a different vendor/cloud provider or leverage specialized compute engines. While many focus on the center of gravity (data), the enterprise also needs portability for logic, code, and SQL.


  3. Access Control: Most enterprises have a huge challenge to have a consistent authorization policy across engines. It is more than just architecture, however, as successful and repeatable enforcement of these policies across multiple engines becomes an operational imperative.


  4. Maintain Structure: One of the biggest sources of human work that we have seen over the last few years is losing the data structure as it is moved elsewhere. A perfect example used to be Snowflake. The process of moving data to Snowflake was a manual one and the introduction of third-party data sets also resulted in rework due to different file formats and changes in formats during movement.

Apache Iceberg to the Rescue

Apache Iceberg is designed from the ground up with most of the challenges and goals mentioned above as the basis to implement an open table format. It address the following challenges:


  1. Flexible compute
    • Don't move data; multiple engines should work seamlessly

    • Support batch, streaming, and ad hoc jobs

    • Support code from many languages, not just JVM frameworks


  2. SQL Warehouse behavior
    • Reliable transactions with SQL tables where we have the ability to perform CRUD operations reliably

    • Separating concerns from real tables provides that segregation


Apache Iceberg keeps its records in object storage — unlike Apache Hive. Iceberg enables SQL behavior to be leveraged by multiple engines and it is designed for huge tables. In production, where a single table can contain tens of petabytes of data, this matters greatly. Even multi-petabyte tables can be read from a single node, without needing a distributed SQL engine to sift through table metadata.


Source: https://iceberg.apache.org/spec/


Iceberg has an unwritten rule, to be invisible when being used in the Big Data stack. This philosophy comes from the SQL table space, where we never think of what is underneath the SQL tables. As any practitioner knows, this is simply not the case when working with Hadoop and Hive-like tables.


Iceberg keeps it simple in two ways. First, avoid unpleasant surprises when changes are made to tables. For example, a change should never bring back data that was deleted and removed. Second, Iceberg reduces context switching as what is underneath the table doesn’t matter — what matters is the work to be done.

Understanding Iceberg FileIO

FileIO is the interface between the core Iceberg library and underlying storage. FileIO was created as a way for Iceberg to function in a world where distributed compute and storage are disaggregated. The legacy Hadoop ecosystem requires the hierarchical pathing and partition structures that are, in practice, the exact opposite of the methods used to achieve speed and scale in the object storage world.


Hadoop and Hive are anti-patterns for high-performance and scalable cloud-native object storage. Data lake applications that rely on the S3 API to interact with MinIO can easily scale to thousands of transactions per second on millions or billions of objects. You can increase read and write performance by processing multiple concurrent requests in parallel. You accomplish this by adding prefixes — a string of characters that is a subset of an object name, starting with the first character — to buckets and then writing parallel operations, each opening a connection per prefix.


In addition, Hadoop's dependency on file system directories does not translate to object storage – it’s hard to physically organize data sets into different directories and address them by path when paths don’t exist. Hadoop relies on a filesystem to define the data set and provide locking mechanisms for concurrency and conflict resolution. In addition, in the Hadoop ecosystem, jobs that process rename operations must be atomic. This is not possible using the S3 API as renames are really two operations: copy and delete. Unfortunately, the result is that there is no isolation between read and write, possibly giving rise to conflicts, collisions, and inconsistencies.


In contrast, Iceberg was designed to run completely abstracted from physical storage using object storage. All locations are “explicit, immutable, and absolute” as defined in metadata. Iceberg tracks the full state of the table without the baggage of referencing directories. It’s dramatically faster to use metadata to find a table than it would be to list the entire hierarchy using the S3 API. There are no renames — a commit simply adds new entries to the metadata table.


The FileIO API performs metadata operations during the planning and commit phases. Tasks use FileIO to read and write the underlying data files, and the locations of these files are included in the table metadata during a commit. Exactly how the engine does this depends on the implementation of FileIO. For legacy environments, HadoopFileIO serves as an adapter layer between an existing Hadoop FileSystem implementation and the FileIO API within Iceberg.


We’re going to focus instead on S3FileIO because it is a native S3 implementation. We don’t need to carry Hadoop cruft with us when we build our cloud-native lakehouse. According to Iceberg FileIO: Cloud Native Tables, the advantages of a native S3 implementation include:


  • Contract Behavior: Hadoop FileSystem implementations have strict contract behavior resulting in additional requests (existence checks, deconflict directories and paths) which add overhead and complexity. Iceberg uses fully addressable and unique paths which avoids additional complexity.

  • Optimized Uploads: S3FileIO optimizes for storage/memory by progressively uploading data to minimize disk consumption for large tasks and preserves low memory consumption when multiple files are open for output.

  • S3 Client Customization: the client uses the latest major AWS SDK version (v2) and allows for users to fully customize the client for use with S3 (including any S3 API compatible endpoint).

  • Serialization Performance: Task processing with HadoopFileIO requires serialization of the Hadoop configuration, which is quite large and in degenerate cases can slow down processing and result in more overhead than data processed.

  • Reduced Dependencies: Hadoop FileSystem implementations introduce a large dependency tree and a simplified implementation reduces overall packaging complexity.


Iceberg provides integration with different AWS services through the iceberg-aws module, bundled with Spark and Flink runtimes for all versions from 0.11.0 onwards. Iceberg allows users to write data to S3 through S3FileIO. When using S3FileIO, catalogs are configured to use the S3 API using the io-impl catalog property. S3FileIO adopts the latest S3 features for optimized security (S3 access control lists, all three S3 server side encryption modes) and performance (progressive multipart uploads) and is therefore recommended for object storage use cases.

Iceberg and MinIO Tutorial

At this time, Spark is the most feature-rich compute engine for working with Iceberg, so this tutorial focuses on using Spark and Spark-SQL to understand Iceberg concepts and features. On Ubuntu 20.04, we will install and configure Java, PostgreSQL as a catalog or metadata pointer, Spark, and MinIO – while carefully downloading and configuring the Java dependencies. Then we’ll run Spark-SQL to create, populate, query and modify a table. We’ll also walk through some of the awesome things that you can do with Iceberg, such as schema evolution, working with hidden partitions, time travel and rollback. After each step, we include a screenshot of the Iceberg bucket in MinIO so you can see what’s going on behind the scenes.

Prerequisites

Download and startMinIO Server. Record the IP address, TCP port, access key, and secret key.
Download and install MinIO Client.


Use the MinIO Client to set an alias and create a bucket for Iceberg


mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key>  <your-MinIO-secret-key>
mc mb minio/iceberg
Bucket created successfully `myminio/iceberg`.


You will need to download and configure Spark to use the required Java Archives (JARs) in order to enable various functionalities like Hadoop, AWS S3, and JDBC. You will also need to have the right version of each required JAR and configuration file in PATH and CLASSPATH. It is, unfortunately, very easy to invoke different versions of JARs and lose track of which JAR you’re running and therefore encounter show-stopping incompatibilities.


Install Java Runtime if you have not done so already. For Ubuntu 20.04, the command is


sudo apt install curl mlocate default-jdk -y


Download and configure PostgreSQL to run as a system service


sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get -y install postgresql
sudo systemctl start postgresql.service


We’ll create a role icebergcat as a superuser, set the password and create a database icebergcat


sudo -u postgres createuser --interactive
ALTER ROLE icebergcat PASSWORD 'minio';
sudo -u postgres createdb icebergcat


Login to the database to verify its working, you’ll be prompted for the password:


psql -U icebergcat -d icebergcat -W -h 127.0.0.1


Download, extract, and move Apache Spark


$ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
$ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz
$ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark


Set the Spark environment by adding the following to ~/.bashrc and then restarting the shell to apply the changes.


export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
bash -l


The following .jar files are required. Download and copy the .jar files in any required location on the Spark machine, for example /opt/spark/jars.


aws-java-sdk-bundle/1.11.901.jar (or higher) is needed to support the S3 protocol.


$ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar


iceberg-spark-runtime-3.2_2.12.jar is required.


$ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar


Start Spark

Start a Spark standalone master server


$ start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out


Open a browser and go to http://Your-IPaddress:7077



Spark is alive at spark://<Your-Machine-Name>:7077


Start a Spark worker process

$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077
starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out

Spark-SQL and Iceberg

Initialize the environment before launching Spark-SQL.


export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin
export AWS_S3_ENDPOINT=10.0.0.10:9000
export AWS_REGION=us-east-1
export MINIO_REGION=us-east-1
export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2"
export AWS_SDK_VERSION=2.17.230
export AWS_MAVEN_GROUP=software.amazon.awssdk
export AWS_PACKAGES=(
"bundle"
"url-connection-client"
)
for pkg in "${AWS_PACKAGES[@]}"; do
export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION"
done


Run the following command to launch Spark-SQL with Iceberg using PostgreSQL for metadata and support for the S3 API, required for MinIO. Alternatively, you can set configuration using your local spark-defaults.conf file


$ spark-sql --packages $DEPENDENCIES \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \
--conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \
--conf spark.sql.catalog.my_catalog.jdbc.password=minio \
--conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.defaultCatalog=my_catalog \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/home/iceicedata/spark-events \
--conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \
--conf spark.sql.catalogImplementation=in-memory


A few important notes about this configuration


  • We declare a catalog my_catalog that uses JDBC to connect to PostgreSQL on an internal IP address and use the icebergcat table for metadata.
  • We then set our warehouse location to the MinIO bucket we created earlier and configured Iceberg to use S3FileIO to access it.

Creating a Table

Next, we’ll create a simple table.


CREATE TABLE my_catalog.my_table (
id bigint,
data string,
category string)
USING iceberg
LOCATION 's3://iceberg'
PARTITIONED BY (category);


Here’s a massive performance improvement that Iceberg offers with S3FileIO. It comes as a great relief to those of us who have suffered through slow performance when using a traditional Hive storage layout with S3 as a result of throttling requests based on object prefix. It’s no secret that creating a partitioned Athena/Hive table on AWS S3 can take 30-60 minutes. Iceberg by default uses the Hive storage layout, but can be switched to use the ObjectStoreLocationProvider.


With ObjectStoreLocationProvider, a deterministic hash is generated for each stored file, with the hash appended directly after the write.data.path. This ensures files written to S3-compatible object storage are equally distributed across multiple prefixes in the S3 bucket, resulting in minimum throttling and maximum throughput for S3-related IO operations. When using ObjectStoreLocationProvider, having a shared and short write.data.path across your Iceberg tables will improve performance. Much more has been done in Iceberg to improve performance and reliability over Hive.


CREATE TABLE my_catalog.my_table (
    id bigint,
    data string,
    category string)
USING iceberg
OPTIONS (
    'write.object-storage.enabled'=true, 
    'write.data.path'='s3://iceberg')
PARTITIONED BY (category);


Looking at MinIO Console, we see that a path was created under our iceberg bucket for my_table



The bucket contains a metadata path



At this point, there’s no data in the table, there is only metadata describing the table. There is also a pointer to this metadata stored in the Iceberg catalog table in PostgreSQL. Spark-SQL (the query engine) searches the Iceberg catalog ( my_catalog) by table name (my_table), and retrieves the URI to the current metadata file.



Let’s take a look at the first metadata file, where information about the table’s schema, partitions, and snapshots is stored. While all snapshots are defined, the current-snapshot-id tells the query engine which snapshot to use, then the query engine searches for that value in the snapshots array, obtains the value of that snapshot’s manifest-list and opens the manifest files in that list, in order. Note that our example only has one snapshot because the table was just created, and no manifest because we have not yet inserted data.


{
  "format-version" : 1,
  "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363",
  "location" : "s3://iceberg/my_table",
  "last-updated-ms" : 1658795119167,
  "last-column-id" : 3,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "data",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "category",
      "required" : false,
      "type" : "string"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "data",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "category",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "partition-spec" : [ {
    "name" : "category",
    "transform" : "identity",
    "source-id" : 3,
    "field-id" : 1000
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "category",
      "transform" : "identity",
      "source-id" : 3,
      "field-id" : 1000
    } ]
  } ],
  "last-partition-id" : 1000,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "option.write.data.path" : "s3://iceberg/my_table",
    "owner" : "msarrel",
    "option.write.object-storage.enabled" : "true",
    "write.data.path" : "s3://iceberg/my_table",
    "write.object-storage.enabled" : "true"
  },

  "current-snapshot-id" : -1,
  "snapshots" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}


Next, let’s insert some mock data and watch the files that Iceberg stores in MinIO. Inside the iceberg bucket, there are now my_table/metadata and my_table/data prefixes.


INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video");



The metadata prefix contains the original metadata file, a manifest list, and manifest files. The manifest list is — you guessed it — a list of manifest files. The manifest list contains information about each manifest file that is included in each snapshot: the location of the manifest file, the snapshot it was added as a result of, information about partitioning and the lower and upper bounds for partition columns of related data files. During a query, the query engine reads the value of manifest file locations from the manifest list and opens the appropriate manifest files. The manifest list is in AVRO format.


Manifest files track data files and include details and pre-calculated statistics about each file. The first thing that’s tracked is file format and location. Manifest files are how Iceberg does away with Hive-style tracking data by filesystem location. Manifest files improve the efficiency and performance of reading data files by including details like partition membership, record count, and the lower and upper bounds of each column. The statistics are written during write operations and are more likely to be timely, accurate and up-to-date than Hive statistics.



When a SELECT query is submitted, the query engine obtains the location of the manifest list from the metadata database. Then the query engine reads the value of the file-path entries for each data-file object and then opens the data files to execute the query.


Shown below are the contents of the data prefix, organized by partition.



Inside the partition, there is a data file per table row.



Let’s run an example query:

spark-sql> SELECT count(1) as count, data
FROM my_catalog.my_table
GROUP BY data;
1       a
1       b
1       c
Time taken: 9.715 seconds, Fetched 3 row(s)
spark-sql>


Now that we understand the different components of an Iceberg table and how the query engine works with them, let’s dive into Iceberg’s best features and how to leverage them in your data lake.

Table Evolution

Schema evolution changes like Add, Drop, Rename, and Update are metadata changes, which means no data files need to be changed/rewritten to perform updates. Iceberg also guarantees that these schema evolution changes are independent and free from side effects. Iceberg uses unique Id’s to track each column in a table with this if a new column is added it would never leverage an existing Id by mistake.


Iceberg table partitions can be updated in an existing table because queries do not reference partition values directly. When new data is written it uses a new spec in a new layout, previously written data with a different spec remains unchanged. This causes split planning when you write new queries. To improve performance, Iceberg uses hidden partitioning so users don’t need to write queries for a specific partition layout to be fast. Users focus on writing queries for the data they need and let Iceberg prune files that do not contain the matching data.


Another evolution that is very useful is that Iceberg sort order can also be updated in the existing table just as the partition spec. Different engines can choose to write data in the latest sort order on unsorted order when sorting is prohibitively expensive, the old data written with previous sort order remains unchanged.


spark-sql> ALTER TABLE my_catalog.my_table
> RENAME my_catalog.my_table_2;


The first few times you do this, you’ll be blown away by how fast it is. This is because you are not rewriting a table, you’re simply operating on metadata. In this case we’ve only changed table_name and Iceberg did this for us in about one-tenth of a second.



Other schema changes are equally painless:

spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity;
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity;
spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;

Partitions

As we mentioned earlier, partitions are supported by other hive formats, however Iceberg supports hidden partitioning that can handle the tedious and error-prone tasks of producing partition values for rows in a table. Users focus on adding filters to the queries that solve business problems and not worry about how the table is partitioned. Iceberg takes care of avoiding reads from unnecessary partitions automatically.


Iceberg handles the intricacies of partitioning and changing the partition scheme of a table for you, greatly simplifying the process for end users. You can define partitioning or let Iceberg take care of it for you. Iceberg likes to partition on a timestamp, such as event time. Partitions are tracked by snapshots in manifests. Queries no longer depend on a table’s physical layout. Because of this separation between physical and logical tables, Iceberg tables can evolve partitions over time as more data is added. For example, repartitioning a Hive table would require creating a new table and reading old data into it. You would also have to change the PARTITION value in every single query you’ve already written — not fun.


spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category;
ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;


We now have two partitioning schemes for the same table. What was impossible in Hive has taken place transparently in Iceberg. From now on, query plans are split, using the old partition scheme to query old data, and the new partition scheme to query new data. Iceberg takes care of this for you — people querying the table don’t need to know that data is stored using two partition schemes. Iceberg does this through a combination of behind-the-scenes WHERE clauses and partition filters that prune out data files without matches.

Time Travel and Rollback

Every write to Iceberg tables creates new snapshots. Snapshots are like the versions and can be used to time travel and rollback just like the way we do with MinIO versioning capabilities. The way snapshots are managed is by setting expireSnapshot so the system is maintained well. Time travel enables reproducible queries that use exactly the same table snapshot, or lets users easily examine changes. Version rollback allows users to quickly correct problems by resetting tables to a good state.


As tables are changed, Iceberg tracks each version as a snapshot and then provides the ability to time travel to any snapshot when querying the table. This can be very useful if you want to run historical queries or reproduce the results of previous queries, perhaps for reporting. Time travel can also be helpful when testing new code changes because you can test new code with a query of known results.


To see the snapshots that have been saved for a table:

spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
2022-07-25 17:26:47.53  527713811620162549      NULL    append  s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro  {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"}
Time taken: 7.236 seconds, Fetched 1 row(s)


Some examples:

-- time travel to October 26, 1986 at 01:21:00
spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00';
-- time travel to snapshot with id 10963874102873
spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;


You can do incremental reads using snapshots, but you must use Spark, not Spark-SQL. For example:

scala> spark.read()
.format(“iceberg”)
.option(“start-snapshot-id”, “10963874102873”)
.option(“end-snapshot-id”, “10963874102994”)
.load(“s3://iceberg/my_table”)


You can also roll back the table to a point in time or to a specific snapshot, as in these two examples:

spark-sql> CALL my_catalog.system.rollback_to_timestamp(‘my_table’, TIMESTAMP ‘2022-07-25 12:15:00.000’); 
spark-sql> CALL my_catalog.system.rollback_to_snapshot(‘my_table’, 527713811620162549);

Expressive SQL

Iceberg supports all the expressive SQL commands like row level delete, merge, and update, and the biggest thing to highlight is that Iceberg supports both Eager and lazy strategies. We can encode all the things we need to delete (for example, GDPR or CCPA,) but not go rewrite all those data files immediately, we can lazily collect garbage as needed and that really helps in efficiency on the huge tables supported by Iceberg.


For example, you can delete all records in a table that match a specific predicate. The following will remove all rows from the video category:

spark-sql> DELETE FROM my_catalog.my_table WHERE category = ‘video’;


Alternatively, you could use CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT to accomplish this:

spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = ‘music’; 


You can merge two tables very easily:

spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;

Data Engineering

Iceberg is the foundation for open analytic table standard and uses SQL behavior and a real table abstraction unlike the other hive table formats and applies the data warehouse fundamentals to fix the problems before we know we have it. With declarative data engineering we can configure tables and not worry about changing each engine to fit the needs of the data. This unlocks automatic optimization and recommendations. With safe commits, data services are possible which helps in avoiding humans babysitting data workloads.


To inspect a table’s history, snapshots and other metadata, Iceberg supports querying metadata. Metadata tables are identified by adding the metadata table name (for example, history) after the original table name in your query.


To display a table’s data files:

spark-sql> SELECT * FROM my_catalog.my_table.files;


To display manifests:

spark-sql> SELECT * FROM my_catalog.my_table.manifests;


To display table history:

spark-sql> SELECT * FROM my_catalog.my_table.history;


To display snapshots:

spark-sql> SELECT * FROM my_catalog.my_table.snapshots;


You can also join snapshots and table history to see the application that wrote each snapshot:

spark-sql> select
              h.made_current_at,
              s.operation,
              h.snapshot_id,
              h.is_current_ancestor,
              s.summary['spark.app.id']
           from my_catalog.my_table.history h
           join my_catalog.my_table.snapshots s
              on h.snapshot_id = s.snapshot_id
           order by made_current_at;


Now that you’ve learned the basics, load some of your data into Iceberg, then learn more from the Spark and Iceberg Quickstart and the Iceberg Documentation.

Integrations

Apache Iceberg has integrations with various query and execution engines, where the Apache Iceberg tables can be created and managed by these connectors. The engines that support Iceberg are SparkFlinkHivePresto ,TrinoDremioSnowflake.

It’s Cool to Build Data Lakes with Iceberg and MinIO

Apache Iceberg gets a lot of attention as a table format for data lakes. The growing open source community and increasing number of integrations from multiple cloud providers and application frameworks means that it’s time to take Iceberg seriously, start experimenting, learning, and planning on integrating it into existing data lake architecture. Pair Iceberg with MinIO for multi-cloud data lakes and analytics.


As you get started with Iceberg and MinIO, please reach out and share your experiences or ask questions through our Slack channel.


Also published here.