paint-brush
Phát triển các hồ dữ liệu phát trực tuyến với Hudi và MinIOtừ tác giả@minio
7,238 lượt đọc
7,238 lượt đọc

Phát triển các hồ dữ liệu phát trực tuyến với Hudi và MinIO

từ tác giả MinIO14m2023/08/29
Read on Terminal Reader

dài quá đọc không nổi

Apache Hudi là định dạng bảng mở đầu tiên dành cho các hồ dữ liệu và đáng được xem xét trong các kiến trúc phát trực tuyến. Việc sử dụng MinIO để lưu trữ Hudi sẽ mở đường cho các hồ dữ liệu và phân tích trên nhiều đám mây.
featured image - Phát triển các hồ dữ liệu phát trực tuyến với Hudi và MinIO
MinIO HackerNoon profile picture
0-item
1-item
2-item

Apache Hudi là một nền tảng hồ dữ liệu phát trực tuyến mang chức năng cơ sở dữ liệu và kho cốt lõi trực tiếp vào hồ dữ liệu. Không hài lòng với việc tự gọi mình là một định dạng tệp mở như Delta hay Apache Iceberg , Hudi cung cấp các bảng, giao dịch, cập nhật/xóa, chỉ mục nâng cao, dịch vụ nhập trực tuyến, tối ưu hóa phân cụm/nén dữ liệu và tính đồng thời.


Được giới thiệu vào năm 2016, Hudi có nguồn gốc vững chắc từ hệ sinh thái Hadoop, giải thích ý nghĩa đằng sau cái tên: Hadoop Upserts andD Incrementals. Nó được phát triển để quản lý việc lưu trữ các bộ dữ liệu phân tích lớn trên HDFS. Mục đích chính của Hudi là giảm độ trễ trong quá trình nhập dữ liệu truyền phát.


Bàn Hồ Đề


Theo thời gian, Hudi đã phát triển để sử dụng lưu trữ đám mây và lưu trữ đối tượng, bao gồm cả MinIO. Việc Hudi rời xa HDFS đi đôi với xu hướng lớn hơn của thế giới là để lại HDFS kế thừa để lưu trữ đối tượng dựa trên nền tảng đám mây, có khả năng mở rộng và hiệu suất cao. Lời hứa của Hudi về việc cung cấp các tính năng tối ưu hóa giúp khối lượng công việc phân tích nhanh hơn cho Apache Spark, Flink, Presto, Trino và các công cụ khác phù hợp tuyệt vời với lời hứa của MinIO về hiệu suất ứng dụng gốc trên nền tảng đám mây trên quy mô lớn.


Các công ty sử dụng Hudi trong sản xuất bao gồm Uber , Amazon , ByteDanceRobinhood . Đây là một số hồ dữ liệu phát trực tuyến lớn nhất trên thế giới. Chìa khóa của Hudi trong trường hợp sử dụng này là nó cung cấp ngăn xếp xử lý dữ liệu gia tăng để tiến hành xử lý có độ trễ thấp trên dữ liệu cột. Thông thường, các hệ thống ghi dữ liệu ra một lần bằng cách sử dụng định dạng tệp mở như Apache Parquet hoặc ORC và lưu trữ dữ liệu này trên hệ thống tệp phân tán hoặc bộ lưu trữ đối tượng có khả năng mở rộng cao. Hudi đóng vai trò là mặt phẳng dữ liệu để nhập, chuyển đổi và quản lý dữ liệu này. Hudi tương tác với bộ lưu trữ bằng API Hệ thống tệp Hadoop , tương thích với (nhưng không nhất thiết phải tối ưu cho) các triển khai từ HDFS đến lưu trữ đối tượng đến hệ thống tệp trong bộ nhớ.

Định dạng tệp Hudi

Hudi sử dụng tệp cơ sở và tệp nhật ký delta để lưu trữ các bản cập nhật/thay đổi đối với tệp cơ sở nhất định. Các tệp cơ sở có thể là Parquet (cột) hoặc HFile (được lập chỉ mục). Nhật ký delta được lưu dưới dạng Avro (hàng) vì việc ghi lại các thay đổi đối với tệp cơ sở khi chúng xảy ra là hợp lý.


Hudi mã hóa tất cả các thay đổi đối với một tệp cơ sở nhất định dưới dạng một chuỗi các khối. Các khối có thể là khối dữ liệu, khối xóa hoặc khối khôi phục. Các khối này được hợp nhất để tạo ra các tệp cơ sở mới hơn. Mã hóa này cũng tạo ra một bản ghi độc lập.



Định dạng tệp Hudi

Nguồn .

Định dạng bảng Hudi

Định dạng bảng bao gồm bố cục tệp của bảng, lược đồ của bảng và siêu dữ liệu theo dõi các thay đổi đối với bảng. Hudi thực thi lược đồ khi ghi, nhất quán với việc nhấn mạnh vào xử lý luồng, để đảm bảo các đường dẫn không bị hỏng do những thay đổi không tương thích ngược.


Hudi nhóm các tệp cho một bảng/phân vùng nhất định lại với nhau và ánh xạ giữa các khóa bản ghi và nhóm tệp. Như đã đề cập ở trên, tất cả các bản cập nhật đều được ghi vào tệp nhật ký delta cho một nhóm tệp cụ thể. Thiết kế này hiệu quả hơn Hive ACID, thiết kế này phải hợp nhất tất cả các bản ghi dữ liệu với tất cả các tệp cơ sở để xử lý truy vấn. Thiết kế của Hudi dự đoán việc cập nhật và xóa nhanh chóng dựa trên khóa khi nó hoạt động với nhật ký delta cho một nhóm tệp chứ không phải cho toàn bộ tập dữ liệu.


Hudi nhóm các tệp cho một bảng/phân vùng nhất định lại với nhau và ánh xạ giữa các khóa bản ghi và nhóm tệp. Như đã đề cập ở trên, tất cả các bản cập nhật đều được ghi vào tệp nhật ký delta cho một nhóm tệp cụ thể. Thiết kế này hiệu quả hơn Hive ACID, thiết kế này phải hợp nhất tất cả các bản ghi dữ liệu với tất cả các tệp cơ sở để xử lý truy vấn. Thiết kế của Hudi dự đoán việc cập nhật và xóa nhanh chóng dựa trên khóa khi nó hoạt động với nhật ký delta cho một nhóm tệp chứ không phải cho toàn bộ tập dữ liệu.


Định dạng bảng Hudi

Nguồn .


Cần phải hiểu rõ dòng thời gian vì nó đóng vai trò là nguồn nhật ký sự kiện thực tế cho tất cả siêu dữ liệu trong bảng của Hudi. Dòng thời gian được lưu trữ trong thư mục .hoodie hoặc nhóm trong trường hợp của chúng tôi. Các sự kiện được giữ lại trên dòng thời gian cho đến khi chúng bị xóa. Dòng thời gian tồn tại cho một bảng tổng thể cũng như cho các nhóm tệp, cho phép xây dựng lại nhóm tệp bằng cách áp dụng nhật ký delta cho tệp cơ sở gốc. Để tối ưu hóa cho việc ghi/xác nhận thường xuyên, thiết kế của Hudi giữ siêu dữ liệu nhỏ so với kích thước của toàn bộ bảng.


Các sự kiện mới trên dòng thời gian được lưu vào bảng siêu dữ liệu nội bộ và được triển khai dưới dạng một loạt các bảng hợp nhất khi đọc, do đó mang lại khả năng khuếch đại ghi thấp. Kết quả là Hudi có thể nhanh chóng tiếp thu những thay đổi nhanh chóng đối với siêu dữ liệu. Ngoài ra, bảng siêu dữ liệu sử dụng định dạng tệp cơ sở HFile, tối ưu hóa hơn nữa hiệu suất với một tập hợp các khóa tra cứu được lập chỉ mục giúp tránh phải đọc toàn bộ bảng siêu dữ liệu. Tất cả các đường dẫn tệp vật lý là một phần của bảng đều được đưa vào siêu dữ liệu để tránh danh sách tệp đám mây tốn kém thời gian.

Nhà văn Hồ Địch

Người viết Hudi tạo điều kiện thuận lợi cho các kiến trúc trong đó Hudi đóng vai trò là lớp ghi hiệu suất cao với hỗ trợ giao dịch ACID cho phép thực hiện các thay đổi gia tăng rất nhanh như cập nhật và xóa.


Kiến trúc Hudi điển hình dựa trên đường dẫn Spark hoặc Flink để cung cấp dữ liệu đến các bảng Hudi. Đường dẫn ghi Hudi được tối ưu hóa để hiệu quả hơn việc chỉ ghi tệp Parquet hoặc Avro vào đĩa. Hudi phân tích các thao tác ghi và phân loại chúng thành các thao tác tăng dần ( insert , upsert , delete ) hoặc thao tác hàng loạt ( insert_overwrite , insert_overwrite_table , delete_partition , bulk_insert ) rồi áp dụng các tối ưu hóa cần thiết.


Người viết Hudi cũng chịu trách nhiệm duy trì siêu dữ liệu. Đối với mỗi bản ghi, thời gian cam kết và số thứ tự duy nhất cho bản ghi đó (tương tự như phần bù Kafka) được viết để có thể rút ra các thay đổi cấp độ bản ghi. Người dùng cũng có thể chỉ định các trường thời gian sự kiện trong luồng dữ liệu đến và theo dõi chúng bằng siêu dữ liệu và dòng thời gian Hudi. Điều này có thể mang lại những cải tiến đáng kể trong quá trình xử lý luồng vì Hudi chứa cả thời gian đến và thời gian sự kiện cho mỗi bản ghi, giúp có thể tạo hình mờ mạnh cho các đường ống xử lý luồng phức tạp.

Độc giả Hudi

Cách ly ảnh chụp nhanh giữa người viết và người đọc cho phép truy vấn ảnh chụp nhanh bảng một cách nhất quán từ tất cả các công cụ truy vấn hồ dữ liệu chính, bao gồm Spark, Hive, Flink, Prest, Trino và Impala. Giống như Parquet và Avro, các bảng Hudi có thể được đọc dưới dạng bảng bên ngoài như SnowflakeSQL Server .


Đầu đọc Hudi được phát triển để có trọng lượng nhẹ. Bất cứ khi nào có thể, các trình đọc và bộ nhớ đệm được vector hóa dành riêng cho công cụ, chẳng hạn như trong Presto và Spark, sẽ được sử dụng. Khi Hudi phải hợp nhất các tệp cơ sở và nhật ký cho một truy vấn, Hudi sẽ cải thiện hiệu suất hợp nhất bằng cách sử dụng các cơ chế như bản đồ có thể tràn và đọc lười, đồng thời cung cấp các truy vấn được tối ưu hóa cho việc đọc.


Hudi bao gồm nhiều khả năng truy vấn gia tăng mạnh mẽ đáng kể. Siêu dữ liệu là cốt lõi của vấn đề này, cho phép sử dụng các cam kết lớn dưới dạng các phần nhỏ hơn và tách rời hoàn toàn việc ghi và truy vấn dữ liệu gia tăng. Thông qua việc sử dụng siêu dữ liệu một cách hiệu quả, du hành thời gian chỉ là một truy vấn gia tăng khác có điểm bắt đầu và điểm dừng được xác định. Hudi ánh xạ các khóa vào các nhóm tệp đơn lẻ tại bất kỳ thời điểm nào, hỗ trợ đầy đủ các khả năng của CDC trên các bảng Hudi. Như đã thảo luận ở trên trong phần người viết Hudi, mỗi bảng bao gồm các nhóm tệp và mỗi nhóm tệp có siêu dữ liệu độc lập riêng.

Hoan hô Hudi!

Điểm mạnh lớn nhất của Hudi là tốc độ xử lý cả dữ liệu truyền phát và dữ liệu hàng loạt. Bằng cách cung cấp khả năng upsert , Hudi thực hiện các nhiệm vụ có cường độ nhanh hơn việc viết lại toàn bộ bảng hoặc phân vùng.


Để tận dụng tốc độ truyền dữ liệu của Hudi, các kho lưu trữ dữ liệu yêu cầu một lớp lưu trữ có khả năng IOPS và thông lượng cao. Sự kết hợp giữa khả năng mở rộng và hiệu suất cao của MinIO chính là điều Hudi cần. MinIO thừa sức đáp ứng hiệu suất cần thiết để cung cấp năng lượng cho hồ dữ liệu doanh nghiệp thời gian thực - điểm chuẩn gần đây đạt được 325 GiB/s (349 GB/s) trên GET và 165 GiB/s (177 GB/s) trên PUT chỉ với 32 nút SSD NVMe có sẵn.


Hồ dữ liệu Hudi dành cho doanh nghiệp đang hoạt động lưu trữ số lượng lớn các tệp Parquet và Avro nhỏ. MinIO bao gồm một số tối ưu hóa tệp nhỏ giúp cho các hồ dữ liệu nhanh hơn. Các đối tượng nhỏ được lưu nội tuyến cùng với siêu dữ liệu, giảm IOPS cần thiết để đọc và ghi các tệp nhỏ như siêu dữ liệu và chỉ mục Hudi.


Lược đồ là một thành phần quan trọng của mỗi bảng Hudi. Hudi có thể thực thi lược đồ hoặc có thể cho phép phát triển lược đồ để đường dẫn dữ liệu truyền trực tuyến có thể thích ứng mà không bị gián đoạn. Ngoài ra, Hudi thực thi lược đồ trên trình ghi để đảm bảo các thay đổi không làm gián đoạn quy trình. Hudi dựa vào Avro để lưu trữ, quản lý và phát triển lược đồ của bảng.


Hudi cung cấp đảm bảo giao dịch ACID cho các hồ dữ liệu. Hudi đảm bảo việc ghi nguyên tử: các cam kết được thực hiện nguyên tử theo dòng thời gian và được cấp dấu thời gian biểu thị thời gian mà hành động được coi là đã xảy ra. Hudi tách biệt các ảnh chụp nhanh giữa các quy trình ghi, bảng và trình đọc để mỗi quy trình hoạt động trên một ảnh chụp nhanh nhất quán của bảng. Hudi hoàn thiện điều này bằng khả năng kiểm soát đồng thời lạc quan (OCC) giữa người viết và kiểm soát đồng thời dựa trên MVCC không chặn giữa các dịch vụ bảng và người viết cũng như giữa nhiều dịch vụ bảng.

Hướng dẫn Hudi và MinIO

Hướng dẫn này sẽ hướng dẫn bạn cách thiết lập Spark, Hudi và MinIO cũng như giới thiệu một số tính năng cơ bản của Hudi. Hướng dẫn này dựa trên Hướng dẫn Apache Hudi Spark , được điều chỉnh để hoạt động với bộ lưu trữ đối tượng MinIO gốc trên đám mây.


Lưu ý rằng việc làm việc với các nhóm được phiên bản sẽ bổ sung thêm một số chi phí bảo trì cho Hudi. Bất kỳ đối tượng nào bị xóa sẽ tạo ra một điểm đánh dấu xóa . Khi Hudi dọn dẹp các tập tin bằng tiện ích Cleaner, số lượng dấu xóa sẽ tăng lên theo thời gian. Điều quan trọng là phải định cấu hình Quản lý vòng đời một cách chính xác để dọn sạch các điểm đánh dấu xóa này vì thao tác Danh sách có thể bị hỏng nếu số lượng điểm đánh dấu xóa đạt tới 1000. Những người bảo trì dự án Hudi khuyên bạn nên dọn dẹp các điểm đánh dấu xóa sau một ngày bằng cách sử dụng các quy tắc vòng đời.

Điều kiện tiên quyết

Tải xuống và cài đặt Apache Spark.


Tải xuống và cài đặt MinIO. Ghi lại địa chỉ IP, cổng TCP cho bảng điều khiển, khóa truy cập và khóa bí mật.


Tải xuống và cài đặt ứng dụng khách MinIO.


Tải xuống thư viện AWS và AWS Hadoop rồi thêm chúng vào đường dẫn lớp của bạn để sử dụng S3A làm việc với bộ lưu trữ đối tượng.

  • AWS: aws-java-sdk:1.10.34 (hoặc cao hơn)

  • Hadoop: hadoop-aws:2.7.3 (hoặc cao hơn)


Tải xuống các tệp Jar, giải nén chúng và sao chép chúng vào /opt/spark/jars .

Tạo nhóm MinIO

Sử dụng Ứng dụng khách MinIO để tạo nhóm chứa dữ liệu Hudi:

 mc alias set myminio http://<your-MinIO-IP:port> <your-MinIO-access-key> <your-MinIO-secret-key> mc mb myminio/hudi

Khởi chạy Spark với Hudi

Khởi động Spark shell với Hudi được định cấu hình để sử dụng MinIO để lưu trữ. Đảm bảo định cấu hình các mục cho S3A bằng cài đặt MinIO của bạn.


 spark-shell \ --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0,org.apache.hadoop:hadoop-aws:3.3.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.hadoop.fs.s3a.access.key=<your-MinIO-access-key>' \ --conf 'spark.hadoop.fs.s3a.secret.key=<your-MinIO-secret-key>'\ --conf 'spark.hadoop.fs.s3a.endpoint=<your-MinIO-IP>:9000' \ --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ --conf 'fs.s3a.signing-algorithm=S3SignerType'


Sau đó, khởi tạo Hudi trong Spark.

 import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.common.model.HoodieRecord


Lưu ý rằng nó sẽ đơn giản hóa việc sử dụng Hudi nhiều lần để tạotệp cấu hình bên ngoài .

Tạo một bảng

Hãy dùng thử và tạo một bảng Hudi nhỏ đơn giản bằng Scala. Hudi DataGenerator là một cách nhanh chóng và dễ dàng để tạo các phần chèn và cập nhật mẫu dựa trên lược đồ chuyến đi mẫu .


 val tableName = "hudi_trips_cow" val basePath = "s3a://hudi/hudi_trips_cow" val dataGen = new DataGenerator

Chèn dữ liệu vào Hudi và ghi bảng vào MinIO

Phần sau đây sẽ tạo dữ liệu chuyến đi mới, tải chúng vào DataFrame và ghi DataFrame mà chúng ta vừa tạo vào MinIO dưới dạng bảng Hudi. mode(Overwrite) ghi đè và tạo lại bảng trong trường hợp nó đã tồn tại. Dữ liệu chuyến đi dựa trên khóa bản ghi ( uuid ), trường phân vùng ( region/country/city ) và logic ( ts ) để đảm bảo các bản ghi chuyến đi là duy nhất cho mỗi phân vùng. Chúng ta sẽ sử dụng thao tác ghi mặc định, upsert . Khi bạn có khối lượng công việc không có bản cập nhật, bạn có thể sử dụng insert hoặc bulk_insert để có thể nhanh hơn.


 val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)


Mở trình duyệt và đăng nhập vào MinIO tại http://<your-MinIO-IP>:<port> bằng khóa truy cập và khóa bí mật của bạn. Bạn sẽ thấy bảng Hudi trong thùng.


Bảng điều khiển MiniIO


Nhóm này cũng chứa đường dẫn .hoodie chứa siêu dữ liệu cũng như các đường dẫn americasasia chứa dữ liệu.


metadata


Hãy nhìn vào siêu dữ liệu. Đây là đường dẫn .hoodie của tôi sau khi hoàn thành toàn bộ hướng dẫn. Chúng ta có thể thấy rằng tôi đã sửa đổi bảng vào Thứ Ba ngày 13 tháng 9 năm 2022 lúc 9:02, 10:37, 10:48, 10:52 và 10:56.


Đường dẫn .hoodie sau khi hoàn thành phần hướng dẫn

Truy vấn dữ liệu

Hãy tải dữ liệu Hudi vào DataFrame và chạy truy vấn mẫu.

 // spark-shell val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()

Du hành thời gian cùng Hudi

Không, chúng tôi không nói về việc đi xem buổi hòa nhạc Hootie and the Blowfish vào năm 1988.


Mỗi lần ghi vào bảng Hudi sẽ tạo ra ảnh chụp nhanh mới. Hãy coi ảnh chụp nhanh là phiên bản của bảng có thể được tham chiếu cho các truy vấn du hành thời gian.


Hãy thử một số truy vấn du hành thời gian (bạn sẽ phải thay đổi dấu thời gian để phù hợp với mình).


 spark.read. format("hudi"). option("as.of.instant", "2022-09-13 09:02:08.200"). load(basePath)

Cập nhật dữ liệu

Quá trình này tương tự như khi chúng tôi chèn dữ liệu mới trước đó. Để thể hiện khả năng cập nhật dữ liệu của Hudi, chúng tôi sẽ tạo các bản cập nhật cho các bản ghi chuyến đi hiện có, tải chúng vào DataFrame rồi ghi DataFrame vào bảng Hudi đã được lưu trong MinIO.


Lưu ý rằng chúng tôi đang sử dụng chế độ lưu append . Nguyên tắc chung là sử dụng chế độ append trừ khi bạn đang tạo bảng mới để không có bản ghi nào bị ghi đè. Cách làm việc thông thường với Hudi là nhập dữ liệu phát trực tuyến theo thời gian thực, thêm chúng vào bảng, sau đó viết một số logic hợp nhất và cập nhật các bản ghi hiện có dựa trên những gì vừa được thêm vào. Ngoài ra, việc ghi bằng chế độ overwrite sẽ xóa và tạo lại bảng nếu nó đã tồn tại.


 // spark-shell val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath)


Truy vấn dữ liệu sẽ hiển thị các bản ghi chuyến đi được cập nhật.

Truy vấn gia tăng

Hudi có thể cung cấp luồng bản ghi đã thay đổi kể từ dấu thời gian nhất định bằng cách sử dụng truy vấn gia tăng. Tất cả những gì chúng ta cần làm là cung cấp thời gian bắt đầu mà từ đó các thay đổi sẽ được truyền phát để xem các thay đổi thông qua cam kết hiện tại và chúng ta có thể sử dụng thời gian kết thúc để giới hạn luồng.


Truy vấn gia tăng là một vấn đề khá lớn đối với Hudi vì nó cho phép bạn xây dựng các đường truyền phát trực tuyến trên dữ liệu hàng loạt.


 // spark-shell // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) val beginTime = commits(commits.length - 2) // commit time we are interested in // incrementally query data val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show()

Truy vấn thời điểm

Hudi có thể truy vấn dữ liệu theo ngày và giờ cụ thể.


 // spark-shell val beginTime = "000" // Represents all commits > this time. val endTime = commits(commits.length - 2) // commit time we are interested in //incrementally query data val tripsPointInTimeDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). option(END_INSTANTTIME_OPT_KEY, endTime). load(basePath) tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

Xóa dữ liệu bằng tính năng xóa mềm

Hudi hỗ trợ hai cách khác nhau để xóa hồ sơ. Xóa mềm sẽ giữ lại khóa bản ghi và loại bỏ các giá trị cho tất cả các trường khác. Thao tác xóa mềm vẫn được duy trì trong MinIO và chỉ bị xóa khỏi hồ dữ liệu bằng thao tác xóa cứng.


 // spark-shell spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count() // fetch two records for soft deletes val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2) // prepare the soft deletes by ensuring the appropriate fields are nullified val nullifyColumns = softDeleteDs.schema.fields. map(field => (field.name, field.dataType.typeName)). filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1) && !Array("ts", "uuid", "partitionpath").contains(pair._1))) val softDeleteDf = nullifyColumns. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))( (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2))) // simply upsert the table after setting these fields to null softDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY, "upsert"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // reload data spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") // This should return the same total count as before spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // This should return (total - 2) count as two records are updated with nulls spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

Xóa dữ liệu bằng xóa cứng

Ngược lại, xóa cứng là những gì chúng ta nghĩ là xóa. Khóa bản ghi và các trường liên quan sẽ bị xóa khỏi bảng.


 // spark-shell // fetch total records count spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() // fetch two records to be deleted val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) // issue deletes val deletes = dataGen.generateDeletes(ds.collectAsList()) val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) hardDeleteDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION_OPT_KEY,"delete"). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Append). save(basePath) // run the same read query as above. val roAfterDeleteViewDF = spark. read. format("hudi"). load(basePath) roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") // fetch should return (total - 2) records spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

Chèn ghi đè

Hồ dữ liệu sẽ trở thành một kho dữ liệu khi nó có khả năng cập nhật dữ liệu hiện có. Chúng tôi sẽ tạo một số dữ liệu chuyến đi mới và sau đó ghi đè lên dữ liệu hiện có của chúng tôi. Thao tác này nhanh hơn upsert trong đó Hudi tính toán toàn bộ phân vùng mục tiêu cùng một lúc cho bạn. Ở đây chúng tôi chỉ định cấu hình để bỏ qua việc lập chỉ mục tự động, kết hợp trước và phân vùng lại mà upsert sẽ làm cho bạn.


 // spark-shell spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false) val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark. read.json(spark.sparkContext.parallelize(inserts, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") df.write.format("hudi"). options(getQuickstartWriteConfigs). option(OPERATION.key(),"insert_overwrite"). option(PRECOMBINE_FIELD.key(), "ts"). option(RECORDKEY_FIELD.key(), "uuid"). option(PARTITIONPATH_FIELD.key(), "partitionpath"). option(TBL_NAME.key(), tableName). mode(Append). save(basePath) // Should have different keys now for San Francisco alone, from query before. spark. read.format("hudi"). load(basePath). select("uuid","partitionpath"). sort("partitionpath","uuid"). show(100, false)

Phát triển lược đồ và phân vùng bảng

Quá trình phát triển lược đồ cho phép bạn thay đổi lược đồ của bảng Hudi để thích ứng với những thay đổi diễn ra trong dữ liệu theo thời gian.


Dưới đây là một số ví dụ về cách truy vấn và phát triển lược đồ cũng như phân vùng. Để thảo luận sâu hơn, vui lòng xem Schema Evolution | Apache Hudi . Lưu ý rằng nếu bạn chạy các lệnh này, chúng sẽ thay đổi lược đồ bảng Hudi của bạn để khác với hướng dẫn này.


 -- Alter table name ALTER TABLE oldTableName RENAME TO newTableName -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*) -- Alter table column type ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType -- Alter table properties ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value') #Alter table examples --rename to: ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2; --add column: ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string); --change column: ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint; --set properties; alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');


Hiện tại, SHOW partitions chỉ hoạt động trên hệ thống tệp vì nó dựa trên đường dẫn bảng hệ thống tệp.


Hướng dẫn này sử dụng Spark để giới thiệu khả năng của Hudi. Tuy nhiên, Hudi có thể hỗ trợ nhiều loại bảng/loại truy vấn và bảng Hudi có thể được truy vấn từ các công cụ truy vấn như Hive, Spark, Presto, v.v. Dự án Hudi có một video demo giới thiệu tất cả những điều này trên thiết lập dựa trên Docker với tất cả các hệ thống phụ thuộc chạy cục bộ.

Hú! Hú! Hãy cùng xây dựng hồ dữ liệu Hudi trên MinIO!

Apache Hudi là định dạng bảng mở đầu tiên dành cho các hồ dữ liệu và đáng được xem xét trong các kiến trúc phát trực tuyến. Cộng đồng và hệ sinh thái Hudi vẫn tồn tại và hoạt động với sự chú trọng ngày càng tăng xung quanh việc thay thế Hadoop/HDFS bằng bộ lưu trữ đối tượng/Hudi cho các hồ dữ liệu phát trực tuyến trên nền tảng đám mây. Việc sử dụng MinIO để lưu trữ Hudi sẽ mở đường cho các hồ dữ liệu và phân tích trên nhiều đám mây. MinIO bao gồm tính năng sao chép chủ động-chủ động để đồng bộ hóa dữ liệu giữa các vị trí — tại chỗ, trên đám mây công cộng/riêng tư và ở biên — hỗ trợ những nội dung tuyệt vời mà doanh nghiệp cần như cân bằng tải theo địa lý và chuyển đổi dự phòng nhanh nóng.


Hãy thử Hudi trên MinIO ngay hôm nay. Nếu bạn có bất kỳ câu hỏi nào hoặc muốn chia sẻ mẹo, vui lòng liên hệ qua kênh Slack của chúng tôi .


Cũng được xuất bản ở đây .