AI döwründe köplenç maglumatlar siziň gämi duralgasy diýilýär. Şol maksat bilen, önümçilik derejeli RAG programmasyny gurmak, eýeçilik korpusyňyzy düzýän maglumatlary saklamak, wersiýa etmek, gaýtadan işlemek, baha bermek we talap etmek üçin amatly maglumat infrastrukturasyny talap edýär. MinIO AI-e ilkinji gezek çemeleşýänligi sebäpli, bu görnüşdäki taslama üçin başlangyç başlangyç infrastrukturamyz, Döwrebap Maglumat köli (MinIO) we wektor maglumatlar bazasyny döretmekdir. Beýleki goşmaça gurallary ýolda dakmak zerurlygy ýüze çyksa-da, bu iki infrastruktura bölümi esas bolup durýar. RAG programmaňyzy önümçilige çykarmakda ýüze çykan ähli meseleler üçin agyrlyk merkezi bolup hyzmat eder.
Youöne sen bir pikirde. Bu sözleri LLM we RAG hakda öňem eşidipdiňiz, ýöne näbellilik sebäpli kän bir iş etmediňiz. Startöne başlamaga kömek edip biljek “Salam Dünýä” ýa-da gazanlar programmasy bar bolsa gowy bolmazmy?
Alada etme, men şol gaýykda boldum. Şeýlelik bilen, bu blogda, haryt enjamlaryny ulanyp, “Retrieval Augmented Generation” (RAG) esasly söhbetdeşlik programmasyny gurmak üçin MinIO-ny nädip ulanmalydygyny görkezeris.
Ihli resminamalary, gaýtadan işlenen bölekleri we wektor maglumatlar bazasyny ulanyp, MinIO ulanyň.
Çelege resminama goşanyňyzda ýa-da aýyranyňyzda hadysalary ýüze çykarmak üçin MinIO-nyň çelek habarnamasy aýratynlygyny ulanyň
Wakany sarp edýän we Langchain ulanyp resminamalary gaýtadan işleýän we metadata we bölek resminamalary metadata çelekine ýazdyrýan Webhook
Täze goşulan ýa-da aýrylan resminamalar üçin MinIO çelek habarnamalary
Wakalary sarp edýän we içerde goýýan we MinIO-da dowam edýän Wektor maglumatlar bazasyna (LanceDB) ýatda saklaýan Webhook.
Ikili ikisini göçürip alyp bilersiňiz
# Run MinIO detached !minio server ~/dev/data --console-address :9090 &
Ollamany şu ýerden göçürip alyň
# Start the Server !ollama serve
# Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0
# Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5
# List All the Models !ollama ls
LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)
## Test with sample text get_embedding("What is MinIO?")
Mc buýrugyny ulanyň ýa-da UI-den ýerine ýetiriň
!mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'
!mc mb myminio/custom-corpus !mc mb myminio/warehouse
import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
## Test with sample text get_embedding("What is MinIO?")
Konsolda Wakalara-> Wakanyň niýetini goşuň -> Webhook-a gidiň
Meýdanlary aşakdaky bahalar bilen dolduryň we ýatda saklaň
Kesgitleýji - doc-webhook
Ahyrky nokat - http: // ýerlihost: 8808 / api / v1 / resminama / bildiriş
Pormpt edilende ýokardaky MinIO-ny täzeden açyň
( Bellik : Munuň üçin mc hem ulanyp bilersiňiz)
Konsolda Çeleklere (Administrator) -> custom-corpus -> Wakalara gidiň
Meýdanlary aşakdaky bahalar bilen dolduryň we ýatda saklaň
ARN - Açylýan ýerden doc-webhook saýlaň
Wakalary saýlaň - PUT we Öçüriň
( Bellik : Munuň üçin mc hem ulanyp bilersiňiz)
Ilkinji webhook sazlamamyz bar
“MinIO” -dan we “Split Document” -den bir obýekti köpeltmek üçin “Langchain” we “Unstructured” -i ulanarys
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits
# test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")
Webhook-a logika goşuň we metadatalary we bölekleri ammar çelekine ýazdyryň
import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Indiki ädimde işleýän ilkinji web sahypamyz bar bolsa, metadata bilen ähli bölekleri “Embeddings” dörediň we wektor maglumatlar bazasynda saklaň.
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Konsolda Wakalara-> Wakanyň niýetini goşuň -> Webhook-a gidiň
Meýdanlary aşakdaky bahalar bilen dolduryň we ýatda saklaň
Kesgitleýji - metadata-webhook
Ahyrky nokat - http: // ýerlihost: 8808 / api / v1 / metadata / bildiriş
Soralanda ýokarsyndaky MinIO-ny täzeden açyň
( Bellik : Munuň üçin mc hem ulanyp bilersiňiz)
Konsolda Çeleklere (Administrator) -> ammar -> Wakalara gidiň
Meýdanlary aşakdaky bahalar bilen dolduryň we ýatda saklaň
ARN - Açylýan ýerden metadata-webhook saýlaň
Prefiks - metadata /
Suffix - .json
Wakalary saýlaň - PUT we Öçüriň
( Bellik : Munuň üçin mc hem ulanyp bilersiňiz)
Ilkinji webhook sazlamamyz bar
Esasy webhook işleýänimizden soň, MinIO ammar çelgesinde lanceDB wektor maglumat bazasyny gurnamaga mümkinçilik bereliň, bu ýerde ähli goýulmalary we goşmaça metadata meýdanlaryny tygşytlarys.
import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")
# list existing tables db.table_names()
# Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table
# Check if that worked get_or_create_table()
# list existing tables db.table_names()
import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"
from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Indi “Ingestion” turbageçirijisi işleýärkä, iň soňky RAG turbageçirijisini birleşdireliň.
Indi resminama lanceDB-e girenimizden soň, gözleg mümkinçiligini goşalyň
EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res
# Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()
RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """
context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Maglumat kölüniň arkasy hökmünde MinIO bilen RAG esasly söhbetdeşligi geçip, durmuşa geçirip bildiňizmi? RAakyn geljekde bu RAG esasly söhbetdeşlik programmasyny guranymyzda size göni efirde görkezjek şol mowzukda webinar ederis.
MinIO-da AI integrasiýasyna ünsi jemleýän bir dörediji hökmünde, netijeliligimizi we göwrümliligini ýokarlandyrmak üçin gurallarymyzyň häzirki zaman AI arhitekturasyna bökdençsiz birleşdirilip bilinjekdigini yzygiderli öwrenýärin. Bu makalada, söhbet programmasyny gurmak üçin MinIO-ny Retrieval-Augmented Generation (RAG) bilen nädip birikdirmelidigini görkezdik. Bu, aýsbergiň ujy, RAG we MinIO üçin has üýtgeşik ulanylýan halatlary gurmak islegiňizi güýçlendirmek üçin. Indi muny ýerine ýetirmek üçin gurluşyk bloklaryňyz bar. Geliň!
MinIO RAG integrasiýasy barada soraglaryňyz bar bolsa, bize ýüz tutuň