O Apache Iceberg parece ter conquistado o mundo dos dados. Inicialmente incubado na Netflix por Ryan Blue, acabou sendo transmitido para a Apache Software Foundation, onde reside atualmente. Em sua essência, é um formato de tabela aberta para conjuntos de dados analíticos em escala (pense em centenas de TBs a centenas de PBs).
É um formato compatível com vários mecanismos. Isso significa que Spark, Trino, Flink, Presto, Hive e Impala podem operar de forma independente e simultânea no conjunto de dados. Ele suporta a língua franca da análise de dados, SQL, bem como recursos importantes como evolução completa do esquema, particionamento oculto, viagem no tempo e reversão e compactação de dados.
Esta postagem se concentra em como o Iceberg e o MinIO se complementam e como várias estruturas analíticas (Spark, Flink, Trino, Dremio e Snowflake) podem aproveitar os dois.
Embora o Apache Hive tenha sido um grande passo à frente para a época, ele começou a mostrar falhas à medida que os aplicativos analíticos se tornavam mais numerosos, diversos e sofisticados. Para obter desempenho, os dados precisavam permanecer nos diretórios e esses diretórios precisavam ser constantemente gerenciados. Isso levou a um banco de dados de diretórios. Isso resolveu o problema de onde estavam os dados, mas introduziu o problema de qual era o estado dessa tabela — que agora estava em dois locais (banco de dados de diretórios e sistema de arquivos).
Isso limitava o que você podia fazer e a flexibilidade que existia — especificamente no que diz respeito às mudanças, que não podiam ser garantidas em ambos os lugares com uma única operação.
Imagine grandes quantidades de dados de vários anos particionados na data. Anos particionados em meses e semanas e, se as semanas forem particionadas em dias, e os dias em horas e assim por diante — a lista de diretórios explode. O Hive Metastore (HMS) é um RDBMS transacional. O sistema de arquivos (HDFS) não é transacional. Quando as informações da partição são alteradas, é necessário recriar o armazenamento da partição e o sistema de arquivos.
O problema era insustentável e nenhum patch resolveria os problemas inerentes. Na verdade, os desafios estavam apenas se acelerando com o crescimento dos dados.
Um dos principais pontos de venda em torno da arquitetura de data lakehouse é que ela oferece suporte a vários mecanismos e estruturas analíticas. Por exemplo, você precisa oferecer suporte a ELT (Extrair, Carregar, Transformar) e ETL (Extrair, Transformar, Carregar). Você precisa oferecer suporte a inteligência de negócios, análise de negócios e tipos de cargas de trabalho de IA/ML. Você precisa interagir com sucesso com o mesmo conjunto de tabelas de maneira segura e previsível. Isso significa que vários motores, como Spark, Flink, Trino, Arrow e Dask, precisam estar de alguma forma ligados a uma arquitetura coesa.
Uma plataforma de vários mecanismos que armazena dados de forma eficiente e, ao mesmo tempo, permite que cada mecanismo seja bem-sucedido é o que o mundo analítico anseia e o que as arquiteturas Iceberg e Data Lakehouse oferecem.
Isso não é simples e há muitos desafios; não há uma maneira fácil de usar vários mecanismos com atualização confiável dos dados. Mas mesmo agora que temos dois ou três formatos que fornecem atualizações confiáveis, ainda há muita confusão e problemas nessa área.
Os requisitos modernos são assim:
Armazenamento de tabela central : armazenar os dados independentemente da computação torna-se uma decisão arquitetônica crítica. A razão pela qual isso importa é porque os dados têm gravidade e nos puxam para o local dos dados. Portanto, se nossos dados estiverem inteiramente em um fornecedor ou provedor de nuvem, estaremos vinculados apenas a esse fornecedor ou provedor de nuvem. Isso é inerentemente problemático quando esses sistemas são fechados ou especializados em design. Software aberto torna-se o requisito para arquiteturas modernas.
Computação portátil : outro requisito moderno é a capacidade de levar seus mecanismos de computação para um fornecedor/fornecedor de nuvem diferente ou alavancar mecanismos de computação especializados. Embora muitos se concentrem no centro de gravidade (dados), a empresa também precisa de portabilidade para lógica, código e SQL.
Controle de Acesso : A maioria das empresas tem um grande desafio de ter uma política de autorização consistente entre os mecanismos. No entanto, é mais do que apenas arquitetura, pois a aplicação bem-sucedida e repetível dessas políticas em vários mecanismos torna-se um imperativo operacional.
Manter a estrutura : uma das maiores fontes de trabalho humano que vimos nos últimos anos é a perda da estrutura de dados conforme ela é movida para outro lugar. Um exemplo perfeito costumava ser Snowflake. O processo de mover dados para o Snowflake era manual e a introdução de conjuntos de dados de terceiros também resultou em retrabalho devido a diferentes formatos de arquivo e alterações nos formatos durante a movimentação.
O Apache Iceberg foi projetado desde o início com a maioria dos desafios e objetivos mencionados acima como base para implementar um formato de tabela aberta. Ele aborda os seguintes desafios:
Não mova dados; vários mecanismos devem funcionar perfeitamente
Oferece suporte a trabalhos em lote, streaming e ad hoc
Código de suporte de vários idiomas, não apenas estruturas JVM
Transações confiáveis com tabelas SQL onde temos a capacidade de realizar operações CRUD de forma confiável
Separar preocupações de tabelas reais fornece essa segregação
O Apache Iceberg mantém seus registros no armazenamento de objetos — ao contrário do Apache Hive. O Iceberg permite que o comportamento do SQL seja aproveitado por vários mecanismos e é projetado para tabelas enormes. Na produção, onde uma única tabela pode conter dezenas de petabytes de dados, isso é muito importante. Mesmo tabelas de vários petabytes podem ser lidas de um único nó, sem a necessidade de um mecanismo SQL distribuído para filtrar os metadados da tabela.
Fonte: https://iceberg.apache.org/spec/
O Iceberg tem uma regra não escrita, para ser invisível ao ser usado na pilha de Big Data. Essa filosofia vem do espaço de tabelas SQL, onde nunca pensamos no que está por baixo das tabelas SQL. Como qualquer profissional sabe, esse simplesmente não é o caso ao trabalhar com tabelas do tipo Hadoop e Hive.
Iceberg simplifica de duas maneiras. Primeiro, evite surpresas desagradáveis quando forem feitas alterações nas tabelas. Por exemplo, uma alteração nunca deve trazer de volta dados que foram excluídos e removidos. Em segundo lugar, o Iceberg reduz a troca de contexto, pois o que está embaixo da mesa não importa — o que importa é o trabalho a ser feito.
FileIO é a interface entre a biblioteca principal do Iceberg e o armazenamento subjacente. O FileIO foi criado como uma maneira de o Iceberg funcionar em um mundo onde a computação e o armazenamento distribuídos são desagregados. O ecossistema legado do Hadoop requer caminhos hierárquicos e estruturas de partição que são, na prática, exatamente o oposto dos métodos usados para obter velocidade e escala no mundo do armazenamento de objetos.
Hadoop e Hive são antipadrões para armazenamento de objetos nativos da nuvem escalonável e de alto desempenho. Os aplicativos de data lake que dependem da API do S3 para interagir com o MinIO podem escalar facilmente para milhares de transações por segundo em milhões ou bilhões de objetos. Você pode aumentar o desempenho de leitura e gravação processando várias solicitações simultâneas em paralelo. Você consegue isso adicionando prefixos — uma cadeia de caracteres que é um subconjunto de um nome de objeto, começando com o primeiro caractere — a blocos e, em seguida, escrevendo operações paralelas, cada uma abrindo uma conexão por prefixo.
Além disso, a dependência do Hadoop em diretórios do sistema de arquivos não se traduz em armazenamento de objetos – é difícil organizar fisicamente conjuntos de dados em diretórios diferentes e endereçá-los por caminho quando caminhos não existem. O Hadoop depende de um sistema de arquivos para definir o conjunto de dados e fornecer mecanismos de bloqueio para simultaneidade e resolução de conflitos. Além disso, no ecossistema Hadoop, os trabalhos que processam as operações de renomeação devem ser atômicos. Isso não é possível usando a API do S3, pois as renomeações são, na verdade, duas operações: copiar e excluir. Infelizmente, o resultado é que não há isolamento entre ler e escrever, possivelmente dando origem a conflitos, colisões e inconsistências.
Em contraste, o Iceberg foi projetado para ser executado completamente abstraído do armazenamento físico usando o armazenamento de objetos. Todos os locais são “explícitos, imutáveis e absolutos”, conforme definido nos metadados. O Iceberg rastreia o estado completo da tabela sem a bagagem de diretórios de referência. É muito mais rápido usar metadados para localizar uma tabela do que listar toda a hierarquia usando a API do S3. Não há renomeações — um commit simplesmente adiciona novas entradas à tabela de metadados.
A API FileIO realiza operações de metadados durante as fases de planejamento e confirmação. As tarefas usam FileIO para ler e gravar os arquivos de dados subjacentes, e os locais desses arquivos são incluídos nos metadados da tabela durante uma confirmação. Exatamente como o mecanismo faz isso depende da implementação do FileIO. Para ambientes legados, HadoopFileIO
serve como uma camada de adaptador entre uma implementação Hadoop FileSystem existente e a API FileIO no Iceberg.
Em vez disso, vamos nos concentrar no S3FileIO
porque é uma implementação nativa do S3. Não precisamos carregar lixo do Hadoop conosco quando construímos nossa casa do lago nativa da nuvem. De acordo com Iceberg FileIO: Cloud Native Tables , as vantagens de uma implementação S3 nativa incluem:
Comportamento do contrato: as implementações do Hadoop FileSystem têm um comportamento de contrato rígido, resultando em solicitações adicionais (verificações de existência, diretórios e caminhos de eliminação de conflitos) que adicionam sobrecarga e complexidade. O Iceberg usa caminhos totalmente endereçáveis e exclusivos que evitam complexidade adicional.
Uploads otimizados: S3FileIO
otimiza o armazenamento/memória ao fazer o upload progressivo de dados para minimizar o consumo de disco para grandes tarefas e preserva o baixo consumo de memória quando vários arquivos são abertos para saída.
Personalização do cliente S3: o cliente usa a versão principal mais recente do AWS SDK (v2) e permite que os usuários personalizem totalmente o cliente para uso com o S3 (incluindo qualquer endpoint compatível com a API do S3).
Desempenho de serialização: o processamento de tarefas com HadoopFileIO
requer a serialização da configuração do Hadoop, que é bastante grande e, em casos degenerados, pode retardar o processamento e resultar em mais sobrecarga do que os dados processados.
Dependências reduzidas: as implementações do Hadoop FileSystem introduzem uma grande árvore de dependências e uma implementação simplificada reduz a complexidade geral do empacotamento.
O Iceberg fornece integração com diferentes serviços da AWS por meio do módulo iceberg-aws , junto com os tempos de execução Spark e Flink para todas as versões de 0.11.0
em diante. O Iceberg permite que os usuários gravem dados no S3 por meio S3FileIO
. Ao usar S3FileIO
, os catálogos são configurados para usar a API S3 usando a propriedade de catálogo io-impl
. S3FileIO
adota os recursos S3 mais recentes para segurança otimizada (listas de controle de acesso S3, todos os três modos de criptografia do lado do servidor S3) e desempenho (uploads multipartes progressivos) e, portanto, é recomendado para casos de uso de armazenamento de objetos.
No momento, o Spark é o mecanismo de computação com mais recursos para trabalhar com o Iceberg, portanto, este tutorial se concentra no uso do Spark e do Spark-SQL para entender os conceitos e os recursos do Iceberg. No Ubuntu 20.04, instalaremos e configuraremos Java, PostgreSQL como um ponteiro de catálogo ou metadados, Spark e MinIO – enquanto baixamos e configuramos cuidadosamente as dependências do Java. Em seguida, executaremos o Spark-SQL para criar, preencher, consultar e modificar uma tabela. Também abordaremos algumas das coisas incríveis que você pode fazer com o Iceberg, como evolução de esquema, trabalho com partições ocultas, viagem no tempo e reversão. Após cada etapa, incluímos uma captura de tela do balde Iceberg no MinIO para que você possa ver o que está acontecendo nos bastidores.
Baixe e inicie o MinIO Server. Registre o endereço IP, a porta TCP, a chave de acesso e a chave secreta.
Baixe e instale o MinIO Client.
Use o MinIO Client para definir um alias e criar um balde para o Iceberg
mc alias set minio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb minio/iceberg Bucket created successfully `myminio/iceberg`.
Você precisará baixar e configurar o Spark para usar os Java Archives (JARs) necessários para habilitar várias funcionalidades como Hadoop, AWS S3 e JDBC. Você também precisará ter a versão correta de cada JAR necessário e arquivo de configuração em PATH e CLASSPATH. É, infelizmente, muito fácil invocar diferentes versões de JARs e perder o controle de qual JAR você está executando e, portanto, encontrar incompatibilidades de parar o show.
Instale o Java Runtime, caso ainda não o tenha feito. Para o Ubuntu 20.04, o comando é
sudo apt install curl mlocate default-jdk -y
Baixe e configure o PostgreSQL para ser executado como um serviço do sistema
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update sudo apt-get -y install postgresql sudo systemctl start postgresql.service
Vamos criar uma função icebergcat
como superusuário, definir a senha e criar um banco de dados icebergcat
sudo -u postgres createuser --interactive ALTER ROLE icebergcat PASSWORD 'minio'; sudo -u postgres createdb icebergcat
Faça login no banco de dados para verificar se está funcionando, você será solicitado a fornecer a senha:
psql -U icebergcat -d icebergcat -W -h 127.0.0.1
Baixe, extraia e mova o Apache Spark
$ wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz $ tar zxvf spark-3.2.1-bin-hadoop3.2.tgz $ sudo mv spark-3.2.1-bin-hadoop3.2/ /opt/spark
Defina o ambiente Spark adicionando o seguinte a ~/.bashrc
e reiniciando o shell para aplicar as alterações.
export SPARK_HOME=/opt/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin bash -l
Os arquivos .jar a seguir são necessários. Faça download e copie os arquivos .jar em qualquer local necessário na máquina Spark, por exemplo /opt/spark/jars
.
aws-java-sdk-bundle/1.11.901.jar (ou superior) é necessário para dar suporte ao protocolo S3.
$ wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.230/bundle-2.17.230.jar
iceberg-spark-runtime-3.2_2.12.jar é necessário.
$ wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.1_2.12/0.13.2/iceberg-spark-runtime-3.1_2.12-0.13.2.jar
Inicie um servidor mestre autônomo do Spark
$ start-master.sh starting org.apache.spark.deploy.master.Master, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.master.Master-1-<Your-Machine-Name>.out
Abra um navegador e vá para http: // Your-IPaddress:7077
O Spark está ativo em spark://<Your-Machine-Name>:7077
Iniciar um processo de trabalho do Spark
$ /opt/spark/sbin/start-worker.sh spark://<Your-Machine-Name>:7077 starting org.apache.spark.deploy.worker.Worker, logging to /opt/spark/logs/spark-msarrel-org.apache.spark.deploy.worker.Worker-1-<Your-Machine-Name>.out
Inicialize o ambiente antes de iniciar o Spark-SQL.
export AWS_ACCESS_KEY_ID=minioadmin export AWS_SECRET_ACCESS_KEY=minioadmin export AWS_S3_ENDPOINT=10.0.0.10:9000 export AWS_REGION=us-east-1 export MINIO_REGION=us-east-1 export DEPENDENCIES="org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" export AWS_SDK_VERSION=2.17.230 export AWS_MAVEN_GROUP=software.amazon.awssdk export AWS_PACKAGES=( "bundle" "url-connection-client" ) for pkg in "${AWS_PACKAGES[@]}"; do export DEPENDENCIES+=",$AWS_MAVEN_GROUP:$pkg:$AWS_SDK_VERSION" done
Execute o seguinte comando para iniciar o Spark-SQL com Iceberg usando PostgreSQL para metadados e suporte para a API S3, necessária para MinIO. Como alternativa, você pode definir a configuração usando seu arquivo spark-defaults.conf
local
$ spark-sql --packages $DEPENDENCIES \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ --conf spark.sql.catalog.my_catalog.uri=jdbc:postgresql://127.0.0.1:5432/icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.user=icebergcat \ --conf spark.sql.catalog.my_catalog.jdbc.password=minio \ --conf spark.sql.catalog.my_catalog.warehouse=s3://iceberg \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.s3.endpoint=http://10.0.0.10:9000 \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=/home/iceicedata/spark-events \ --conf spark.history.fs.logDirectory= /home/iceicedata/spark-events \ --conf spark.sql.catalogImplementation=in-memory
Algumas notas importantes sobre esta configuração
my_catalog
que usa JDBC para se conectar ao PostgreSQL em um endereço IP interno e usamos a tabela icebergcat
para metadados.S3FileIO
para acessá-lo.Em seguida, criaremos uma tabela simples.
CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg LOCATION 's3://iceberg' PARTITIONED BY (category);
Aqui está uma enorme melhoria de desempenho que o Iceberg oferece com o S3FileIO. É um grande alívio para aqueles de nós que sofreram com o desempenho lento ao usar um layout de armazenamento Hive tradicional com S3 como resultado da limitação de solicitações com base no prefixo do objeto. Não é nenhum segredo que a criação de uma tabela Athena/Hive particionada no AWS S3 pode levar de 30 a 60 minutos. Por padrão, o Iceberg usa o layout de armazenamento Hive, mas pode ser alternado para usar o ObjectStoreLocationProvider
.
Com ObjectStoreLocationProvider
, um hash determinístico é gerado para cada arquivo armazenado, com o hash anexado diretamente após write.data.path
. Isso garante que os arquivos gravados no armazenamento de objetos compatível com S3 sejam igualmente distribuídos em vários prefixos no bucket S3, resultando em limitação mínima e taxa de transferência máxima para operações de E/S relacionadas ao S3. Ao usar ObjectStoreLocationProvider
, ter um write.data.path
compartilhado e curto em suas tabelas Iceberg melhorará o desempenho. Muito mais foi feito no Iceberg para melhorar o desempenho e a confiabilidade do Hive .
CREATE TABLE my_catalog.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, 'write.data.path'='s3://iceberg') PARTITIONED BY (category);
Olhando para MinIO Console, vemos que um caminho foi criado sob nosso balde iceberg
para my_table
O bucket contém um caminho metadata
Neste ponto, não há dados na tabela, há apenas metadados descrevendo a tabela. Há também um ponteiro para esses metadados armazenados na tabela do catálogo Iceberg no PostgreSQL. Spark-SQL (o mecanismo de consulta) pesquisa o catálogo Iceberg ( my_catalog
) pelo nome da tabela ( my_table
) e recupera o URI para o arquivo de metadados atual.
Vamos dar uma olhada no primeiro arquivo de metadados, onde as informações sobre o esquema, as partições e os instantâneos da tabela são armazenados. Enquanto todos os instantâneos são definidos, o current-snapshot-id
informa ao mecanismo de consulta qual instantâneo usar, então o mecanismo de consulta procura esse valor na matriz snapshots
, obtém o valor da manifest-list
desse instantâneo e abre os arquivos de manifesto nesse lista, em ordem. Observe que nosso exemplo possui apenas um instantâneo porque a tabela acabou de ser criada e nenhum manifesto porque ainda não inserimos dados.
{ "format-version" : 1, "table-uuid" : "b72c46d1-0648-4e02-aab3-0d2853c97363", "location" : "s3://iceberg/my_table", "last-updated-ms" : 1658795119167, "last-column-id" : 3, "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] }, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "id", "required" : false, "type" : "long" }, { "id" : 2, "name" : "data", "required" : false, "type" : "string" }, { "id" : 3, "name" : "category", "required" : false, "type" : "string" } ] } ], "partition-spec" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "category", "transform" : "identity", "source-id" : 3, "field-id" : 1000 } ] } ], "last-partition-id" : 1000, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "option.write.data.path" : "s3://iceberg/my_table", "owner" : "msarrel", "option.write.object-storage.enabled" : "true", "write.data.path" : "s3://iceberg/my_table", "write.object-storage.enabled" : "true" }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] }
Em seguida, vamos inserir alguns dados fictícios e observar os arquivos que o Iceberg armazena no MinIO. Dentro do balde iceberg
, agora existem os prefixos my_table/metadata
e my_table/data
.
INSERT INTO my_catalog.my_table VALUES (1, 'a', "music"), (2, 'b', "music"), (3, 'c', "video");
O prefixo de metadados contém o arquivo de metadados original, uma lista de manifesto e arquivos de manifesto. A lista de manifesto é - você adivinhou - uma lista de arquivos de manifesto. A lista de manifestos contém informações sobre cada arquivo de manifesto incluído em cada instantâneo: o local do arquivo de manifesto, o instantâneo do qual foi adicionado, informações sobre particionamento e os limites inferior e superior para colunas de partição de arquivos de dados relacionados. Durante uma consulta, o mecanismo de consulta lê o valor dos locais do arquivo de manifesto na lista de manifestos e abre os arquivos de manifesto apropriados. A lista de manifestos está no formato AVRO.
Arquivos de manifesto rastreiam arquivos de dados e incluem detalhes e estatísticas pré-calculadas sobre cada arquivo. A primeira coisa rastreada é o formato e a localização do arquivo. Os arquivos de manifesto são como o Iceberg elimina os dados de rastreamento no estilo Hive por localização do sistema de arquivos. Os arquivos de manifesto melhoram a eficiência e o desempenho da leitura de arquivos de dados, incluindo detalhes como participação na partição, contagem de registros e os limites inferior e superior de cada coluna. As estatísticas são gravadas durante as operações de gravação e têm mais probabilidade de serem oportunas, precisas e atualizadas do que as estatísticas do Hive.
Quando uma consulta SELECT é enviada, o mecanismo de consulta obtém a localização da lista de manifestos do banco de dados de metadados. Em seguida, o mecanismo de consulta lê o valor das entradas file-path
para cada objeto data-file
e abre os arquivos de dados para executar a consulta.
Abaixo está o conteúdo do prefixo data
, organizado por partição.
Dentro da partição, há um arquivo de dados por linha da tabela.
Vamos executar uma consulta de exemplo:
spark-sql> SELECT count(1) as count, data FROM my_catalog.my_table GROUP BY data; 1 a 1 b 1 c Time taken: 9.715 seconds, Fetched 3 row(s) spark-sql>
Agora que entendemos os diferentes componentes de uma tabela Iceberg e como o mecanismo de consulta funciona com eles, vamos nos aprofundar nos melhores recursos do Iceberg e como aproveitá-los em seu data lake.
Alterações de evolução de esquema como Adicionar, Eliminar, Renomear e Atualizar são alterações de metadados, o que significa que nenhum arquivo de dados precisa ser alterado/reescrito para realizar atualizações. O Iceberg também garante que essas mudanças na evolução do esquema sejam independentes e livres de efeitos colaterais. O Iceberg usa IDs exclusivos para rastrear cada coluna em uma tabela com isso, se uma nova coluna for adicionada, nunca alavancará um ID existente por engano.
As partições da tabela Iceberg podem ser atualizadas em uma tabela existente porque as consultas não fazem referência diretamente aos valores da partição. Quando novos dados são gravados, eles usam uma nova especificação em um novo layout, os dados gravados anteriormente com uma especificação diferente permanecem inalterados. Isso causa planejamento dividido quando você escreve novas consultas. Para melhorar o desempenho, o Iceberg usa particionamento oculto para que os usuários não precisem escrever consultas para um layout de partição específico para ser rápido. Os usuários se concentram em escrever consultas para os dados de que precisam e permitem que o Iceberg remova os arquivos que não contêm os dados correspondentes.
Outra evolução muito útil é que a ordem de classificação do Iceberg também pode ser atualizada na tabela existente, assim como a especificação da partição. Diferentes mecanismos podem optar por gravar dados na ordem de classificação mais recente em ordem não classificada quando a classificação é proibitivamente cara, os dados antigos gravados com a ordem de classificação anterior permanecem inalterados.
spark-sql> ALTER TABLE my_catalog.my_table > RENAME my_catalog.my_table_2;
Nas primeiras vezes que você fizer isso, ficará surpreso com a rapidez. Isso ocorre porque você não está reescrevendo uma tabela, está simplesmente operando em metadados. Nesse caso, alteramos apenas table_name
e o Iceberg fez isso por nós em cerca de um décimo de segundo.
Outras alterações de esquema são igualmente indolores:
spark-sql> ALTER TABLE my_catalog.my_table RENAME COLUMN data TO quantity; spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN buyer string AFTER quantity; spark-sql> ALTER TABLE my_catalog.my_table ALTER COLUMN quantity AFTER buyer;
Como mencionamos anteriormente, as partições são suportadas por outros formatos hive, no entanto, o Iceberg oferece suporte ao particionamento oculto que pode lidar com as tarefas tediosas e propensas a erros de produzir valores de partição para linhas em uma tabela. Os usuários se concentram em adicionar filtros às consultas que resolvem problemas de negócios e não se preocupam com a forma como a tabela é particionada. O Iceberg evita leituras de partições desnecessárias automaticamente.
O Iceberg lida com as complexidades do particionamento e da alteração do esquema de partição de uma tabela para você, simplificando bastante o processo para os usuários finais. Você pode definir o particionamento ou deixar que o Iceberg cuide disso para você. Iceberg gosta de particionar em um carimbo de data/hora, como a hora do evento. As partições são rastreadas por instantâneos em manifestos. As consultas não dependem mais do layout físico de uma tabela. Devido a essa separação entre tabelas físicas e lógicas, as tabelas Iceberg podem desenvolver partições ao longo do tempo à medida que mais dados são adicionados. Por exemplo, reparticionar uma tabela do Hive exigiria a criação de uma nova tabela e a leitura de dados antigos nela. Você também teria que alterar o valor PARTITION em cada consulta que você já escreveu - não é divertido.
spark-sql> ALTER TABLE my_catalog.my_table ADD COLUMN month int AFTER category; ALTER TABLE my_catalog.my_table ADD PARTITION FIELD month;
Agora temos dois esquemas de particionamento para a mesma tabela. O que era impossível no Hive aconteceu de forma transparente no Iceberg. A partir de agora, os planos de consulta são divididos, usando o esquema de partição antigo para consultar dados antigos e o novo esquema de partição para consultar novos dados. O Iceberg cuida disso para você — as pessoas que consultam a tabela não precisam saber que os dados são armazenados usando dois esquemas de partição. O Iceberg faz isso por meio de uma combinação de cláusulas WHERE de bastidores e filtros de partição que excluem arquivos de dados sem correspondências.
Cada gravação em tabelas Iceberg cria novos instantâneos. Os instantâneos são como as versões e podem ser usados para viajar no tempo e reverter, da mesma forma que fazemos com os recursos de versão do MinIO. A maneira como os instantâneos são gerenciados é definindo expireSnapshot
para que o sistema seja bem mantido. A viagem no tempo permite consultas reproduzíveis que usam exatamente o mesmo instantâneo de tabela ou permite que os usuários examinem facilmente as alterações. A reversão de versão permite que os usuários corrijam rapidamente os problemas, redefinindo as tabelas para um bom estado.
À medida que as tabelas são alteradas, o Iceberg rastreia cada versão como um instantâneo e, em seguida, fornece a capacidade de viajar no tempo para qualquer instantâneo ao consultar a tabela. Isso pode ser muito útil se você quiser executar consultas históricas ou reproduzir os resultados de consultas anteriores, talvez para geração de relatórios. A viagem no tempo também pode ser útil ao testar novas alterações de código porque você pode testar o novo código com uma consulta de resultados conhecidos.
Para ver os instantâneos que foram salvos para uma tabela:
spark-sql> SELECT * FROM my_catalog.my_table.snapshots; 2022-07-25 17:26:47.53 527713811620162549 NULL append s3://iceberg/my_table/metadata/snap-527713811620162549-1-c16452b4-b384-42bc-af07-b2731299e2b8.avro {"added-data-files":"3","added-files-size":"2706","added-records":"3","changed-partition-count":"2","spark.app.id":"local-1658795082601","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2706","total-position-deletes":"0","total-records":"3"} Time taken: 7.236 seconds, Fetched 1 row(s)
Alguns exemplos:
-- time travel to October 26, 1986 at 01:21:00 spark-sql> SELECT * FROM my_catalog.my_table TIMESTAMP AS OF '1986-10-26 01:21:00'; -- time travel to snapshot with id 10963874102873 spark-sql> SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
Você pode fazer leituras incrementais usando instantâneos, mas deve usar o Spark, não o Spark-SQL. Por exemplo:
scala> spark.read() .format(“iceberg”) .option(“start-snapshot-id”, “10963874102873”) .option(“end-snapshot-id”, “10963874102994”) .load(“s3://iceberg/my_table”)
Você também pode reverter a tabela para um ponto no tempo ou para um instantâneo específico, como nestes dois exemplos:
spark-sql> CALL my_catalog.system.rollback_to_timestamp('my_table', TIMESTAMP '2022-07-25 12:15:00.000'); spark-sql> CALL my_catalog.system.rollback_to_snapshot('my_table', 527713811620162549);
O Iceberg oferece suporte a todos os comandos SQL expressivos, como exclusão, mesclagem e atualização de nível de linha, e o mais importante a destacar é que o Iceberg oferece suporte às estratégias Eager e lazy. Podemos codificar todas as coisas que precisamos excluir (por exemplo, GDPR ou CCPA), mas não reescrever todos esses arquivos de dados imediatamente, podemos coletar preguiçosamente o lixo conforme necessário e isso realmente ajuda na eficiência nas enormes tabelas suportadas pelo Iceberg.
Por exemplo, você pode excluir todos os registros em uma tabela que correspondem a um predicado específico. O seguinte removerá todas as linhas da categoria de vídeo:
spark-sql> DELETE FROM my_catalog.my_table WHERE category = 'video';
Como alternativa, você pode usar CREATE TABLE AS SELECT ou REPLACE TABLE AS SELECT para fazer isso:
spark-sql> CREATE TABLE my_catalog.my_table_music AS SELECT * FROM my_catalog.my_table WHERE category = 'music';
Você pode mesclar duas tabelas com muita facilidade:
spark-sql> MERGE INTO my_catalog.my_data pt USING (SELECT * FROM my_catalog.my_data_new) st ON pt.id = st.id WHEN NOT MATCHED THEN INSERT *;
Iceberg é a base para o padrão de tabela analítica aberta e usa o comportamento SQL e uma abstração de tabela real, ao contrário dos outros formatos de tabela hive, e aplica os fundamentos do data warehouse para corrigir os problemas antes que saibamos que os temos. Com a engenharia de dados declarativa, podemos configurar tabelas e não nos preocupar em alterar cada mecanismo para atender às necessidades dos dados. Isso desbloqueia otimização automática e recomendações. Com commits seguros, os serviços de dados são possíveis, o que ajuda a evitar cargas de trabalho de dados de babá de humanos.
Para inspecionar o histórico, instantâneos e outros metadados de uma tabela, o Iceberg oferece suporte à consulta de metadados. As tabelas de metadados são identificadas adicionando o nome da tabela de metadados (por exemplo, histórico) após o nome da tabela original em sua consulta.
Para exibir os arquivos de dados de uma tabela:
spark-sql> SELECT * FROM my_catalog.my_table.files;
Para exibir manifestos:
spark-sql> SELECT * FROM my_catalog.my_table.manifests;
Para exibir o histórico da tabela:
spark-sql> SELECT * FROM my_catalog.my_table.history;
Para exibir instantâneos:
spark-sql> SELECT * FROM my_catalog.my_table.snapshots;
Você também pode juntar instantâneos e histórico de tabela para ver o aplicativo que gravou cada instantâneo:
spark-sql> select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] from my_catalog.my_table.history h join my_catalog.my_table.snapshots s on h.snapshot_id = s.snapshot_id order by made_current_at;
Agora que você aprendeu o básico, carregue alguns de seus dados no Iceberg e saiba mais no Início rápido do Spark e do Iceberg e na Documentação do Iceberg .
O Apache Iceberg possui integrações com diversos engines de consulta e execução, onde as tabelas do Apache Iceberg podem ser criadas e gerenciadas por esses conectores. Os motores que suportam Iceberg são Spark , Flink , Hive , Presto , Trino , Dremio , Snowflake .
O Apache Iceberg recebe muita atenção como um formato de tabela para data lakes. A crescente comunidade de código aberto e o número crescente de integrações de vários provedores de nuvem e estruturas de aplicativos significam que é hora de levar o Iceberg a sério, começar a experimentar, aprender e planejar integrá-lo à arquitetura de data lake existente. Combine o Iceberg com o MinIO para data lakes e análises em várias nuvens.
Ao começar a usar o Iceberg e o MinIO, entre em contato e compartilhe suas experiências ou faça perguntas por meio de nosso canal Slack .
Publicado também aqui .