paint-brush
Développer des lacs de données de streaming avec Hudi et MinIOpar@minio
7,246 lectures
7,246 lectures

Développer des lacs de données de streaming avec Hudi et MinIO

par MinIO14m2023/08/29
Read on Terminal Reader

Trop long; Pour lire

Apache Hudi a été le premier format de table ouvert pour les lacs de données et mérite d'être pris en considération dans les architectures de streaming. L'utilisation de MinIO pour le stockage Hudi ouvre la voie aux lacs de données et aux analyses multi-cloud.
featured image - Développer des lacs de données de streaming avec Hudi et MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Hudi est une plate-forme de lac de données en streaming qui apporte les fonctionnalités de base d'entrepôt et de base de données directement au lac de données. Non content de s'appeler un format de fichier ouvert comme Delta ou Apache Iceberg , Hudi fournit des tables, des transactions, des insertions/suppressions, des index avancés, des services d'ingestion de streaming, des optimisations de regroupement/compactage de données et la concurrence.


Lancé en 2016, Hudi est fermement ancré dans l'écosystème Hadoop, d'où la signification de son nom : Hadoop Upserts and Incrementals. Il a été développé pour gérer le stockage de grands ensembles de données analytiques sur HDFS. L'objectif principal de Hudi est de réduire la latence lors de l'ingestion de données en streaming.


Tableau Hudi


Au fil du temps, Hudi a évolué pour utiliser le stockage cloud et le stockage objet, y compris MinIO. L'abandon par Hudi du HDFS va de pair avec la tendance plus large du monde à abandonner l'ancien HDFS pour un stockage d'objets performant, évolutif et natif du cloud. La promesse de Hudi de fournir des optimisations qui accélèrent les charges de travail analytiques pour Apache Spark, Flink, Presto, Trino et d'autres s'accorde bien avec la promesse de MinIO de performances des applications cloud natives à grande échelle.


Les entreprises utilisant Hudi en production incluent Uber , Amazon , ByteDance et Robinhood . Ce sont quelques-uns des plus grands lacs de données de streaming au monde. La clé de Hudi dans ce cas d’utilisation est qu’il fournit une pile de traitement de données incrémentielle qui effectue un traitement à faible latence sur les données en colonnes. En règle générale, les systèmes écrivent les données une seule fois en utilisant un format de fichier ouvert comme Apache Parquet ou ORC, et les stockent sur un stockage d'objets hautement évolutif ou un système de fichiers distribué. Hudi sert de plan de données pour ingérer, transformer et gérer ces données. Hudi interagit avec le stockage à l'aide de l' API Hadoop FileSystem , qui est compatible (mais pas nécessairement optimale) avec les implémentations allant de HDFS au stockage d'objets en passant par les systèmes de fichiers en mémoire.

Format de fichier Hudi

Hudi utilise un fichier de base et des fichiers journaux delta qui stockent les mises à jour/modifications d'un fichier de base donné. Les fichiers de base peuvent être Parquet (en colonnes) ou HFile (indexé). Les journaux delta sont enregistrés au format Avro (ligne), car il est logique d'enregistrer les modifications apportées au fichier de base au fur et à mesure qu'elles se produisent.


Hudi code toutes les modifications apportées à un fichier de base donné sous la forme d'une séquence de blocs. Les blocs peuvent être des blocs de données, des blocs de suppression ou des blocs d'annulation. Ces blocs sont fusionnés afin de dériver des fichiers de base plus récents. Cet encodage crée également un journal autonome.



Format de fichier Hudi

Source .

Format du tableau Hudi

Un format de tableau comprend la présentation des fichiers de la table, le schéma de la table et les métadonnées qui suivent les modifications apportées à la table. Hudi applique le schéma à l'écriture, conformément à l'accent mis sur le traitement des flux, pour garantir que les pipelines ne soient pas interrompus par des modifications non rétrocompatibles.


Hudi regroupe les fichiers d'une table/partition donnée et mappe entre les clés d'enregistrement et les groupes de fichiers. Comme mentionné ci-dessus, toutes les mises à jour sont enregistrées dans les fichiers journaux delta pour un groupe de fichiers spécifique. Cette conception est plus efficace que Hive ACID, qui doit fusionner tous les enregistrements de données avec tous les fichiers de base pour traiter les requêtes. La conception de Hudi prévoit des insertions et des suppressions rapides basées sur des clés, car elle fonctionne avec les journaux delta pour un groupe de fichiers, et non pour un ensemble de données complet.


Hudi regroupe les fichiers d'une table/partition donnée et mappe entre les clés d'enregistrement et les groupes de fichiers. Comme mentionné ci-dessus, toutes les mises à jour sont enregistrées dans les fichiers journaux delta pour un groupe de fichiers spécifique. Cette conception est plus efficace que Hive ACID, qui doit fusionner tous les enregistrements de données avec tous les fichiers de base pour traiter les requêtes. La conception de Hudi prévoit des insertions et des suppressions rapides basées sur des clés, car elle fonctionne avec les journaux delta pour un groupe de fichiers, et non pour un ensemble de données complet.


Format du tableau Hudi

Source .


La chronologie est essentielle à comprendre car elle sert de source de journal d'événements véridique pour toutes les métadonnées de la table de Hudi. La chronologie est stockée dans le dossier .hoodie , ou bucket dans notre cas. Les événements sont conservés sur la chronologie jusqu'à ce qu'ils soient supprimés. La chronologie existe pour un tableau global ainsi que pour les groupes de fichiers, permettant la reconstruction d'un groupe de fichiers en appliquant les journaux delta au fichier de base d'origine. Afin d'optimiser les écritures/validations fréquentes, la conception de Hudi maintient les métadonnées petites par rapport à la taille de la table entière.


Les nouveaux événements sur la chronologie sont enregistrés dans une table de métadonnées interne et implémentés sous la forme d'une série de tables de fusion lors de la lecture, offrant ainsi une faible amplification d'écriture. En conséquence, Hudi peut rapidement absorber les modifications rapides des métadonnées. De plus, la table de métadonnées utilise le format de fichier de base HFile, optimisant encore davantage les performances avec un ensemble de recherches de clés indexées qui évitent d'avoir à lire l'intégralité de la table de métadonnées. Tous les chemins de fichiers physiques qui font partie du tableau sont inclus dans les métadonnées pour éviter les listes de fichiers cloud coûteuses et fastidieuses.

Écrivains Hudi

Les rédacteurs Hudi facilitent les architectures dans lesquelles Hudi sert de couche d'écriture hautes performances avec prise en charge des transactions ACID qui permet des modifications incrémentielles très rapides telles que des mises à jour et des suppressions.


Une architecture Hudi typique s'appuie sur les pipelines Spark ou Flink pour fournir des données aux tables Hudi. Le chemin d'écriture Hudi est optimisé pour être plus efficace que la simple écriture d'un fichier Parquet ou Avro sur le disque. Hudi analyse les opérations d'écriture et les classe en opérations incrémentielles ( insert , upsert , delete ) ou par lots ( insert_overwrite , insert_overwrite_table , delete_partition , bulk_insert ), puis applique les optimisations nécessaires.


Les rédacteurs Hudi sont également responsables de la maintenance des métadonnées. Pour chaque enregistrement, l'heure de validation et un numéro de séquence unique à cet enregistrement (similaire à un décalage Kafka) sont écrits, ce qui permet de dériver les modifications au niveau de l'enregistrement. Les utilisateurs peuvent également spécifier des champs d'heure d'événement dans les flux de données entrants et les suivre à l'aide de métadonnées et de la chronologie Hudi. Cela peut apporter des améliorations spectaculaires au traitement des flux, car Hudi contient à la fois l'heure d'arrivée et l'heure de l'événement pour chaque enregistrement, ce qui permet de créer des filigranes solides pour les pipelines de traitement de flux complexes.

Lecteurs Hudi

L'isolation des instantanés entre les rédacteurs et les lecteurs permet d'interroger les instantanés de table de manière cohérente à partir de tous les principaux moteurs de requête de lac de données, notamment Spark, Hive, Flink, Prest, Trino et Impala. Comme Parquet et Avro, les tables Hudi peuvent être lues comme des tables externes par Snowflake et SQL Server .


Les lecteurs Hudi sont développés pour être légers. Dans la mesure du possible, des lecteurs vectoriels et une mise en cache spécifiques au moteur, tels que ceux de Presto et Spark, sont utilisés. Lorsque Hudi doit fusionner des fichiers de base et des fichiers journaux pour une requête, Hudi améliore les performances de fusion en utilisant des mécanismes tels que des cartes déversables et une lecture paresseuse, tout en fournissant également des requêtes optimisées en lecture.


Hudi inclut plusieurs capacités d'interrogation incrémentielles remarquablement puissantes. Les métadonnées sont au cœur de tout cela, permettant de consommer des commits importants sous forme de morceaux plus petits et de dissocier complètement l'écriture et l'interrogation incrémentielle des données. Grâce à une utilisation efficace des métadonnées, le voyage dans le temps n'est qu'une autre requête incrémentielle avec un point de départ et un point d'arrêt définis. Hudi mappe atomiquement les clés sur des groupes de fichiers uniques à tout moment donné, prenant en charge toutes les fonctionnalités CDC sur les tables Hudi. Comme indiqué ci-dessus dans la section des rédacteurs Hudi, chaque table est composée de groupes de fichiers et chaque groupe de fichiers possède ses propres métadonnées autonomes.

Hourra pour Hudi !

La plus grande force de Hudi réside dans la rapidité avec laquelle il ingère à la fois les données en streaming et par lots. En offrant la possibilité d' upsert , Hudi exécute des tâches de plusieurs ordres de grandeur plus rapidement que la réécriture de tables ou de partitions entières.


Pour tirer parti de la vitesse d'ingestion de Hudi, les data lakehouses nécessitent une couche de stockage capable d'IOPS et de débit élevés. La combinaison d'évolutivité et de hautes performances de MinIO est exactement ce dont Hudi a besoin. MinIO est plus que capable d'offrir les performances requises pour alimenter un lac de données d'entreprise en temps réel : un récent benchmark a atteint 325 Gio/s (349 Go/s) sur les GET et 165 Gio/s (177 Go/s) sur les PUT avec seulement 32 nœuds de SSD NVMe disponibles dans le commerce.


Un lac de données Hudi d'entreprise actif stocke un nombre massif de petits fichiers Parquet et Avro. MinIO comprend un certain nombre d' optimisations de petits fichiers qui permettent des lacs de données plus rapides. Les petits objets sont enregistrés en ligne avec les métadonnées, réduisant ainsi les IOPS nécessaires à la fois pour lire et écrire de petits fichiers tels que les métadonnées et les index Hudi.


Le schéma est un élément essentiel de chaque table Hudi. Hudi peut appliquer le schéma ou permettre son évolution afin que le pipeline de données en streaming puisse s'adapter sans se rompre. De plus, Hudi applique le schéma sur l'écrivain pour garantir que les modifications ne interrompent pas les pipelines. Hudi s'appuie sur Avro pour stocker, gérer et faire évoluer le schéma d'une table.


Hudi fournit des garanties transactionnelles ACID aux lacs de données. Hudi garantit les écritures atomiques : les validations sont effectuées de manière atomique sur une chronologie et reçoivent un horodatage qui indique l'heure à laquelle l'action est censée s'être produite. Hudi isole les instantanés entre les processus d'écriture, de table et de lecture afin que chacun fonctionne sur un instantané cohérent de la table. Hudi complète cela avec un contrôle de concurrence optimiste (OCC) entre les rédacteurs et un contrôle de concurrence non bloquant basé sur MVCC entre les services de table et les rédacteurs et entre plusieurs services de table.

Tutoriel Hudi et MinIO

Ce didacticiel vous guidera dans la configuration de Spark, Hudi et MinIO et présentera quelques fonctionnalités de base de Hudi. Ce didacticiel est basé sur le guide Apache Hudi Spark , adapté pour fonctionner avec le stockage d'objets MinIO natif cloud.


Notez que travailler avec des buckets versionnés ajoute une certaine surcharge de maintenance à Hudi. Tout objet supprimé crée un marqueur de suppression . À mesure que Hudi nettoie les fichiers à l'aide de l' utilitaire Cleaner, le nombre de marqueurs de suppression augmente avec le temps. Il est important de configurer correctement la gestion du cycle de vie pour nettoyer ces marqueurs de suppression, car l'opération List peut s'étouffer si le nombre de marqueurs de suppression atteint 1 000. Les responsables du projet Hudi recommandent de nettoyer les marqueurs de suppression après une journée en utilisant les règles de cycle de vie.

Conditions préalables

Téléchargez et installez Apache Spark.


Téléchargez et installez MinIO. Enregistrez l'adresse IP, le port TCP de la console, la clé d'accès et la clé secrète.


Téléchargez et installez le client MinIO.


Téléchargez les bibliothèques AWS et AWS Hadoop et ajoutez-les à votre chemin de classe afin d'utiliser S3A pour travailler avec le stockage d'objets.

  • AWS : aws-java-sdk:1.10.34 (ou version ultérieure)

  • Hadoop : hadoop-aws:2.7.3 (ou supérieur)


Téléchargez les fichiers Jar, décompressez-les et copiez-les dans /opt/spark/jars .

Créer un compartiment MinIO

Utilisez le client MinIO pour créer un bucket pour héberger les données Hudi :

 mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi

Lancez Spark avec Hudi

Démarrez le shell Spark avec Hudi configuré pour utiliser MinIO pour le stockage. Assurez-vous de configurer les entrées pour S3A avec vos paramètres MinIO.


 spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'


Ensuite, initialisez Hudi dans Spark.

 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord


Notez que cela simplifiera l'utilisation répétée de Hudi pour créer unfichier de configuration externe .

Créer un tableau

Essayez-le et créez une simple petite table Hudi à l'aide de Scala. Le Hudi DataGenerator est un moyen rapide et facile de générer des exemples d'insertions et de mises à jour basées sur l' exemple de schéma de voyage .


 val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator

Insérez des données dans Hudi et écrivez une table dans MinIO

Ce qui suit générera de nouvelles données de voyage, les chargera dans un DataFrame et écrira le DataFrame que nous venons de créer dans MinIO en tant que table Hudi. mode(Overwrite) écrase et recrée la table dans le cas où elle existe déjà. Les données de voyage s'appuient sur une clé d'enregistrement ( uuid ), un champ de partition ( region/country/city ) et une logique ( ts ) pour garantir que les enregistrements de voyage sont uniques pour chaque partition. Nous utiliserons l'opération d'écriture par défaut, upsert . Lorsque vous avez une charge de travail sans mises à jour, vous pouvez utiliser insert ou bulk_insert , ce qui pourrait être plus rapide.


 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)


Ouvrez un navigateur et connectez-vous à MinIO sur http://<your-MinIO-IP>:<port> avec votre clé d'accès et votre clé secrète. Vous verrez la table Hudi dans le seau.


Console MinIO


Le compartiment contient également un chemin .hoodie qui contient des métadonnées, ainsi que des chemins americas et asia qui contiennent des données.


Métadonnées


Jetez un œil aux métadonnées. Voici à quoi ressemble mon chemin .hoodie après avoir terminé l'intégralité du didacticiel. On voit que j'ai modifié le tableau le mardi 13 septembre 2022 à 9h02, 10h37, 10h48, 10h52 et 10h56.


Le chemin .hoodie après avoir terminé le didacticiel

Données de requête

Chargeons les données Hudi dans un DataFrame et exécutons un exemple de requête.

 // spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Voyage dans le temps avec Hudi

Non, nous ne parlons pas d'aller voir un concert de Hootie and the Blowfish en 1988.


Chaque écriture dans les tables Hudi crée de nouveaux instantanés. Considérez les instantanés comme des versions de la table qui peuvent être référencées pour les requêtes de voyage dans le temps.


Essayez quelques requêtes de voyage dans le temps (vous devrez modifier les horodatages pour qu'ils soient pertinents pour vous).


 spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)

Mettre à jour les données

Ce processus est similaire à celui utilisé lorsque nous avons inséré de nouvelles données plus tôt. Pour montrer la capacité de Hudi à mettre à jour les données, nous allons générer des mises à jour des enregistrements de voyage existants, les charger dans un DataFrame, puis écrire le DataFrame dans la table Hudi déjà enregistrée dans MinIO.


Notez que nous utilisons le mode de sauvegarde append . Une ligne directrice générale consiste à utiliser le mode append , sauf si vous créez une nouvelle table afin qu'aucun enregistrement ne soit écrasé. Une façon typique de travailler avec Hudi consiste à ingérer des données en streaming en temps réel, en les ajoutant à la table, puis à écrire une logique qui fusionne et met à jour les enregistrements existants en fonction de ce qui vient d'être ajouté. Alternativement, l'écriture en mode overwrite supprime et recrée la table si elle existe déjà.


 // spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)


L'interrogation des données affichera les enregistrements de voyage mis à jour.

Requête incrémentielle

Hudi peut fournir un flux d'enregistrements modifiés depuis un horodatage donné à l'aide d'une requête incrémentielle. Tout ce que nous devons faire est de fournir une heure de début à partir de laquelle les modifications seront diffusées pour voir les modifications jusqu'à la validation actuelle, et nous pouvons utiliser une heure de fin pour limiter le flux.


Les requêtes incrémentielles sont un gros problème pour Hudi car elles vous permettent de créer des pipelines de streaming sur des données par lots.


 // spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

Requête ponctuelle

Hudi peut interroger des données à une heure et une date spécifiques.


 // spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

Suppression de données avec des suppressions logicielles

Hudi prend en charge deux manières différentes de supprimer des enregistrements. Une suppression logicielle conserve la clé d'enregistrement et annule les valeurs de tous les autres champs. Les suppressions logicielles sont conservées dans MinIO et supprimées uniquement du lac de données à l'aide d'une suppression matérielle.


 // spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

Suppression de données avec des suppressions matérielles

En revanche, les suppressions définitives sont ce que nous considérons comme des suppressions. La clé d'enregistrement et les champs associés sont supprimés de la table.


 // spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

Insérer un remplacement

Le lac de données devient un lac de données lorsqu'il acquiert la capacité de mettre à jour les données existantes. Nous allons générer de nouvelles données de voyage, puis écraser nos données existantes. Cette opération est plus rapide qu'un upsert où Hudi calcule la totalité de la partition cible en même temps pour vous. Ici, nous spécifions la configuration afin de contourner l'indexation, la précombinaison et le repartitionnement automatiques upsert ferait pour vous.


 // spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

Schéma de table Evolve et partitionnement

L'évolution du schéma vous permet de modifier le schéma d'une table Hudi pour l'adapter aux changements qui ont lieu dans les données au fil du temps.


Vous trouverez ci-dessous quelques exemples de la manière d'interroger et de faire évoluer le schéma et le partitionnement. Pour une discussion plus approfondie, veuillez consulter Schema Evolution | Apache Hudi . Notez que si vous exécutez ces commandes, elles modifieront votre schéma de table Hudi pour qu'il diffère de ce didacticiel.


 -- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');


Actuellement, SHOW partitions ne fonctionnent que sur un système de fichiers, car elles sont basées sur le chemin de la table du système de fichiers.


Ce didacticiel a utilisé Spark pour présenter les capacités de Hudi. Cependant, Hudi peut prendre en charge plusieurs types de tables/types de requêtes et les tables Hudi peuvent être interrogées à partir de moteurs de requête tels que Hive, Spark, Presto et bien plus encore. Le projet Hudi propose une vidéo de démonstration qui présente tout cela sur une configuration basée sur Docker avec tous les systèmes dépendants exécutés localement.

Huée! Huée! Construisons des lacs de données Hudi sur MinIO !

Apache Hudi a été le premier format de table ouvert pour les lacs de données et mérite d'être pris en considération dans les architectures de streaming. La communauté et l'écosystème Hudi sont vivants et actifs, avec un accent croissant sur le remplacement de Hadoop/HDFS par Hudi/stockage objet pour les lacs de données de streaming natifs dans le cloud. L'utilisation de MinIO pour le stockage Hudi ouvre la voie aux lacs de données et aux analyses multi-cloud. MinIO inclut une réplication active-active pour synchroniser les données entre les emplacements – sur site, dans le cloud public/privé et en périphérie – permettant aux entreprises de bénéficier des fonctionnalités essentielles dont elles ont besoin, comme l'équilibrage de charge géographique et le basculement rapide à chaud.


Essayez Hudi sur MinIO aujourd'hui. Si vous avez des questions ou souhaitez partager des conseils, veuillez nous contacter via notre chaîne Slack .


Également publié ici .