paint-brush
Su guía definitiva para la arquitectura Lakehouse con Iceberg y MinIOpor@minio
9,088 lecturas
9,088 lecturas

Su guía definitiva para la arquitectura Lakehouse con Iceberg y MinIO

por MinIO22m2023/08/08
Read on Terminal Reader

Demasiado Largo; Para Leer

Apache Iceberg parece haber conquistado el mundo de los datos. Inicialmente incubado en Netflix por Ryan Blue, finalmente se transmitió a Apache Software Foundation, donde reside actualmente. En esencia, es un formato de tabla abierta para conjuntos de datos analíticos a escala (piense en cientos de TB a cientos de PB). Es un formato compatible con varios motores. Lo que eso significa es que Spark, Trino, Flink, Presto, Hive e Impala pueden operar de forma independiente y simultánea en el conjunto de datos. Admite la lengua franca del análisis de datos, SQL, así como características clave como la evolución completa del esquema, la partición oculta, el viaje en el tiempo y la reversión y compactación de datos. Esta publicación se centra en cómo Iceberg y MinIO se complementan entre sí y cómo varios marcos analíticos (Spark, Flink, Trino, Dremio y Snowflake) pueden aprovechar los dos.
featured image - Su guía definitiva para la arquitectura Lakehouse con Iceberg y MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Iceberg parece haber conquistado el mundo de los datos. Inicialmente incubado en Netflix por Ryan Blue, finalmente se transmitió a Apache Software Foundation, donde reside actualmente. En esencia, es un formato de tabla abierta para conjuntos de datos analíticos a escala (piense en cientos de TB a cientos de PB).


Es un formato compatible con varios motores. Lo que eso significa es que Spark, Trino, Flink, Presto, Hive e Impala pueden operar de forma independiente y simultánea en el conjunto de datos. Admite la lengua franca del análisis de datos, SQL, así como características clave como la evolución completa del esquema, la partición oculta, el viaje en el tiempo y la reversión y compactación de datos.


Esta publicación se centra en cómo Iceberg y MinIO se complementan entre sí y cómo varios marcos analíticos (Spark, Flink, Trino, Dremio y Snowflake) pueden aprovechar los dos.

Fondo

Si bien Apache Hive fue un gran paso adelante para su época, finalmente comenzó a mostrar grietas a medida que las aplicaciones de análisis se volvieron más numerosas, diversas y sofisticadas. Para lograr el rendimiento, los datos debían permanecer en los directorios y esos directorios debían administrarse constantemente. Esto condujo a una base de datos de directorios. Eso resolvió el problema de dónde estaban los datos, pero introdujo el problema de cuál era el estado de esa tabla, que ahora estaba en dos lugares (la base de datos de directorios y el sistema de archivos).


Esto limitaba lo que podía hacer y la flexibilidad que existía, específicamente con respecto a los cambios, que no podían garantizarse en ambos lugares con una sola operación.


Imagine grandes cantidades de datos de varios años particionados en la fecha. Años divididos en meses y semanas y, si las semanas se dividen en días, y los días en horas y así sucesivamente, la lista del directorio explota. Hive Metastore (HMS) es un RDBMS transaccional. El sistema de archivos (HDFS) no es transaccional. Cuando se cambia la información de la partición, se requiere la recreación tanto del almacén de particiones como del sistema de archivos.


El problema era insostenible y ninguna cantidad de parches iba a resolver los problemas inherentes. De hecho, los desafíos solo se aceleraban con el crecimiento de los datos.

Objetivos para un formato moderno de mesa abierta

Uno de los puntos de venta clave en torno a la arquitectura de lago de datos es que admite múltiples motores y marcos analíticos. Por ejemplo, debe admitir tanto ELT (Extraer, Cargar, Transformar) como ETL (Extraer, Transformar, Cargar). Debe ser compatible con la inteligencia comercial, el análisis comercial y los tipos de cargas de trabajo de IA/ML. Debe interactuar con éxito con el mismo conjunto de tablas de una manera segura y predecible. Esto significa que múltiples motores como Spark, Flink, Trino, Arrow y Dask deben estar vinculados de alguna manera en una arquitectura cohesiva.


Una plataforma de múltiples motores que aloja datos de manera eficiente y permite que cada motor tenga éxito es lo que el mundo analítico ha estado anhelando y lo que ofrecen las arquitecturas Iceberg y Data Lakehouse.


Esto no es simple y conlleva muchos desafíos; no existe una forma sencilla de utilizar varios motores con una actualización fiable de los datos. Pero incluso ahora que tenemos dos o tres formatos que brindan actualizaciones confiables, todavía hay mucha confusión y hay problemas en esta área.



Los requisitos modernos se ven así:


  1. Almacenamiento de tabla central : almacenar los datos independientemente de la computación se convierte en una decisión arquitectónica crítica. La razón por la que es importante es porque los datos tienen gravedad y nos atraen hacia la ubicación de los datos. Entonces, si nuestros datos están completamente en un proveedor o proveedor de la nube, entonces estamos vinculados solo a ese proveedor o proveedor de la nube. Esto es inherentemente problemático cuando esos sistemas están cerrados o especializados en diseño. El software abierto se convierte en el requisito para las arquitecturas modernas.


  2. Cómputo portátil : otro requisito moderno es la capacidad de llevar sus motores de cómputo a un proveedor diferente/proveedor de la nube o aprovechar motores de cómputo especializados. Si bien muchos se enfocan en el centro de gravedad (datos), la empresa también necesita portabilidad para la lógica, el código y SQL.


  3. Control de acceso : la mayoría de las empresas enfrentan un gran desafío para tener una política de autorización consistente en todos los motores. Sin embargo, es más que una simple arquitectura, ya que la aplicación exitosa y repetible de estas políticas en múltiples motores se convierte en un imperativo operativo.


  4. Mantener la estructura : una de las mayores fuentes de trabajo humano que hemos visto en los últimos años es perder la estructura de datos a medida que se traslada a otro lugar. Un ejemplo perfecto solía ser Snowflake. El proceso de mover datos a Snowflake fue manual y la introducción de conjuntos de datos de terceros también resultó en un retrabajo debido a los diferentes formatos de archivo y cambios en los formatos durante el movimiento.

Apache Iceberg al rescate

Apache Iceberg está diseñado desde cero con la mayoría de los desafíos y objetivos mencionados anteriormente como base para implementar un formato de mesa abierta. Aborda los siguientes desafíos:


  1. Cómputo flexible
    • No mueva datos; varios motores deberían funcionar a la perfección

    • Admite trabajos por lotes, de transmisión y ad hoc

    • Admite código de muchos idiomas, no solo marcos JVM


  2. Comportamiento del almacén de SQL
    • Transacciones confiables con tablas SQL donde tenemos la capacidad de realizar operaciones CRUD de manera confiable

    • Separar las preocupaciones de las tablas reales proporciona que la segregación


Apache Iceberg mantiene sus registros en el almacenamiento de objetos, a diferencia de Apache Hive. Iceberg permite que varios motores aprovechen el comportamiento de SQL y está diseñado para tablas enormes. En producción, donde una sola tabla puede contener decenas de petabytes de datos, esto es muy importante. Incluso las tablas de varios petabytes se pueden leer desde un solo nodo, sin necesidad de un motor SQL distribuido para filtrar los metadatos de la tabla.


Fuente: https://iceberg.apache.org/spec/


Iceberg tiene una regla no escrita, ser invisible cuando se usa en la pila de Big Data. Esta filosofía proviene del espacio de tablas SQL, donde nunca pensamos en lo que hay debajo de las tablas SQL. Como cualquier profesional sabe, este simplemente no es el caso cuando se trabaja con tablas similares a Hadoop y Hive.


Iceberg lo mantiene simple de dos maneras. En primer lugar, evite sorpresas desagradables cuando se realicen cambios en las tablas. Por ejemplo, un cambio nunca debería recuperar datos que se eliminaron y eliminaron. En segundo lugar, Iceberg reduce el cambio de contexto, ya que no importa lo que hay debajo de la mesa, lo que importa es el trabajo que hay que hacer.

Entendiendo Iceberg FileIO

FileIO es la interfaz entre la biblioteca central de Iceberg y el almacenamiento subyacente. FileIO se creó como una forma de que Iceberg funcione en un mundo donde la computación y el almacenamiento distribuidos están desagregados. El ecosistema heredado de Hadoop requiere rutas jerárquicas y estructuras de partición que son, en la práctica, exactamente lo contrario de los métodos utilizados para lograr velocidad y escala en el mundo del almacenamiento de objetos.


Hadoop y Hive son antipatrones para el almacenamiento de objetos nativos de la nube escalable y de alto rendimiento. Las aplicaciones de lago de datos que dependen de la API de S3 para interactuar con MinIO pueden escalar fácilmente a miles de transacciones por segundo en millones o miles de millones de objetos. Puede aumentar el rendimiento de lectura y escritura procesando varias solicitudes simultáneas en paralelo. Esto se logra agregando prefijos (una cadena de caracteres que es un subconjunto del nombre de un objeto, comenzando con el primer carácter) a los depósitos y luego escribiendo operaciones paralelas, cada una abriendo una conexión por prefijo.


Además, la dependencia de Hadoop de los directorios del sistema de archivos no se traduce en almacenamiento de objetos; es difícil organizar físicamente conjuntos de datos en diferentes directorios y abordarlos por ruta cuando no existen rutas. Hadoop se basa en un sistema de archivos para definir el conjunto de datos y proporcionar mecanismos de bloqueo para la concurrencia y la resolución de conflictos. Además, en el ecosistema de Hadoop, los trabajos que procesan operaciones de cambio de nombre deben ser atómicos. Esto no es posible con la API de S3, ya que los cambios de nombre son en realidad dos operaciones: copiar y eliminar. Desafortunadamente, el resultado es que no hay aislamiento entre lectura y escritura, lo que posiblemente dé lugar a conflictos, colisiones e incoherencias.


Por el contrario, Iceberg fue diseñado para ejecutarse completamente abstraído del almacenamiento físico mediante el almacenamiento de objetos. Todas las ubicaciones son "explícitas, inmutables y absolutas" tal como se definen en los metadatos. Iceberg realiza un seguimiento del estado completo de la tabla sin el equipaje de los directorios de referencia. Es mucho más rápido usar metadatos para encontrar una tabla que enumerar toda la jerarquía con la API de S3. No hay cambios de nombre: una confirmación simplemente agrega nuevas entradas a la tabla de metadatos.


La API de FileIO realiza operaciones de metadatos durante las fases de planificación y confirmación. Las tareas usan FileIO para leer y escribir los archivos de datos subyacentes, y las ubicaciones de estos archivos se incluyen en los metadatos de la tabla durante una confirmación. Exactamente cómo el motor hace esto depende de la implementación de FileIO. Para entornos heredados, HadoopFileIO sirve como una capa de adaptador entre una implementación existente de Hadoop FileSystem y la API de FileIO dentro de Iceberg.


En cambio, nos centraremos en S3FileIO porque es una implementación nativa de S3. No necesitamos llevar Hadoop Cruft con nosotros cuando construimos nuestra casa del lago nativa de la nube. Según Iceberg FileIO: Cloud Native Tables , las ventajas de una implementación nativa de S3 incluyen:


  • Comportamiento del contrato: las implementaciones de Hadoop FileSystem tienen un comportamiento de contrato estricto que da como resultado solicitudes adicionales (comprobaciones de existencia, directorios y rutas sin conflicto) que agregan sobrecarga y complejidad. Iceberg utiliza rutas totalmente direccionables y únicas que evitan una complejidad adicional.

  • Cargas optimizadas: S3FileIO optimiza el almacenamiento/memoria mediante la carga progresiva de datos para minimizar el consumo de disco para tareas grandes y conserva un bajo consumo de memoria cuando se abren varios archivos para la salida.

  • Personalización del cliente de S3: el cliente utiliza la última versión principal del SDK de AWS (v2) y permite a los usuarios personalizar completamente el cliente para usarlo con S3 (incluido cualquier punto de conexión compatible con la API de S3).

  • Rendimiento de serialización: el procesamiento de tareas con HadoopFileIO requiere la serialización de la configuración de Hadoop, que es bastante grande y, en casos degenerados, puede ralentizar el procesamiento y generar más gastos generales que los datos procesados.

  • Dependencias reducidas: las implementaciones de Hadoop FileSystem introducen un gran árbol de dependencias y una implementación simplificada reduce la complejidad general del empaquetado.


Iceberg brinda integración con diferentes servicios de AWS a través del módulo iceberg-aws , incluido con los tiempos de ejecución de Spark y Flink para todas las versiones desde la 0.11.0 en adelante. Iceberg permite a los usuarios escribir datos en S3 a través de S3FileIO . Cuando se usa S3FileIO , los catálogos se configuran para usar la API de S3 mediante la propiedad de catálogo io-impl . S3FileIO adopta las últimas funciones de S3 para optimizar la seguridad (listas de control de acceso de S3, los tres modos de cifrado del lado del servidor de S3) y el rendimiento (cargas progresivas de varias partes) y, por lo tanto, se recomienda para casos de uso de almacenamiento de objetos.

Tutorial de Iceberg y MinIO

En este momento, Spark es el motor informático con más funciones para trabajar con Iceberg, por lo que este tutorial se centra en el uso de Spark y Spark-SQL para comprender los conceptos y las funciones de Iceberg. En Ubuntu 20.04, instalaremos y configuraremos Java, PostgreSQL como puntero de catálogo o metadatos, Spark y MinIO, mientras descargamos y configuramos cuidadosamente las dependencias de Java. Luego, ejecutaremos Spark-SQL para crear, completar, consultar y modificar una tabla. También veremos algunas de las cosas asombrosas que puede hacer con Iceberg, como la evolución del esquema, el trabajo con particiones ocultas, el viaje en el tiempo y la reversión. Después de cada paso, incluimos una captura de pantalla del cubo Iceberg en MinIO para que pueda ver lo que sucede detrás de escena.

requisitos previos

Descargue e inicie el servidor MinIO. Registre la dirección IP, el puerto TCP, la clave de acceso y la clave secreta.
Descargue e instale el cliente MinIO.


Use MinIO Client para establecer un alias y crear un depósito para Iceberg


 mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb minio/iceberg Bucket created successfully `myminio/iceberg`.


Deberá descargar y configurar Spark para usar los archivos Java (JAR) requeridos para habilitar varias funcionalidades como Hadoop, AWS S3 y JDBC. También deberá tener la versión correcta de cada archivo JAR y configuración requerido en PATH y CLASSPATH. Desafortunadamente, es muy fácil invocar diferentes versiones de JAR y perder el rastro de qué JAR está ejecutando y, por lo tanto, encontrar incompatibilidades que detengan el espectáculo.


Instale Java Runtime si aún no lo ha hecho. Para Ubuntu 20.04, el comando es


 sudo apt install curl mlocate default-jdk -y


Descargue y configure PostgreSQL para que se ejecute como un servicio del sistema


 sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update sudo apt-get -y install postgresql sudo systemctl start postgresql.service


Crearemos un rol icebergcat como superusuario, estableceremos la contraseña y crearemos una base de datos icebergcat


 sudo -u postgres createuser --interactive ALTER ROLE icebergcat PASSWORD 'minio'; sudo -u postgres createdb icebergcat


Inicie sesión en la base de datos para verificar que funcione, se le pedirá la contraseña:


 psql -U icebergcat -d icebergcat -W -h 127.0.0.1


Descargue, extraiga y mueva Apache Spark


 $ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz $ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz $ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark


Configure el entorno Spark agregando lo siguiente a ~/.bashrc y luego reiniciando el shell para aplicar los cambios.


 export SPARK_HOME=/opt/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin bash -l


Se requieren los siguientes archivos .jar. Descargue y copie los archivos .jar en cualquier ubicación requerida en la máquina Spark, por ejemplo /opt/spark/jars .


Se necesita aws-java-sdk-bundle/1.11.901.jar (o superior) para admitir el protocolo S3.


 $ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar


Se requiere iceberg-spark-runtime-3.2_2.12.jar .


 $ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar


Iniciar chispa

Inicie un servidor maestro independiente de Spark


 $ start-master.sh starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out


Abra un navegador y vaya a http: // Your-IPaddress:7077



Spark está vivo en spark://<Your-Machine-Name>:7077


Iniciar un proceso de trabajo de Spark

 $ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077 starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out

Spark SQL y Iceberg

Inicialice el entorno antes de iniciar Spark-SQL.


 export AWS_ACCESS_KEY_ID=minioadmin export AWS_SECRET_ACCESS_KEY=minioadmin export AWS_S3_ENDPOINT=10.0.0.10:9000 export AWS_REGION=us-east-1 export MINIO_REGION=us-east-1 export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" export AWS_SDK_VERSION=2.17.230 export AWS_MAVEN_GROUP=software.amazon.awssdk export AWS_PACKAGES=( "bundle" "url-connection-client" ) for pkg in "${AWS_PACKAGES[@]}"; do export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION" done


Ejecute el siguiente comando para iniciar Spark-SQL con Iceberg usando PostgreSQL para metadatos y compatibilidad con la API de S3, necesaria para MinIO. Alternativamente, puede establecer la configuración usando su archivo local spark-defaults.conf


 $ spark-sql --packages $DEPENDENCIES \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ --conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.password=minio \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=/home/iceicedata/spark-events \ --conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \ --conf spark.sql.catalogImplementation=in-memory


Algunas notas importantes sobre esta configuración


  • Declaramos un catálogo my_catalog que usa JDBC para conectarse a PostgreSQL en una dirección IP interna y usa la tabla icebergcat para los metadatos.
  • Luego configuramos la ubicación de nuestro almacén en el depósito MinIO que creamos anteriormente y configuramos Iceberg para usar S3FileIO para acceder a él.

Crear una tabla

A continuación, crearemos una tabla simple.


 CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg LOCATION 's3://iceberg' PARTITIONED BY (category);


Aquí hay una mejora de rendimiento masiva que ofrece Iceberg con S3FileIO. Es un gran alivio para aquellos de nosotros que sufrimos un rendimiento lento cuando usamos un diseño de almacenamiento Hive tradicional con S3 como resultado de la limitación de las solicitudes basadas en el prefijo del objeto. No es ningún secreto que la creación de una tabla Athena/Hive particionada en AWS S3 puede llevar entre 30 y 60 minutos. Iceberg usa de forma predeterminada el diseño de almacenamiento de Hive, pero se puede cambiar para usar ObjectStoreLocationProvider .


Con ObjectStoreLocationProvider , se genera un hash determinista para cada archivo almacenado, con el hash añadido directamente después de write.data.path . Esto garantiza que los archivos escritos en el almacenamiento de objetos compatibles con S3 se distribuyan por igual entre varios prefijos en el depósito de S3, lo que da como resultado una limitación mínima y un rendimiento máximo para las operaciones de E/S relacionadas con S3. Al usar ObjectStoreLocationProvider , tener un write.data.path breve y compartido en las tablas de Iceberg mejorará el rendimiento. Se ha hecho mucho más en Iceberg para mejorar el rendimiento y la confiabilidad con respecto a Hive .


 CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, 'write.data.path'='s3://iceberg') PARTITIONED BY (category);


Mirando la consola MinIO, vemos que se creó una ruta debajo de nuestro depósito iceberg para my_table



El depósito contiene una ruta metadata



En este punto, no hay datos en la tabla, solo hay metadatos que describen la tabla. También hay un puntero a estos metadatos almacenados en la tabla de catálogo Iceberg en PostgreSQL. Spark-SQL (el motor de consultas) busca en el catálogo de Iceberg ( my_catalog ) por nombre de tabla ( my_table ) y recupera el URI del archivo de metadatos actual.



Echemos un vistazo al primer archivo de metadatos, donde se almacena la información sobre el esquema, las particiones y las instantáneas de la tabla. Mientras se definen todas las instantáneas, el current-snapshot-id le dice al motor de consulta qué instantánea usar, luego el motor de consulta busca ese valor en la matriz snapshots , obtiene el valor de la manifest-list de esa instantánea y abre los archivos de manifiesto en ese lista, en orden. Tenga en cuenta que nuestro ejemplo solo tiene una instantánea porque la tabla se acaba de crear y ningún manifiesto porque aún no hemos insertado datos.


 { "format-version" : 1, "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363", "location" : "s3://iceberg/my_table", "last-updated-ms" : 1658795119167, "last-column-id" : 3, "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] }, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] } ], "partition-spec" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ] } ], "last-partition-id" : 1000, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "option.write.data.path" : "s3://iceberg/my_table", "owner" : "msarrel", "option.write.object-storage.enabled" : "true", "write.data.path" : "s3://iceberg/my_table", "write.object-storage.enabled" : "true" }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] }


A continuación, insertemos algunos datos simulados y observemos los archivos que Iceberg almacena en MinIO. Dentro del cubo iceberg , ahora hay prefijos my_table/metadata y my_table/data .


 INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video"); 



El prefijo de metadatos contiene el archivo de metadatos original, una lista de manifiestos y archivos de manifiestos. La lista de manifiesto es, lo adivinó, una lista de archivos de manifiesto. La lista de manifiestos contiene información sobre cada archivo de manifiesto que se incluye en cada instantánea: la ubicación del archivo de manifiesto, la instantánea que se agregó como resultado, información sobre la partición y los límites inferior y superior para las columnas de partición de los archivos de datos relacionados. Durante una consulta, el motor de consultas lee el valor de las ubicaciones de los archivos de manifiesto de la lista de manifiestos y abre los archivos de manifiesto apropiados. La lista de manifiestos está en formato AVRO.


Los archivos de manifiesto realizan un seguimiento de los archivos de datos e incluyen detalles y estadísticas precalculadas sobre cada archivo. Lo primero que se rastrea es el formato y la ubicación del archivo. Los archivos de manifiesto son la forma en que Iceberg elimina los datos de seguimiento al estilo de Hive por ubicación del sistema de archivos. Los archivos de manifiesto mejoran la eficiencia y el rendimiento de la lectura de archivos de datos al incluir detalles como la pertenencia a la partición, el recuento de registros y los límites inferior y superior de cada columna. Las estadísticas se escriben durante las operaciones de escritura y es más probable que sean oportunas, precisas y actualizadas que las estadísticas de Hive.



Cuando se envía una consulta SELECT, el motor de consultas obtiene la ubicación de la lista de manifiestos de la base de datos de metadatos. Luego, el motor de consulta lee el valor de las entradas file-path para cada objeto data-file y luego abre los archivos de datos para ejecutar la consulta.


A continuación se muestra el contenido del prefijo data , organizado por partición.



Dentro de la partición, hay un archivo de datos por fila de la tabla.



Ejecutemos una consulta de ejemplo:

 spark-sql> SELECT count(1) as count, data FROM my_catalog.my_table GROUP BY data; 1 a 1 b 1 c Time taken: 9.715 seconds, Fetched 3 row(s) spark-sql>


Ahora que comprendemos los diferentes componentes de una tabla Iceberg y cómo funciona el motor de consultas con ellos, profundicemos en las mejores funciones de Iceberg y cómo aprovecharlas en su lago de datos.

Evolución de la tabla

Los cambios de evolución del esquema como Agregar, Soltar, Cambiar nombre y Actualizar son cambios de metadatos, lo que significa que no es necesario cambiar/reescribir archivos de datos para realizar actualizaciones. Iceberg también garantiza que estos cambios en la evolución del esquema son independientes y no tienen efectos secundarios. Iceberg usa ID únicos para rastrear cada columna en una tabla con esto, si se agrega una nueva columna, nunca aprovechará una ID existente por error.


Las particiones de la tabla Iceberg se pueden actualizar en una tabla existente porque las consultas no hacen referencia directamente a los valores de partición. Cuando se escriben nuevos datos, utiliza una nueva especificación en un nuevo diseño, los datos escritos previamente con una especificación diferente permanecen sin cambios. Esto provoca una planificación dividida cuando escribe nuevas consultas. Para mejorar el rendimiento, Iceberg utiliza particiones ocultas para que los usuarios no necesiten escribir consultas para que un diseño de partición específico sea rápido. Los usuarios se enfocan en escribir consultas para los datos que necesitan y dejan que Iceberg elimine los archivos que no contienen los datos coincidentes.


Otra evolución que es muy útil es que el orden de clasificación de Iceberg también se puede actualizar en la tabla existente al igual que la especificación de partición. Diferentes motores pueden optar por escribir datos en el orden de clasificación más reciente en orden no clasificado cuando la clasificación es prohibitivamente costosa, los datos antiguos escritos con el orden de clasificación anterior permanecen sin cambios.


 spark-sql> ALTER TABLE my_catalog.my_table > RENAME my_catalog.my_table_2;


Las primeras veces que hagas esto, te sorprenderá lo rápido que es. Esto se debe a que no está reescribiendo una tabla, simplemente está operando con metadatos. En este caso, solo hemos cambiado table_name e Iceberg lo hizo por nosotros en aproximadamente una décima de segundo.



Otros cambios de esquema son igualmente indoloros:

 spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity; spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity; spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;

Particiones

Como mencionamos anteriormente, las particiones son compatibles con otros formatos de subárbol, sin embargo, Iceberg admite particiones ocultas que pueden manejar las tareas tediosas y propensas a errores de producir valores de partición para filas en una tabla. Los usuarios se enfocan en agregar filtros a las consultas que resuelven problemas comerciales y no se preocupan por cómo se divide la tabla. Iceberg se encarga de evitar lecturas de particiones innecesarias de forma automática.


Iceberg maneja las complejidades de la partición y el cambio del esquema de partición de una tabla por usted, lo que simplifica enormemente el proceso para los usuarios finales. Puede definir la partición o dejar que Iceberg se encargue de ello por usted. A Iceberg le gusta particionar en una marca de tiempo, como la hora del evento. Las particiones se rastrean mediante instantáneas en los manifiestos. Las consultas ya no dependen del diseño físico de una tabla. Debido a esta separación entre tablas físicas y lógicas, las tablas Iceberg pueden evolucionar en particiones con el tiempo a medida que se agregan más datos. Por ejemplo, volver a particionar una tabla de Hive requeriría crear una nueva tabla y leer datos antiguos en ella. También tendría que cambiar el valor de PARTICIÓN en cada consulta que ya haya escrito, no es divertido.


 spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category; ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;


Ahora tenemos dos esquemas de partición para la misma tabla. Lo que era imposible en Hive ha tenido lugar de forma transparente en Iceberg. A partir de ahora, los planes de consulta se dividen, utilizando el esquema de partición anterior para consultar datos antiguos y el esquema de partición nuevo para consultar datos nuevos. Iceberg se encarga de esto por usted: las personas que consultan la tabla no necesitan saber que los datos se almacenan utilizando dos esquemas de partición. Iceberg hace esto a través de una combinación de cláusulas WHERE detrás de escena y filtros de partición que eliminan los archivos de datos sin coincidencias.

Viaje en el tiempo y reversión

Cada escritura en las tablas de Iceberg crea nuevas instantáneas. Las instantáneas son como las versiones y se pueden usar para viajar en el tiempo y retroceder, tal como lo hacemos con las capacidades de control de versiones de MinIO. La forma en que se administran las instantáneas es configurando expireSnapshot para que el sistema se mantenga bien. El viaje en el tiempo permite realizar consultas reproducibles que utilizan exactamente la misma instantánea de la tabla o permite a los usuarios examinar fácilmente los cambios. La reversión de la versión permite a los usuarios corregir problemas rápidamente restableciendo las tablas a un buen estado.


A medida que se cambian las tablas, Iceberg realiza un seguimiento de cada versión como una instantánea y luego ofrece la posibilidad de viajar en el tiempo a cualquier instantánea al consultar la tabla. Esto puede ser muy útil si desea ejecutar consultas históricas o reproducir los resultados de consultas anteriores, tal vez para generar informes. El viaje en el tiempo también puede ser útil al probar nuevos cambios de código porque puede probar el nuevo código con una consulta de resultados conocidos.


Para ver las instantáneas que se han guardado para una tabla:

 spark-sql> SELECT * FROM my_catalog.my_table.snapshots; 2022-07-25 17:26:47.53 527713811620162549 NULL append s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"} Time taken: 7.236 seconds, Fetched 1 row(s)


Algunos ejemplos:

 -- time travel to October 26, 1986 at 01:21:00 spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00'; -- time travel to snapshot with id 10963874102873 spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;


Puede realizar lecturas incrementales mediante instantáneas, pero debe utilizar Spark, no Spark-SQL. Por ejemplo:

 scala> spark.read() .format(“iceberg”) .option(“start-snapshot-id”, “10963874102873”) .option(“end-snapshot-id”, “10963874102994”) .load(“s3://iceberg/my_table”)


También puede revertir la tabla a un punto en el tiempo o a una instantánea específica, como en estos dos ejemplos:

 spark-sql> CALL my_catalog.system.rollback_to_timestamp('my_table', TIMESTAMP '2022-07-25 12:15:00.000'); spark-sql> CALL my_catalog.system.rollback_to_snapshot('my_table', 527713811620162549);

SQL expresivo

Iceberg es compatible con todos los comandos SQL expresivos, como eliminar, combinar y actualizar el nivel de fila, y lo más importante a destacar es que Iceberg es compatible con las estrategias Eager y Lazy. Podemos codificar todas las cosas que necesitamos eliminar (por ejemplo, GDPR o CCPA), pero no volver a escribir todos esos archivos de datos de inmediato, podemos recolectar basura perezosamente según sea necesario y eso realmente ayuda a la eficiencia en las enormes tablas compatibles con Iceberg.


Por ejemplo, puede eliminar todos los registros de una tabla que coincidan con un predicado específico. Lo siguiente eliminará todas las filas de la categoría de video:

 spark-sql> DELETE FROM my_catalog.my_table WHERE category = 'video';


Alternativamente, puede usar CREAR TABLA COMO SELECCIONAR o REEMPLAZAR TABLA COMO SELECCIONAR para lograr esto:

 spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = 'music';


Puede fusionar dos tablas muy fácilmente:

 spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;

Ingeniería de datos

Iceberg es la base para el estándar de tablas analíticas abiertas y utiliza el comportamiento de SQL y una abstracción de tabla real a diferencia de otros formatos de tablas Hive y aplica los fundamentos del almacén de datos para solucionar los problemas antes de que sepamos que los tenemos. Con la ingeniería de datos declarativa podemos configurar tablas y no preocuparnos por cambiar cada motor para que se ajuste a las necesidades de los datos. Esto desbloquea la optimización y las recomendaciones automáticas. Con confirmaciones seguras, los servicios de datos son posibles, lo que ayuda a evitar que los humanos vigilen las cargas de trabajo de datos.


Para inspeccionar el historial, las instantáneas y otros metadatos de una tabla, Iceberg admite la consulta de metadatos. Las tablas de metadatos se identifican agregando el nombre de la tabla de metadatos (por ejemplo, historial) después del nombre de la tabla original en su consulta.


Para mostrar los archivos de datos de una tabla:

 spark-sql> SELECT * FROM my_catalog.my_table.files;


Para mostrar manifiestos:

 spark-sql> SELECT * FROM my_catalog.my_table.manifests;


Para mostrar el historial de la tabla:

 spark-sql> SELECT * FROM my_catalog.my_table.history;


Para mostrar instantáneas:

 spark-sql> SELECT * FROM my_catalog.my_table.snapshots;


También puede unir instantáneas e historial de tablas para ver la aplicación que escribió cada instantánea:

 spark-sql> select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from my_catalog.my_table.history h join my_catalog.my_table.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at;


Ahora que ha aprendido los conceptos básicos, cargue algunos de sus datos en Iceberg, luego obtenga más información de Spark and Iceberg Quickstart y Iceberg Documentation .

integraciones

Apache Iceberg tiene integraciones con varios motores de consulta y ejecución, donde estos conectores pueden crear y administrar las tablas de Apache Iceberg. Los motores que soportan Iceberg son Spark , Flink , Hive , Presto , Trino , Dremio , Snowflake .

Es genial construir lagos de datos con Iceberg y MinIO

Apache Iceberg recibe mucha atención como formato de tabla para lagos de datos. La creciente comunidad de código abierto y el creciente número de integraciones de múltiples proveedores de nube y marcos de aplicaciones significan que es hora de tomar Iceberg en serio, comenzar a experimentar, aprender y planificar su integración en la arquitectura de lago de datos existente. Combine Iceberg con MinIO para lagos de datos y análisis de múltiples nubes.


A medida que comience a usar Iceberg y MinIO, comuníquese y comparta sus experiencias o haga preguntas a través de nuestro canal de Slack .


También publicado aquí .