ډیری وختونه ویل شوي چې د AI په عمر کې - ډاټا ستاسو خټک دی. د دې لپاره ، د تولید درجې RAG غوښتنلیک رامینځته کول د ډیټا ذخیره کولو ، نسخې ، پروسس کولو ، ارزونې او پوښتنې کولو لپاره د مناسب ډیټا زیربنا غوښتنه کوي چې ستاسو د ملکیت کارپس پکې شامل دي. څرنګه چې MinIO AI ته د معلوماتو لومړۍ طریقه غوره کوي، د دې ډول پروژې لپاره زموږ د ابتدايي زیربناوو وړاندیز د عصري ډیټا لیک (MinIO) او د ویکتور ډیټابیس رامینځته کول دي. په داسې حال کې چې نور اړین وسایل باید د لارې په اوږدو کې ولګول شي، دا دوه زیربنایي واحدونه بنسټیز دي. دوی به د نږدې ټولو دندو لپاره د جاذبې مرکز په توګه خدمت وکړي چې وروسته ستاسو د RAG غوښتنلیک تولید ته راوړلو کې ورسره مخ شوي.
مګر تاسو په یوه ناندرۍ کې یاست. تاسو دمخه د دې شرایطو LLM او RAG په اړه اوریدلي یاست مګر له دې هاخوا تاسو د نامعلوم له امله ډیر څه نه دي کړي. مګر ایا دا به ښه نه وي که چیرې د "هیلو ورلډ" یا بویلر پلیټ ایپ شتون ولري چې تاسو سره د پیل کولو کې مرسته کولی شي؟
اندیښنه مه کوئ، زه په هماغه کښتۍ کې وم. نو پدې بلاګ کې به موږ وښیو چې څنګه د کموډیټ هارډویر په کارولو سره د بیا ترلاسه کولو وده شوي نسل (RAG) پراساس چیٹ غوښتنلیک جوړولو لپاره MinIO وکاروو.
د ویکتور ډیټابیس په کارولو سره د ټولو اسنادو ، پروسس شوي ټوټې او سرایت کولو لپاره MinIO وکاروئ.
په بالټ کې د اسنادو اضافه کولو یا لرې کولو په وخت کې د پیښو رامینځته کولو لپاره د MinIO د بالټ خبرتیا فیچر وکاروئ
ویب بوک چې پیښه مصرفوي او د لینګچین په کارولو سره اسناد پروسس کوي او میټاډاټا او ټوټه شوي سندونه د میټاډاټا بالټ کې خوندي کوي
د نوي اضافه شوي یا حذف شوي سندونو لپاره د MinIO بالټ خبرتیا پیښې رامینځته کړئ
یو ویب هک چې پیښې مصرفوي او سرایتونه رامینځته کوي او د ویکتور ډیټابیس (LanceDB) ته یې خوندي کوي چې په MinIO کې دوام لري
تاسو کولی شئ بائنری ډاونلوډ کړئ که تاسو دا دمخه نه لرئ له دې ځایه
# Run MinIO detached !minio server ~/dev/data --console-address :9090 &
اولاما له دې ځایه ډاونلوډ کړئ
# Start the Server !ollama serve
# Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0
# Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5
# List All the Models !ollama ls
LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)
## Test with sample text get_embedding("What is MinIO?")
د mc کمانډ وکاروئ یا د UI څخه یې وکړئ
!mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'
!mc mb myminio/custom-corpus !mc mb myminio/warehouse
import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
## Test with sample text get_embedding("What is MinIO?")
په کنسول کې پیښو ته لاړ شئ-> د پیښې ځای اضافه کړئ -> ویب هک
ساحې د لاندې ارزښتونو سره ډک کړئ او خوندي کلیک وکړئ
پیژندونکی - doc-webhook
پای ټکی - http://localhost:8808/api/v1/document/notification
په پورتنۍ برخه کې د MinIO Restart کلیک وکړئ کله چې پورمپ شوی وي
( یادونه : تاسو کولی شئ د دې لپاره mc هم وکاروئ)
په کنسول کې بالټ (اډمینسټر) ته لاړ شئ -> دودیز کارپس -> پیښې
ساحې د لاندې ارزښتونو سره ډک کړئ او خوندي کلیک وکړئ
ARN - د ډراپ ډاون څخه د ډاک ویب بوک غوره کړئ
پیښې غوره کړئ - PUT او DELETE چیک کړئ
( یادونه : تاسو کولی شئ د دې لپاره mc هم وکاروئ)
موږ زموږ لومړی ویب هک ترتیب لرو
موږ به د MinIO څخه یو څیز لوستلو لپاره Langchain او Unstructured وکاروو او په څو څو برخو کې د اسنادو ویشلو
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits
# test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")
په ویب هک کې د ټوټې منطق اضافه کړئ او میټاډاټا او ټوټې د ګودام بالټ کې خوندي کړئ
import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
اوس چې موږ لومړی ویب هک لرو چې بل ګام یې کار کوي د میټاډاټا سره ټولې برخې ترلاسه کړئ ایمبیډینګونه رامینځته کړئ او په ویکتور ډیټابیس کې یې ذخیره کړئ
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
په کنسول کې پیښو ته لاړ شئ-> د پیښې ځای اضافه کړئ -> ویب هک
ساحې د لاندې ارزښتونو سره ډک کړئ او خوندي کلیک وکړئ
پیژندونکی - metadata-webhook
پای ټکی - http://localhost:8808/api/v1/metadata/notification
کله چې غوښتل شوي وي په پورتنۍ برخه کې MinIO بیا پیل کړئ کلیک وکړئ
( یادونه : تاسو کولی شئ د دې لپاره mc هم وکاروئ)
په کنسول کې بالټ (اډمینسټریټر) ته لاړ شئ -> ګودام -> پیښې
ساحې د لاندې ارزښتونو سره ډک کړئ او خوندي کلیک وکړئ
ARN - د ډراپ ډاون څخه میټاډاټا ویب بوک غوره کړئ
مخفف - میټاډاټا/
مخفی - .json
پیښې غوره کړئ - PUT او DELETE چیک کړئ
( یادونه : تاسو کولی شئ د دې لپاره mc هم وکاروئ)
موږ زموږ لومړی ویب هک ترتیب لرو
اوس چې موږ بنسټیز ویب هک کار کوو، اجازه راکړئ د MinIO ګودام بالټ کې د لانس ډی بی ویکتور ډیټابیس تنظیم کړو چې په کې به موږ ټول ایمبیډینګونه او اضافي میټاډاټا ساحې خوندي کړو.
import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")
# list existing tables db.table_names()
# Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table
# Check if that worked get_or_create_table()
# list existing tables db.table_names()
import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"
from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
اوس چې موږ د انجیکشن پایپ لاین کار کوو راځئ چې وروستی RAG پایپ لاین مدغم کړو.
اوس چې موږ سند په لانس ډی بی کې داخل کړی دی راځئ چې د لټون وړتیا اضافه کړو
EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res
# Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()
RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """
context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
ایا تاسو کولی شئ د ډیټا لیک بیکینډ په توګه د MinIO سره د RAG پراساس چیټ ته لاړ شئ او پلي کړئ؟ موږ به په نږدې راتلونکي کې په ورته موضوع یو ویبینار وکړو چیرې چې موږ به تاسو ته یو ژوندی ډیمو درکړو ځکه چې موږ د دې RAG پراساس چیټ غوښتنلیک جوړوو.
د یو پراختیا کونکي په توګه چې په MinIO کې د AI ادغام باندې تمرکز کوي ، زه په دوامداره توګه سپړم چې څنګه زموږ وسیلې په بې ساري ډول په عصري AI جوړښتونو کې مدغم کیدی شي ترڅو موثریت او پیمانه لوړ کړي. پدې مقاله کې ، موږ تاسو ته وښودله چې څنګه د چیټ غوښتنلیک جوړولو لپاره MinIO د Retrieval-Augmented Generation (RAG) سره مدغم کړو. دا یوازې د یخ برګ یوه برخه ده ، ترڅو تاسو ته د RAG او MinIO لپاره د نورو ځانګړي کارول شوي قضیې رامینځته کولو لپاره ستاسو په لټون کې وده درکړي. اوس تاسو د دې کولو لپاره د ودانۍ بلاکونه لرئ. راځئ چې دا وکړو!
که تاسو د MinIO RAG ادغام په اړه کومه پوښتنه لرئ ډاډه اوسئ چې موږ سره اړیکه ونیسئ