paint-brush
Wie ich mit BigQuery und Langchain einen Datenanalyse-Assistenten erstellt habevon@yi
956 Lesungen
956 Lesungen

Wie ich mit BigQuery und Langchain einen Datenanalyse-Assistenten erstellt habe

von Yi Ai11m2024/06/10
Read on Terminal Reader

Zu lang; Lesen

Nutzen Sie Langchain, OpenAI und BigQuery, um die Datenanalyse zu automatisieren, die Verarbeitung zu optimieren und den Datenschutz mit Tools wie Streamlit und Google Cloud DLP zu gewährleisten.
featured image - Wie ich mit BigQuery und Langchain einen Datenanalyse-Assistenten erstellt habe
Yi Ai HackerNoon profile picture



Da Unternehmen täglich riesige Datenmengen generieren, kann es schwierig sein, aus all diesen Informationen nützliche Erkenntnisse zu gewinnen, insbesondere bei komplexen Datensätzen und riesigen Datenmengen. Aber mit generativer KI können wir die Datenanalyse rationalisieren und automatisieren und sie so effizient und zugänglich machen. In diesem Artikel zeige ich Ihnen, wie Sie einen KI- Datenanalyseassistenten mit Google Langchain , OpenAI, BigQuery und Data Loss Prevention (DLP) einrichten und verwenden.


Anwendungsfall: Datenanalyse mit BigQuery automatisieren

Lösungsdesign

Die Lösung umfasst die Einrichtung einer Streamlit-App mit Langchain und OpenAI, die mit dem BigQuery-Datensatz interagiert, um die Datenanalyse zu automatisieren. Dieser Agent verwendet benutzerdefinierte Tools für bestimmte Aufgaben wie das Maskieren von PII-Kundenattributen und das Visualisieren von Daten. Darüber hinaus wird der Agent so konfiguriert, dass er den Chatverlauf speichert und kontextbezogen genaue Antworten gewährleistet.


Hier ist ein Diagramm der Lösungsarchitektur:


Betrachten wir ein Szenario, in dem wir einen BigQuery-Datensatz mit den folgenden Tabellen haben:

  • Kundentabelle : Enthält Kundendaten.
  • Kontakttabelle : Enthält die Kontaktdaten des Kunden.
  • Kundenadresstabelle : Verknüpft Kunden mit Adressen.
  • Adresstabelle : Enthält Adressinformationen.
  • Job-Statistiktabelle : Protokolliert ETL-Batch-Job-Zusammenfassungen, die Daten kürzen und in die Kundenprofiltabellen laden.


Langchain einrichten

Was ist Langchain?

LangChain bietet KI-Entwicklern Tools, um Sprachmodelle mit externen Datenquellen zu verbinden. Es ist Open Source und wird von einer aktiven Community unterstützt. Organisationen können LangChain kostenlos nutzen und erhalten Unterstützung von anderen Entwicklern, die sich mit dem Framework auskennen.


Um Datenanalysen mit Langchain durchzuführen, müssen wir zunächst die Bibliotheken Langchain und OpenAI installieren. Dies können Sie tun, indem Sie die erforderlichen Bibliotheken herunterladen und dann in Ihr Projekt importieren.


Installieren Sie Langchain:

 pip install langchain matplotlib pandas streamlit pip install -qU langchain-openai langchain-community


Definieren Sie das Langchain-Modell und richten Sie die BigQuery-Verbindung ein:

 import os import re import streamlit as st from google.cloud import dlp_v2 from google.cloud.dlp_v2 import types from langchain.agents import create_sql_agent from langchain_community.vectorstores import FAISS from langchain_core.example_selectors import SemanticSimilarityExampleSelector from langchain_core.messages import AIMessage from langchain_core.prompts import ( SystemMessagePromptTemplate, PromptTemplate, FewShotPromptTemplate, ) from langchain_core.prompts.chat import ( ChatPromptTemplate, HumanMessagePromptTemplate, MessagesPlaceholder, ) from langchain.memory import ConversationBufferMemory from langchain_experimental.utilities import PythonREPL from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain.sql_database import SQLDatabase from langchain.tools import Tool service_account_file = f"{os.getcwd()}/service-account-key.json" os.environ["OPENAI_API_KEY"] = ( "xxxxxx" ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file model = ChatOpenAI(model="gpt-4o", temperature=0) project = "lively-metrics-295911" dataset = "customer_profiles" sqlalchemy_url = ( f"bigquery://{project}/{dataset}?credentials_path={service_account_file}" ) db = SQLDatabase.from_uri(sqlalchemy_url)


Einrichten benutzerdefinierter Tools

Um die Fähigkeiten unseres Agenten zu erweitern, können wir benutzerdefinierte Tools für bestimmte Aufgaben einrichten, etwa zum Maskieren von PII-Daten und zum Visualisieren von Daten.


  1. Maskieren von PII mit Google Cloud DLP

    Datenschutz ist entscheidend. Um PII in den Ausgaben zu schützen, können wir Google Cloud Data Loss Prevention (DLP) nutzen. Wir werden ein benutzerdefiniertes Tool erstellen, das die DLP-API aufruft, um alle in der Antwort vorhandenen PII-Daten zu maskieren.


 def mask_pii_data(text): dlp = dlp_v2.DlpServiceClient() project_id = project parent = f"projects/{project_id}" info_types = [ {"name": "EMAIL_ADDRESS"}, {"name": "PHONE_NUMBER"}, {"name": "DATE_OF_BIRTH"}, {"name": "LAST_NAME"}, {"name": "STREET_ADDRESS"}, {"name": "LOCATION"}, ] deidentify_config = types.DeidentifyConfig( info_type_transformations=types.InfoTypeTransformations( transformations=[ types.InfoTypeTransformations.InfoTypeTransformation( primitive_transformation=types.PrimitiveTransformation( character_mask_config=types.CharacterMaskConfig( masking_character="*", number_to_mask=0, reverse_order=False ) ) ) ] ) ) item = {"value": text} inspect_config = {"info_types": info_types} request = { "parent": parent, "inspect_config": inspect_config, "deidentify_config": deidentify_config, "item": item, } response = dlp.deidentify_content(request=request) return response.item.value


  1. Python REPL

    Um dem LLM die Datenvisualisierung mit Python zu ermöglichen, nutzen wir als Nächstes die Python REPL und definieren ein benutzerdefiniertes Tool für unseren Agenten.


 python_repl = PythonREPL()


Erstellen wir nun die Agent-Tools, die mask_pii_data und python_repl:

 def sql_agent_tools(): tools = [ Tool.from_function( func=mask_pii_data, name="mask_pii_data", description="Masks PII data in the input text using Google Cloud DLP.", ), Tool( name="python_repl", description=f"A Python shell. Use this to execute python commands. \ Input should be a valid python command. \ If you want to see the output of a value, \ you should print it out with `print(...)`.", func=python_repl.run, ), ] return tools

Verwenden von Few-Shot-Beispielen

Indem Sie dem Modell einige Beispielversuche bereitstellen, können Sie seine Reaktionen steuern und die Leistung verbessern.

Definieren Sie Beispiel-SQL-Abfragen,

 # Example Queries sql_examples = [ { "input": "Count of Customers by Source System", "query": f""" SELECT source_system_name, COUNT(*) AS customer_count FROM `{project}.{dataset}.customer` GROUP BY source_system_name ORDER BY customer_count DESC; """, }, { "input": "Average Age of Customers by Gender", "query": f""" SELECT gender, AVG(EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM dob)) AS average_age FROM `{project}.{dataset}.customer` GROUP BY gender; """, }, ... ]


Fügen Sie als Nächstes Beispiele zur Vorlage mit den wenigen Eingabeaufforderungen hinzu.

 example_selector = SemanticSimilarityExampleSelector.from_examples( sql_examples, OpenAIEmbeddings(), FAISS, k=2, input_keys=["input"], )


Definieren Sie als Nächstes das Präfix und das Suffix und übergeben Sie dann few_shot_prompt direkt an die Factory-Methode from_messages .


Hinweis: Im SUFFIX gibt es eine Variable {chat_history} , die ich im nächsten Schritt erkläre, wenn wir den Agenten erstellen und Speicher hinzufügen.

 PREFIX = """ You are a SQL expert. You have access to a BigQuery database. Identify which tables can be used to answer the user's question and write and execute a SQL query accordingly. Given an input question, create a syntactically correct SQL query to run against the dataset customer_profiles, then look at the results of the query and return the answer. Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results. You can order the results by a relevant column to return the most interesting examples in the database. Never query for all the columns from a specific table; only ask for the relevant columns given the question. You have access to tools for interacting with the database. Only use the information returned by these tools to construct your final answer. You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database. If the question does not seem related to the database, just return "I don't know" as the answer. If the user asks for a visualization of the results, use the python_agent tool to create and display the visualization. After obtaining the results, you must use the mask_pii_data tool to mask the results before providing the final answer. """ SUFFIX = """Begin! {chat_history} Question: {input} Thought: I should look at the tables in the database to see what I can query. Then I should query the schema of the most relevant tables. {agent_scratchpad}""" few_shot_prompt = FewShotPromptTemplate( example_selector=example_selector, example_prompt=PromptTemplate.from_template( "User input: {input}\nSQL query: {query}" ), prefix=PREFIX, suffix="", input_variables=["input", "top_k"], example_separator="\n\n", ) messages = [ SystemMessagePromptTemplate(prompt=few_shot_prompt), MessagesPlaceholder(variable_name="chat_history"), HumanMessagePromptTemplate.from_template("{input}"), AIMessage(content=SUFFIX), MessagesPlaceholder(variable_name="agent_scratchpad"), ] prompt = ChatPromptTemplate.from_messages(messages)


Erklärung der Variablen

  • Eingabe : Die Eingabe oder Abfrage des Benutzers.

  • agent_scratchpad : Ein temporärer Speicherbereich für Zwischenschritte oder Gedanken.

  • chat_history : Behält den Überblick über vorherige Interaktionen, um den Kontext beizubehalten.

  • handle_parsing_errors : Stellt sicher, dass der Agent Analysefehler ordnungsgemäß verarbeiten und beheben kann.

  • Speicher : Das Modul, das zum Speichern und Abrufen des Chatverlaufs verwendet wird.


Es ist Zeit für den letzten Schritt. Lassen Sie uns die App erstellen!


Erstellen einer LLM-App mit Streamlit

Um eine interaktive Schnittstelle zum Testen des gerade erstellten Langchain-Agenten zu erstellen, können wir Streamlit verwenden.


 st.title("Data Analysis Assistant") if "history" not in st.session_state: st.session_state.history = [] user_input = st.text_input("Ask your question:") if st.button("Run Query"): if user_input: with st.spinner("Processing..."): st.session_state.history.append(f"User: {user_input}") response = agent_executor.run(input=user_input) if "sandbox:" in response: response = response.replace(f"sandbox:", "") match = re.search(r"\((.+\.png)\)", response) if match: image_file_path = match.group(1) if os.path.isfile(image_file_path): st.session_state.history.append({"image": image_file_path}) else: st.error("The specified image file does not exist.") else: st.session_state.history.append(f"Agent: {response}") st.experimental_rerun() else: st.error("Please enter a question.") for message in st.session_state.history: if isinstance(message, str): st.write(message) elif isinstance(message, dict) and "image" in message: st.image(message["image"])


Wir haben alles eingerichtet. Lassen Sie uns die Streamlit-App ausführen

 streamlit run app.py


und testen Sie es, indem Sie einige Analysefragen stellen.


Abschluss

Durch die Nutzung von Langchain und OpenAI können wir komplexe Datenanalyseaufgaben automatisieren, wodurch es viel einfacher wird, Erkenntnisse aus großen Datensätzen zu gewinnen. Dieser Ansatz spart nicht nur Zeit, sondern gewährleistet auch eine genaue und konsistente Analyse. Egal, ob Sie mit Kundenprofilen, Kontaktinformationen oder Jobstatistiken arbeiten, ein KI-gestützter Datenanalyseassistent kann Ihre Datenverarbeitungsfunktionen erheblich verbessern. Den vollständigen Quellcode finden Sie unter GitHub-Repository .