In this blog, we will build a knowledge graph for product recommendations using taxonomy and complementary taxonomy. CocoIndex makes it easy to build and maintain knowledge graphs with continuous source updates. knowledge graph CocoIndex Product taxonomy is a way to organize product catalogs in a logical and hierarchical structure; a great detailed explanation can be found here. In practice, it is a complicated problem: a product can be part of multiple categories, and a category can have multiple parents. Product taxonomy i here We will use LLM to extract a list of taxonomies from product descriptions. We will also use LLM to generate a list of complementary taxonomies for each product - for example, when someone buys a notepad, they may also buy a pen as a complementary product. We'll use Knowledge Graph to explore the relationships between products that can be further used for product recommendations or labeling. The source code is available at CocoIndex Examples - product_taxonomy. CocoIndex Examples - product_taxonomy We are constantly improving, and more features and examples are coming soon. Stay tuned and follow our progress by starring⭐ our GitHub repo. starring our GitHub repo. GitHub repo Prerequisites Install PostgreSQL. CocoIndex uses PostgreSQL internally for incremental processing. Install Neo4j, a graph database. Configure your OpenAI API key. Alternatively, you can switch to Ollama, which runs LLM models locally - guide. Install PostgreSQL. CocoIndex uses PostgreSQL internally for incremental processing. Install PostgreSQL Install Neo4j, a graph database. Install Neo4j Configure your OpenAI API key. Alternatively, you can switch to Ollama, which runs LLM models locally - guide. Configure your OpenAI API key guide Documentation You can read the official CocoIndex Documentation for Property Graph Targets here. here Data flow to build knowledge graph Overview The core flow is about ~100 lines of python code. ~100 lines of python code We are going to declare a data flow ingest products (in JSON) for each product, parse JSON map & clean up data extract taxonomy from the mapped data collect data export data to neo4j ingest products (in JSON) for each product, parse JSON map & clean up data extract taxonomy from the mapped data parse JSON map & clean up data extract taxonomy from the mapped data parse JSON map & clean up data extract taxonomy from the mapped data collect data export data to neo4j Add documents as source @cocoindex.flow_def(name="StoreProduct") def store_product_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): data_scope["products"] = flow_builder.add_source( cocoindex.sources.LocalFile(path="products", included_patterns=["*.json"]), refresh_interval=datetime.timedelta(seconds=5)) @cocoindex.flow_def(name="StoreProduct") def store_product_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): data_scope["products"] = flow_builder.add_source( cocoindex.sources.LocalFile(path="products", included_patterns=["*.json"]), refresh_interval=datetime.timedelta(seconds=5)) Here flow_builder.add_source creates a KTable. filename is the key of the KTable. flow_builder.add_source KTable filename Add data collectors Add collectors at the root scope to collect the product, taxonomy and complementary taxonomy. product_node = data_scope.add_collector() product_taxonomy = data_scope.add_collector() product_complementary_taxonomy = data_scope.add_collector() product_node = data_scope.add_collector() product_taxonomy = data_scope.add_collector() product_complementary_taxonomy = data_scope.add_collector() Process each product We will parse the JSON file for each product, and transform the data to the format that we need for downstream processing. Data Mapping @cocoindex.op.function(behavior_version=2) def extract_product_info(product: cocoindex.typing.Json, filename: str) -> ProductInfo: return ProductInfo( id=f"{filename.removesuffix('.json')}", url=product["source"], title=product["title"], price=float(product["price"].lstrip("$").replace(",", "")), detail=Template(PRODUCT_TEMPLATE).render(**product), ) @cocoindex.op.function(behavior_version=2) def extract_product_info(product: cocoindex.typing.Json, filename: str) -> ProductInfo: return ProductInfo( id=f"{filename.removesuffix('.json')}", url=product["source"], title=product["title"], price=float(product["price"].lstrip("$").replace(",", "")), detail=Template(PRODUCT_TEMPLATE).render(**product), ) Here we define a function for data mapping, e.g., clean up the id field map title -> title clean up the price field generate a markdown string for the product detail based on all the fields (for LLM to extract taxonomy and complementary taxonomy, we find that markdown works best as context for LLM). clean up the id field id map title -> title title title clean up the price field price generate a markdown string for the product detail based on all the fields (for LLM to extract taxonomy and complementary taxonomy, we find that markdown works best as context for LLM). Flow Within the flow, we plug in the data mapping transformation to process each product JSON. with data_scope["products"].row() as product: data = (product["content"] .transform(cocoindex.functions.ParseJson(), language="json") .transform(extract_product_info, filename=product["filename"])) product_node.collect(id=data["id"], url=data["url"], title=data["title"], price=data["price"]) with data_scope["products"].row() as product: data = (product["content"] .transform(cocoindex.functions.ParseJson(), language="json") .transform(extract_product_info, filename=product["filename"])) product_node.collect(id=data["id"], url=data["url"], title=data["title"], price=data["price"]) The first transform() parses the JSON file. The second transform() performs the defined data mapping. We collect the fields we need for the product node in Neo4j. The first transform() parses the JSON file. transform() The second transform() performs the defined data mapping. transform() We collect the fields we need for the product node in Neo4j. Extract taxonomy and complementary taxonomy using LLM Product Taxonomy Definition Since we are using LLM to extract product taxonomy, we need to provide a detailed instruction at the class-level docstring. @dataclasses.dataclass class ProductTaxonomy: """ Taxonomy for the product. A taxonomy is a concise noun (or short noun phrase), based on its core functionality, without specific details such as branding, style, etc. Always use the most common words in US English. Use lowercase without punctuation, unless it's a proper noun or acronym. A product may have multiple taxonomies. Avoid large categories like "office supplies" or "electronics". Use specific ones, like "pen" or "printer". """ name: str @dataclasses.dataclass class ProductTaxonomy: """ Taxonomy for the product. A taxonomy is a concise noun (or short noun phrase), based on its core functionality, without specific details such as branding, style, etc. Always use the most common words in US English. Use lowercase without punctuation, unless it's a proper noun or acronym. A product may have multiple taxonomies. Avoid large categories like "office supplies" or "electronics". Use specific ones, like "pen" or "printer". """ name: str Define Product Taxonomy Info Basically we want to extract all possible taxonomies for a product, and think about what other products are likely to be bought together with the current product. @dataclasses.dataclass class ProductTaxonomyInfo: """ Taxonomy information for the product. Fields: - taxonomies: Taxonomies for the current product. - complementary_taxonomies: Think about when customers buy this product, what else they might need as complementary products. Put labels for these complentary products. """ taxonomies: list[ProductTaxonomy] complementary_taxonomies: list[ProductTaxonomy] @dataclasses.dataclass class ProductTaxonomyInfo: """ Taxonomy information for the product. Fields: - taxonomies: Taxonomies for the current product. - complementary_taxonomies: Think about when customers buy this product, what else they might need as complementary products. Put labels for these complentary products. """ taxonomies: list[ProductTaxonomy] complementary_taxonomies: list[ProductTaxonomy] For each product, we want some insight about its taxonomy and complementary taxonomy and we could use that as bridge to find related product using knowledge graph. LLM Extraction Finally, we will use cocoindex.functions.ExtractByLlm to extract the taxonomy and complementary taxonomy from the product detail. cocoindex.functions.ExtractByLlm taxonomy = data["detail"].transform(cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4.1"), output_type=ProductTaxonomyInfo)) taxonomy = data["detail"].transform(cocoindex.functions.ExtractByLlm( llm_spec=cocoindex.LlmSpec( api_type=cocoindex.LlmApiType.OPENAI, model="gpt-4.1"), output_type=ProductTaxonomyInfo)) For example, LLM takes the description of the gel pen, and extracts taxonomy to be gel pen. Meanwhile, it suggests that when people buy gel pen, they may also be interested in notebook etc as complimentary taxonomy. gel pen gel pen gel pen notebook And then we will collect the taxonomy and complementary taxonomy to the collector. with taxonomy['taxonomies'].row() as t: product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) with taxonomy['complementary_taxonomies'].row() as t: product_complementary_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) with taxonomy['taxonomies'].row() as t: product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) with taxonomy['complementary_taxonomies'].row() as t: product_complementary_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) Build knowledge graph Basic concepts All nodes for Neo4j need two things: Label: The type of the node. E.g., Product, Taxonomy. Primary key field: The field that uniquely identifies the node. E.g., id for Product nodes. Label: The type of the node. E.g., Product, Taxonomy. Product Taxonomy Primary key field: The field that uniquely identifies the node. E.g., id for Product nodes. id Product CocoIndex uses the primary key field to match the nodes and deduplicate them. If you have multiple nodes with the same primary key, CocoIndex keeps only one of them. There are two ways to map nodes: When you have a collector just for the node, you can directly export it to Neo4j. For example Product. We've collected each product explicitly. When you have a collector for relationships connecting to the node, you can map nodes from selected fields in the relationship collector. You must declare a node label and primary key field. When you have a collector just for the node, you can directly export it to Neo4j. For example Product. We've collected each product explicitly. Product When you have a collector for relationships connecting to the node, you can map nodes from selected fields in the relationship collector. You must declare a node label and primary key field. For example, product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) product_taxonomy.collect(id=cocoindex.GeneratedField.UUID, product_id=data["id"], taxonomy=t["name"]) Collects a relationship, and taxonomy node is created from the relationship. Configure Neo4j connection: conn_spec = cocoindex.add_auth_entry( "Neo4jConnection", cocoindex.storages.Neo4jConnection( uri="bolt://localhost:7687", user="neo4j", password="cocoindex", )) conn_spec = cocoindex.add_auth_entry( "Neo4jConnection", cocoindex.storages.Neo4jConnection( uri="bolt://localhost:7687", user="neo4j", password="cocoindex", )) Export Product nodes to Neo4j Product product_node.export( "product_node", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Nodes(label="Product") ), primary_key_fields=["id"], ) product_node.export( "product_node", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Nodes(label="Product") ), primary_key_fields=["id"], ) This exports Neo4j nodes with label Product from the product_node collector. Product product_node It declares Neo4j node label Product. It specifies id as the primary key field. It carries all the fields from product_node collector to Neo4j nodes with label Product. It declares Neo4j node label Product. It specifies id as the primary key field. Product id It carries all the fields from product_node collector to Neo4j nodes with label Product. product_node Product Export Taxonomy nodes to Neo4j Taxonomy We don't have explicit collector for Taxonomy nodes. They are part of the product_taxonomy and product_complementary_taxonomy collectors and fields are collected during the taxonomy extraction. Taxonomy product_taxonomy product_complementary_taxonomy To export them as Neo4j nodes, we need to first declare Taxonomy nodes. Taxonomy flow_builder.declare( cocoindex.storages.Neo4jDeclaration( connection=conn_spec, nodes_label="Taxonomy", primary_key_fields=["value"], ) ) flow_builder.declare( cocoindex.storages.Neo4jDeclaration( connection=conn_spec, nodes_label="Taxonomy", primary_key_fields=["value"], ) ) Next, export the product_taxonomy as relationship to Neo4j. product_taxonomy product_taxonomy.export( "product_taxonomy", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Relationships( rel_type="PRODUCT_TAXONOMY", source=cocoindex.storages.NodeFromFields( label="Product", fields=[ cocoindex.storages.TargetFieldMapping( source="product_id", target="id"), ] ), target=cocoindex.storages.NodeFromFields( label="Taxonomy", fields=[ cocoindex.storages.TargetFieldMapping( source="taxonomy", target="value"), ] ), ), ), primary_key_fields=["id"], ) product_taxonomy.export( "product_taxonomy", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Relationships( rel_type="PRODUCT_TAXONOMY", source=cocoindex.storages.NodeFromFields( label="Product", fields=[ cocoindex.storages.TargetFieldMapping( source="product_id", target="id"), ] ), target=cocoindex.storages.NodeFromFields( label="Taxonomy", fields=[ cocoindex.storages.TargetFieldMapping( source="taxonomy", target="value"), ] ), ), ), primary_key_fields=["id"], ) Similarly, we can export the product_complementary_taxonomy as relationship to Neo4j. product_complementary_taxonomy product_complementary_taxonomy.export( "product_complementary_taxonomy", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Relationships( rel_type="PRODUCT_COMPLEMENTARY_TAXONOMY", source=cocoindex.storages.NodeFromFields( label="Product", fields=[ cocoindex.storages.TargetFieldMapping( source="product_id", target="id"), ] ), target=cocoindex.storages.NodeFromFields( label="Taxonomy", fields=[ cocoindex.storages.TargetFieldMapping( source="taxonomy", target="value"), ] ), ), ), primary_key_fields=["id"], ) product_complementary_taxonomy.export( "product_complementary_taxonomy", cocoindex.storages.Neo4j( connection=conn_spec, mapping=cocoindex.storages.Relationships( rel_type="PRODUCT_COMPLEMENTARY_TAXONOMY", source=cocoindex.storages.NodeFromFields( label="Product", fields=[ cocoindex.storages.TargetFieldMapping( source="product_id", target="id"), ] ), target=cocoindex.storages.NodeFromFields( label="Taxonomy", fields=[ cocoindex.storages.TargetFieldMapping( source="taxonomy", target="value"), ] ), ), ), primary_key_fields=["id"], ) The cocoindex.storages.Relationships declares how to map relationships in Neo4j. cocoindex.storages.Relationships In a relationship, there's: A source node and a target node. A relationship connecting the source and target. Note that different relationships may share the same source and target nodes. A source node and a target node. A relationship connecting the source and target. Note that different relationships may share the same source and target nodes. NodeFromFields takes the fields from the entity_relationship collector and creates Taxonomy nodes. NodeFromFields entity_relationship Taxonomy Main function Finally, the main function for the flow initializes the CocoIndex flow and runs it. @cocoindex.main_fn() def _run(): pass if __name__ == "__main__": load_dotenv(override=True) _run() @cocoindex.main_fn() def _run(): pass if __name__ == "__main__": load_dotenv(override=True) _run() Query and test your index 🎉 Now you are all set! Install the dependencies: pip install -e . Run following commands to setup and update the index. python main.py cocoindex setup python main.py cocoindex update You'll see the index updates state in the terminal. For example, you'll see the following output: documents: 9 added, 0 removed, 0 updated (Optional) I used CocoInsight to troubleshoot the index generation and understand the data lineage of the pipeline. It is in free beta now, you can give it a try. Run following command to start CocoInsight: python3 main.py cocoindex server -c https://cocoindex.io And then open the url https://cocoindex.io/cocoinsight. It just connects to your local CocoIndex server, with Zero pipeline data retention. Install the dependencies: pip install -e . Install the dependencies: pip install -e . pip install -e . Run following commands to setup and update the index. python main.py cocoindex setup python main.py cocoindex update You'll see the index updates state in the terminal. For example, you'll see the following output: documents: 9 added, 0 removed, 0 updated Run following commands to setup and update the index. python main.py cocoindex setup python main.py cocoindex update python main.py cocoindex setup python main.py cocoindex update You'll see the index updates state in the terminal. For example, you'll see the following output: documents: 9 added, 0 removed, 0 updated documents: 9 added, 0 removed, 0 updated (Optional) I used CocoInsight to troubleshoot the index generation and understand the data lineage of the pipeline. It is in free beta now, you can give it a try. Run following command to start CocoInsight: python3 main.py cocoindex server -c https://cocoindex.io (Optional) I used CocoInsight to troubleshoot the index generation and understand the data lineage of the pipeline. It is in free beta now, you can give it a try. Run following command to start CocoInsight: python3 main.py cocoindex server -c https://cocoindex.io python3 main.py cocoindex server -c https://cocoindex.io And then open the url https://cocoindex.io/cocoinsight. It just connects to your local CocoIndex server, with Zero pipeline data retention. And then open the url https://cocoindex.io/cocoinsight. It just connects to your local CocoIndex server, with Zero pipeline data retention. https://cocoindex.io/cocoinsight Browse the knowledge graph After the knowledge graph is built, you can explore the knowledge graph you built in Neo4j Browser. For the dev environment, you can connect to Neo4j browser using credentials: username: Neo4j password: cocoindex which is pre-configured in our docker compose config.yaml. username: Neo4j Neo4j password: cocoindex which is pre-configured in our docker compose config.yaml. cocoindex config.yaml You can open it at http://localhost:7474, and run the following Cypher query to get all relationships: http://localhost:7474 MATCH p=()-->() RETURN p MATCH p=()-->() RETURN p Support us We are constantly improving, and more features and examples are coming soon. If you love this article, please give us a star ⭐ at GitHub repo to help us grow. GitHub repo Thanks for reading!