paint-brush
BigQuery と Langchain を使ってデータ分析アシスタントを構築する方法@yi
414 測定値
414 測定値

BigQuery と Langchain を使ってデータ分析アシスタントを構築する方法

Yi Ai11m2024/06/10
Read on Terminal Reader

長すぎる; 読むには

Langchain、OpenAI、BigQuery を活用して、Streamlit や Google Cloud DLP などのツールでデータ分析を自動化し、処理を効率化し、データのプライバシーを確保します。
featured image - BigQuery と Langchain を使ってデータ分析アシスタントを構築する方法
Yi Ai HackerNoon profile picture



企業は毎日膨大な量のデータを生成するため、このすべての情報から有用な洞察を引き出すのは、特に複雑なデータセットと膨大な量のデータの場合は困難です。しかし、生成 AI を使用すると、データ分析を合理化および自動化して、効率的でアクセスしやすいものにすることができます。この記事では、Google Langchain 、OpenAI、 BigQuery 、およびデータ損失防止(DLP) を使用して AIデータ分析アシスタントを設定して使用する方法を紹介します。


ユースケース: BigQuery によるデータ分析の自動化

ソリューション設計

このソリューションでは、BigQuery データセットとやり取りしてデータ分析を自動化する Langchain と OpenAI を使用して Streamlit アプリを設定します。このエージェントは、PII 顧客属性のマスキングやデータの視覚化などの特定のタスクにカスタム ツールを使用します。さらに、エージェントはチャット履歴を保持するように構成され、コンテキストに応じた正確な応答が保証されます。


ソリューション アーキテクチャの図を以下に示します。


次のテーブルを含む BigQuery データセットがあるシナリオを考えてみましょう。

  • 顧客テーブル: 顧客データが含まれます。
  • 連絡先テーブル: 顧客の連絡先の詳細が含まれます。
  • 顧客住所テーブル: 顧客と住所をリンクします。
  • アドレス テーブル: アドレス情報が含まれます。
  • ジョブ統計テーブル: データを切り捨てて顧客プロファイル テーブルにロードする ETL バッチ ジョブの概要を記録します。


Langchainのセットアップ

Langchainとは何ですか?

LangChain は、AI 開発者に言語モデルを外部データ ソースに接続するためのツールを提供します。オープンソースであり、アクティブなコミュニティによってサポートされています。組織は LangChain を無料で使用し、フレームワークに精通した他の開発者からサポートを受けることができます。


Langchain を使用してデータ分析を実行するには、まず Langchain と OpenAI ライブラリをインストールする必要があります。これは、必要なライブラリをダウンロードし、プロジェクトにインポートすることで実行できます。


Langchainをインストールします:

 pip install langchain matplotlib pandas streamlit pip install -qU langchain-openai langchain-community


Langchain モデルを定義し、Bigquery 接続を設定します。

 import os import re import streamlit as st from google.cloud import dlp_v2 from google.cloud.dlp_v2 import types from langchain.agents import create_sql_agent from langchain_community.vectorstores import FAISS from langchain_core.example_selectors import SemanticSimilarityExampleSelector from langchain_core.messages import AIMessage from langchain_core.prompts import ( SystemMessagePromptTemplate, PromptTemplate, FewShotPromptTemplate, ) from langchain_core.prompts.chat import ( ChatPromptTemplate, HumanMessagePromptTemplate, MessagesPlaceholder, ) from langchain.memory import ConversationBufferMemory from langchain_experimental.utilities import PythonREPL from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain.sql_database import SQLDatabase from langchain.tools import Tool service_account_file = f"{os.getcwd()}/service-account-key.json" os.environ["OPENAI_API_KEY"] = ( "xxxxxx" ) os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = service_account_file model = ChatOpenAI(model="gpt-4o", temperature=0) project = "lively-metrics-295911" dataset = "customer_profiles" sqlalchemy_url = ( f"bigquery://{project}/{dataset}?credentials_path={service_account_file}" ) db = SQLDatabase.from_uri(sqlalchemy_url)


カスタムツールの設定

エージェントの機能を強化するために、PII データのマスキングやデータの視覚化など、特定のタスク用のカスタム ツールを設定できます。


  1. Google Cloud DLP による PII のマスキング

    データのプライバシーは重要です。出力内の PII を保護するために、Google Cloud Data Loss Prevention (DLP) を利用できます。DLP API を呼び出して応答内に存在する PII データをマスクするカスタム ツールを構築します。


 def mask_pii_data(text): dlp = dlp_v2.DlpServiceClient() project_id = project parent = f"projects/{project_id}" info_types = [ {"name": "EMAIL_ADDRESS"}, {"name": "PHONE_NUMBER"}, {"name": "DATE_OF_BIRTH"}, {"name": "LAST_NAME"}, {"name": "STREET_ADDRESS"}, {"name": "LOCATION"}, ] deidentify_config = types.DeidentifyConfig( info_type_transformations=types.InfoTypeTransformations( transformations=[ types.InfoTypeTransformations.InfoTypeTransformation( primitive_transformation=types.PrimitiveTransformation( character_mask_config=types.CharacterMaskConfig( masking_character="*", number_to_mask=0, reverse_order=False ) ) ) ] ) ) item = {"value": text} inspect_config = {"info_types": info_types} request = { "parent": parent, "inspect_config": inspect_config, "deidentify_config": deidentify_config, "item": item, } response = dlp.deidentify_content(request=request) return response.item.value


  1. Python の REPL

    次に、LLM が Python を使用してデータの視覚化を実行できるようにするために、Python REPL を活用してエージェント用のカスタム ツールを定義します。


 python_repl = PythonREPL()


次に、 mask_pii_datapython_repl:

 def sql_agent_tools(): tools = [ Tool.from_function( func=mask_pii_data, name="mask_pii_data", description="Masks PII data in the input text using Google Cloud DLP.", ), Tool( name="python_repl", description=f"A Python shell. Use this to execute python commands. \ Input should be a valid python command. \ If you want to see the output of a value, \ you should print it out with `print(...)`.", func=python_repl.run, ), ] return tools

少数のショットの例を使用する

モデルに少数のショットの例を提供すると、応答をガイドし、パフォーマンスを向上させるのに役立ちます。

サンプルSQLクエリを定義する、

 # Example Queries sql_examples = [ { "input": "Count of Customers by Source System", "query": f""" SELECT source_system_name, COUNT(*) AS customer_count FROM `{project}.{dataset}.customer` GROUP BY source_system_name ORDER BY customer_count DESC; """, }, { "input": "Average Age of Customers by Gender", "query": f""" SELECT gender, AVG(EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM dob)) AS average_age FROM `{project}.{dataset}.customer` GROUP BY gender; """, }, ... ]


次に、数回のショットのプロンプト テンプレートに例を追加します。

 example_selector = SemanticSimilarityExampleSelector.from_examples( sql_examples, OpenAIEmbeddings(), FAISS, k=2, input_keys=["input"], )


次に、プレフィックスとサフィックスを定義し、 few_shot_prompt from_messagesファクトリ メソッドに直接渡します。


注: SUFFIX には{chat_history}変数があります。これについては、エージェントを作成してメモリを追加する次の手順で説明します。

 PREFIX = """ You are a SQL expert. You have access to a BigQuery database. Identify which tables can be used to answer the user's question and write and execute a SQL query accordingly. Given an input question, create a syntactically correct SQL query to run against the dataset customer_profiles, then look at the results of the query and return the answer. Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most {top_k} results. You can order the results by a relevant column to return the most interesting examples in the database. Never query for all the columns from a specific table; only ask for the relevant columns given the question. You have access to tools for interacting with the database. Only use the information returned by these tools to construct your final answer. You MUST double check your query before executing it. If you get an error while executing a query, rewrite the query and try again.DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database. If the question does not seem related to the database, just return "I don't know" as the answer. If the user asks for a visualization of the results, use the python_agent tool to create and display the visualization. After obtaining the results, you must use the mask_pii_data tool to mask the results before providing the final answer. """ SUFFIX = """Begin! {chat_history} Question: {input} Thought: I should look at the tables in the database to see what I can query. Then I should query the schema of the most relevant tables. {agent_scratchpad}""" few_shot_prompt = FewShotPromptTemplate( example_selector=example_selector, example_prompt=PromptTemplate.from_template( "User input: {input}\nSQL query: {query}" ), prefix=PREFIX, suffix="", input_variables=["input", "top_k"], example_separator="\n\n", ) messages = [ SystemMessagePromptTemplate(prompt=few_shot_prompt), MessagesPlaceholder(variable_name="chat_history"), HumanMessagePromptTemplate.from_template("{input}"), AIMessage(content=SUFFIX), MessagesPlaceholder(variable_name="agent_scratchpad"), ] prompt = ChatPromptTemplate.from_messages(messages)


変数の説明

  • input : ユーザーの入力またはクエリ。

  • agent_scratchpad : 中間ステップや考えを一時的に保存する領域。

  • chat_history : コンテキストを維持するために以前のやり取りを追跡します。

  • handle_parsing_errors : エージェントが解析エラーを適切に処理し、回復できるようにします。

  • メモリ: チャット履歴を保存および取得するために使用されるモジュール。


いよいよ最後のステップです。アプリを構築しましょう。


Streamlit を使用した LLM アプリの構築

先ほど構築した Langchain エージェントをテストするためのインタラクティブ インターフェースを作成するには、Streamlit を使用できます。


 st.title("Data Analysis Assistant") if "history" not in st.session_state: st.session_state.history = [] user_input = st.text_input("Ask your question:") if st.button("Run Query"): if user_input: with st.spinner("Processing..."): st.session_state.history.append(f"User: {user_input}") response = agent_executor.run(input=user_input) if "sandbox:" in response: response = response.replace(f"sandbox:", "") match = re.search(r"\((.+\.png)\)", response) if match: image_file_path = match.group(1) if os.path.isfile(image_file_path): st.session_state.history.append({"image": image_file_path}) else: st.error("The specified image file does not exist.") else: st.session_state.history.append(f"Agent: {response}") st.experimental_rerun() else: st.error("Please enter a question.") for message in st.session_state.history: if isinstance(message, str): st.write(message) elif isinstance(message, dict) and "image" in message: st.image(message["image"])


準備は完了です。Streamlitアプリを実行してみましょう

streamlit run app.py


いくつかの分析質問をしてテストします。


結論

LangchainとOpenAIを活用することで、複雑なデータ分析タスクを自動化し、大規模なデータセットから洞察を得ることがはるかに簡単になります。このアプローチは時間を節約するだけでなく、正確で一貫性のある分析を保証します。顧客プロファイル、連絡先情報、または求人統計を扱う場合でも、AIを搭載したデータ分析アシスタントはデータ処理機能を大幅に向上させることができます。完全なソースコードについては、 GitHub リポジトリ