Dans un article précédent, j'ai fourni une introduction à Apache Iceberg et montré comment il utilise MinIO pour le stockage. J'ai également montré comment mettre en place une machine de développement. Pour ce faire, j'ai utilisé Docker Compose pour installer un conteneur Apache Spark comme moteur de traitement, un catalogue REST et MinIO pour le stockage. J'ai conclu avec un exemple très simple utilisant Apache Spark pour ingérer des données et PyIceberg pour interroger les données. Si vous êtes nouveau sur Apache Iceberg ou si vous devez configurer Apache Iceberg sur votre machine de développement, lisez cet article d'introduction .
Dans cet article, je vais reprendre là où mon article précédent s'est arrêté et étudier un problème courant du Big Data : la nécessité d'une solution unique pour fournir le stockage des données brutes, des données non structurées et des données structurées (données qui ont été organisées à partir de données brutes). données). De plus, la même solution doit fournir un moteur de traitement permettant d'établir des rapports efficaces sur les données conservées. C'est la promesse des Data Lakehouses - les capacités des Data Warehouses pour les données structurées et les capacités des Data Lakes pour les données non structurées - le tout dans une solution centralisée.
Examinons plus en détail notre scénario Big Data.
Le diagramme ci-dessous décrit un problème courant et une solution hypothétique. Les données arrivent dans un centre de données depuis plusieurs emplacements et dans plusieurs formats. Ce qu'il faut, c'est une solution centralisée qui permette de transformer les données brutes de telle sorte qu'un moteur de traitement puisse prendre en charge efficacement la business intelligence, l'analyse des données et l'apprentissage automatique. Dans le même temps, cette solution doit également être capable de stocker des données non structurées (texte, images, audio et vidéo) pour l'exploration des données et l'apprentissage automatique. Il doit également conserver toutes les données transformées dans leur format d'origine au cas où une transformation devrait être rejouée ou qu'un problème d'intégrité des données devrait être étudié.
À titre d’exemple concret, imaginez une banque dépositaire mondiale qui gère des fonds communs de placement pour ses clients. Les données représentant le livre des registres comptables et le livre des registres d'investissement pour chaque fonds et pour chaque client sont diffusées en permanence dans Data Lakehouse depuis des zones géographiques du monde entier. À partir de là, des contrôles de passage sécurisé doivent avoir lieu (tout ce qui a été envoyé a-t-il été reçu) et des contrôles de qualité des données doivent être effectués. Enfin, les données peuvent être partitionnées et chargées dans un autre magasin qui prendra en charge les rapports de début et de fin de journée.
Alternativement, ce diagramme représente peut-être un scénario IOT dans lequel les stations météorologiques envoient des données de température et d'autres données météorologiques. Quel que soit le scénario, il faut trouver un moyen de stocker les données en toute sécurité dans leur format d'origine, puis de transformer et de traiter toutes les données qui doivent être stockées de manière plus structurée, le tout dans une solution centralisée. C'est la promesse d'un Data Lakehouse : le meilleur d'un Data Warehouse et d'un Data Lake combinés en une seule solution centralisée.
Rendons réelle la solution hypothétique décrite ci-dessus. Ceci est représenté dans le diagramme ci-dessous.
Il y a deux composants logiques dans notre Data Lakehouse. La première est une implémentation d’Apache Iceberg pour les données structurées – l’équivalent d’un Data Warehouse. (C'est ce que j'ai construit dans mon article précédent - je n'entrerai donc pas dans les détails ici.) Le deuxième composant logique est MinIO pour les données non structurées - le côté Data Lake de notre Data Lakehouse. Toutes les données entrant dans Lakehouse sont transmises à cette instance logique de MinIO. En réalité, les deux instances logiques de MinIO présentées ci-dessus pourraient être la même instance de MinIO dans votre centre de données. Si le cluster dans lequel vous exécutez MinIO peut gérer l'ingestion de toutes les données entrantes et les exigences de traitement d'Apache Iceberg, un tel déploiement permettra d'économiser de l'argent. C’est en fait ce que je vais faire dans cet article. J'utiliserai un compartiment dans l'instance MinIO d'Apache Iceberg pour contenir toutes les données non structurées et brutes.
Commençons à jouer avec les données en introduisant l'ensemble de données que je vais utiliser pour cet exercice et en l'ingérant dans MinIO.
L'ensemble de données que nous allons expérimenter dans cet article est un ensemble de données public connu sous le nom de Global Surface Summary of the Day (GSOD), qui est géré par la National Oceanic and Atmospheric Administration (NOAA). La NOAA conserve actuellement les données de plus de 9 000 stations à travers le monde et l'ensemble de données GSOD contient des informations récapitulatives quotidiennes provenant de ces stations. Vous pouvez télécharger les données ici . Il existe un fichier gzip par an. Il commence en 1929 et se termine en 2022 (au moment d’écrire ces lignes). Pour construire notre Data Lakehouse, j'ai téléchargé le fichier de chaque année et l'ai placé dans l'instance MinIO utilisée pour notre Data Lakehouse. J'ai mis tous les fichiers dans un compartiment nommé « lake ». Les deux compartiments de notre instance de MinIO sont présentés ci-dessous. Le bucket « entrepôt » a été créé lorsque nous avons installé Apache Iceberg.
J'ai utilisé la console MinIO pour ingérer manuellement les données brutes. Dans un pipeline professionnel, vous souhaiterez le faire de manière automatisée. Découvrez Comment configurer Kafka et diffuser des données vers MinIO dans Kubernetes pour voir comment utiliser Kafka et Kubernetes pour importer des données dans MinIO.
Ces fichiers sont conditionnés pour faciliter le téléchargement : si vous essayez de les utiliser directement pour créer un rapport ou un graphique, cela représenterait une opération très gourmande en E/S (et potentiellement gourmande en CPU). Imaginez que vous souhaitiez tracer la température moyenne annuelle à partir d'une station spécifiée. Pour ce faire, vous devez ouvrir chaque fichier et parcourir chaque ligne, en recherchant les entrées qui correspondent à votre station le jour qui vous intéresse. Une meilleure option consiste à utiliser nos capacités Data Lakehouses pour organiser les données et créer des rapports sur les données organisées. La première étape consiste à configurer un nouveau notebook Jupyter.
Tout d’abord, accédez au serveur Jupyter Notebook installé dans le moteur de traitement Apache Spark. Il peut être trouvé sur http://localhost:8888 . Créez un nouveau bloc-notes et dans la première cellule, ajoutez les importations indiquées ci-dessous. (Tous les cahiers complétés créés dans cet article peuvent être trouvés ici .)
from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'
Notez que nous importons la bibliothèque MinIO. Le notebook que nous construisons est un pipeline ETL allant du stockage non structuré (MinIO Data Lake) au stockage structuré (Apache Iceberg, qui utilise MinIO sous le capot.) Le début de votre notebook devrait ressembler à ceci.
Maintenant, nous pouvons créer une base de données Iceberg et un tableau pour nos données.
La création de la base de données et de la table pour l'ensemble de données GSOD est simple. Le script ci-dessous créera la base de données que nous nommerons « noaa ». Ajoutez ceci dans une cellule après les importations.
%%sql CREATE DATABASE IF NOT EXISTS noaa;
Le script ci-dessous créera la table gsod
.
%%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)
Lorsque vous jouez avec Apache Iceberg, vous souhaiterez souvent supprimer une table pour pouvoir recommencer une expérience. Le script ci-dessous supprimera la table gsod
si vous souhaitez modifier quoi que ce soit dans sa configuration.
%%sql DROP TABLE IF EXISTS noaa.gsod;
Maintenant que nous avons les fichiers zip bruts par année dans notre Lakehouse, nous pouvons les extraire, les transformer et les charger dans notre Data Lakehouse. Introduisons d'abord quelques fonctions d'assistance. La fonction ci-dessous renverra une liste d'objets MinIO dans un compartiment spécifié qui correspond à un préfixe.
def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list
Notez que dans le code ci-dessus, un fichier d'informations d'identification MinIO est nécessaire. Ceci peut être obtenu à partir de la console MinIO. Si vous ne savez pas comment obtenir les informations d'identification MinIO, une section de cet article explique comment les générer et les télécharger.
Ensuite, nous avons besoin d'une fonction pour obtenir un objet de MinIO. Étant donné que les objets sont des fichiers tar, nous avons également besoin de cette fonction pour extraire les données de l'archive tar et les transformer en Pandas DataFrame. Cela se fait en utilisant la fonction ci-dessous.
def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df
Ces deux fonctions sont des utilitaires génériques qui peuvent être réutilisés indépendamment de ce que vous faites avec MinIO. Pensez à les mettre dans votre collection personnelle d'extraits de code ou dans le Github Gist de votre organisation.
Nous sommes maintenant prêts à envoyer des données du côté entrepôt de notre Lakehouse. Cela peut être fait avec le code ci-dessous, qui démarre une session Spark, parcourt tous les fichiers tar GSOD, les extrait, les transforme et les envoie à notre table Iceberg.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')
Le code de cette section a chargé manuellement les données d'un compartiment MinIO. Dans un environnement de production, vous souhaiterez déployer ce code dans un service et utiliser les événements MinIO Bucket pour l'ingestion automatisée.
Commençons un nouveau bloc-notes pour les rapports. La cellule ci-dessous importe les utilitaires dont nous aurons besoin. Plus précisément, nous utiliserons PyIceberg pour la récupération de données, Pandas pour la gestion des données et Seaborn pour la visualisation des données.
from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')
Ce que nous voulons faire, c'est calculer la température moyenne annuelle pour une station météo donnée. Cela nous donne un numéro par an et prend en compte toutes les saisons de l'année. La première étape consiste à interroger Iceberg pour toutes les données d’une station donnée. Cela se fait ci-dessous en utilisant PyIceberg.
tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)
L'identifiant de la station utilisé dans le code ci-dessus correspond à une station située à l'aéroport international de Newark Liberty, dans le New Jersey, aux États-Unis. Il est opérationnel depuis 1973 (près de 50 ans de données). Lorsque le code s'exécutera, vous obtiendrez le résultat ci-dessous. (J'utilise la fonction DataFrame head() pour obtenir un échantillon.)
Ensuite, nous devons regrouper par année et calculer la moyenne. En utilisant Pandas, il s'agit de quelques lignes de code. Aucune boucle n’est nécessaire.
df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year
Une fois cette cellule exécutée, vous verrez une valeur unique pour chaque année. Les premières années sont présentées ci-dessous.
Enfin, nous pouvons visualiser nos moyennes annuelles. Nous utiliserons Seaborn pour créer un tracé linéaire. Cela ne prend qu'une seule ligne de code.
sns.lineplot(data=df, x="year", y="temp", errorbar=None)
Le tracé linéaire est présenté ci-dessous.
Une autre commande que vous devez toujours exécuter après avoir exécuté un rapport pour la première fois est ci-dessous.
[task.file.file_path for task in sc.plan_files()]
Il s'agit d'une compréhension de liste qui vous donnera une liste de tous les fichiers de données dans Apache Iceberg dont les données correspondent à votre requête. Il y en aura beaucoup, même si les métadonnées d'Iceberg peuvent en filtrer beaucoup. Voir tous les fichiers impliqués fait ressortir le fait que le stockage d’objets à haute vitesse est un élément important d’un Lakehouse.
Dans cet article, nous avons construit un Data Lakehouse en utilisant MinIO et Apache Iceberg. Nous l'avons fait en utilisant l'ensemble de données GSOD. Tout d’abord, les données brutes ont été téléchargées du côté lac de notre Data Lakehouse (MinIO). À partir de là, nous avons créé une base de données et une table dans Apache Iceberg (le côté Data Warehouse de notre Data Lakehouse). Ensuite, nous avons construit un simple pipeline ETL pour déplacer les données du Lake vers l'entrepôt au sein du Data Lakehouse.
Une fois Apache Iceberg entièrement rempli de données, nous avons pu créer un rapport de température annuelle moyenne et le visualiser.
Gardez à l’esprit que si vous souhaitez créer un Data Lakehouse en production, vous aurez besoin des fonctionnalités d’entreprise de MinIO. Pensez à examiner la gestion du cycle de vie des objets , les meilleures pratiques de sécurité , le streaming Kafka et les événements de compartiment .
Également publié ici .