paint-brush
Creando un Data Lakehouse usando Apache Iceberg y MinIOpor@minio
6,998 lecturas
6,998 lecturas

Creando un Data Lakehouse usando Apache Iceberg y MinIO

por MinIO12m2023/09/14
Read on Terminal Reader

Demasiado Largo; Para Leer

La promesa de Data Lakehouses está en sus capacidades para datos estructurados y no estructurados, todo en una solución centralizada que utiliza Apache Iceberg y MinIO.
featured image - Creando un Data Lakehouse usando Apache Iceberg y MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

En una publicación anterior, proporcioné una introducción a Apache Iceberg y mostré cómo usa MinIO para el almacenamiento. También mostré cómo configurar una máquina de desarrollo. Para hacer esto, utilicé Docker Compose para instalar un contenedor Apache Spark como motor de procesamiento, un catálogo REST y MinIO para almacenamiento. Concluí con un ejemplo muy simple que usaba Apache Spark para ingerir datos y PyIceberg para consultarlos. Si es nuevo en Apache Iceberg o si necesita configurar Apache Iceberg en su máquina de desarrollo, lea esta publicación introductoria .


En esta publicación, continuaré donde lo dejó mi publicación anterior e investigaré un problema común de big data: la necesidad de una solución única para proporcionar almacenamiento para datos sin procesar, datos no estructurados y datos estructurados (datos que han sido seleccionados a partir de datos sin procesar). datos). Además, la misma solución debería proporcionar un motor de procesamiento que permita generar informes eficientes sobre los datos seleccionados. Esta es la promesa de Data Lakehouses: las capacidades de Data Warehouses para datos estructurados y las capacidades de Data Lakes para datos no estructurados, todo en una solución centralizada.


Veamos nuestro escenario de big data con más detalle.

Un problema común

El siguiente diagrama muestra un problema común y una solución hipotética. Los datos llegan a un centro de datos desde múltiples ubicaciones y en múltiples formatos. Lo que se necesita es una solución centralizada que permita transformar los datos sin procesar de modo que un motor de procesamiento pueda respaldar de manera eficiente la inteligencia empresarial, el análisis de datos y el aprendizaje automático. Al mismo tiempo, esta solución también debe ser capaz de almacenar datos no estructurados (texto, imágenes, audio y video) para la exploración de datos y el aprendizaje automático. También debe conservar los datos transformados en su formato original en caso de que sea necesario reproducir una transformación o investigar un problema de integridad de los datos.


Datos estructurados


Como ejemplo concreto, imaginemos un banco custodio global que gestiona fondos mutuos para sus clientes. Los datos que representan el libro de registros contables y el libro de registros de inversiones de cada fondo para cada cliente ingresan constantemente a Data Lakehouse desde geografías de todo el mundo. A partir de ahí, se deben realizar verificaciones de paso seguro (si se recibió todo lo enviado) y se deben ejecutar verificaciones de calidad de los datos. Finalmente, los datos se pueden dividir y cargar en otro almacén que admita informes de inicio y final del día.


Alternativamente, tal vez este diagrama represente un escenario de IOT donde las estaciones meteorológicas envían datos de temperatura y otros datos relacionados con el clima. Independientemente del escenario, lo que se necesita es una forma de almacenar los datos de forma segura en su formato original y luego transformar y procesar cualquier dato que deba almacenarse de una manera más estructurada, todo en una solución centralizada. Ésta es la promesa de Data Lakehouse: lo mejor de un Data Warehouse y un Data Lake combinados en una solución centralizada.


Hagamos real la solución hipotética descrita anteriormente. Esto se muestra en el siguiente diagrama.


Creando una casa de lago de datos


Hay dos componentes lógicos en nuestro Data Lakehouse. La primera es una implementación de Apache Iceberg para datos estructurados, el equivalente a un almacén de datos. (Esto es lo que construí en mi publicación anterior , por lo que no entraré en detalles aquí). El segundo componente lógico es MinIO para datos no estructurados: el lado del lago de datos de nuestro Data Lakehouse. Todos los datos que ingresan a Lakehouse se entregan a esta instancia lógica de MinIO. En realidad, las dos instancias lógicas de MinIO que se muestran arriba podrían ser la misma instancia de MinIO en su centro de datos. Si el clúster en el que está ejecutando MinIO puede manejar la ingesta de todos los datos entrantes y los requisitos de procesamiento de Apache Iceberg, entonces dicha implementación ahorrará dinero. De hecho, esto es lo que haré en este post. Usaré un depósito dentro de la instancia de MinIO de Apache Iceberg para contener todos los datos sin procesar y no estructurados.


Comencemos a jugar con los datos introduciendo el conjunto de datos que usaré para este ejercicio e incorporándolo a MinIO.

El conjunto de datos del resumen global del día

El conjunto de datos con el que experimentaremos en esta publicación es un conjunto de datos públicos conocido como Resumen de superficie global del día (GSOD), administrado por la Administración Nacional Oceánica y Atmosférica (NOAA). Actualmente, la NOAA mantiene datos de más de 9000 estaciones en todo el mundo y el conjunto de datos GSOD contiene información resumida por día de estas estaciones. Puedes descargar los datos aquí . Hay un archivo gzip por año. Comienza en 1929 y finaliza en 2022 (en el momento de escribir este artículo). Para construir nuestro Data Lakehouse, descargué el archivo de cada año y lo puse en la instancia MinIO que se utiliza para nuestro Data Lakehouse. Puse todos los archivos en un depósito llamado "lago". Los dos depósitos dentro de nuestra instancia de MinIO se muestran a continuación. El depósito "almacén" se creó cuando instalamos Apache Iceberg.


Conjunto de datos del día



Utilicé la consola MinIO para ingerir los datos sin procesar manualmente. En un proceso profesional, querrás hacer esto de forma automatizada. Consulte Cómo configurar Kafka y transmitir datos a MinIO en Kubernetes para ver cómo usar Kafka y Kubernetes para obtener datos en MinIO.


Estos archivos están empaquetados para facilitar su descarga: si intenta utilizarlos directamente para crear un informe o un gráfico, sería una operación que consumiría mucha E/S (y potencialmente un uso intensivo de la CPU). Imagine que desea trazar la temperatura promedio por año desde una estación específica. Para hacer esto, debe abrir cada archivo y buscar en cada fila, buscando las entradas que coincidan con su estación en el día de interés. Una mejor opción es utilizar nuestras capacidades de Data Lakehouses para seleccionar los datos e informar sobre los datos seleccionados. El primer paso es configurar un nuevo cuaderno Jupyter.

Configurar un cuaderno Jupyter

Primero, navegue hasta el servidor Jupyter Notebook que está instalado en el motor de procesamiento Apache Spark. Se puede encontrar en http://localhost:8888 . Cree un nuevo cuaderno y en la primera celda, agregue las importaciones que se muestran a continuación. (Todos los cuadernos completos creados en esta publicación se pueden encontrar aquí ).


 from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'


Observe que estamos importando la biblioteca MinIO. La computadora portátil que estamos construyendo es una canalización ETL desde el almacenamiento no estructurado (MinIO Data Lake) al almacenamiento estructurado (Apache Iceberg, que usa MinIO bajo el capó). El inicio de su computadora portátil debería verse así.


Importando la biblioteca MinIO


Ahora, podemos crear una base de datos y una tabla Iceberg para nuestros datos.

Crear una base de datos y una tabla Iceberg

Crear la base de datos y la tabla para el conjunto de datos GSOD es sencillo. El siguiente script creará la base de datos a la que llamaremos "noaa". Agregue esto en una celda después de las importaciones.


 %%sql CREATE DATABASE IF NOT EXISTS noaa;


El siguiente script creará la tabla gsod .

 %%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)


Mientras juegas con Apache Iceberg, a menudo querrás dejar caer una mesa para poder empezar un experimento de nuevo. El siguiente script eliminará la tabla gsod si desea cambiar algo en su configuración.


 %%sql DROP TABLE IF EXISTS noaa.gsod;

Ingesta de datos de MinIO a Iceberg

Ahora que tenemos los archivos zip sin formato basados en años en nuestro Lakehouse, podemos extraerlos, transformarlos y cargarlos en nuestro Data Lakehouse. Primero, introduzcamos algunas funciones auxiliares. La siguiente función devolverá una lista de objetos MinIO en un depósito específico que coincida con un prefijo.


 def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list


Tenga en cuenta que en el código anterior, se necesita un archivo de credenciales MinIO. Esto se puede obtener desde la consola MinIO. Si no sabe cómo obtener credenciales MinIO, hay una sección de esta publicación que muestra cómo generarlas y descargarlas.


A continuación, necesitamos una función para obtener un objeto de MinIO. Dado que los objetos son archivos tar, también necesitamos esta función para extraer datos del archivo tar y transformarlos en un Pandas DataFrame. Esto se hace usando la siguiente función.


 def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df


Ambas funciones son utilidades genéricas que se pueden reutilizar independientemente de lo que esté haciendo con MinIO. Considere ponerlos en su colección personal de fragmentos de código o en el Github Gist de su organización.


Ahora estamos listos para enviar datos al lado del almacén de nuestro Lakehouse. Esto se puede hacer con el siguiente código, que inicia una sesión de Spark, recorre todos los archivos tar de GSOD, los extrae, los transforma y los envía a nuestra tabla Iceberg.


 from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')


El código de esta sección cargó datos manualmente desde un depósito MinIO. En un entorno de producción, querrá implementar este código en un servicio y utilizar MinIO Bucket Events para la ingesta automatizada.

Consultando Iceberg Data Lakehouse usando PyIceberg

Comencemos un nuevo cuaderno para informar. La siguiente celda importa las utilidades que necesitaremos. Específicamente, usaremos PyIceberg para la recuperación de datos, Pandas para la manipulación de datos y Seaborn para visualizar datos.


 from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')


Lo que queremos hacer es calcular la temperatura media anual para una estación meteorológica determinada. Esto nos da un número por año y tiene en cuenta todas las estaciones del año. El primer paso es consultar a Iceberg todos los datos de una estación determinada. Esto se hace a continuación usando PyIceberg.


 tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)


La identificación de la estación utilizada en el código anterior es para una estación ubicada en el Aeropuerto Internacional Newark Liberty, Nueva Jersey, EE. UU. Está operativo desde 1973 (casi 50 años de datos). Cuando se ejecute el código, obtendrá el siguiente resultado. (Estoy usando la función head() de DataFrame para obtener una muestra).


Producción


A continuación, debemos agrupar por año y calcular la media. Usando Pandas, estas son unas pocas líneas de código. No se necesita ningún bucle.


 df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year


Una vez que se ejecute esta celda, verá un valor único para cada año. Los mejores años se muestran a continuación.


Agrupación por año


Finalmente, podemos visualizar nuestros promedios anuales. Usaremos Seaborn para crear un diagrama de líneas. Esto requiere solo una línea de código.


 sns.lineplot(data=df, x="year", y="temp", errorbar=None)


El gráfico de líneas se muestra a continuación.


Trazado lineal


A continuación se muestra otro comando que siempre debe ejecutar después de ejecutar un informe por primera vez.


 [task.file.file_path for task in sc.plan_files()]


Esta es una lista de comprensión que le brindará una lista de todos los archivos de datos en Apache Iceberg que tienen datos que coinciden con su consulta. Habrá muchos, aunque los metadatos de Iceberg pueden filtrar muchos. Ver todos los archivos involucrados nos recuerda el hecho de que el almacenamiento de objetos de alta velocidad es una parte importante de Lakehouse.

Resumen

En esta publicación, construimos un Data Lakehouse usando MinIO y Apache Iceberg. Hicimos esto usando el conjunto de datos GSOD. Primero, los datos sin procesar se cargaron en el lado del lago de nuestro Data Lakehouse (MinIO). A partir de ahí, creamos una base de datos y una tabla en Apache Iceberg (el lado del almacén de datos de nuestro Data Lakehouse). Luego, creamos una canalización ETL simple para mover datos del lago al almacén dentro de Data Lakehouse.


Una vez que tuvimos Apache Iceberg completamente lleno de datos, pudimos crear un informe de temperatura promedio anual y visualizarlo.


Tenga en cuenta que si desea crear un Data Lakehouse en producción, necesitará las funciones empresariales de MinIO. Considere la posibilidad de consultar la gestión del ciclo de vida de los objetos , las mejores prácticas de seguridad , la transmisión Kafka y los eventos de depósito .


También publicado aquí .