paint-brush
Cómo creé un asistente de análisis de datos con BigQuery y Langchainpor@yi
414 lecturas
414 lecturas

Cómo creé un asistente de análisis de datos con BigQuery y Langchain

por Yi Ai11m2024/06/10
Read on Terminal Reader

Demasiado Largo; Para Leer

Aproveche Langchain, OpenAI y BigQuery para automatizar el análisis de datos, optimizar el procesamiento y garantizar la privacidad de los datos con herramientas como Streamlit y Google Cloud DLP.
featured image - Cómo creé un asistente de análisis de datos con BigQuery y Langchain
Yi Ai HackerNoon profile picture



Dado que las empresas generan cantidades masivas de datos diariamente, extraer información útil de toda esta información puede resultar difícil, especialmente con conjuntos de datos complejos y enormes volúmenes de datos. Pero con la IA generativa, podemos optimizar y automatizar el análisis de datos, haciéndolo eficiente y accesible. En este artículo, le mostraré cómo configurar y utilizar un asistente de análisis de datos de IA utilizando Google Langchain , OpenAI, BigQuery y Data Loss Prevention (DLP).


Caso de uso: automatización del análisis de datos con BigQuery

Diseño de solución

La solución implica configurar una aplicación Streamlit utilizando Langchain y OpenAI que interactúa con el conjunto de datos de BigQuery para automatizar el análisis de datos. Este agente utilizará herramientas personalizadas para tareas específicas, como enmascarar atributos de PII del cliente y visualizar datos. Además, el agente se configurará para conservar el historial de chat, lo que garantiza respuestas contextualmente precisas.


A continuación se muestra un diagrama de la arquitectura de la solución:


Consideremos un escenario en el que tenemos un conjunto de datos de BigQuery que contiene las siguientes tablas:

  • Tabla de clientes : Contiene datos de clientes.
  • Tabla de contactos : contiene datos de contacto del cliente.
  • Tabla de direcciones de clientes : vincula a los clientes con las direcciones.
  • Tabla de direcciones : contiene información de direcciones.
  • Tabla de estadísticas de trabajo : registra resúmenes de trabajos por lotes ETL que truncan y cargan datos en las tablas de perfil del cliente.


Configurar Langchain

¿Qué es Langchain?

LangChain proporciona a los desarrolladores de IA herramientas para conectar modelos de lenguaje con fuentes de datos externas. Es de código abierto y está respaldado por una comunidad activa. Las organizaciones pueden utilizar LangChain de forma gratuita y recibir soporte de otros desarrolladores competentes en el marco.


Para realizar análisis de datos con Langchain, primero debemos instalar las bibliotecas Langchain y OpenAI. Esto se puede hacer descargando las bibliotecas necesarias y luego importándolas a su proyecto.


Instalar Langchain:

 pip install langchain matplotlib pandas streamlit pip install -qU langchain-openai langchain-community


Defina el modelo Langchain y configure la conexión de Bigquery:

 import os import re import streamlit as st from google.cloud import dlp_v2 from google.cloud.dlp_v2 import types from langchain.agents import create_sql_agent from langchain_community.vectorstores import FAISS from langchain_core.example_selectors import SemanticSimilarityExampleSelector from langchain_core.messages import AIMessage from langchain_core.prompts import ( SystemMessagePromptTemplate, PromptTemplate, FewShotPromptTemplate, ) from langchain_core.prompts.chat import ( ChatPromptTemplate, HumanMessagePromptTemplate, MessagesPlaceholder, ) from langchain.memory import ConversationBufferMemory from langchain_experimental.utilities import PythonREPL from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain.sql_database import SQLDatabase from langchain.tools import Tool service_account_file = f"{os.getcwd()}/service-account-key.json" os.environ["OPENAI_API_KEY"] = ( "xxxxxx" ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file model = ChatOpenAI(model="gpt-4o", temperature=0) project = "lively-metrics-295911" dataset = "customer_profiles" sqlalchemy_url = ( f"bigquery://{project}/{dataset}?credentials_path={service_account_file}" ) db = SQLDatabase.from_uri(sqlalchemy_url)


Configurar herramientas personalizadas

Para mejorar las capacidades de nuestro agente, podemos configurar herramientas personalizadas para tareas específicas, como enmascarar datos PII y visualizar datos.


  1. Enmascaramiento de PII con Google Cloud DLP

    La privacidad de los datos es crucial. Para proteger la PII en las salidas, podemos utilizar Google Cloud Data Loss Prevention (DLP). Construiremos una herramienta personalizada que llame a la API de DLP para enmascarar cualquier dato PII presente en la respuesta.


 def mask_pii_data(text): dlp = dlp_v2.DlpServiceClient() project_id = project parent = f"projects/{project_id}" info_types = [ {"name": "EMAIL_ADDRESS"}, {"name": "PHONE_NUMBER"}, {"name": "DATE_OF_BIRTH"}, {"name": "LAST_NAME"}, {"name": "STREET_ADDRESS"}, {"name": "LOCATION"}, ] deidentify_config = types.DeidentifyConfig( info_type_transformations=types.InfoTypeTransformations( transformations=[ types.InfoTypeTransformations.InfoTypeTransformation( primitive_transformation=types.PrimitiveTransformation( character_mask_config=types.CharacterMaskConfig( masking_character="*", number_to_mask=0, reverse_order=False ) ) ) ] ) ) item = {"value": text} inspect_config = {"info_types": info_types} request = { "parent": parent, "inspect_config": inspect_config, "deidentify_config": deidentify_config, "item": item, } response = dlp.deidentify_content(request=request) return response.item.value


  1. REPL de Python

    A continuación, para permitir que LLM realice visualización de datos usando Python, aprovecharemos Python REPL y definiremos una herramienta personalizada para nuestro agente.


 python_repl = PythonREPL()


Ahora, creemos las herramientas del agente, que incluirán mask_pii_data y python_repl:

 def sql_agent_tools(): tools = [ Tool.from_function( func=mask_pii_data, name="mask_pii_data", description="Masks PII data in the input text using Google Cloud DLP.", ), Tool( name="python_repl", description=f"A Python shell. Use this to execute python commands. \ Input should be a valid python command. \ If you want to see the output of a value, \ you should print it out with `print(...)`.", func=python_repl.run, ), ] return tools

Usando ejemplos de pocas tomas

Proporcionar al modelo ejemplos de algunas tomas ayuda a guiar sus respuestas y mejorar el rendimiento.

Definir consultas SQL de muestra,

 # Example Queries sql_examples = [ { "input": "Count of Customers by Source System", "query": f""" SELECT source_system_name, COUNT(*) AS customer_count FROM `{project}.{dataset}.customer` GROUP BY source_system_name ORDER BY customer_count DESC; """, }, { "input": "Average Age of Customers by Gender", "query": f""" SELECT gender, AVG(EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM dob)) AS average_age FROM `{project}.{dataset}.customer` GROUP BY gender; """, }, ... ]


A continuación, agregue ejemplos a la plantilla de mensajes de pocas tomas.

 example_selector = SemanticSimilarityExampleSelector.from_examples( sql_examples, OpenAIEmbeddings(), FAISS, k=2, input_keys=["input"], )


A continuación, defina el prefijo y el sufijo, luego pase few_shot_prompt directamente al método de fábrica from_messages .


Nota: Hay una variable {chat_history} en el SUFIJO, que explicaré en el siguiente paso cuando creemos el agente y agreguemos memoria.

 PREFIX = """ You are a SQL expert. You have access to a BigQuery database. Identify which tables can be used to answer the user's question and write and execute a SQL query accordingly. Given an input question, create a syntactically correct SQL query to run against the dataset customer_profiles, then look at the results of the query and return the answer. Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results. You can order the results by a relevant column to return the most interesting examples in the database. Never query for all the columns from a specific table; only ask for the relevant columns given the question. You have access to tools for interacting with the database. Only use the information returned by these tools to construct your final answer. You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database. If the question does not seem related to the database, just return "I don't know" as the answer. If the user asks for a visualization of the results, use the python_agent tool to create and display the visualization. After obtaining the results, you must use the mask_pii_data tool to mask the results before providing the final answer. """ SUFFIX = """Begin! {chat_history} Question: {input} Thought: I should look at the tables in the database to see what I can query. Then I should query the schema of the most relevant tables. {agent_scratchpad}""" few_shot_prompt = FewShotPromptTemplate( example_selector=example_selector, example_prompt=PromptTemplate.from_template( "User input: {input}\nSQL query: {query}" ), prefix=PREFIX, suffix="", input_variables=["input", "top_k"], example_separator="\n\n", ) messages = [ SystemMessagePromptTemplate(prompt=few_shot_prompt), MessagesPlaceholder(variable_name="chat_history"), HumanMessagePromptTemplate.from_template("{input}"), AIMessage(content=SUFFIX), MessagesPlaceholder(variable_name="agent_scratchpad"), ] prompt = ChatPromptTemplate.from_messages(messages)


Explicación de variables

  • input : La entrada o consulta del usuario.

  • agent_scratchpad : un área de almacenamiento temporal para pasos o pensamientos intermedios.

  • chat_history : realiza un seguimiento de las interacciones anteriores para mantener el contexto.

  • handle_parsing_errors : garantiza que el agente pueda manejar y recuperarse correctamente de los errores de análisis.

  • memoria : el módulo utilizado para almacenar y recuperar el historial de chat.


Es hora de dar el paso final. ¡Construyamos la aplicación!


Creación de una aplicación LLM con Streamlit

Para crear una interfaz interactiva para probar el agente Langchain que acabamos de crear, podemos usar Streamlit.


 st.title("Data Analysis Assistant") if "history" not in st.session_state: st.session_state.history = [] user_input = st.text_input("Ask your question:") if st.button("Run Query"): if user_input: with st.spinner("Processing..."): st.session_state.history.append(f"User: {user_input}") response = agent_executor.run(input=user_input) if "sandbox:" in response: response = response.replace(f"sandbox:", "") match = re.search(r"\((.+\.png)\)", response) if match: image_file_path = match.group(1) if os.path.isfile(image_file_path): st.session_state.history.append({"image": image_file_path}) else: st.error("The specified image file does not exist.") else: st.session_state.history.append(f"Agent: {response}") st.experimental_rerun() else: st.error("Please enter a question.") for message in st.session_state.history: if isinstance(message, str): st.write(message) elif isinstance(message, dict) and "image" in message: st.image(message["image"])


Hemos configurado todo. Ejecutemos la aplicación Streamlit

 streamlit run app.py


y pruébelo haciendo algunas preguntas de análisis.


Conclusión

Al aprovechar Langchain y OpenAI, podemos automatizar tareas complejas de análisis de datos, lo que facilita la obtención de información a partir de grandes conjuntos de datos. Este enfoque no sólo ahorra tiempo sino que también garantiza un análisis preciso y consistente. Ya sea que esté trabajando con perfiles de clientes, información de contacto o estadísticas laborales, un asistente de análisis de datos con tecnología de inteligencia artificial puede mejorar enormemente sus capacidades de procesamiento de datos. Para obtener el código fuente completo, visite el repositorio de GitHub .