在上一篇文章中,我介绍了 Apache Iceberg 并展示了它如何使用 MinIO 进行存储。我还展示了如何设置开发机器。为此,我使用 Docker Compose 安装 Apache Spark 容器作为处理引擎、REST 目录以及用于存储的 MinIO。我以一个非常简单的示例作为结束,该示例使用 Apache Spark 提取数据并使用 PyIceberg 查询数据。如果您是 Apache Iceberg 的新手,或者需要在开发计算机上设置 Apache Iceberg,请阅读这篇介绍性文章。
在这篇文章中,我将继续我上一篇文章留下的内容,并研究一个常见的大数据问题——需要一个单一的解决方案来为原始数据、非结构化数据和结构化数据(从原始数据中整理出来的数据)提供存储。数据)。此外,相同的解决方案应该提供一个处理引擎,允许根据整理的数据进行有效的报告。这是数据湖屋的承诺 - 数据仓库用于结构化数据的功能和数据湖用于非结构化数据的功能 - 全部都在一个集中式解决方案中。
让我们更详细地看看我们的大数据场景。
下图描述了一个常见问题和假设的解决方案。数据从多个位置以多种格式进入数据中心。我们需要一个集中式解决方案,允许转换原始数据,以便处理引擎可以有效地支持商业智能、数据分析和机器学习。同时,该解决方案还必须能够存储非结构化数据(文本、图像、音频和视频),用于数据探索和机器学习。它还应保留以原始格式转换的任何数据,以防需要重播转换或需要调查数据完整性问题。
举一个具体的例子,想象一家为其客户管理共同基金的全球托管银行。代表每个客户的每个基金的会计记录簿和投资记录簿的数据不断从世界各地流入数据湖屋。从那里开始,需要进行安全通道检查(发送的所有内容是否已收到),并且需要运行数据质量检查。最后,数据可以被分区并加载到另一个支持日开始和日结束报告的存储中。
或者,也许该图代表了一个物联网场景,其中气象站发送温度和其他与天气相关的数据。无论哪种情况,都需要一种方法,以原始格式安全地存储数据,然后转换和处理需要以更结构化的方式存储的任何数据 - 所有这些都在一个集中式解决方案中。这是数据湖屋的承诺 - 将最好的数据仓库和数据湖结合到一个集中式解决方案中。
让我们将上述假设的解决方案变为现实。如下图所示。
我们的 Data Lakehouse 有两个逻辑组件。第一个是 Apache Iceberg 的结构化数据实现 - 相当于数据仓库。 (这是我在上一篇文章中构建的内容 - 因此我不会在这里详细介绍。)第二个逻辑组件是用于非结构化数据的 MinIO - 我们的 Data Lakehouse 的 Data Lake 端。所有进入 Lakehouse 的数据都会传送到 MinIO 的这个逻辑实例。实际上,上面显示的两个 MinIO 逻辑实例可能是数据中心中的同一个 MinIO 实例。如果您运行 MinIO 的集群可以处理所有传入数据的摄取和 Apache Iceberg 的处理要求,那么这样的部署将节省资金。事实上,这就是我在这篇文章中要做的事情。我将在 Apache Iceberg 的 MinIO 实例中使用一个存储桶来保存所有非结构化数据和原始数据。
让我们开始处理数据,介绍我将用于本练习的数据集并将其引入 MinIO。
我们将在本文中试验的数据集是一个公共数据集,称为全球表面每日摘要 (GSOD),由美国国家海洋和大气管理局 (NOAA) 管理。 NOAA 目前维护着来自全球 9000 多个站点的数据,GSOD 数据集包含这些站点每天的摘要信息。您可以在此处下载数据。每年有一个 gzip 文件。它从 1929 年开始,到 2022 年结束(在撰写本文时)。为了构建我们的 Data Lakehouse,我下载了每年的文件并将其放入用于我们的 Data Lakehouse 的 MinIO 实例中。我将所有文件放入名为“lake”的存储桶中。MinIO 实例中的两个存储桶如下所示。 “warehouse”存储桶是在我们安装 Apache Iceberg 时创建的。
我使用 MinIO 控制台手动提取原始数据。在专业的管道中,您将希望以自动化的方式完成此操作。查看如何在 Kubernetes 中设置 Kafka 并将数据流式传输到 MinIO ,了解如何使用 Kafka 和 Kubernetes 将数据获取到 MinIO。
这些文件被打包以方便下载 - 如果您尝试直接使用它们来创建报告或图表,那么这将是一个非常 IO 密集型的操作(并且可能是 CPU 密集型的)。想象一下,您想要绘制指定站点的年平均气温图表。为此,您必须打开每个文件并搜索每一行,查找与您感兴趣的当天的电台相匹配的条目。更好的选择是使用我们的 Data Lakehouses 功能来整理数据并报告整理的数据。第一步是设置一个新的 Jupyter 笔记本。
首先,导航到安装在 Apache Spark 处理引擎中的 Jupyter Notebook 服务器。它可以在http://localhost:8888找到。创建一个新笔记本,然后在第一个单元格中添加如下所示的导入。 (可以在此处找到本文中创建的所有已完成的笔记本。)
from collections import namedtuple import csv import json import logging import tarfile from time import time from typing import List from minio import Minio from minio.error import S3Error import pandas as pd import pyarrow as pa import pyarrow.parquet as pq pd.options.mode.chained_assignment = None bucket_name = 'lake'
请注意,我们正在导入 MinIO 库。我们正在构建的笔记本是一个从非结构化存储(MinIO Data Lake)到结构化存储(Apache Iceberg,它在底层使用 MinIO)的 ETL 管道。笔记本的开头应该如下所示。
现在,我们可以为我们的数据创建一个 Iceberg 数据库和表。
为 GSOD 数据集创建数据库和表非常简单。下面的脚本将创建数据库,我们将其命名为“noaa”。导入后将其添加到单元格中。
%%sql CREATE DATABASE IF NOT EXISTS noaa;
下面的脚本将创建gsod
表。
%%sql CREATE TABLE IF NOT EXISTS noaa.gsod ( station string, date timestamp, latitude double, longitude double, name string, temp double ) USING iceberg PARTITIONED BY (station)
当您使用 Apache Iceberg 时,您经常会想要删除一个表,以便可以重新开始实验。如果您希望更改有关其设置的任何内容,下面的脚本将删除gsod
表。
%%sql DROP TABLE IF EXISTS noaa.gsod;
现在我们的 Lakehouse 中有基于年份的原始 zip 文件,我们可以提取、转换它们并将其加载到我们的 Data Lakehouse 中。我们先介绍一些辅助函数。下面的函数将返回指定存储桶中与前缀匹配的 MinIO 对象列表。
def get_object_list(bucket_name: str, prefix: str) -> List[str]: ''' Gets a list of objects from a bucket. ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_list = [] objects = client.list_objects(bucket_name, prefix=prefix, recursive=True) for obj in objects: object_list.append(obj.object_name) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return object_list
请注意,在上面的代码中,需要一个 MinIO 凭证文件。这可以从 MinIO 控制台获取。如果您不知道如何获取 MinIO 凭据,那么本文的一部分将介绍如何生成和下载它们。
接下来,我们需要一个函数来从 MinIO 获取对象。由于对象是 tar 文件,我们还需要此函数从 tar 存档中提取数据并将其转换为 Pandas DataFrame。这是使用下面的函数完成的。
def tar_to_df(bucket_name: str, object_name: str) -> pd.DataFrame: ''' This function will take a tarfile reference in MinIO and do the following: - unzip the tarfile - turn the data into a single DataFrame object ''' logger = logging.getLogger('gsod_logger') logger.setLevel(logging.INFO) # Temp file to use for processing the tar files. temp_file_name = 'temp.tar.gz' # Load the credentials and connection information. with open('credentials.json') as f: credentials = json.load(f) # Get data of an object. try: # Create client with access and secret key client = Minio(credentials['url'], # host.docker.internal credentials['accessKey'], credentials['secretKey'], secure=False) object_info = client.fget_object(bucket_name, object_name, temp_file_name) Row = namedtuple('Row', ('station', 'date', 'latitude', 'longitude', 'elevation', 'name', 'temp', 'temp_attributes', 'dewp', 'dewp_attributes', 'slp', 'SLP_attributes', 'stp', 'stp_attributes', 'visib', 'visib_attributes', 'wdsp', 'wdsp_attributes', 'mxspd', 'gust', 'max', 'max_attributes', 'min', 'min_attributes', 'prcp', 'prcp_attributes', 'sndp', 'frshtt')) # Columns of interest and their data types. dtypes={ 'station': 'string', 'date': 'datetime64[ns]', 'latitude': 'float64', 'longitude': 'float64', 'name': 'string', 'temp': 'float64' } tar = tarfile.open(temp_file_name, 'r:gz') all_rows = [] for member in tar.getmembers(): member_handle = tar.extractfile(member) byte_data = member_handle.read() decoded_string = byte_data.decode() lines = decoded_string.splitlines() reader = csv.reader(lines, delimiter=',') # Get all the rows in the member. Skip the header. _ = next(reader) file_rows = [Row(*l) for l in reader] all_rows += file_rows df = pd.DataFrame.from_records(all_rows, columns=Row._fields) df = df[list(dtypes.keys())] for c in df.columns: if dtypes[c] == 'float64': df[c] = pd.to_numeric(df[c], errors='coerce') df = df.astype(dtype=dtypes) except S3Error as s3_err: logger.error(f'S3 Error occurred: {s3_err}.') raise s3_err except Exception as err: logger.error(f'Error occurred: {err}.') raise err return df
这两个函数都是通用实用程序,无论您使用 MinIO 做什么,都可以重复使用。考虑将它们放入您个人的代码片段集合或您组织的 Github Gist 中。
现在,我们准备将数据发送到 Lakehouse 的仓库端。这可以通过下面的代码来完成,该代码启动 Spark 会话,循环遍历所有 GSOD tar 文件,提取、转换并将其发送到我们的 Iceberg 表。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Jupyter').getOrCreate() objects = get_object_list(bucket_name, 'noaa/gsod') for obj in reversed(objects): print(obj) df = tar_to_df(bucket_name, obj) table = pa.Table.from_pandas(df) pq.write_table(table, 'temp.parquet') df = spark.read.parquet('temp.parquet') df.write.mode('append').saveAsTable('noaa.gsod')
本节中的代码手动从 MinIO 存储桶加载数据。在生产环境中,您需要将此代码部署在服务中并使用MinIO Bucket Events进行自动摄取。
让我们开始一个新的笔记本来进行报告。下面的单元格导入了我们需要的实用程序。具体来说,我们将使用 PyIceberg 进行数据检索,使用 Pandas 进行数据整理,使用 Seaborn 进行数据可视化。
from pyiceberg.catalog import load_catalog from pyiceberg.expressions import GreaterThanOrEqual, EqualTo import pandas as pd import seaborn as sns pd.options.mode.chained_assignment = None catalog = load_catalog('default')
我们想要做的是计算给定气象站的每年平均气温。这为我们提供了每年一个数字,并考虑了一年中的所有季节。第一步是查询 Iceberg 以获取给定站点的所有数据。这是使用 PyIceberg 完成的。
tbl = catalog.load_table('noaa.gsod') sc = tbl.scan(row_filter="station == '72502014734'") df = sc.to_arrow().to_pandas() df.head(10)
上面代码中使用的车站 ID 适用于位于美国新泽西州纽瓦克自由国际机场的车站。它自 1973 年开始运行(近 50 年的数据)。当代码运行时,您将得到以下输出。 (我正在使用 DataFrame head() 函数来获取示例。)
接下来,我们需要按年份分组并计算平均值。使用 Pandas,这是几行代码。不需要循环。
df['year'] = df['date'].dt.year df = df[['year','temp']] grouped_by_year = df.groupby('year') average_by_year = grouped_by_year.mean() average_by_year
该单元运行后,您将看到每年的单个值。排名靠前的几年如下所示。
最后,我们可以可视化我们的年平均值。我们将使用 Seaborn 创建线图。这只需要一行代码。
sns.lineplot(data=df, x="year", y="temp", errorbar=None)
线图如下所示。
第一次运行报告后应始终运行的另一个命令如下。
[task.file.file_path for task in sc.plan_files()]
这是一个列表理解,将为您提供 Apache Iceberg 中具有与您的查询匹配的数据的所有数据文件的列表。会有很多,尽管 Iceberg 的元数据可以过滤掉很多。看到涉及的所有文件,您就会明白高速对象存储是 Lakehouse 的重要组成部分。
在这篇文章中,我们使用 MinIO 和 Apache Iceberg 构建了一个 Data Lakehouse。我们使用 GSOD 数据集做到了这一点。首先,原始数据被上传到我们的 Data Lakehouse (MinIO) 的湖边。从那里,我们在 Apache Iceberg(我们的 Data Lakehouse 的数据仓库一侧)中创建了一个数据库和一个表。然后,我们构建了一个简单的 ETL 管道,将数据从湖移动到 Data Lakehouse 内的仓库。
一旦 Apache Iceberg 完全填充了数据,我们就能够创建年平均温度报告并将其可视化。
请记住,如果您想在生产中构建 Data Lakehouse,那么您将需要 MinIO 的企业功能。考虑查看对象生命周期管理、安全最佳实践、 Kafka 流和存储桶事件。
也发布在这里。