paint-brush
Como construí um assistente de análise de dados com BigQuery e Langchainpor@yi
966 leituras
966 leituras

Como construí um assistente de análise de dados com BigQuery e Langchain

por Yi Ai11m2024/06/10
Read on Terminal Reader

Muito longo; Para ler

Aproveite Langchain, OpenAI e BigQuery para automatizar a análise de dados, agilizar o processamento e garantir a privacidade dos dados com ferramentas como Streamlit e Google Cloud DLP.
featured image - Como construí um assistente de análise de dados com BigQuery e Langchain
Yi Ai HackerNoon profile picture



Como as empresas geram enormes quantidades de dados diariamente, pode ser difícil extrair insights úteis de todas essas informações, especialmente com conjuntos de dados complexos e grandes volumes de dados. Mas com a IA generativa, podemos agilizar e automatizar a análise de dados, tornando-a eficiente e acessível. Neste artigo, mostrarei como configurar e usar um assistente de análise de dados de IA usando Google Langchain , OpenAI, BigQuery e Data Loss Prevention (DLP).


Caso de uso: automatização da análise de dados com BigQuery

Projeto de solução

A solução envolve a configuração de um aplicativo Streamlit usando Langchain e OpenAI que interage com o conjunto de dados BigQuery para automatizar a análise de dados. Este agente usará ferramentas personalizadas para tarefas específicas, como mascarar atributos de clientes PII e visualizar dados. Além disso, o agente será configurado para reter o histórico do chat, garantindo respostas contextualmente precisas.


Aqui está um diagrama da arquitetura da solução:


Vamos considerar um cenário em que temos um conjunto de dados do BigQuery contendo as seguintes tabelas:

  • Tabela Cliente : Contém dados do cliente.
  • Tabela de contato : contém detalhes de contato do cliente.
  • Tabela de endereços do cliente : vincula os clientes aos endereços.
  • Tabela de endereços : contém informações de endereço.
  • Tabela de estatísticas de trabalho : registra resumos de trabalhos em lote ETL que truncam e carregam dados nas tabelas de perfil do cliente


Configurar Langchain

O que é Langchain?

LangChain fornece aos desenvolvedores de IA ferramentas para conectar modelos de linguagem com fontes de dados externas. É de código aberto e apoiado por uma comunidade ativa. As organizações podem usar o LangChain gratuitamente e receber suporte de outros desenvolvedores com experiência na estrutura.


Para realizar a análise de dados usando Langchain, primeiro precisamos instalar as bibliotecas Langchain e OpenAI. Isso pode ser feito baixando as bibliotecas necessárias e importando-as para o seu projeto.


Instale Langchain:

 pip install langchain matplotlib pandas streamlit pip install -qU langchain-openai langchain-community


Defina o modelo Langchain e configure a conexão do BigQuery:

 import os import re import streamlit as st from google.cloud import dlp_v2 from google.cloud.dlp_v2 import types from langchain.agents import create_sql_agent from langchain_community.vectorstores import FAISS from langchain_core.example_selectors import SemanticSimilarityExampleSelector from langchain_core.messages import AIMessage from langchain_core.prompts import ( SystemMessagePromptTemplate, PromptTemplate, FewShotPromptTemplate, ) from langchain_core.prompts.chat import ( ChatPromptTemplate, HumanMessagePromptTemplate, MessagesPlaceholder, ) from langchain.memory import ConversationBufferMemory from langchain_experimental.utilities import PythonREPL from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain.sql_database import SQLDatabase from langchain.tools import Tool service_account_file = f"{os.getcwd()}/service-account-key.json" os.environ["OPENAI_API_KEY"] = ( "xxxxxx" ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file model = ChatOpenAI(model="gpt-4o", temperature=0) project = "lively-metrics-295911" dataset = "customer_profiles" sqlalchemy_url = ( f"bigquery://{project}/{dataset}?credentials_path={service_account_file}" ) db = SQLDatabase.from_uri(sqlalchemy_url)


Configurando ferramentas personalizadas

Para aprimorar as capacidades do nosso agente, podemos configurar ferramentas personalizadas para tarefas específicas, como mascarar dados PII e visualizar dados.


  1. Mascarando PII com Google Cloud DLP

    A privacidade dos dados é crucial. Para proteger PII nas saídas, podemos utilizar o Google Cloud Data Loss Prevention (DLP). Criaremos uma ferramenta personalizada que chama a API DLP para mascarar quaisquer dados PII presentes na resposta.


 def mask_pii_data(text): dlp = dlp_v2.DlpServiceClient() project_id = project parent = f"projects/{project_id}" info_types = [ {"name": "EMAIL_ADDRESS"}, {"name": "PHONE_NUMBER"}, {"name": "DATE_OF_BIRTH"}, {"name": "LAST_NAME"}, {"name": "STREET_ADDRESS"}, {"name": "LOCATION"}, ] deidentify_config = types.DeidentifyConfig( info_type_transformations=types.InfoTypeTransformations( transformations=[ types.InfoTypeTransformations.InfoTypeTransformation( primitive_transformation=types.PrimitiveTransformation( character_mask_config=types.CharacterMaskConfig( masking_character="*", number_to_mask=0, reverse_order=False ) ) ) ] ) ) item = {"value": text} inspect_config = {"info_types": info_types} request = { "parent": parent, "inspect_config": inspect_config, "deidentify_config": deidentify_config, "item": item, } response = dlp.deidentify_content(request=request) return response.item.value


  1. REPL Python

    A seguir, para permitir que o LLM execute a visualização de dados usando Python, aproveitaremos o Python REPL e definiremos uma ferramenta personalizada para nosso agente.


 python_repl = PythonREPL()


Agora, vamos criar as ferramentas do agente, que incluirão mask_pii_data e python_repl:

 def sql_agent_tools(): tools = [ Tool.from_function( func=mask_pii_data, name="mask_pii_data", description="Masks PII data in the input text using Google Cloud DLP.", ), Tool( name="python_repl", description=f"A Python shell. Use this to execute python commands. \ Input should be a valid python command. \ If you want to see the output of a value, \ you should print it out with `print(...)`.", func=python_repl.run, ), ] return tools

Usando exemplos de poucas fotos

Fornecer ao modelo exemplos de poucas cenas ajuda a orientar suas respostas e melhorar o desempenho.

Definir exemplos de consultas SQL,

 # Example Queries sql_examples = [ { "input": "Count of Customers by Source System", "query": f""" SELECT source_system_name, COUNT(*) AS customer_count FROM `{project}.{dataset}.customer` GROUP BY source_system_name ORDER BY customer_count DESC; """, }, { "input": "Average Age of Customers by Gender", "query": f""" SELECT gender, AVG(EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM dob)) AS average_age FROM `{project}.{dataset}.customer` GROUP BY gender; """, }, ... ]


A seguir, adicione exemplos ao modelo de prompt de poucas tentativas.

 example_selector = SemanticSimilarityExampleSelector.from_examples( sql_examples, OpenAIEmbeddings(), FAISS, k=2, input_keys=["input"], )


Em seguida, defina o prefixo e o sufixo e passe few_shot_prompt diretamente para o método de fábrica from_messages .


Nota: Existe uma variável {chat_history} no SUFFIX, que explicarei na próxima etapa quando criarmos o agente e adicionarmos memória.

 PREFIX = """ You are a SQL expert. You have access to a BigQuery database. Identify which tables can be used to answer the user's question and write and execute a SQL query accordingly. Given an input question, create a syntactically correct SQL query to run against the dataset customer_profiles, then look at the results of the query and return the answer. Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results. You can order the results by a relevant column to return the most interesting examples in the database. Never query for all the columns from a specific table; only ask for the relevant columns given the question. You have access to tools for interacting with the database. Only use the information returned by these tools to construct your final answer. You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database. If the question does not seem related to the database, just return "I don't know" as the answer. If the user asks for a visualization of the results, use the python_agent tool to create and display the visualization. After obtaining the results, you must use the mask_pii_data tool to mask the results before providing the final answer. """ SUFFIX = """Begin! {chat_history} Question: {input} Thought: I should look at the tables in the database to see what I can query. Then I should query the schema of the most relevant tables. {agent_scratchpad}""" few_shot_prompt = FewShotPromptTemplate( example_selector=example_selector, example_prompt=PromptTemplate.from_template( "User input: {input}\nSQL query: {query}" ), prefix=PREFIX, suffix="", input_variables=["input", "top_k"], example_separator="\n\n", ) messages = [ SystemMessagePromptTemplate(prompt=few_shot_prompt), MessagesPlaceholder(variable_name="chat_history"), HumanMessagePromptTemplate.from_template("{input}"), AIMessage(content=SUFFIX), MessagesPlaceholder(variable_name="agent_scratchpad"), ] prompt = ChatPromptTemplate.from_messages(messages)


Explicação das Variáveis

  • input : a entrada ou consulta do usuário.

  • agent_scratchpad : Uma área de armazenamento temporário para etapas ou pensamentos intermediários.

  • chat_history : rastreia interações anteriores para manter o contexto.

  • handle_parsing_errors : garante que o agente possa manipular e se recuperar normalmente de erros de análise.

  • memória : O módulo usado para armazenar e recuperar o histórico de bate-papo.


É hora da etapa final. Vamos construir o aplicativo!


Construindo um aplicativo LLM com Streamlit

Para criar uma interface interativa para testar o agente Langchain que acabamos de construir, podemos usar Streamlit.


 st.title("Data Analysis Assistant") if "history" not in st.session_state: st.session_state.history = [] user_input = st.text_input("Ask your question:") if st.button("Run Query"): if user_input: with st.spinner("Processing..."): st.session_state.history.append(f"User: {user_input}") response = agent_executor.run(input=user_input) if "sandbox:" in response: response = response.replace(f"sandbox:", "") match = re.search(r"\((.+\.png)\)", response) if match: image_file_path = match.group(1) if os.path.isfile(image_file_path): st.session_state.history.append({"image": image_file_path}) else: st.error("The specified image file does not exist.") else: st.session_state.history.append(f"Agent: {response}") st.experimental_rerun() else: st.error("Please enter a question.") for message in st.session_state.history: if isinstance(message, str): st.write(message) elif isinstance(message, dict) and "image" in message: st.image(message["image"])


Nós configuramos tudo. Vamos executar o aplicativo Streamlit

 streamlit run app.py


e teste-o fazendo algumas perguntas de análise.


Conclusão

Ao aproveitar Langchain e OpenAI, podemos automatizar tarefas complexas de análise de dados, tornando muito mais fácil obter insights de grandes conjuntos de dados. Essa abordagem não apenas economiza tempo, mas também garante análises precisas e consistentes. Esteja você trabalhando com perfis de clientes, informações de contato ou estatísticas de empregos, um assistente de análise de dados com tecnologia de IA pode melhorar muito seus recursos de processamento de dados. Para o código-fonte completo, visite o Repositório GitHub .