غالبًا ما يُقال إنه في عصر الذكاء الاصطناعي - البيانات هي خندقك. ولتحقيق هذه الغاية، يتطلب بناء تطبيق RAG من الدرجة الإنتاجية بنية تحتية مناسبة للبيانات لتخزين وإصدار ومعالجة وتقييم واستعلام أجزاء البيانات التي تشكل مجموعتك الملكية. ونظرًا لأن MinIO يتبنى نهجًا يضع البيانات أولاً في الاعتبار فيما يتعلق بالذكاء الاصطناعي، فإن توصيتنا الأولية الافتراضية للبنية التحتية لمشروع من هذا النوع هي إعداد بحيرة بيانات حديثة (MinIO) وقاعدة بيانات متجهة. وبينما قد يلزم توصيل أدوات مساعدة أخرى على طول الطريق، فإن وحدتي البنية التحتية هاتين أساسيتان. وسوف تعملان كمركز ثقل لجميع المهام تقريبًا التي تواجهها لاحقًا في إدخال تطبيق RAG الخاص بك إلى الإنتاج.
لكنك في حيرة من أمرك. لقد سمعت عن هذين المصطلحين LLM وRAG من قبل، ولكنك لم تخوض الكثير من التجارب بسبب عدم معرفتك بهما. ولكن ألن يكون من الرائع أن يكون هناك تطبيق "Hello World" أو تطبيق عادي يمكن أن يساعدك في البدء؟
لا تقلق، كنت في نفس الموقف. لذا في هذه المدونة، سنوضح كيفية استخدام MinIO لبناء تطبيق دردشة يعتمد على Retrieval Augmented Generation (RAG) باستخدام أجهزة متوفرة.
استخدم MinIO لتخزين كافة المستندات والأجزاء المعالجة والتضمينات باستخدام قاعدة بيانات المتجهات.
استخدم ميزة إشعارات الدلو في MinIO لتشغيل الأحداث عند إضافة مستندات إلى دلو أو إزالتها منه
Webhook الذي يستهلك الحدث ويعالج المستندات باستخدام Langchain ويحفظ البيانات الوصفية والمستندات المجزأة في دلو البيانات الوصفية
تشغيل أحداث إشعار دلو MinIO للمستندات المجزأة المضافة حديثًا أو المحذوفة
Webhook يستهلك الأحداث ويولد التضمينات ويحفظها في قاعدة بيانات Vector (LanceDB) التي يتم الاحتفاظ بها في MinIO
يمكنك تنزيل الملف الثنائي إذا لم يكن لديك بالفعل من هنا
# Run MinIO detached !minio server ~/dev/data --console-address :9090 &
تحميل برنامج Ollama من هنا
# Start the Server !ollama serve
# Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0
# Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5
# List All the Models !ollama ls
LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)
## Test with sample text get_embedding("What is MinIO?")
استخدم الأمر mc أو قم به من واجهة المستخدم
!mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'
!mc mb myminio/custom-corpus !mc mb myminio/warehouse
import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
## Test with sample text get_embedding("What is MinIO?")
في وحدة التحكم، انتقل إلى الأحداث-> إضافة وجهة الحدث-> Webhook
املأ الحقول بالقيم التالية واضغط على حفظ
المعرف - doc-webhook
نقطة النهاية - http://localhost:8808/api/v1/document/notification
انقر فوق إعادة تشغيل MinIO في الأعلى عند الالتقاط
( ملاحظة : يمكنك أيضًا استخدام mc لهذا الغرض)
في وحدة التحكم، انتقل إلى Buckets (المسؤول) -> custom-corpus -> Events
املأ الحقول بالقيم التالية واضغط على حفظ
ARN - حدد doc-webhook من القائمة المنسدلة
حدد الأحداث - حدد PUT و DELETE
( ملاحظة : يمكنك أيضًا استخدام mc لهذا الغرض)
لقد قمنا بإعداد خطاف الويب الأول لدينا
سنستخدم Langchain وUnstructured لقراءة كائن من MinIO وتقسيم المستندات إلى أجزاء متعددة
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits
# test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")
أضف منطق القطعة إلى webhook واحفظ البيانات الوصفية والقطع في دلو المستودع
import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
الآن بعد أن أصبح لدينا أول خطاف ويب يعمل، فإن الخطوة التالية هي الحصول على جميع القطع التي تحتوي على بيانات وصفية وإنشاء التضمينات وتخزينها في قاعدة بيانات المتجهات
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
في وحدة التحكم، انتقل إلى الأحداث-> إضافة وجهة الحدث-> Webhook
املأ الحقول بالقيم التالية واضغط على حفظ
المعرف - metadata-webhook
نقطة النهاية - http://localhost:8808/api/v1/metadata/notification
انقر فوق إعادة تشغيل MinIO في الأعلى عند مطالبتك بذلك
( ملاحظة : يمكنك أيضًا استخدام mc لهذا الغرض)
في وحدة التحكم، انتقل إلى الدلاء (المسؤول) -> المستودع -> الأحداث
املأ الحقول بالقيم التالية واضغط على حفظ
ARN - حدد webhook metadata من القائمة المنسدلة
البادئة - البيانات الوصفية/
اللاحقة - .json
حدد الأحداث - حدد PUT و DELETE
( ملاحظة : يمكنك أيضًا استخدام mc لهذا الغرض)
لقد قمنا بإعداد خطاف الويب الأول لدينا
الآن بعد أن أصبح لدينا خطاف الويب الأساسي يعمل، فلنقم بإعداد قاعدة بيانات متجه lanceDB في دلو مستودع MinIO حيث سنحفظ جميع التضمينات وحقول البيانات الوصفية الإضافية
import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")
# list existing tables db.table_names()
# Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table
# Check if that worked get_or_create_table()
# list existing tables db.table_names()
import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"
from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
الآن بعد أن أصبح خط أنابيب الاستيعاب يعمل، فلنبدأ في دمج خط أنابيب RAG النهائي.
الآن بعد أن قمنا بإدخال المستند إلى lanceDB، فلنقم بإضافة إمكانية البحث
EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res
# Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()
RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """
context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
هل تمكنت من تنفيذ الدردشة القائمة على RAG باستخدام MinIO كبرنامج أساسي لبحيرة البيانات؟ سنقوم في المستقبل القريب بإجراء ندوة عبر الإنترنت حول نفس الموضوع حيث سنقدم لك عرضًا توضيحيًا مباشرًا أثناء بناء تطبيق الدردشة القائم على RAG.
بصفتي مطورًا يركز على تكامل الذكاء الاصطناعي في MinIO، فأنا أستكشف باستمرار كيف يمكن دمج أدواتنا بسلاسة في بنيات الذكاء الاصطناعي الحديثة لتعزيز الكفاءة وقابلية التوسع. في هذه المقالة، أوضحنا لك كيفية دمج MinIO مع Retrieval-Augmented Generation (RAG) لبناء تطبيق دردشة. هذا مجرد غيض من فيض، لمنحك دفعة في سعيك لبناء المزيد من الحالات المستخدمة الفريدة لـ RAG وMinIO. الآن لديك اللبنات الأساسية للقيام بذلك. دعنا نفعل ذلك!
إذا كانت لديك أي أسئلة حول تكامل MinIO RAG، فتأكد من التواصل معنا على