Elasticsearch是基于 Apache Lucene 的开源搜索和分析引擎。在使用 Elasticsearch 构建基于变更数据捕获 (CDC) 数据的应用程序时,您需要设计系统来处理索引中现有文档的频繁更新或修改。
在本篇博文中,我们将介绍可用于更新的不同选项,包括完整更新、部分更新和脚本更新。我们还将讨论修改文档时Elasticsearch内部发生的情况,以及频繁更新如何影响系统的 CPU 利用率。
为了更好地理解频繁更新的用例,让我们看一下 Netflix 等视频流服务的搜索应用程序。当用户搜索节目(即“政治惊悚片”)时,系统会根据关键字和其他元数据返回一组相关结果。
让我们看一下 Elasticsearch 中电视剧《纸牌屋》的示例文档:
{ "name": "House of Cards", "description": "Frank Underwood is a Democrat appointed as the Secretary of State. Along with his wife, he sets out on a quest to seek revenge from the people who betrayed him while successfully rising to supremacy.", "genres": ["drama", "thriller"], "views": 100, }
可以在 Elasticsearch 中配置搜索,以使用name
和description
作为全文搜索字段。views views
存储每个标题的观看次数,可用于提升内容,使更受欢迎的节目排名更高。每次用户观看节目或电影的一集时, views
字段都会递增。
在像 Netflix这样规模的应用程序中使用此搜索配置时,根据Netflix 参与度报告,执行的更新次数很容易超过每分钟数百万次。根据该报告,用户从 1 月到 7 月观看了约 1000 亿小时的内容。假设每集或一部电影的平均观看时间为 15 分钟,则每分钟的观看次数平均达到 130 万次。使用上面指定的搜索配置,每次观看都需要进行数百万次的更新。
许多搜索和分析应用程序都会经历频繁更新,尤其是基于 CDC 数据构建的应用程序。
让我们深入研究如何使用以下代码在 Elasticsearch 中执行更新的一般示例:
- from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Retrieve the current document to get the current 'views' value try: current_doc = es.get(index=index_name, id=document_id) current_views = current_doc['_source']['views'] except Exception as e: print(f"Error retrieving current document: {e}") current_views = 0 # Set a default value if there's an error # Define the update body to increment 'views' by 1 update_body = { "doc": { "views": current_views + 1 # Increment 'views' by 1 } } # Perform the update try: es.update(index=index_name, id=document_id, body=update_body) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
在 Elasticsearch 中执行更新时,您可以使用索引 API替换现有文档,或使用更新 API对文档进行部分更新。
索引 API 会检索整个文档,对文档进行更改,然后重新索引该文档。使用更新 API,您只需发送要修改的字段,而不是整个文档。这仍然会导致文档被重新索引,但会最大限度地减少通过网络发送的数据量。更新 API 在文档很大并且通过网络发送整个文档会很耗时的情况下特别有用。
让我们使用 Python 代码看看索引 API 和更新 API 是如何工作的。
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Retrieve the existing document existing_document = es.get(index=index_name, id=document_id) # Make your changes to the document existing_document["_source"]["field1"] = "new_value1" existing_document["_source"]["field2"] = "new_value2" # Call the index API to perform the full update es.index(index=index_name, id=document_id, body=existing_document["_source"])
正如您在上面的代码中看到的,索引 API 需要对 Elasticsearch 进行两次单独的调用,这可能会导致集群性能下降和负载增加。
部分更新内部使用重新索引 API ,但已配置为只需要一次网络调用以获得更好的性能。
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Specify the fields to be updated update_fields = { "field1": "new_value1", "field2": "new_value2" } # Use the update API to perform a partial update es.update(index=index_name, id=document_id, body={"doc": update_fields})
您可以使用 Elasticsearch 中的更新 API 来更新查看次数,但更新 API 本身无法用于根据先前的值增加查看次数。这是因为我们需要旧的查看次数来设置新的查看次数值。
让我们看看如何使用强大的脚本语言 Painless 来解决这个问题。
Painless是一种为 Elasticsearch 设计的脚本语言,可用于查询和聚合计算、复杂条件、数据转换等。Painless 还支持在更新查询中使用脚本来根据复杂逻辑修改文档。
在下面的示例中,我们使用 Painless 脚本在单个 API 调用中执行更新,并根据旧视图计数的值增加新的视图计数。
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.views += 1" # Increment 'views' by 1 } } # Perform the update using the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
Painless 脚本非常直观易懂,它只是将每个文档的视图计数增加 1。
Elasticsearch 中的嵌套对象是一种数据结构,允许将对象数组作为单个父文档中的单独文档进行索引。处理自然形成嵌套结构的复杂数据(如对象中的对象)时,嵌套对象非常有用。在典型的 Elasticsearch 文档中,对象数组是扁平的,但使用嵌套数据类型可以单独索引和查询数组中的每个对象。
Painless 脚本还可用于更新 Elasticsearch 中的嵌套对象。
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID for the example index_name = 'your_index' document_id = 'your_document_id' # Specify the nested field and the updated value nested_field = "nested_field_name" updated_value = "new_value" # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.nested_field_name = params.updated_value", "params": { "updated_value": updated_value } } } # Perform the update using the Update API and the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Nested object updated successfully!") except Exception as e: print(f"Error updating nested object: {e}")
可以通过索引操作向 Elasticsearch 中的文档添加新字段。
您可以使用 Update API 使用新字段部分更新现有文档。启用索引上的动态映射后,引入新字段非常简单。只需索引包含该字段的文档,Elasticsearch 就会自动找出合适的映射并将新字段添加到映射中。
如果索引上的动态映射已禁用,您将需要使用更新映射 API。您可以在下面看到一个示例,了解如何通过向电影索引添加“类别”字段来更新索引映射。
PUT /movies/_mapping { "properties": { "category": { "type": "keyword" } } }
虽然代码很简单,但 Elasticsearch 内部需要做很多繁重的工作来执行这些更新,因为数据存储在不可变的段中。因此,Elasticsearch 无法简单地对文档进行就地更新。执行更新的唯一方法是重新索引整个文档,无论使用哪种 API。
Elasticsearch 的底层使用了 Apache Lucene。Lucene 索引由一个或多个段组成。段是一个独立的、不可变的索引结构,代表整体索引的子集。添加或更新文档时,会创建新的 Lucene 段,并将旧文档标记为软删除。随着时间的推移,随着新文档的添加或现有文档的更新,可能会累积多个段。为了优化索引结构,Lucene 会定期将较小的段合并为较大的段。
由于每个更新操作都是重新索引操作,因此所有更新本质上都是带有软删除的插入。
将更新视为插入操作会产生成本影响。一方面,软删除数据意味着旧数据仍会保留一段时间,从而导致索引的存储和内存膨胀。执行软删除、重新索引和垃圾收集操作也会对 CPU 造成严重影响,而在所有副本上重复这些操作会加剧这种影响。
随着产品的增长和数据随时间的变化,更新会变得更加棘手。为了保持 Elasticsearch 的性能,您需要更新集群中的分片、分析器和标记器,这需要对整个集群进行重新索引。对于生产应用程序,这将需要设置一个新集群并迁移所有数据。迁移集群既耗时又容易出错,因此这不是一项可以掉以轻心的操作。
Elasticsearch 中更新操作的简单性可以掩盖系统底层发生的繁重操作任务。Elasticsearch 将每个更新视为插入,需要重新创建和重新索引整个文档。对于频繁更新的应用程序,这很快就会变得昂贵,正如我们在 Netflix 示例中看到的那样,每分钟都会发生数百万次更新。我们建议使用Bulk API批量更新,这会增加工作负载的延迟,或者在面对 Elasticsearch 中的频繁更新时寻找替代解决方案。
Rockset 是一款在云端构建的搜索和分析数据库,是 Elasticsearch 的可变替代方案。Rockset 建立在RocksDB之上,后者是一种以可变性而广受欢迎的键值存储,因此可以对文档进行就地更新。这样一来,只会更新和重新索引单个字段的值,而不是整个文档。
如果您想比较 Elasticsearch 和 Rockset 在更新密集型工作负载下的性能,您可以获得 300 美元的信用额度开始Rockset 的免费试用。