paint-brush
Použití MinIO k vytvoření aplikace pro chat s rozšířenou generací načítánípodle@minio
5,705 čtení
5,705 čtení

Použití MinIO k vytvoření aplikace pro chat s rozšířenou generací načítání

podle MinIO21m2024/09/18
Read on Terminal Reader

Příliš dlouho; Číst

Vytvoření aplikace RAG na produkční úrovni vyžaduje vhodnou datovou infrastrukturu pro ukládání, verzi, zpracování, hodnocení a dotazování na části dat, které tvoří váš proprietární korpus.
featured image - Použití MinIO k vytvoření aplikace pro chat s rozšířenou generací načítání
MinIO HackerNoon profile picture
0-item


Často se říká, že ve věku umělé inteligence jsou data vaším příkopem. Za tímto účelem vyžaduje vytvoření aplikace RAG na produkční úrovni vhodnou datovou infrastrukturu pro ukládání, verzi, zpracování, vyhodnocování a dotazování na části dat, které tvoří váš proprietární korpus. Vzhledem k tomu, že MinIO používá k AI přístup na prvním místě na datech, naším výchozím doporučením pro počáteční infrastrukturu pro projekt tohoto typu je nastavení Modern Data Lake (MinIO) a vektorové databáze. Zatímco ostatní pomocné nástroje může být nutné zapojit během cesty, tyto dvě jednotky infrastruktury jsou základní. Budou sloužit jako těžiště pro téměř všechny úkoly, se kterými se následně setkáte při uvedení vaší aplikace RAG do výroby.


Ale jste v rébusu. O těchto pojmech LLM a RAG jste již slyšeli, ale kromě toho jste se kvůli neznámému moc neodvážili. Nebylo by ale hezké, kdyby existovala aplikace „Hello World“ nebo standardní aplikace, která vám pomůže začít?


Neboj, byl jsem na stejné lodi. V tomto blogu si tedy ukážeme, jak používat MinIO k vytvoření chatovací aplikace založené na Retrieval Augmented Generation (RAG) pomocí komoditního hardwaru.


  • Použijte MinIO k uložení všech dokumentů, zpracovaných kusů a vložení pomocí vektorové databáze.


  • Použijte funkci oznámení o bucketu MinIO ke spuštění událostí při přidávání nebo odebírání dokumentů do bucketu


  • Webhook, který spotřebovává událost a zpracovává dokumenty pomocí Langchain a ukládá metadata a rozdělené dokumenty do segmentu metadat


  • Spouštět události upozornění MinIO bucket pro nově přidané nebo odstraněné blokované dokumenty


  • Webhook, který spotřebovává události a generuje vložení a ukládá je do vektorové databáze (LanceDB), která je uložena v MinIO


Použité klíčové nástroje

  • MinIO - Object Store pro zachování všech dat
  • LanceDB - Serverless open-source vektorová databáze, která přetrvává data v úložišti objektů
  • Ollama – Chcete-li lokálně spustit LLM a model vkládání (kompatibilní s OpenAI API)
  • Gradio - Rozhraní, jehož prostřednictvím lze komunikovat s aplikací RAG
  • FastAPI – Server pro webhooky, který přijímá oznámení o bucketu z MinIO a zpřístupňuje aplikaci Gradio
  • LangChain & Unstructured - extrahovat užitečný text z našich dokumentů a rozdělit je pro vkládání


Použité modely

  • LLM – Phi-3-128K (3,8B parametrů)
  • Embeddings – Nomic Embed Text v1.5 ( Matryoshka Embeddings / 768 Dim, 8K kontext)

Spusťte MinIO Server

Zde si můžete stáhnout binární soubor, pokud jej ještě nemáte


 # Run MinIO detached !minio server ~/dev/data --console-address :9090 &


Spustit Ollama Server + stáhnout LLM & Embedding Model

Stáhněte si Ollama odtud


 # Start the Server !ollama serve


 # Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0


 # Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5


 # List All the Models !ollama ls


Vytvořte základní aplikaci Gradio pomocí FastAPI k testování modelu

 LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response


 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)

Testovací model vkládání

 import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)


 ## Test with sample text get_embedding("What is MinIO?")


Přehled potrubí příjmu

Vytvářejte MiniIO buckety

Použijte příkaz mc nebo to proveďte z uživatelského rozhraní

  • custom-corpus - Pro uložení všech dokumentů
  • sklad – pro uložení všech metadat, bloků a vektorových vložení


 !mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'


 !mc mb myminio/custom-corpus !mc mb myminio/warehouse

Vytvořte webhook, který spotřebovává oznámení o segmentu z vlastního korpusového segmentu

 import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


 ## Test with sample text get_embedding("What is MinIO?")


Vytvořte upozornění na události MinIO a propojte je s vlastním korpusovým bucketem

Vytvořit událost Webhooku

V konzole přejděte na Události-> Přidat cíl události -> Webhook


Vyplňte pole následujícími hodnotami a stiskněte Uložit


Identifikátor - doc-webhook


Koncový bodhttp://localhost:8808/api/v1/document/notification


Po zobrazení výzvy klikněte na Restartovat MinIO v horní části


( Poznámka : K tomu můžete také použít mc)

Propojte událost webhooku s událostmi vlastního korpusu

V konzoli přejděte na Buckets (Administrator) -> custom-corpus -> Events


Vyplňte pole následujícími hodnotami a stiskněte Uložit


ARN – Z rozevírací nabídky vyberte webhook dokumentu


Vyberte Události - Zaškrtněte PUT a DELETE


( Poznámka : K tomu můžete také použít mc)


Máme za sebou první nastavení webhooku

Nyní otestujte přidáním a odebráním objektu

Extrahujte data z dokumentů a bloku

Použijeme Langchain a Nestrukturované ke čtení objektu z MinIO a rozdělených dokumentů na více částí


 from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits


 # test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")

Přidejte do Webhooku logiku Chunking

Přidejte logiku bloků do webhooku a uložte metadata a bloky do segmentu skladu


 import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"

Aktualizujte server FastAPI pomocí nové logiky

 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)

Přidejte nový webhook ke zpracování metadat/dílů dokumentu

Nyní, když funguje první webhook, dalším krokem je získat všechny části s metadaty, vygenerovat vložení a uložit je do vektorové databáze



 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


Vytvořte oznámení o událostech MinIO a propojte je s kbelíkem skladu

Vytvořit událost Webhooku

V konzole přejděte na Události-> Přidat cíl události -> Webhook


Vyplňte pole následujícími hodnotami a stiskněte Uložit


Identifikátor – metadata-webhook


Koncový bodhttp://localhost:8808/api/v1/metadata/notification


Po zobrazení výzvy klikněte na Restartovat MinIO v horní části


( Poznámka : K tomu můžete také použít mc)

Propojte událost webhooku s událostmi vlastního korpusu

V konzoli přejděte do Buckets (Administrátor) -> sklad -> Události


Vyplňte pole následujícími hodnotami a stiskněte Uložit


ARN – Z rozbalovací nabídky vyberte webhook metadat


Prefix - metadata/


Přípona – .json


Vyberte Události - Zaškrtněte PUT a DELETE


( Poznámka : K tomu můžete také použít mc)


Máme za sebou první nastavení webhooku

Nyní otestujte přidáním a odebráním objektu ve vlastním korpusu a zjistěte, zda se tento webhook spustí

Vytvořte vektorovou databázi LanceDB v MinIO

Nyní, když máme funkční základní webhook, pojďme nastavit vektorovou databázi lanceDB v kbelíku skladu MinIO, do kterého uložíme všechna vložení a další pole metadat.


 import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")


 # list existing tables db.table_names()


 # Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table


 # Check if that worked get_or_create_table()


 # list existing tables db.table_names()

Přidejte Ukládání/odebírání dat z lanceDB do metadata-webhook

 import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"

Přidejte plánovač, který zpracovává data z front

 from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)

Aktualizujte FastAPI pomocí změn Vector Embedding Changes

 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808) 




Nyní, když máme funkční potrubí Ingestion, pojďme integrovat finální potrubí RAG.

Přidejte možnost vektorového vyhledávání

Nyní, když máme dokument přijatý do lanceDB, přidejte možnost vyhledávání


 EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res


 # Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()

Vyzvěte LLM, aby použila příslušné dokumenty

 RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """


 context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df

Aktualizujte FastAPI Chat Endpoint pro použití RAG

 import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)


Podařilo se vám projít a implementovat chat založený na RAG s MinIO jako backendem datového jezera? V blízké budoucnosti uspořádáme webový seminář na stejné téma, kde vám poskytneme živé demo, když vytváříme tuto chatovací aplikaci založenou na RAG.

RAGs-R-Us

Jako vývojář zaměřený na integraci AI ve společnosti MinIO neustále zkoumám, jak lze naše nástroje hladce integrovat do moderních architektur AI, aby se zvýšila efektivita a škálovatelnost. V tomto článku jsme vám ukázali, jak integrovat MinIO s Retrieval-Augmented Generation (RAG) a vytvořit chatovací aplikaci. Toto je jen špička ledovce, která vám poskytne podporu ve vaší snaze vytvořit více unikátních použitých pouzder pro RAG a MinIO. Nyní k tomu máte stavební kameny. Pojďme na to!


Pokud máte nějaké dotazy ohledně integrace MinIO RAG, určitě se na nás obraťte Slack !

L O A D I N G
. . . comments & more!

About Author

MinIO HackerNoon profile picture
MinIO@minio
MinIO is a high-performance, cloud-native object store that runs anywhere (public cloud, private cloud, colo, onprem).

ZAVĚŠIT ZNAČKY

TENTO ČLÁNEK BYL PŘEDSTAVEN V...