在这个博客中,我们将建立一个 使用分类和补充分类的产品建议。 通过不断的源代码更新,轻松构建和维护知识图表。 knowledge graph 可口可乐 以一个逻辑和等级的结构来组织产品目录的方法;可以找到一个非常详细的解释。 在实践中,这是一个复杂的问题:一个产品可以属于多个类别,一个类别可以有多个父母。 Product taxonomy i 这里 我们还将使用LLM来生成每个产品的补充性分类列表 - 例如,当有人购买笔记本时,他们也可以作为补充产品购买笔记本。 源代码可在 . CocoIndex 示例 - product_taxonomy 我们正在不断改进,更多的功能和示例即将到来。 starring our . GitHub 回复 GitHub 回复 前提条件 安装 PostgreSQL. CocoIndex 使用 PostgreSQL 内部进行增量处理。 安装 Neo4j,一个图形数据库。 另一种方式是,你可以切换到Ollama,它在本地运行LLM模型 - 指南。 文档 您可以阅读房地产图表目标的官方CocoIndex文档 . 这里 数据流构建知识图表 概览 核心流量是关于 . 百行Python代码 我们将宣布数据流 输入产品(在 JSON 中) for each product, parse JSON map & clean up data extract taxonomy from the mapped data 收集数据 将数据导出到 neo4j 将文件添加为来源 @cocoindex.flow_def(name="StoreProduct") def store_product_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): data_scope["products"] = flow_builder.add_source( cocoindex.sources.LocalFile(path="products", included_patterns=["*.json"]), refresh_interval=datetime.timedelta(seconds=5)) 这里 创建 A 是的 这是Ktable的关键。 flow_builder.add_source 可见 filename 添加数据收集器 在根范围内添加收集器来收集产品、分类和补充分类。 product_node = data_scope.add_collector() product_taxonomy = data_scope.add_collector() product_complementary_taxonomy = data_scope.add_collector() 每个产品过程 我们将对每个产品的JSON文件进行分析,并将数据转换为我们需要的下游处理格式。 数据地图 @cocoindex.op.function(behavior_version=2) def extract_product_info(product: cocoindex.typing.Json, filename: str) -> ProductInfo: return ProductInfo( id=f"{filename.removesuffix('.json')}", url=product["source"], title=product["title"], price=float(product["price"].lstrip("$").replace(",", "")), detail=Template(PRODUCT_TEMPLATE).render(**product), ) 在这里,我们定义了数据映射的函数,例如: 清理 id 字段 地图标题 -> 标题 清理价格领域 生成基于所有领域的产品细节的标记字符串(对于LLM提取分类和补充分类,我们发现标记最好作为LLM的背景)。 流 在流中,我们插入数据映射转换来处理每个产品JSON。 with data_scope["products"].row() as product: data = (product["content"] .transform(cocoindex.functions.ParseJson(), language="json") .transform(extract_product_info, filename=product["filename"])) product_node.collect(id=data["id"], url=data["url"], title=data["title"], price=data["price"]) 第一個轉換() 解析 JSON 檔案。 第二个转换() 执行定义数据映射。 我们收集了 Neo4j 中的产品节点所需的字段。 使用LLM提取分类和补充分类 产品 Taxonomy 定义 由于我们正在使用LLM来提取产品分类,我们需要在班级水平的教程中提供详细的指导。 @dataclasses.dataclass class ProductTaxonomy: """ Taxonomy for the product. A taxonomy is a concise noun (or short noun phrase), based on its core functionality, without specific details such as branding, style, etc. Always use the most common words in US English. Use lowercase without punctuation, unless it's a proper noun or acronym. A product may have multiple taxonomies. Avoid large categories like "office supplies" or "electronics". Use specific ones, like "pen" or "printer". """ name: str 定义产品税收信息 基本上,我们想提取一个产品的所有可能的分类,并考虑哪些其他产品可能会与当前的产品一起购买。 @dataclasses.dataclass class ProductTaxonomyInfo: """ Taxonomy information for the product. Fields: - taxonomies: Taxonomies for the current product. - complementary_taxonomies: Think about when customers buy this product, what else they might need as complementary products. Put labels for these complentary products. """ taxonomies: list[ProductTaxonomy] complementary_taxonomies: list[ProductTaxonomy] 对于每个产品,我们想要一些关于其分类和补充分类的见解,我们可以使用这一点作为找到相关产品的桥梁,使用知识图表。 LLM提取 最后,我们将使用 从产品细节中提取分类和补充分类。 cocoindex.functions.ExtractByLlm taxonomy = data["detail"].transform(cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4.1"), output_type=ProductTaxonomyInfo)) 例如,LLM采取了描述的 ,并提取计量为 与此同时,它表明,当人们购买 他们也可能对 比如称赞的税率。 钢笔 钢笔 钢笔 笔记本 然后我们将收集到收集者的分类和补充分类。 with taxonomy['taxonomies'].row() as t: product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) with taxonomy['complementary_taxonomies'].row() as t: product_complementary_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) 构建知识图形 基本概念 Neo4j 的所有节点都需要两件事: 标签:节点的类型. 例如,产品,计量学。 主要密钥字段:唯一识别节点的字段,例如,产品节点的ID。 CocoIndex 使用主要密钥字段来匹配节点并将其重复,如果您有多个节点具有相同的主要密钥,则 CocoIndex 只保留其中一个。 There are two ways to map nodes: 当你有一个仅用于节点的收集器时,你可以直接将其导出到Neo4j。 当您有一个连接到节点的关系收集器时,您可以从关系收集器中的选定字段中绘制节点,您必须声明节点标签和主要密钥字段。 例如, product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) 收集一个关系,并从该关系中创建分类节点。 配置 Neo4j 连接: conn_spec = cocoindex.add_auth_entry( "Neo4jConnection", cocoindex.storages.Neo4jConnection( uri="bolt://localhost:7687", user="neo4j", password="cocoindex", )) 出口 關於 Neo4j Product 产品 product_node.export( "product_node", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Nodes(label="Product") ), primary_key_fields=["id"], ) 此出口 Neo4j 节点与标签 從The 收藏家 Product product_node 它声明 Neo4j 节点标签产品. 它指定 id 作为主要密钥字段。 它携带了从 product_node 收集器到具有产品标签的 Neo4j 节点的所有字段。 出口 關於 Neo4j 税务 税务 我们没有明确的收藏家为 ,他们是其中的一部分。 和 收集器和领域在计量学提取过程中收集。 Taxonomy product_taxonomy product_complementary_taxonomy 要将它们导出为Neo4j节点,我们需要先声明 节点 Taxonomy flow_builder.declare( cocoindex.storages.Neo4jDeclaration( connection=conn_spec, nodes_label="Taxonomy", primary_key_fields=["value"], ) ) 其次,出口 与NEO4J的关系 product_taxonomy product_taxonomy.export( "product_taxonomy", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Relationships( rel_type="PRODUCT_TAXONOMY", source=cocoindex.storages.NodeFromFields( label="Product", fields=[ cocoindex.storages.TargetFieldMapping( source="product_id", target="id"), ] ), target=cocoindex.storages.NodeFromFields( label="Taxonomy", fields=[ cocoindex.storages.TargetFieldMapping( source="taxonomy", target="value"), ] ), ), ), primary_key_fields=["id"], ) 同样,我们可以出口 与NEO4J的关系 product_complementary_taxonomy product_complementary_taxonomy.export( "product_complementary_taxonomy", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Relationships( rel_type="PRODUCT_COMPLEMENTARY_TAXONOMY", source=cocoindex.storages.NodeFromFields( label="Product", fields=[ cocoindex.storages.TargetFieldMapping( source="product_id", target="id"), ] ), target=cocoindex.storages.NodeFromFields( label="Taxonomy", fields=[ cocoindex.storages.TargetFieldMapping( source="taxonomy", target="value"), ] ), ), ), primary_key_fields=["id"], ) 该 如何在 Neo4j 中绘制关系。 cocoindex.storages.Relationships 在一个关系中,有: 一个源节点和一个目标节点 连接源和目标的关系 请注意,不同的关系可能共享相同的源和目标节点。 把田野从 收藏家和创造者 节点 NodeFromFields entity_relationship Taxonomy Main function 最后,流的主要函数启动了CocoIndex流并运行它。 @cocoindex.main_fn() def _run(): pass if __name__ == "__main__": load_dotenv(override=True) _run() 查询并测试您的索引 现在你们都设定了! Install the dependencies: pip install -e . Run following commands to setup and update the index. python main.py cocoindex setup python main.py cocoindex update You'll see the index updates state in the terminal. For example, you'll see the following output: documents: 9 added, 0 removed, 0 updated (Optional) I used CocoInsight to troubleshoot the index generation and understand the data lineage of the pipeline. It is in free beta now, you can give it a try. Run following command to start CocoInsight: python3 main.py cocoindex server -c https://cocoindex.io And then open the url . It just connects to your local CocoIndex server, with Zero pipeline data retention. https://cocoindex.io/cocoinsight 浏览知识图表 知识图构建后,您可以探索您在 Neo4j 浏览器中构建的知识图。 对于开发环境,您可以使用凭证连接到 Neo4j 浏览器: 用户名: Neo4j 密码: cocoindex 预先配置在我们的 docker 组成 config.yaml。 你可以打开它在 ,然后运行以下数字查询以获取所有关系: http://localhost:7474 MATCH p=()-->() RETURN p 支持我们 我们不断改进,更多功能和示例即将到来.如果你喜欢这篇文章,请给我们一颗星 帮助我们成长。 GitHub 回复 谢谢你的阅读!