paint-brush
MinIO を使用して検索拡張生成チャット アプリケーションを構築する@minio
5,705 測定値
5,705 測定値

MinIO を使用して検索拡張生成チャット アプリケーションを構築する

MinIO21m2024/09/18
Read on Terminal Reader

長すぎる; 読むには

本番環境レベルの RAG アプリケーションを構築するには、独自のコーパスを構成する大量のデータを保存、バージョン管理、処理、評価、およびクエリするための適切なデータ インフラストラクチャが必要です。
featured image - MinIO を使用して検索拡張生成チャット アプリケーションを構築する
MinIO HackerNoon profile picture
0-item


AI の時代では、データが堀であるとよく言われます。そのため、本番レベルの RAG アプリケーションを構築するには、独自のコーパスを構成する大量のデータを保存、バージョン管理、処理、評価、およびクエリするための適切なデータ インフラストラクチャが必要です。MinIO は AI に対してデータ ファーストのアプローチを採用しているため、このタイプのプロジェクトに対するデフォルトの初期インフラストラクチャの推奨事項は、Modern Data Lake (MinIO) とベクター データベースをセットアップすることです。途中で他の補助ツールを接続する必要がある場合もありますが、これら 2 つのインフラストラクチャ ユニットは基礎的なものです。これらは、RAG アプリケーションを本番環境に導入する際にその後に発生するほぼすべてのタスクの中心として機能します。


しかし、あなたは困惑しています。LLM と RAG という用語については聞いたことがありますが、それ以上は未知のため、あまり詳しく調べていません。しかし、始めるのに役立つ「Hello World」または定型アプリがあれば便利だと思いませんか?


心配しないでください。私も同じ状況でした。そこでこのブログでは、MinIO を使用して、市販のハードウェアを使用して Retrieval Augmented Generation (RAG) ベースのチャット アプリケーションを構築する方法を説明します。


  • MinIO を使用して、ベクター データベースを使用してすべてのドキュメント、処理されたチャンク、および埋め込みを保存します。


  • MinIO のバケット通知機能を使用して、バケットにドキュメントを追加または削除するときにイベントをトリガーします。


  • イベントを消費し、Langchain を使用してドキュメントを処理し、メタデータとチャンクされたドキュメントをメタデータ バケットに保存する Webhook


  • 新しく追加または削除されたチャンクドキュメントに対して MinIO バケット通知イベントをトリガーします。


  • イベントを消費して埋め込みを生成し、それをMinIOに永続化されているベクターデータベース(LanceDB)に保存するWebhook


使用された主なツール

  • MinIO - すべてのデータを永続化するオブジェクトストア
  • LanceDB - オブジェクト ストアにデータを保存するサーバーレス オープンソース ベクトル データベース
  • Ollama - LLM と埋め込みモデルをローカルで実行する (OpenAI API 互換)
  • Gradio - RAG アプリケーションと対話するためのインターフェース
  • FastAPI - MinIOからバケット通知を受信し、Gradioアプリを公開するWebhooksのサーバー
  • LangChain & Unstructured - ドキュメントから有用なテキストを抽出し、埋め込み用にチャンク化します


使用モデル

  • LLM - Phi-3-128K (3.8B パラメータ)
  • 埋め込み- Nomic Embed Text v1.5 ( Matryoshka 埋め込み/ 768 Dim、8K コンテキスト)

MinIOサーバーを起動する

バイナリをまだお持ちでない場合は、ここからダウンロードできます。


 # Run MinIO detached !minio server ~/dev/data --console-address :9090 &


Ollama Serverを起動し、LLMと埋め込みモデルをダウンロードする

Ollamaをここからダウンロード


# Start the Server !ollama serve


 # Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0


 # Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5


 # List All the Models !ollama ls


FastAPI を使用して基本的な Gradio アプリを作成し、モデルをテストする

LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response


 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)

埋め込みモデルのテスト

import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)


 ## Test with sample text get_embedding("What is MinIO?")


取り込みパイプラインの概要

MinIO バケットを作成する

mcコマンドを使用するか、UIから実行します

  • カスタムコーパス - すべての文書を保存する
  • ウェアハウス - すべてのメタデータ、チャンク、ベクトル埋め込みを保存する


!mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'


 !mc mb myminio/custom-corpus !mc mb myminio/warehouse

カスタムコーパスバケットからバケット通知を消費するWebhookを作成する

import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


 ## Test with sample text get_embedding("What is MinIO?")


MinIO イベント通知を作成し、それをカスタムコーパス バケットにリンクする

Webhookイベントを作成する

コンソールでイベント -> イベントの宛先を追加 -> Webhookに移動します。


以下の値をフィールドに入力し、保存をクリックします。


識別子- doc-webhook


エンドポイント- http://localhost:8808/api/v1/document/notification


プロンプトが表示されたら、上部の「MinIOを再起動」をクリックします。


(: これには mc も使用できます)

Webhookイベントをカスタムコーパスバケットイベントにリンクする

コンソールで、バケット(管理者)->カスタムコーパス->イベントに移動します。


以下の値をフィールドに入力し、保存をクリックします。


ARN - ドロップダウンからdoc-webhookを選択します


イベントを選択- PUTとDELETEをチェック


(: これには mc も使用できます)


最初のWebhookの設定が完了しました

オブジェクトを追加したり削除したりしてテストしてみましょう

ドキュメントとチャンクからデータを抽出する

LangchainとUnstructuredを使用してMinIOからオブジェクトを読み取り、ドキュメントを複数のチャンクに分割します。


 from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits


 # test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")

Webhookにチャンクロジックを追加する

チャンクロジックをWebhookに追加し、メタデータとチャンクをウェアハウスバケットに保存します。


 import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"

新しいロジックでFastAPIサーバーを更新する

import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)

ドキュメントのメタデータ/チャンクを処理するための新しい Webhook を追加する

最初のWebhookが機能するようになったので、次のステップはメタデータを含むすべてのチャンクを取得し、埋め込みを生成してベクターデータベースに保存することです。



 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


MinIO イベント通知を作成し、それをウェアハウス バケットにリンクする

Webhookイベントの作成

コンソールでイベント -> イベントの宛先を追加 -> Webhookに移動します。


以下の値をフィールドに入力し、保存をクリックします。


識別子- メタデータ Webhook


エンドポイント- http://localhost:8808/api/v1/metadata/notification


プロンプトが表示されたら、上部の「MinIOを再起動」をクリックします。


(: これには mc も使用できます)

Webhookイベントをカスタムコーパスバケットイベントにリンクする

コンソールで、バケット(管理者)-> ウェアハウス -> イベントに移動します。


以下の値をフィールドに入力し、保存をクリックします。


ARN - ドロップダウンからメタデータウェブフックを選択します


プレフィックス- metadata/


サフィックス- .json


イベントを選択- PUTとDELETEをチェック


(: これには mc も使用できます)


最初のWebhookの設定が完了しました

次に、カスタムコーパスにオブジェクトを追加および削除して、このWebhookがトリガーされるかどうかを確認します。

MinIO で LanceDB ベクトル データベースを作成する

基本的なWebhookが動作するようになったので、MinIOウェアハウスバケットにlanceDBベクターデータベースを設定し、そこにすべての埋め込みと追加のメタデータフィールドを保存します。


 import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")


 # list existing tables db.table_names()


 # Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table


 # Check if that worked get_or_create_table()


 # list existing tables db.table_names()

lanceDB からのデータの保存/削除をメタデータ Webhook に追加

import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"

キューからデータを処理するスケジューラを追加する

from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)

ベクトル埋め込みの変更に合わせて FastAPI を更新

import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808) 




取り込みパイプラインが動作するようになったので、最終的な RAG パイプラインを統合しましょう。

ベクトル検索機能の追加

文書をlanceDBに取り込んだので、検索機能を追加しましょう。


 EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res


 # Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()

LLMに関連文書の使用を促す

RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """


 context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df

RAG を使用するように FastAPI チャット エンドポイントを更新する

import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


MinIO をデータ レイク バックエンドとして使用して RAG ベースのチャットを実装できましたか? 近い将来、この同じトピックに関するウェビナーを開催し、この RAG ベースのチャット アプリケーションを構築しながらライブ デモをお届けする予定です。

ラグザラス

MinIO で AI 統合に注力している開発者として、私は効率性とスケーラビリティを高めるために、当社のツールを最新の AI アーキテクチャにシームレスに統合する方法を常に模索しています。この記事では、MinIO と Retrieval-Augmented Generation (RAG) を統合してチャット アプリケーションを構築する方法を説明しました。これは氷山の一角に過ぎず、RAG と MinIO のよりユニークなユースケースを構築するための探求を後押しします。これで、それを実現するための構成要素が揃いました。さあ、始めましょう!


MinIO RAG統合についてご質問がある場合は、お気軽にお問い合わせください。スラック