Matetika no lazaina fa amin'ny vanim-potoanan'ny AI - ny angon-drakitra dia moat anao. Mba hanaovana izany, ny fananganana rindranasa RAG amin'ny ambaratonga famokarana dia mitaky fotodrafitrasa angon-drakitra mety hitahirizana, dikan-teny, fanodinana, fanombanana, ary fangataham-panazavana amin'ny angona misy anao. Koa satria ny MiniIO dia maka ny angona voalohany amin'ny AI, ny tolo-kevitry ny fotodrafitrasa voalohany ho an'ny tetikasa amin'ity karazana ity dia ny fananganana Modern Data Lake (MinIO) sy angona vector. Na dia mety mila ampidirina eny an-dalana aza ireo fitaovana fanampiny hafa, ireo fotodrafitrasa roa ireo dia fototra. Izy ireo dia ho ivon'ny sinton'ny saika ho an'ny asa rehetra atrehana amin'ny fampidirana ny fampiharana RAG anao amin'ny famokarana.
Saingy ao anaty sangisangy ianao. Efa naheno an'ireo teny hoe LLM sy RAG ireo ianao taloha fa ny ankoatr'izay dia tsy mbola nanao ezaka be noho ny tsy fantatra. Fa tsy tsara ve raha misy “Hello World” na app boilerplate afaka manampy anao hanomboka?
Aza manahy fa tao anaty sambo iray ihany aho. Noho izany ato amin'ity bilaogy ity dia hasehontsika ny fomba fampiasana ny MiniIO hananganana rindranasa chat mifototra amin'ny Retrieval Augmented Generation (RAG) amin'ny fampiasana fitaovana entam-barotra.
Ampiasao ny MiniIO mba hitahiry ny antontan-taratasy rehetra, ny ampahany voakarakara ary ny embeddings mampiasa ny angon-drakitra vector.
Ampiasao ny endri-pampandrenesana siny an'i MiniIO hanentanana hetsika rehefa manampy na manaisotra antontan-taratasy ao anaty siny
Webhook izay mandany ny hetsika sy manodina ny antontan-taratasy amin'ny alàlan'ny Langchain ary mitahiry ny metadata sy ny antontan-taratasy voafantina ao anaty siny metadata
Trigger hetsika fampandrenesana siny MiniIO ho an'ny antontan-taratasy vao nampiana na nesorina
Webhook izay mandany ny hetsika sy mamorona embeddings ary mitahiry izany ao amin'ny Vector Database (LanceDB) izay mijanona ao amin'ny MiniIO.
Afaka misintona ny binary ianao raha mbola tsy manana izany avy eto
# Run MinIO detached !minio server ~/dev/data --console-address :9090 &
Ampidino eto ny Ollama
# Start the Server !ollama serve
# Download Phi-3 LLM !ollama pull phi3:3.8b-mini-128k-instruct-q8_0
# Download Nomic Embed Text v1.5 !ollama pull nomic-embed-text:v1.5
# List All the Models !ollama ls
LLM_MODEL = "phi3:3.8b-mini-128k-instruct-q8_0" EMBEDDING_MODEL = "nomic-embed-text:v1.5" LLM_ENDPOINT = "http://localhost:11434/api/chat" CHAT_API_PATH = "/chat" def llm_chat(user_question, history): history = history or [] user_message = f"**You**: {user_question}" llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "keep_alive": "48h", # Keep the model in-memory for 48 hours "messages": [ {"role": "user", "content": user_question } ]}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
import numpy as np EMBEDDING_ENDPOINT = "http://localhost:11434/api/embeddings" EMBEDDINGS_DIM = 768 def get_embedding(text): resp = requests.post(EMBEDDING_ENDPOINT, json={"model": EMBEDDING_MODEL, "prompt": text}) return np.array(resp.json()["embedding"][:EMBEDDINGS_DIM], dtype=np.float16)
## Test with sample text get_embedding("What is MinIO?")
Ampiasao ny baiko mc na ataovy amin'ny UI
!mc alias set 'myminio' 'http://localhost:9000' 'minioadmin' 'minioadmin'
!mc mb myminio/custom-corpus !mc mb myminio/warehouse
import json import gradio as gr import requests from fastapi import FastAPI, Request from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request): json_data = await request.json() print(json.dumps(json_data, indent=2)) with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
## Test with sample text get_embedding("What is MinIO?")
Ao amin'ny Console dia mandehana any amin'ny Events-> Add Event Destination -> Webhook
Fenoy ny sanda manaraka ary tsindrio ny Save
Identifier - doc-webhook
Endpoint - http://localhost:8808/api/v1/document/notification
Tsindrio Restart MiniIO eo an-tampony rehefa pormpted to
( Fanamarihana : Azonao atao koa ny mampiasa mc amin'ity)
Ao amin'ny console dia mandehana any amin'ny Buckets (Administrator) -> custom-corpus -> Events
Fenoy ny sanda manaraka ary tsindrio ny Save
ARN - Safidio ny doc-webhook avy amin'ny dropdown
Mifidiana hetsika - Jereo ny PUT sy DELETE
( Fanamarihana : Azonao atao koa ny mampiasa mc amin'ity)
Manana ny fanamboarana webhook voalohany izahay
Hampiasa Langchain sy Unstructured izahay hamakiana zavatra iray avy amin'ny MiniIO sy Split Documents amin'ny ampahany maromaro.
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import S3FileLoader MINIO_ENDPOINT = "http://localhost:9000" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" # Split Text from a given document using chunk_size number of characters text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=64, length_function=len) def split_doc_by_chunks(bucket_name, object_key): loader = S3FileLoader(bucket_name, object_key, endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) docs = loader.load() doc_splits = text_splitter.split_documents(docs) return doc_splits
# test the chunking split_doc_by_chunks("custom-corpus", "The-Enterprise-Object-Store-Feature-Set.pdf")
Ampio ny lojikan'ny chunk amin'ny webhook ary tehirizo ao amin'ny siny trano fanatobiana ny metadata sy ny sombiny
import urllib.parse import s3fs METADATA_PREFIX = "metadata" # Using s3fs to save and delete objects from MinIO s3 = s3fs.S3FileSystem() # Split the documents and save the metadata to warehouse bucket def create_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(record["s3"]["bucket"]["name"], record["s3"]["object"]["key"]) doc_splits = split_doc_by_chunks(bucket_name, object_key) for i, chunk in enumerate(doc_splits): source = f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}/chunk_{i:05d}.json" with s3.open(source, "w") as f: f.write(chunk.json()) return "Task completed!" def delete_object_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) s3.delete(f"warehouse/{METADATA_PREFIX}/{bucket_name}/{object_key}", recursive=True) return "Task completed!"
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Amin'izao fotoana izao dia manana ny webhook voalohany miasa amin'ny dingana manaraka isika dia ny maka ny tapany rehetra miaraka amin'ny metadata Mamorona ny Embeddings ary mitahiry izany ao amin'ny tahiry vector.
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() print(json.dumps(json_data, indent=2)) @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Ao amin'ny Console dia mandehana any amin'ny Events-> Add Event Destination -> Webhook
Fenoy ny sanda manaraka ary tsindrio ny Save
Identifier - metadata-webhook
Endpoint - http://localhost:8808/api/v1/metadata/notification
Tsindrio Restart MiniIO eo an-tampony rehefa asaina
( Fanamarihana : Azonao atao koa ny mampiasa mc amin'ity)
Ao amin'ny console dia mandehana ao amin'ny Buckets (Administrator) -> trano fanatobiana entana -> Events
Fenoy ny sanda manaraka ary tsindrio ny Save
ARN - Safidio ny metadata-webhook avy amin'ny dropdown
Tovona - metadata/
Tovana - .json
Mifidiana hetsika - Jereo ny PUT sy DELETE
( Fanamarihana : Azonao atao koa ny mampiasa mc amin'ity)
Manana ny fanamboarana webhook voalohany izahay
Amin'izao fotoana izao dia manana ny webhook fototra miasa isika, avelao hametraka ny lanceDB vector databse ao amin'ny siny trano fanatobiana entana MiniIO izay hamonjena ny embeddings rehetra sy ny saha metadata fanampiny.
import os import lancedb # Set these environment variables for the lanceDB to connect to MinIO os.environ["AWS_DEFAULT_REGION"] = "us-east-1" os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY os.environ["AWS_ENDPOINT"] = MINIO_ENDPOINT os.environ["ALLOW_HTTP"] = "True" db = lancedb.connect("s3://warehouse/v-db/")
# list existing tables db.table_names()
# Create a new table with pydantic schema from lancedb.pydantic import LanceModel, Vector import pyarrow as pa DOCS_TABLE = "docs" EMBEDDINGS_DIM = 768 table = None class DocsModel(LanceModel): parent_source: str # Actual object/document source source: str # Chunk/Metadata source text: str # Chunked text vector: Vector(EMBEDDINGS_DIM, pa.float16()) # Vector to be stored def get_or_create_table(): global table if table is None and DOCS_TABLE not in list(db.table_names()): return db.create_table(DOCS_TABLE, schema=DocsModel) if table is None: table = db.open_table(DOCS_TABLE) return table
# Check if that worked get_or_create_table()
# list existing tables db.table_names()
import multiprocessing EMBEDDING_DOCUMENT_PREFIX = "search_document" # Add queue that keeps the processed meteadata in memory add_data_queue = multiprocessing.Queue() delete_data_queue = multiprocessing.Queue() def create_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) print(bucket_name, object_key) with s3.open(f"{bucket_name}/{object_key}", "r") as f: data = f.read() chunk_json = json.loads(data) embeddings = get_embedding(f"{EMBEDDING_DOCUMENT_PREFIX}: {chunk_json['page_content']}") add_data_queue.put({ "text": chunk_json["page_content"], "parent_source": chunk_json.get("metadata", "").get("source", ""), "source": f"{bucket_name}/{object_key}", "vector": embeddings }) return "Metadata Create Task Completed!" def delete_metadata_task(json_data): for record in json_data["Records"]: bucket_name = record["s3"]["bucket"]["name"] object_key = urllib.parse.unquote(record["s3"]["object"]["key"]) delete_data_queue.put(f"{bucket_name}/{object_key}") return "Metadata Delete Task completed!"
from apscheduler.schedulers.background import BackgroundScheduler import pandas as pd def add_vector_job(): data = [] table = get_or_create_table() while not add_data_queue.empty(): item = add_data_queue.get() data.append(item) if len(data) > 0: df = pd.DataFrame(data) table.add(df) table.compact_files() print(len(table.to_pandas())) def delete_vector_job(): table = get_or_create_table() source_data = [] while not delete_data_queue.empty(): item = delete_data_queue.get() source_data.append(item) if len(source_data) > 0: filter_data = ", ".join([f'"{d}"' for d in source_data]) table.delete(f'source IN ({filter_data})') table.compact_files() table.cleanup_old_versions() print(len(table.to_pandas())) scheduler = BackgroundScheduler() scheduler.add_job(add_vector_job, 'interval', seconds=10) scheduler.add_job(delete_vector_job, 'interval', seconds=10)
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Amin'izao fotoana izao dia manana ny fantsona Ingestion miasa isika, andao hampiditra ny fantsona RAG farany.
Ankehitriny rehefa manana ny antontan-taratasy voarakitra ao amin'ny lanceDB dia andao ampio ny fahaiza-mitady
EMBEDDING_QUERY_PREFIX = "search_query" def search(query, limit=5): query_embedding = get_embedding(f"{EMBEDDING_QUERY_PREFIX}: {query}") res = get_or_create_table().search(query_embedding).metric("cosine").limit(limit) return res
# Lets test to see if it works res = search("What is MinIO Enterprise Object Store Lite?") res.to_list()
RAG_PROMPT = """ DOCUMENT: {documents} QUESTION: {user_question} INSTRUCTIONS: Answer in detail the user's QUESTION using the DOCUMENT text above. Keep your answer ground in the facts of the DOCUMENT. Do not use sentence like "The document states" citing the document. If the DOCUMENT doesn't contain the facts to answer the QUESTION only Respond with "Sorry! I Don't know" """
context_df = [] def llm_chat(user_question, history): history = history or [] global context_df # Search for relevant document chunks res = search(user_question) documents = " ".join([d["text"].strip() for d in res.to_list()]) # Pass the chunks to LLM for grounded response llm_resp = requests.post(LLM_ENDPOINT, json={"model": LLM_MODEL, "messages": [ {"role": "user", "content": RAG_PROMPT.format(user_question=user_question, documents=documents) } ], "options": { # "temperature": 0, "top_p": 0.90, }}, stream=True) bot_response = "**AI:** " for resp in llm_resp.iter_lines(): json_data = json.loads(resp) bot_response += json_data["message"]["content"] yield bot_response context_df = res.to_pandas() context_df = context_df.drop(columns=['source', 'vector']) def clear_events(): global context_df context_df = [] return context_df
import json import gradio as gr import requests from fastapi import FastAPI, Request, BackgroundTasks from pydantic import BaseModel import uvicorn import nest_asyncio app = FastAPI() @app.on_event("startup") async def startup_event(): get_or_create_table() if not scheduler.running: scheduler.start() @app.on_event("shutdown") async def shutdown_event(): scheduler.shutdown() @app.post("/api/v1/metadata/notification") async def receive_metadata_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New Metadata created!") background_tasks.add_task(create_metadata_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Metadata deleted!") background_tasks.add_task(delete_metadata_task, json_data) return {"status": "success"} @app.post("/api/v1/document/notification") async def receive_webhook(request: Request, background_tasks: BackgroundTasks): json_data = await request.json() if json_data["EventName"] == "s3:ObjectCreated:Put": print("New object created!") background_tasks.add_task(create_object_task, json_data) if json_data["EventName"] == "s3:ObjectRemoved:Delete": print("Object deleted!") background_tasks.add_task(delete_object_task, json_data) return {"status": "success"} with gr.Blocks(gr.themes.Soft()) as demo: gr.Markdown("## RAG with MinIO") ch_interface = gr.ChatInterface(llm_chat, undo_btn=None, clear_btn="Clear") ch_interface.chatbot.show_label = False ch_interface.chatbot.height = 600 gr.Markdown("### Context Supplied") context_dataframe = gr.DataFrame(headers=["parent_source", "text", "_distance"], wrap=True) ch_interface.clear_btn.click(clear_events, [], context_dataframe) @gr.on(ch_interface.output_components, inputs=[ch_interface.chatbot], outputs=[context_dataframe]) def update_chat_context_df(text): global context_df if context_df is not None: return context_df return "" demo.queue() if __name__ == "__main__": nest_asyncio.apply() app = gr.mount_gradio_app(app, demo, path=CHAT_API_PATH) uvicorn.run(app, host="0.0.0.0", port=8808)
Afaka nandalo sy nampihatra ny chat mifototra amin'ny RAG miaraka amin'i MiniIO ho toy ny backend ny farihy data ve ianao? Hanao webinar amin'ity lohahevitra ity ihany koa izahay ato ho ato izay hanomezanay demo mivantana rehefa manamboatra ity rindranasa chat RAG ity izahay.
Amin'ny maha-mpamolavola mifantoka amin'ny fampidirana AI ao amin'ny MiniIO, dia mikaroka hatrany ny fomba ahafahan'ny fitaovantsika ampidirina amin'ny rafitra AI maoderina aho mba hanatsarana ny fahombiazany sy ny scalability. Ato amin'ity lahatsoratra ity, nasehonay anao ny fomba hampidirana ny MiniIO amin'ny Retrieval-Augmented Generation (RAG) hananganana rindranasa chat. Ity no tendron'ny iceberg, mba hanome anao fampiroboroboana amin'ny fikatsahanao hanangana tranga tsy manam-paharoa ho an'ny RAG sy MiniIO. Ankehitriny ianao dia manana ny trano fanorenana hanaovana izany. Andao hatao!
Raha manana fanontaniana momba ny fampidirana MiniIO RAG ianao dia aza misalasala mifandray aminay