大家好!最近,我在实践中应用了一个有趣的解决方案,我长期以来一直想尝试该解决方案,现在我准备好解释如何为任何其他任务创建类似的解决方案。我们将讨论创建一个回答问题的 ChatGPT 的定制版本,考虑到一个庞大的知识库,其长度不受提示大小的限制(这意味着您无法简单地将每个问题之前的所有信息添加到 ChatGPT)。
为了实现这一目标,我们将使用 OpenAI 的上下文嵌入(从知识库中真正高质量地搜索相关问题)和 ChatGPT API 本身(以自然人类语言格式化答案)。
此外,假设助理不仅可以回答明确提出的问答问题,还可以回答熟悉问答的人可以回答的问题。如果您有兴趣学习如何创建使用大型知识库进行响应的简单机器人,欢迎了解详细信息。
我想指出的是,有一些库项目试图以框架的形式来解决这个任务,例如LangChain ,我也尝试过使用它。然而,就像任何处于早期开发阶段的框架一样,在某些情况下,它往往会限制而不是简化事情。特别是,从解决这个任务的一开始,我就明白我想用数据做什么,并且知道如何自己去做(包括基于上下文的搜索、在提示中设置正确的上下文、结合信息源)。
但我无法配置框架以达到可接受的质量水平,并且调试框架对于这项任务来说似乎有点矫枉过正。最后,我创建了自己的样板代码,并对这种方法感到满意。
让我简单描述一下我正在处理的任务,您可以在自己的任务中使用相同的代码,用适合您的替换数据源和提示。您仍然可以完全控制机器人的逻辑。
在编写代码时,我经常使用 ChatGPT(而且我并不为此感到羞耻)。然而,由于缺乏2022年以上的数据,相对较新的技术有时会出现问题。
特别是,在为 The Graph 协议开发子图时(构建ETL 的最流行方法,用于从 EVM 兼容的区块链检索索引数据,您可以在我之前的文章 [ 1 ] 和 [ 2 ] 中阅读更多相关信息),库本身经历了几次破坏性的兼容性更改。 ChatGPT 的“旧”答案不再有帮助,我必须在稀缺的文档中搜索正确的答案,最坏的情况是在开发人员的 Discord 中搜索,这不是很方便(它不像 StackOverflow)。
问题的第二部分是,每次你需要正确提供对话上下文,因为 ChatGPT 经常偏离子图主题,跳到 GraphQL、SQL 或高等数学(“图”、“子图”等不是唯一术语,有许多不同的解释和主题)。
因此,在与 ChatGPT 进行短暂的斗争以纠正子图代码中的错误之后,我决定创建自己的SubgraphGPT机器人,它将始终在正确的上下文中并尝试回答,同时考虑到知识库和来自开发人员不和谐的消息。
附言。我在 Web3 基础设施提供商chainstack.com担任首席产品经理,负责子图托管服务的开发。因此,我必须大量使用子图,帮助用户理解这项相对较新的技术。
最后,为了解决这个问题,我决定使用两个来源:
手动编译的问答知识库,以半盲模式选择(通常我将文档中的主题标题作为问题,将整段信息作为答案)。
从协议开发者 Discord 导出过去 2 年的消息(以弥补 2021 年底以来的缺失时期)。
接下来,每个源使用不同的方法来编写对 ChatGPT API 的请求,具体来说:
对于手动编译的问答,
对于每个问题,都会生成一个上下文嵌入(在多维状态下描述该问题的向量),通过text-embedding-ada-002模型获得,
然后,使用余弦距离搜索函数,找到知识库中最相似的前 3 个问题(可以使用最适合您的数据集的数字,而不是 3 个),
这 3 个问题的答案将添加到最终提示中,并带有大致描述“仅当与给定问题相关时才使用此问答片段”。
对于从 Discord 导出的消息,使用了以下算法:
对于每条包含问号的消息,还会生成上下文嵌入(使用相同的模型),
然后,以类似的方式,选择前 5 个最相似的问题,
作为答案的上下文,添加该问题后面的 20 条消息,假定这些消息有一定概率包含该问题的答案,
此信息大致如下添加到最终提示中:“如果您在随附的问答片段中没有找到问题的明确答案,则开发人员提供的以下聊天片段可能对您回答原始问题有用......”
此外,如果未明确给出主题,则问答片段和聊天的存在可能会导致答案含糊不清,例如,可能如下所示:
因此,它理解问题与上下文无关,并且答案也被接受与上下文无关。然后就被告知可以用这样的数据,总结如下:
为了避免这种情况,我们引入了主题的概念,它被显式定义并插入到提示符的开头,如下所示:
“我需要得到与‘图子图开发’主题相关的问题的答案:{{{什么是子图?}}}”
另外,在最后一句中,我还补充一句:
最后,只有当上述信息不充分时,您才可以使用“图子图开发”主题中的知识来回答问题。
最终完整的提示(不包括聊天获取的部分)如下:
==I need to get an answer to the question related to the topic of "The Graph subgraph development": {{{what is a subgraph?}}}.== ==Possibly, you might find an answer in these Q&As \[use the information only if it is actually relevant and useful for the question answering\]:== ==Q: <What is a subgraph?>== ==A: <A subgraph is a custom API built on blockchain data. Subgraphs are queried using the GraphQL query language and are deployed to a Graph Node using the Graph CLI. Once deployed and published to The Graph's decentralized network, Indexers process subgraphs and make them available to be queried by subgraph consumers.>== ==Q: <Am I still able to create a subgraph if my smart contracts don't have events?>== ==A: <It is highly recommended that you structure your smart contracts to have events associated with data you are interested in querying. Event handlers in the subgraph are triggered by contract events and are by far the fastest way to retrieve useful data. If the contracts you are working with do not contain events, your subgraph can use call and block handlers to trigger indexing. Although this is not recommended, as performance will be significantly slower.>== ==Q: <How do I call a contract function or access a public state variable from my subgraph mappings?>== ==A: <Take a look at Access to smart contract state inside the section AssemblyScript API. https://thegraph.com/docs/en/developing/assemblyscript-api/>== ==Finally, only if the information above was not enough you can use your knowledge in the topic of "The Graph subgraph development" to answer the question.==
在输入处使用半自动生成的提示对上述请求的响应从一开始看起来就是正确的:
在这种情况下,机器人会立即使用正确的密钥进行响应,并添加更多相关信息,因此答案看起来不像问答中那么简单(我提醒您,这个问题恰好在问题和答案列表中),但有合理的解释,部分解决了以下问题。
我应该立即注意到,最后会有一个指向存储库的链接,因此您可以按原样运行机器人,将“主题”替换为您自己的,将问答知识库文件替换为您自己的,并为 OpenAI 和 Telegram 机器人提供您自己的 API 密钥。所以这里的描述并不是为了完全对应GitHub上的源代码,而是为了突出代码的主要方面。
让我们创建一个新的虚拟环境并安装requirements.txt中的依赖项:
virtualenv -p python3.8 .venv source .venv/bin/activate pip install -r requirements.txt
如上所述,假设有一个问题和答案列表,在本例中采用以下类型的 Excel 文件格式:
为了找到与给定问题最相似的问题,我们需要向该文件的每一行添加问题的嵌入(状态空间中的多维向量)。为此,我们将使用add_embeddings.py文件。该脚本由几个简单的部分组成。
导入库并读取命令行参数:
import pandas as pd import openai import argparse # Create an Argument Parser object parser = argparse.ArgumentParser(description='Adding embeddings for each line of csv file') # Add the arguments parser.add_argument('--openai_api_key', type=str, help='API KEY of OpenAI API to create contextual embeddings for each line') parser.add_argument('--file', type=str, help='A source CSV file with the text data') parser.add_argument('--colname', type=str, help='Column name with the texts') # Parse the command-line arguments args = parser.parse_args() # Access the argument values openai.api_key = args.openai_api_key file = args.file colname = args.colname
接下来,将文件读入 pandas 数据帧并根据问号的存在过滤问题。此代码片段通常用于处理知识库以及来自 Discord 的原始消息流,因此假设问题经常重复,我决定保留这种简单的粗略非问题过滤方法。
if file[-4:] == '.csv': df = pd.read_csv(file) else: df = pd.read_excel(file) # filter NAs df = df[~df[colname].isna()] # Keep only questions df = df[df[colname].str.contains('\?')]
最后 - 通过调用模型text-embedding-ada-002的 API 来生成嵌入的函数,由于 API 偶尔会过载并可能响应错误,因此会产生几个重复请求,并将此函数应用于数据帧的每一行。
def get_embedding(text, model="text-embedding-ada-002"): i = 0 max_try = 3 # to avoid random OpenAI API fails: while i < max_try: try: text = text.replace("\n", " ") result = openai.Embedding.create(input=[text], model=model)['data'][0]['embedding'] return result except: i += 1 def process_row(x): return get_embedding(x, model='text-embedding-ada-002') df['ada_embedding'] = df[colname].apply(process_row) df.to_csv(file[:-4]+'_question_embed.csv', index=False)
最后,可以使用以下命令调用该脚本:
python add_embeddings.py \ --openai_api_key="xxx" \ --file="./subgraphs_faq.xlsx" \ --colname="Question"
设置 OpenAI API 密钥、知识库文件以及问题文本所在列的名称。最终创建的文件subgraphs_faq._question_embed.csv包含“Question”、“Answer”和“ada_embedding ”列。
如果您对仅根据手动收集的知识库进行响应的简单机器人感兴趣,则可以跳过本节和以下部分。不过,我将在这里简要提供代码示例,用于从 Discord 频道和 Telegram 群组收集数据。文件discord-channel-data-collection.py由两部分组成。第一部分包括导入库和初始化命令行参数:
import requests import json import pandas as pd import argparse # Create an Argument Parser object parser = argparse.ArgumentParser(description='Discord Channel Data Collection Script') # Add the arguments parser.add_argument('--channel_id', type=str, help='Channel ID from the URL of a channel in browser https://discord.com/channels/xxx/{CHANNEL_ID}') parser.add_argument('--authorization_key', type=str, help='Authorization Key. Being on the discord channel page, start typing anything, then open developer tools -> Network -> Find "typing" -> Headers -> Authorization.') # Parse the command-line arguments args = parser.parse_args() # Access the argument values channel_id = args.channel_id authorization_key = args.authorization_key
第二个是从通道检索数据并将其保存到 pandas 数据帧中的函数,以及使用指定参数进行的调用。
def retrieve_messages(channel_id, authorization_key): num = 0 limit = 100 headers = { 'authorization': authorization_key } last_message_id = None # Create a pandas DataFrame df = pd.DataFrame(columns=['id', 'dt', 'text', 'author_id', 'author_username', 'is_bot', 'is_reply', 'id_reply']) while True: query_parameters = f'limit={limit}' if last_message_id is not None: query_parameters += f'&before={last_message_id}' r = requests.get( f'https://discord.com/api/v9/channels/{channel_id}/messages?{query_parameters}', headers=headers ) jsonn = json.loads(r.text) if len(jsonn) == 0: break for value in jsonn: is_reply = False id_reply = '0' if 'message_reference' in value and value['message_reference'] is not None: if 'message_id' in value['message_reference'].keys(): is_reply = True id_reply = value['message_reference']['message_id'] text = value['content'] if 'embeds' in value.keys(): if len(value['embeds'])>0: for x in value['embeds']: if 'description' in x.keys(): if text != '': text += ' ' + x['description'] else: text = x['description'] df_t = pd.DataFrame({ 'id': value['id'], 'dt': value['timestamp'], 'text': text, 'author_id': value['author']['id'], 'author_username': value['author']['username'], 'is_bot': value['author']['bot'] if 'bot' in value['author'].keys() else False, 'is_reply': is_reply, 'id_reply': id_reply, }, index=[0]) if len(df) == 0: df = df_t.copy() else: df = pd.concat([df, df_t], ignore_index=True) last_message_id = value['id'] num = num + 1 print('number of messages we collected is', num) # Save DataFrame to a CSV file df.to_csv(f'../discord_messages_{channel_id}.csv', index=False) if __name__ == '__main__': retrieve_messages(channel_id, authorization_key)
从这里的有用信息来看,有一个细节我每次需要都找不到——获取授权密钥。考虑到channel_id可以从浏览器中打开的Discord频道的URL(链接中最后一个长数字)获取,因此authorization_key只能通过在频道中开始输入消息,然后使用开发者工具在Network部分中找到名为“ typing ”的事件并从标头中提取参数来找到。
收到这些参数后,您可以运行以下命令来收集频道中的所有消息(替换为您自己的值):
python discord-channel-data-collection.py \ --channel_id=123456 \ --authorization_key="123456qwerty"
由于我经常从 Telegram 中的聊天/频道下载各种数据,因此我还决定为此提供代码,该代码会生成类似格式(与add_embeddings.py脚本兼容)的 CSV 文件。因此, telegram-group-data-collection.py脚本如下所示。从命令行导入库并初始化参数:
import pandas as pd import argparse from telethon import TelegramClient # Create an Argument Parser object parser = argparse.ArgumentParser(description='Telegram Group Data Collection Script') # Add the arguments parser.add_argument('--app_id', type=int, help='Telegram APP id from https://my.telegram.org/apps') parser.add_argument('--app_hash', type=str, help='Telegram APP hash from https://my.telegram.org/apps') parser.add_argument('--phone_number', type=str, help='Telegram user phone number with the leading "+"') parser.add_argument('--password', type=str, help='Telegram user password') parser.add_argument('--group_name', type=str, help='Telegram group public name without "@"') parser.add_argument('--limit_messages', type=int, help='Number of last messages to download') # Parse the command-line arguments args = parser.parse_args() # Access the argument values app_id = args.app_id app_hash = args.app_hash phone_number = args.phone_number password = args.password group_name = args.group_name limit_messages = args.limit_messages
如您所见,如果不将自己授权为第一人称,您无法简单地下载聊天中的所有消息。换句话说,除了通过 https://my.telegram.org/apps 创建应用程序(获取 APP_ID 和 APP_HASH)之外,您还需要使用您的电话号码和密码从 Telethon 库创建 TelegramClient 类的实例。
此外,您将需要 Telegram 聊天的公共 group_name 并明确指定要检索的最新消息的数量。总的来说,我已经多次对任意数量的导出消息执行此过程,而没有收到来自 Telegram API 的任何临时或永久禁令,这与从一个帐户过于频繁地发送消息不同。
脚本的第二部分包含导出消息的实际函数及其执行(进行必要的过滤以避免导致收集中途停止的严重错误):
async def main(): messages = await client.get_messages(group_name, limit=limit_messages) df = pd.DataFrame(columns=['date', 'user_id', 'raw_text', 'views', 'forwards', 'text', 'chan', 'id']) for m in messages: if m is not None: if 'from_id' in m.__dict__.keys(): if m.from_id is not None: if 'user_id' in m.from_id.__dict__.keys(): df = pd.concat([df, pd.DataFrame([{'date': m.date, 'user_id': m.from_id.user_id, 'raw_text': m.raw_text, 'views': m.views, 'forwards': m.forwards, 'text': m.text, 'chan': group_name, 'id': m.id}])], ignore_index=True) df = df[~df['user_id'].isna()] df = df[~df['text'].isna()] df['date'] = pd.to_datetime(df['date']) df = df.sort_values('date').reset_index(drop=True) df.to_csv(f'../telegram_messages_{group_name}.csv', index=False) client = TelegramClient('session', app_id, app_hash) client.start(phone=phone_number, password=password) with client: client.loop.run_until_complete(main())
最后,可以使用以下命令执行此脚本(将值替换为您自己的值):
python telegram-group-data-collection.py \ --app_id=123456 --app_hash="123456qwerty" \ --phone_number="+xxxxxx" --password="qwerty123" \ --group_name="xxx" --limit_messages=10000
大多数时候,我将我喜欢的项目打包到 Telegram 机器人中,因为它只需最少的努力即可启动并立即显示出潜力。在这种情况下,我也做了同样的事情。我必须说,机器人代码并不包含我在SubgraphGPT机器人的生产版本中使用的所有极端情况,因为它有相当多的继承逻辑来自我的另一个宠物项目。相反,我留下了最少量的基本代码,这些代码应该很容易根据您的需要进行修改。
telegram-bot.py脚本由几个部分组成。首先,和以前一样,导入库并初始化命令行参数。
import threading import telegram from telegram.ext import Updater, CommandHandler, MessageHandler, Filters import openai from openai.embeddings_utils import cosine_similarity import numpy as np import pandas as pd import argparse import functools # Create an Argument Parser object parser = argparse.ArgumentParser(description='Run the bot which uses prepared knowledge base enriched with contextual embeddings') # Add the arguments parser.add_argument('--openai_api_key', type=str, help='API KEY of OpenAI API to create contextual embeddings for each line') parser.add_argument('--telegram_bot_token', type=str, help='A telegram bot token obtained via @BotFather') parser.add_argument('--file', type=str, help='A source CSV file with the questions, answers and embeddings') parser.add_argument('--topic', type=str, help='Write the topic to add a default context for the bot') parser.add_argument('--start_message', type=str, help="The text that will be shown to the users after they click /start button/command", default="Hello, World!") parser.add_argument('--model', type=str, help='A model of ChatGPT which will be used', default='gpt-3.5-turbo-16k') parser.add_argument('--num_top_qa', type=str, help="The number of top similar questions' answers as a context", default=3) # Parse the command-line arguments args = parser.parse_args() # Access the argument values openai.api_key = args.openai_api_key token = args.telegram_bot_token file = args.file topic = args.topic model = args.model num_top_qa = args.num_top_qa start_message = args.start_message
请注意,在这种情况下,您还需要一个 OpenAI API 密钥,因为为了找到与用户刚刚从知识库中输入的问题最相似的问题,您首先需要通过调用 API 来获取该问题的嵌入,就像我们对知识库本身所做的那样。
此外,您将需要:
然后是知识库文件的加载和问题嵌入的初始化。
# reading QA file with embeddings df_qa = pd.read_csv(file) df_qa['ada_embedding'] = df_qa.ada_embedding.apply(eval).apply(np.array)
为了向 ChatGPT API 发出请求,我知道它有时会因过载而响应错误,因此我使用了一个在出现错误时自动请求重试的函数。
def retry_on_error(func): @functools.wraps(func) def wrapper(*args, **kwargs): max_retries = 3 for i in range(max_retries): try: return func(*args, **kwargs) except Exception as e: print(f"Error occurred, retrying ({i+1}/{max_retries} attempts)...") # If all retries failed, raise the last exception raise e return wrapper @retry_on_error def call_chatgpt(*args, **kwargs): return openai.ChatCompletion.create(*args, **kwargs)
根据 OpenAI 的建议,在将文本转换为嵌入之前,应将新行替换为空格。
def get_embedding(text, model="text-embedding-ada-002"): text = text.replace("\n", " ") return openai.Embedding.create(input=[text], model=model)['data'][0]['embedding']
为了搜索最相似的问题,我们计算两个问题嵌入之间的余弦距离,直接取自 openai 库。
def search_similar(df, question, n=3, pprint=True): embedding = get_embedding(question, model='text-embedding-ada-002') df['similarities'] = df.ada_embedding.apply(lambda x: cosine_similarity(x, embedding)) res = df.sort_values('similarities', ascending=False).head(n) return res
在收到与给定问题最相似的问题-答案对的列表后,您可以将它们编译成一个文本,并以 ChatGPT 可以明确确定什么是什么的方式对其进行标记。
def collect_text_qa(df): text = '' for i, row in df.iterrows(): text += f'Q: <'+row['Question'] + '>\nA: <'+ row['Answer'] +'>\n\n' print('len qa', len(text.split(' '))) return text
之后,就已经有必要将文章开头描述的提示的“片段”收集成一个整体。
def collect_full_prompt(question, qa_prompt, chat_prompt=None): prompt = f'I need to get an answer to the question related to the topic of "{topic}": ' + "{{{"+ question +"}}}. " prompt += '\n\nPossibly, you might find an answer in these Q&As [use the information only if it is actually relevant and useful for the question answering]: \n\n' + qa_prompt # edit if you need to use this also if chat_prompt is not None: prompt += "---------\nIf you didn't find a clear answer in the Q&As, possibly, these talks from chats might be helpful to answer properly [use the information only if it is actually relevant and useful for the question answering]: \n\n" + chat_prompt prompt += f'\nFinally, only if the information above was not enough you can use your knowledge in the topic of "{topic}" to answer the question.' return prompt
在本例中,我使用 Discord 中的消息删除了部分,但如果 chat_prompt != None,您仍然可以遵循逻辑。
此外,我们需要一个函数将从 ChatGPT API 收到的响应拆分为 Telegram 消息(不超过 4096 个字符):
def telegram_message_format(text): max_message_length = 4096 if len(text) > max_message_length: parts = [] while len(text) > max_message_length: parts.append(text[:max_message_length]) text = text[max_message_length:] parts.append(text) return parts else: return [text]
该机器人以典型的步骤序列开始,分配由/start命令触发的两个功能并接收来自用户的个人消息:
bot = telegram.Bot(token=token) updater = Updater(token=token, use_context=True) dispatcher = updater.dispatcher dispatcher.add_handler(CommandHandler("start", start, filters=Filters.chat_type.private)) dispatcher.add_handler(MessageHandler(~Filters.command & Filters.text, message_handler)) updater.start_polling()
响应/start的代码很简单:
def start(update, context): user = update.effective_user context.bot.send_message(chat_id=user.id, text=start_message)
对于回复自由格式的消息,还不太清楚。
首先,为了避免阻塞来自不同用户的线程,让我们立即使用线程库将它们“分离”为独立的进程。
def message_handler(update, context): thread = threading.Thread(target=long_running_task, args=(update, context)) thread.start()
其次,所有逻辑都将发生在long_running_task函数内。我特意将主要片段包装在try/ except中,以便在修改机器人代码时轻松定位错误。
def long_running_task(update, context): user = update.effective_user context.bot.send_message(chat_id=user.id, text='🕰️⏰🕙⏱️⏳...') try: question = update.message.text.strip() except Exception as e: context.bot.send_message(chat_id=user.id, text=f"🤔It seems like you're sending not text to the bot. Currently, the bot can only work with text requests.") return try: qa_found = search_similar(df_qa, question, n=num_top_qa) qa_prompt = collect_text_qa(qa_found) full_prompt = collect_full_prompt(question, qa_prompt) except Exception as e: context.bot.send_message(chat_id=user.id, text=f"Search failed. Debug needed.") return
由于用自己的知识库和主题替换知识库和主题时可能会出现错误,例如由于格式原因,因此会显示人类可读的错误。
接下来,请求被发送到 ChatGPT API,并带有一条已经证明了自己的领先系统消息:“你是一个有用的助手。 ”如有必要,结果输出会被分成多条消息并发回给用户。
try: print(full_prompt) completion = call_chatgpt( model=model, n=1, messages=[{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": full_prompt}] ) result = completion['choices'][0]['message']['content'] except Exception as e: context.bot.send_message(chat_id=user.id, text=f'It seems like the OpenAI service is responding with errors. Try sending the request again.') return parts = telegram_message_format(result) for part in parts: update.message.reply_text(part, reply_to_message_id=update.message.message_id)
代码部分到此结束。
现在,此类机器人的原型可以通过以下链接以有限的格式获得。由于 API 是付费的,你每天最多可以发出 3 个请求,但我不认为这会限制任何人,因为最有趣的不是专注于某个狭窄主题的专门机器人,而是 AnythingGPT 项目的代码,该代码可以在GitHub上找到,其中包含一个简短的说明,说明如何创建自己的机器人,以基于此示例使用你的知识库解决特定任务。如果您已经阅读到最后,我希望这篇文章对您有所帮助。