Apache Iceberg semble avoir pris d'assaut le monde des données. Initialement incubé chez Netflix par Ryan Blue, il a finalement été transmis à l'Apache Software Foundation où il réside actuellement. À la base, il s'agit d'un format de tableau ouvert pour les ensembles de données analytiques à grande échelle (pensez à des centaines de To à des centaines de Po).
C'est un format compatible avec plusieurs moteurs. Cela signifie que Spark, Trino, Flink, Presto, Hive et Impala peuvent tous fonctionner indépendamment et simultanément sur l'ensemble de données. Il prend en charge la lingua franca de l'analyse de données, SQL, ainsi que des fonctionnalités clés telles que l'évolution complète du schéma, le partitionnement caché, le voyage dans le temps, la restauration et le compactage des données.
Cet article se concentre sur la façon dont Iceberg et MinIO se complètent et comment divers cadres analytiques (Spark, Flink, Trino, Dremio et Snowflake) peuvent tirer parti des deux.
Alors qu'Apache Hive était une avancée majeure pour son époque, il a finalement commencé à montrer des fissures à mesure que les applications d'analyse devenaient plus nombreuses, diversifiées et sophistiquées. Pour atteindre les performances, les données devaient rester dans les répertoires et ces répertoires devaient être gérés en permanence. Cela a conduit à une base de données de répertoires. Cela a résolu le problème de l'emplacement des données, mais a introduit le problème de l'état de cette table - qui se trouvait maintenant à deux endroits (la base de données des répertoires et le système de fichiers).
Cela limitait ce que vous pouviez faire et la flexibilité qui existait, notamment en ce qui concerne les changements, qui ne pouvaient être garantis aux deux endroits avec une seule opération.
Imaginez de grandes quantités de données pluriannuelles partitionnées à la date. Les années sont divisées en mois et en semaines et, si les semaines sont divisées en jours, et les jours en heures, etc., la liste des répertoires explose. Le Hive Metastore (HMS) est un SGBDR transactionnel. Le système de fichiers (HDFS) est non transactionnel. Lorsque les informations de partition sont modifiées, cela nécessite la recréation du magasin de partitions et du système de fichiers.
Le problème était insoutenable et aucune quantité de correctifs n'allait résoudre les problèmes inhérents. En fait, les défis ne faisaient que s'accélérer avec la croissance des données.
L'un des principaux arguments de vente de l'architecture Data Lakehouse est qu'elle prend en charge plusieurs moteurs et cadres d'analyse. Par exemple, vous devez prendre en charge à la fois ELT (Extract, Load, Transform) et ETL (Extract, Transform, Load). Vous devez prendre en charge l'informatique décisionnelle, l'analyse commerciale et les types de charges de travail IA/ML. Vous devez vous connecter avec succès au même ensemble de tables de manière sûre et prévisible. Cela signifie que plusieurs moteurs tels que Spark, Flink, Trino, Arrow et Dask doivent tous être liés d'une manière ou d'une autre à une architecture cohérente.
Une plate-forme multi-moteurs qui héberge les données de manière efficace tout en permettant à chaque moteur de réussir est ce à quoi aspire le monde de l'analyse et ce que les architectures Iceberg et Data Lakehouse offrent.
Ce n'est pas simple et il y a beaucoup de défis à relever; il n'y a pas de moyen facile d'utiliser plusieurs moteurs avec une mise à jour fiable des données. Mais même maintenant que nous avons deux ou trois formats qui fournissent des mises à jour fiables, il y a encore beaucoup de confusion et il y a des problèmes dans ce domaine.
Les exigences modernes ressemblent à ceci :
Central Table Storage : Stocker les données indépendamment du calcul devient une décision architecturale critique. La raison pour laquelle c'est important est que les données ont une gravité et qu'elles nous attirent vers l'emplacement des données. Ainsi, si nos données se trouvent entièrement chez un seul fournisseur ou fournisseur de cloud, nous ne sommes liés qu'à ce fournisseur ou fournisseur de cloud. Ceci est intrinsèquement problématique lorsque ces systèmes sont fermés ou spécialisés dans la conception. Les logiciels ouverts deviennent l'exigence des architectures modernes.
Calcul portable : Une autre exigence moderne est la possibilité de transférer vos moteurs de calcul vers un autre fournisseur/fournisseur de cloud ou de tirer parti de moteurs de calcul spécialisés. Alors que beaucoup se concentrent sur le centre de gravité (les données), l'entreprise a également besoin de portabilité pour la logique, le code et SQL.
Contrôle d'accès : La plupart des entreprises ont un énorme défi pour avoir une politique d'autorisation cohérente sur tous les moteurs. Cependant, il ne s'agit pas seulement d'architecture, car l'application réussie et reproductible de ces politiques sur plusieurs moteurs devient un impératif opérationnel.
Maintenir la structure : L'une des plus grandes sources de travail humain que nous avons vues au cours des dernières années est la perte de la structure des données car elle est déplacée ailleurs. Un exemple parfait était Snowflake. Le processus de déplacement des données vers Snowflake était manuel et l'introduction d'ensembles de données tiers a également entraîné des retouches en raison de différents formats de fichiers et de changements de formats pendant le déplacement.
Apache Iceberg est conçu dès le départ avec la plupart des défis et objectifs mentionnés ci-dessus comme base pour implémenter un format de table ouverte. Il relève les défis suivants :
Ne déplacez pas les données ; plusieurs moteurs doivent fonctionner de manière transparente
Prend en charge les tâches par lots, en continu et ad hoc
Prend en charge le code de nombreux langages, pas seulement les frameworks JVM
Transactions fiables avec des tables SQL où nous avons la capacité d'effectuer des opérations CRUD de manière fiable
Séparer les préoccupations des tables réelles fournit cette ségrégation
Apache Iceberg conserve ses enregistrements dans un stockage d'objets, contrairement à Apache Hive. Iceberg permet au comportement SQL d'être exploité par plusieurs moteurs et il est conçu pour les tables volumineuses. En production, où une seule table peut contenir des dizaines de pétaoctets de données, cela est très important. Même les tables de plusieurs pétaoctets peuvent être lues à partir d'un seul nœud, sans avoir besoin d'un moteur SQL distribué pour parcourir les métadonnées des tables.
Source : https://iceberg.apache.org/spec/
Iceberg a une règle non écrite, pour être invisible lorsqu'il est utilisé dans la pile Big Data. Cette philosophie vient de l'espace table SQL, où nous ne pensons jamais à ce qui se trouve sous les tables SQL. Comme tout praticien le sait, ce n'est tout simplement pas le cas lorsque vous travaillez avec des tables de type Hadoop et Hive.
Iceberg reste simple de deux manières. Tout d'abord, évitez les mauvaises surprises lorsque des modifications sont apportées aux tables. Par exemple, une modification ne doit jamais ramener des données qui ont été supprimées et supprimées. Deuxièmement, Iceberg réduit le changement de contexte car ce qui se trouve sous la table n'a pas d'importance — ce qui compte, c'est le travail à faire.
FileIO est l'interface entre la bibliothèque centrale Iceberg et le stockage sous-jacent. FileIO a été créé pour permettre à Iceberg de fonctionner dans un monde où le calcul et le stockage distribués sont désagrégés. L'ancien écosystème Hadoop nécessite des structures de cheminement et de partition hiérarchiques qui sont, en pratique, l'exact opposé des méthodes utilisées pour atteindre la vitesse et l'échelle dans le monde du stockage d'objets.
Hadoop et Hive sont des anti-modèles pour un stockage d'objets cloud natif hautes performances et évolutif. Les applications de lac de données qui s'appuient sur l'API S3 pour interagir avec MinIO peuvent facilement évoluer jusqu'à des milliers de transactions par seconde sur des millions ou des milliards d'objets. Vous pouvez augmenter les performances de lecture et d'écriture en traitant plusieurs requêtes simultanées en parallèle. Pour ce faire, ajoutez des préfixes (une chaîne de caractères qui est un sous-ensemble d'un nom d'objet, en commençant par le premier caractère) aux compartiments, puis en écrivant des opérations parallèles, chacune ouvrant une connexion par préfixe.
De plus, la dépendance de Hadoop vis-à-vis des répertoires du système de fichiers ne se traduit pas par un stockage d'objets : il est difficile d'organiser physiquement les ensembles de données dans différents répertoires et de les traiter par chemin lorsque les chemins n'existent pas. Hadoop s'appuie sur un système de fichiers pour définir l'ensemble de données et fournir des mécanismes de verrouillage pour la concurrence et la résolution des conflits. De plus, dans l'écosystème Hadoop, les tâches qui traitent les opérations de changement de nom doivent être atomiques. Ce n'est pas possible avec l'API S3 car les changements de nom sont en réalité deux opérations : copier et supprimer. Malheureusement, le résultat est qu'il n'y a pas d'isolement entre la lecture et l'écriture, ce qui peut entraîner des conflits, des collisions et des incohérences.
En revanche, Iceberg a été conçu pour fonctionner complètement indépendamment du stockage physique en utilisant le stockage d'objets. Tous les emplacements sont "explicites, immuables et absolus" comme défini dans les métadonnées. Iceberg suit l'état complet de la table sans les bagages des répertoires de référencement. Il est beaucoup plus rapide d'utiliser des métadonnées pour trouver une table que de répertorier toute la hiérarchie à l'aide de l'API S3. Il n'y a pas de renommage - un commit ajoute simplement de nouvelles entrées à la table de métadonnées.
L'API FileIO effectue des opérations de métadonnées pendant les phases de planification et de validation. Les tâches utilisent FileIO pour lire et écrire les fichiers de données sous-jacents, et les emplacements de ces fichiers sont inclus dans les métadonnées de la table lors d'une validation. La manière exacte dont le moteur procède dépend de l'implémentation de FileIO. Pour les environnements hérités, HadoopFileIO
sert de couche d'adaptation entre une implémentation Hadoop FileSystem existante et l'API FileIO dans Iceberg.
Nous allons plutôt nous concentrer sur S3FileIO
car il s'agit d'une implémentation S3 native. Nous n'avons pas besoin de transporter Hadoop Cruft avec nous lorsque nous construisons notre Lakehouse cloud-native. Selon Iceberg FileIO : Cloud Native Tables , les avantages d'une implémentation S3 native incluent :
Comportement contractuel : les implémentations Hadoop FileSystem ont un comportement contractuel strict, ce qui entraîne des requêtes supplémentaires (vérifications d'existence, suppression des conflits de répertoires et de chemins) qui ajoutent de la surcharge et de la complexité. Iceberg utilise des chemins entièrement adressables et uniques, ce qui évite une complexité supplémentaire.
Téléchargements optimisés : S3FileIO
optimise le stockage/la mémoire en téléchargeant progressivement les données afin de minimiser la consommation de disque pour les tâches volumineuses et préserve une faible consommation de mémoire lorsque plusieurs fichiers sont ouverts pour la sortie.
Personnalisation du client S3 : le client utilise la dernière version majeure du SDK AWS (v2) et permet aux utilisateurs de personnaliser entièrement le client pour une utilisation avec S3 (y compris tout point de terminaison compatible avec l'API S3).
Performances de sérialisation : le traitement des tâches avec HadoopFileIO
nécessite la sérialisation de la configuration Hadoop, qui est assez volumineuse et, dans les cas dégénérés, peut ralentir le traitement et entraîner plus de surcharge que les données traitées.
Dépendances réduites : les implémentations Hadoop FileSystem introduisent une grande arborescence de dépendances et une implémentation simplifiée réduit la complexité globale de l'emballage.
Iceberg fournit une intégration avec différents services AWS via le module iceberg-aws , fourni avec les runtimes Spark et Flink pour toutes les versions à partir de la version 0.11.0
. Iceberg permet aux utilisateurs d'écrire des données sur S3 via S3FileIO
. Lors de l'utilisation S3FileIO
, les catalogues sont configurés pour utiliser l'API S3 à l'aide de la propriété de catalogue io-impl
. S3FileIO
adopte les dernières fonctionnalités S3 pour une sécurité optimisée (listes de contrôle d'accès S3, les trois modes de chiffrement côté serveur S3) et des performances (téléchargements progressifs en plusieurs parties) et est donc recommandé pour les cas d'utilisation du stockage d'objets.
À l'heure actuelle, Spark est le moteur de calcul le plus riche en fonctionnalités pour travailler avec Iceberg. Ce didacticiel se concentre donc sur l'utilisation de Spark et de Spark-SQL pour comprendre les concepts et les fonctionnalités d'Iceberg. Sur Ubuntu 20.04, nous allons installer et configurer Java, PostgreSQL en tant que pointeur de catalogue ou de métadonnées, Spark et MinIO - tout en téléchargeant et en configurant soigneusement les dépendances Java. Ensuite, nous exécuterons Spark-SQL pour créer, remplir, interroger et modifier une table. Nous passerons également en revue certaines des choses impressionnantes que vous pouvez faire avec Iceberg, telles que l'évolution du schéma, le travail avec des partitions cachées, le voyage dans le temps et la restauration. Après chaque étape, nous incluons une capture d'écran du seau Iceberg dans MinIO afin que vous puissiez voir ce qui se passe dans les coulisses.
Téléchargez et démarrez MinIO Server. Enregistrez l'adresse IP, le port TCP, la clé d'accès et la clé secrète.
Téléchargez et installez le client MinIO.
Utilisez le client MinIO pour définir un alias et créer un compartiment pour Iceberg
mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb minio/iceberg Bucket created successfully `myminio/iceberg`.
Vous devrez télécharger et configurer Spark pour utiliser les archives Java (JAR) requises afin d'activer diverses fonctionnalités telles que Hadoop, AWS S3 et JDBC. Vous devrez également disposer de la bonne version de chaque fichier JAR et fichier de configuration requis dans PATH et CLASSPATH. Il est, malheureusement, très facile d'invoquer différentes versions de JAR et de perdre la trace du JAR que vous utilisez et donc de rencontrer des incompatibilités époustouflantes.
Installez Java Runtime si vous ne l'avez pas déjà fait. Pour Ubuntu 20.04, la commande est
sudo apt install curl mlocate default-jdk -y
Téléchargez et configurez PostgreSQL pour qu'il s'exécute en tant que service système
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update sudo apt-get -y install postgresql sudo systemctl start postgresql.service
Nous allons créer un rôle icebergcat
en tant que superutilisateur, définir le mot de passe et créer une base de données icebergcat
sudo -u postgres createuser --interactive ALTER ROLE icebergcat PASSWORD 'minio'; sudo -u postgres createdb icebergcat
Connectez-vous à la base de données pour vérifier son fonctionnement, vous serez invité à saisir le mot de passe :
psql -U icebergcat -d icebergcat -W -h 127.0.0.1
Télécharger, extraire et déplacer Apache Spark
$ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz $ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz $ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark
Définissez l'environnement Spark en ajoutant ce qui suit à ~/.bashrc
, puis en redémarrant le shell pour appliquer les modifications.
export SPARK_HOME=/opt/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin bash -l
Les fichiers .jar suivants sont requis. Téléchargez et copiez les fichiers .jar dans n'importe quel emplacement requis sur la machine Spark, par exemple /opt/spark/jars
.
aws-java-sdk-bundle/1.11.901.jar (ou supérieur) est nécessaire pour prendre en charge le protocole S3.
$ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar
iceberg-spark-runtime-3.2_2.12.jar est requis.
$ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar
Démarrer un serveur maître autonome Spark
$ start-master.sh starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out
Ouvrez un navigateur et allez sur http: // Votre-adresseIP:7077
Spark est actif sur spark://<Your-Machine-Name>:7077
Démarrer un processus de travail Spark
$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077 starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out
Initialisez l'environnement avant de lancer Spark-SQL.
export AWS_ACCESS_KEY_ID=minioadmin export AWS_SECRET_ACCESS_KEY=minioadmin export AWS_S3_ENDPOINT=10.0.0.10:9000 export AWS_REGION=us-east-1 export MINIO_REGION=us-east-1 export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" export AWS_SDK_VERSION=2.17.230 export AWS_MAVEN_GROUP=software.amazon.awssdk export AWS_PACKAGES=( "bundle" "url-connection-client" ) for pkg in "${AWS_PACKAGES[@]}"; do export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION" done
Exécutez la commande suivante pour lancer Spark-SQL avec Iceberg en utilisant PostgreSQL pour les métadonnées et la prise en charge de l'API S3, requise pour MinIO. Vous pouvez également définir la configuration à l'aide de votre fichier local spark-defaults.conf
$ spark-sql --packages $DEPENDENCIES \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ --conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.password=minio \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=/home/iceicedata/spark-events \ --conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \ --conf spark.sql.catalogImplementation=in-memory
Quelques remarques importantes sur cette configuration
my_catalog
qui utilise JDBC pour se connecter à PostgreSQL sur une adresse IP interne et utilisons la table icebergcat
pour les métadonnées.S3FileIO
pour y accéder.Ensuite, nous allons créer un tableau simple.
CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg LOCATION 's3://iceberg' PARTITIONED BY (category);
Voici une amélioration massive des performances offerte par Iceberg avec S3FileIO. C'est un grand soulagement pour ceux d'entre nous qui ont souffert de performances lentes lors de l'utilisation d'une configuration de stockage Hive traditionnelle avec S3 en raison de demandes de limitation basées sur le préfixe d'objet. Ce n'est un secret pour personne que la création d'une table Athena/Hive partitionnée sur AWS S3 peut prendre 30 à 60 minutes. Iceberg utilise par défaut la disposition de stockage Hive, mais peut être commuté pour utiliser ObjectStoreLocationProvider
.
Avec ObjectStoreLocationProvider
, un hachage déterministe est généré pour chaque fichier stocké, avec le hachage ajouté directement après le write.data.path
. Cela garantit que les fichiers écrits dans le stockage d'objets compatible S3 sont répartis de manière égale sur plusieurs préfixes dans le compartiment S3, ce qui entraîne une limitation minimale et un débit maximal pour les opérations d'E/S liées à S3. Lorsque vous utilisez ObjectStoreLocationProvider
, le fait d'avoir un write.data.path
partagé et court sur vos tables Iceberg améliorera les performances. Beaucoup plus a été fait dans Iceberg pour améliorer les performances et la fiabilité par rapport à Hive .
CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, 'write.data.path'='s3://iceberg') PARTITIONED BY (category);
En regardant la console MinIO, nous voyons qu'un chemin a été créé sous notre seau iceberg
pour my_table
Le bucket contient un chemin metadata
À ce stade, il n'y a pas de données dans la table, il n'y a que des métadonnées décrivant la table. Il existe également un pointeur vers ces métadonnées stockées dans la table du catalogue Iceberg dans PostgreSQL. Spark-SQL (le moteur de requête) recherche le catalogue Iceberg ( my_catalog
) par nom de table ( my_table
) et récupère l'URI dans le fichier de métadonnées actuel.
Examinons le premier fichier de métadonnées, où sont stockées les informations sur le schéma, les partitions et les instantanés de la table. Alors que tous les instantanés sont définis, le current-snapshot-id
indique au moteur de requête quel instantané utiliser, puis le moteur de requête recherche cette valeur dans le tableau snapshots
, obtient la valeur de manifest-list
de cet instantané et ouvre les fichiers manifestes dans ce liste, dans l'ordre. Notez que notre exemple n'a qu'un seul instantané car la table vient d'être créée, et aucun manifeste car nous n'avons pas encore inséré de données.
{ "format-version" : 1, "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363", "location" : "s3://iceberg/my_table", "last-updated-ms" : 1658795119167, "last-column-id" : 3, "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] }, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] } ], "partition-spec" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ] } ], "last-partition-id" : 1000, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "option.write.data.path" : "s3://iceberg/my_table", "owner" : "msarrel", "option.write.object-storage.enabled" : "true", "write.data.path" : "s3://iceberg/my_table", "write.object-storage.enabled" : "true" }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] }
Ensuite, insérons quelques données fictives et regardons les fichiers qu'Iceberg stocke dans MinIO. À l'intérieur du seau iceberg
, il y a maintenant les préfixes my_table/metadata
et my_table/data
.
INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video");
Le préfixe de métadonnées contient le fichier de métadonnées d'origine, une liste de manifestes et des fichiers manifestes. La liste manifeste est - vous l'avez deviné - une liste de fichiers manifestes. La liste des manifestes contient des informations sur chaque fichier manifeste inclus dans chaque instantané : l'emplacement du fichier manifeste, l'instantané auquel il a été ajouté, des informations sur le partitionnement et les limites inférieure et supérieure des colonnes de partition des fichiers de données associés. Au cours d'une requête, le moteur de requête lit la valeur des emplacements des fichiers manifestes dans la liste des manifestes et ouvre les fichiers manifestes appropriés. La liste des manifestes est au format AVRO.
Les fichiers manifestes suivent les fichiers de données et incluent des détails et des statistiques précalculées sur chaque fichier. La première chose qui est suivie est le format et l'emplacement du fichier. Les fichiers manifestes sont la façon dont Iceberg supprime les données de suivi de style Hive par emplacement du système de fichiers. Les fichiers manifestes améliorent l'efficacité et les performances de lecture des fichiers de données en incluant des détails tels que l'appartenance à la partition, le nombre d'enregistrements et les limites inférieure et supérieure de chaque colonne. Les statistiques sont écrites pendant les opérations d'écriture et sont plus susceptibles d'être opportunes, précises et à jour que les statistiques Hive.
Lorsqu'une requête SELECT est soumise, le moteur de requête obtient l'emplacement de la liste de manifestes à partir de la base de données de métadonnées. Ensuite, le moteur de requête lit la valeur des entrées file-path
pour chaque objet data-file
, puis ouvre les fichiers de données pour exécuter la requête.
Vous trouverez ci-dessous le contenu du préfixe data
, organisé par partition.
A l'intérieur de la partition, il y a un fichier de données par ligne de table.
Exécutons un exemple de requête :
spark-sql> SELECT count(1) as count, data FROM my_catalog.my_table GROUP BY data; 1 a 1 b 1 c Time taken: 9.715 seconds, Fetched 3 row(s) spark-sql>
Maintenant que nous comprenons les différents composants d'une table Iceberg et comment le moteur de requête fonctionne avec eux, plongeons dans les meilleures fonctionnalités d'Iceberg et comment les exploiter dans votre lac de données.
Les modifications d'évolution du schéma telles que Ajouter, Supprimer, Renommer et Mettre à jour sont des modifications de métadonnées, ce qui signifie qu'aucun fichier de données n'a besoin d'être modifié/réécrit pour effectuer des mises à jour. Iceberg garantit également que ces changements d'évolution de schéma sont indépendants et sans effets secondaires. Iceberg utilise des identifiants uniques pour suivre chaque colonne d'une table. Si une nouvelle colonne est ajoutée, elle ne tirera jamais parti d'un identifiant existant par erreur.
Les partitions de table Iceberg peuvent être mises à jour dans une table existante car les requêtes ne référencent pas directement les valeurs de partition. Lorsque de nouvelles données sont écrites, elles utilisent une nouvelle spécification dans une nouvelle mise en page, les données précédemment écrites avec une spécification différente restent inchangées. Cela provoque une planification fractionnée lorsque vous écrivez de nouvelles requêtes. Pour améliorer les performances, Iceberg utilise le partitionnement caché afin que les utilisateurs n'aient pas besoin d'écrire des requêtes pour qu'une disposition de partition spécifique soit rapide. Les utilisateurs se concentrent sur l'écriture de requêtes pour les données dont ils ont besoin et laissent Iceberg élaguer les fichiers qui ne contiennent pas les données correspondantes.
Une autre évolution très utile est que l'ordre de tri Iceberg peut également être mis à jour dans la table existante, tout comme la spécification de partition. Différents moteurs peuvent choisir d'écrire des données dans le dernier ordre de tri sur un ordre non trié lorsque le tri est d'un coût prohibitif, les anciennes données écrites avec l'ordre de tri précédent restent inchangées.
spark-sql> ALTER TABLE my_catalog.my_table > RENAME my_catalog.my_table_2;
Les premières fois que vous faites cela, vous serez époustouflé par la rapidité. C'est parce que vous ne réécrivez pas une table, vous travaillez simplement sur des métadonnées. Dans ce cas, nous n'avons changé que table_name
et Iceberg l'a fait pour nous en environ un dixième de seconde.
D'autres changements de schéma sont tout aussi indolores :
spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity; spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity; spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;
Comme nous l'avons mentionné précédemment, les partitions sont prises en charge par d'autres formats de ruche, mais Iceberg prend en charge le partitionnement caché qui peut gérer les tâches fastidieuses et sujettes aux erreurs de production de valeurs de partition pour les lignes d'une table. Les utilisateurs se concentrent sur l'ajout de filtres aux requêtes qui résolvent les problèmes de l'entreprise et ne se soucient pas de la façon dont la table est partitionnée. Iceberg s'occupe d'éviter automatiquement les lectures de partitions inutiles.
Iceberg gère pour vous les subtilités du partitionnement et de la modification du schéma de partition d'une table, ce qui simplifie considérablement le processus pour les utilisateurs finaux. Vous pouvez définir le partitionnement ou laisser Iceberg s'en charger pour vous. Iceberg aime partitionner sur un horodatage, comme l'heure de l'événement. Les partitions sont suivies par des instantanés dans les manifestes. Les requêtes ne dépendent plus de la disposition physique d'une table. En raison de cette séparation entre les tables physiques et logiques, les tables Iceberg peuvent faire évoluer les partitions au fil du temps à mesure que de nouvelles données sont ajoutées. Par exemple, repartitionner une table Hive nécessiterait de créer une nouvelle table et d'y lire les anciennes données. Vous devrez également modifier la valeur PARTITION dans chaque requête que vous avez déjà écrite - ce n'est pas amusant.
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category; ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;
Nous avons maintenant deux schémas de partitionnement pour la même table. Ce qui était impossible dans Hive s'est déroulé de manière transparente dans Iceberg. À partir de maintenant, les plans de requête sont divisés, en utilisant l'ancien schéma de partition pour interroger les anciennes données et le nouveau schéma de partition pour interroger les nouvelles données. Iceberg s'en charge pour vous - les personnes qui interrogent la table n'ont pas besoin de savoir que les données sont stockées à l'aide de deux schémas de partition. Iceberg le fait grâce à une combinaison de clauses WHERE et de filtres de partition en arrière-plan qui éliminent les fichiers de données sans correspondance.
Chaque écriture dans les tables Iceberg crée de nouveaux instantanés. Les instantanés sont comme les versions et peuvent être utilisés pour voyager dans le temps et revenir en arrière, tout comme nous le faisons avec les capacités de gestion des versions de MinIO. La façon dont les instantanés sont gérés consiste à définir expireSnapshot
afin que le système soit bien entretenu. Le voyage dans le temps permet des requêtes reproductibles qui utilisent exactement le même instantané de table ou permet aux utilisateurs d'examiner facilement les modifications. La restauration de version permet aux utilisateurs de corriger rapidement les problèmes en réinitialisant les tables à un bon état.
Au fur et à mesure que les tables sont modifiées, Iceberg suit chaque version sous forme d'instantané, puis offre la possibilité de voyager dans le temps vers n'importe quel instantané lors de l'interrogation de la table. Cela peut être très utile si vous souhaitez exécuter des requêtes historiques ou reproduire les résultats de requêtes précédentes, par exemple pour la création de rapports. Le voyage dans le temps peut également être utile lors du test de nouvelles modifications de code, car vous pouvez tester un nouveau code avec une requête de résultats connus.
Pour voir les instantanés qui ont été enregistrés pour une table :
spark-sql> SELECT * FROM my_catalog.my_table.snapshots; 2022-07-25 17:26:47.53 527713811620162549 NULL append s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"} Time taken: 7.236 seconds, Fetched 1 row(s)
Quelques exemples:
-- time travel to October 26, 1986 at 01:21:00 spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00'; -- time travel to snapshot with id 10963874102873 spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
Vous pouvez effectuer des lectures incrémentielles à l'aide d'instantanés, mais vous devez utiliser Spark, pas Spark-SQL. Par exemple:
scala> spark.read() .format(“iceberg”) .option(“start-snapshot-id”, “10963874102873”) .option(“end-snapshot-id”, “10963874102994”) .load(“s3://iceberg/my_table”)
Vous pouvez également restaurer la table à un moment donné ou à un instantané spécifique, comme dans ces deux exemples :
spark-sql> CALL my_catalog.system.rollback_to_timestamp('my_table', TIMESTAMP '2022-07-25 12:15:00.000'); spark-sql> CALL my_catalog.system.rollback_to_snapshot('my_table', 527713811620162549);
Iceberg prend en charge toutes les commandes SQL expressives telles que la suppression, la fusion et la mise à jour au niveau des lignes, et la chose la plus importante à souligner est qu'Iceberg prend en charge les stratégies Eager et paresseuses. Nous pouvons encoder toutes les choses que nous devons supprimer (par exemple, GDPR ou CCPA), mais ne pas aller réécrire tous ces fichiers de données immédiatement, nous pouvons collecter paresseusement les ordures au besoin et cela aide vraiment à l'efficacité sur les énormes tables prises en charge par Iceberg.
Par exemple, vous pouvez supprimer tous les enregistrements d'une table qui correspondent à un prédicat spécifique. Ce qui suit supprimera toutes les lignes de la catégorie vidéo :
spark-sql> DELETE FROM my_catalog.my_table WHERE category = 'video';
Alternativement, vous pouvez utiliser CREATE TABLE AS SELECT ou REPLACE TABLE AS SELECT pour accomplir ceci :
spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = 'music';
Vous pouvez fusionner deux tables très facilement :
spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;
Iceberg est la base de la norme de table analytique ouverte et utilise le comportement SQL et une véritable abstraction de table contrairement aux autres formats de table de ruche et applique les principes fondamentaux de l'entrepôt de données pour résoudre les problèmes avant que nous sachions que nous les avons. Avec l'ingénierie déclarative des données, nous pouvons configurer des tables sans nous soucier de changer chaque moteur pour répondre aux besoins des données. Cela déverrouille l'optimisation et les recommandations automatiques. Avec des validations sûres, des services de données sont possibles, ce qui permet d'éviter que les humains gardent les charges de travail de données.
Pour inspecter l'historique, les instantanés et d'autres métadonnées d'une table, Iceberg prend en charge l'interrogation des métadonnées. Les tables de métadonnées sont identifiées en ajoutant le nom de la table de métadonnées (par exemple, historique) après le nom de la table d'origine dans votre requête.
Pour afficher les fichiers de données d'une table :
spark-sql> SELECT * FROM my_catalog.my_table.files;
Pour afficher les manifestes :
spark-sql> SELECT * FROM my_catalog.my_table.manifests;
Pour afficher l'historique des tables :
spark-sql> SELECT * FROM my_catalog.my_table.history;
Pour afficher des instantanés :
spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
Vous pouvez également joindre des instantanés et l'historique des tables pour voir l'application qui a écrit chaque instantané :
spark-sql> select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from my_catalog.my_table.history h join my_catalog.my_table.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at;
Maintenant que vous avez appris les bases, chargez certaines de vos données dans Iceberg, puis apprenez-en plus grâce au démarrage rapide Spark et Iceberg et à la documentation Iceberg .
Apache Iceberg a des intégrations avec divers moteurs de requête et d'exécution, où les tables Apache Iceberg peuvent être créées et gérées par ces connecteurs. Les moteurs prenant en charge Iceberg sont Spark , Flink , Hive , Presto , Trino , Dremio , Snowflake .
Apache Iceberg attire beaucoup l'attention en tant que format de table pour les lacs de données. La communauté open source croissante et le nombre croissant d'intégrations de plusieurs fournisseurs de cloud et de cadres d'application signifient qu'il est temps de prendre Iceberg au sérieux, de commencer à expérimenter, à apprendre et à planifier son intégration dans l'architecture existante du lac de données. Associez Iceberg à MinIO pour des lacs de données et des analyses multi-cloud.
Lorsque vous démarrez avec Iceberg et MinIO, contactez-nous et partagez vos expériences ou posez des questions via notre chaîne Slack .
Également publié ici .