Elasticsearch — это поисковая и аналитическая система с открытым исходным кодом, основанная на Apache Lucene. При создании приложений на основе данных системы отслеживания измененных данных (CDC) с использованием Elasticsearch вам потребуется спроектировать систему так, чтобы она обрабатывала частые обновления или изменения существующих документов в индексе.
В этом блоге мы рассмотрим различные варианты обновлений, включая полные обновления, частичные обновления и обновления по сценарию. Мы также обсудим, что происходит внутри Elasticsearch при изменении документа и как частые обновления влияют на загрузку ЦП в системе.
Чтобы лучше понять варианты использования с частыми обновлениями , давайте рассмотрим приложение поиска для службы потокового видео, например Netflix. Когда пользователь ищет шоу, например «политический триллер», ему возвращается набор релевантных результатов на основе ключевых слов и других метаданных.
Давайте посмотрим на пример документа в Elasticsearch шоу «Карточный домик»:
{ "name": "House of Cards", "description": "Frank Underwood is a Democrat appointed as the Secretary of State. Along with his wife, he sets out on a quest to seek revenge from the people who betrayed him while successfully rising to supremacy.", "genres": ["drama", "thriller"], "views": 100, }
В Elasticsearch можно настроить поиск для использования name
и description
в качестве полей полнотекстового поиска. Поле views
, в котором хранится количество просмотров каждого заголовка, можно использовать для продвижения контента, повышая рейтинг более популярных шоу. Поле views
увеличивается каждый раз, когда пользователь смотрит серию шоу или фильма.
При использовании этой конфигурации поиска в приложении масштаба Netflix количество выполняемых обновлений может легко превысить миллионы в минуту, как это определено в отчете Netflix Engagement Report . Согласно отчету, с января по июль пользователи просмотрели около 100 миллиардов часов контента. Если предположить, что среднее время просмотра серии или фильма составляет 15 минут, то количество просмотров в минуту в среднем достигает 1,3 миллиона. При указанной выше конфигурации поиска каждое представление потребует обновления в масштабе миллионов.
Многие поисковые и аналитические приложения могут часто обновляться, особенно если они созданы на основе данных CDC.
Давайте углубимся в общий пример того, как выполнить обновление в Elasticsearch, с помощью кода ниже:
- from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Retrieve the current document to get the current 'views' value try: current_doc = es.get(index=index_name, id=document_id) current_views = current_doc['_source']['views'] except Exception as e: print(f"Error retrieving current document: {e}") current_views = 0 # Set a default value if there's an error # Define the update body to increment 'views' by 1 update_body = { "doc": { "views": current_views + 1 # Increment 'views' by 1 } } # Perform the update try: es.update(index=index_name, id=document_id, body=update_body) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
При обновлении в Elasticsearch вы можете использовать API индекса для замены существующего документа или API обновления для частичного обновления документа.
API индексирования извлекает весь документ, вносит в него изменения, а затем переиндексирует документ. С помощью API обновления вы просто отправляете поля, которые хотите изменить, а не весь документ. Это по-прежнему приводит к переиндексации документа, но сводит к минимуму объем данных, передаваемых по сети. API обновления особенно полезен в тех случаях, когда размер документа велик и отправка всего документа по сети займет много времени.
Давайте посмотрим, как работают API индексирования и API обновления с использованием кода Python.
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Retrieve the existing document existing_document = es.get(index=index_name, id=document_id) # Make your changes to the document existing_document["_source"]["field1"] = "new_value1" existing_document["_source"]["field2"] = "new_value2" # Call the index API to perform the full update es.index(index=index_name, id=document_id, body=existing_document["_source"])
Как вы можете видеть в приведенном выше коде, индексный API требует двух отдельных вызовов Elasticsearch, что может привести к снижению производительности и увеличению нагрузки на ваш кластер.
Частичные обновления внутренне используют API переиндексации , но настроены так, что для повышения производительности требуется только один сетевой вызов.
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Specify the fields to be updated update_fields = { "field1": "new_value1", "field2": "new_value2" } # Use the update API to perform a partial update es.update(index=index_name, id=document_id, body={"doc": update_fields})
Вы можете использовать API обновления в Elasticsearch для обновления количества просмотров, но сам по себе API обновления не может использоваться для увеличения количества просмотров на основе предыдущего значения. Это связано с тем, что нам нужно старое количество просмотров, чтобы установить новое значение количества просмотров.
Давайте посмотрим, как мы можем это исправить, используя мощный язык сценариев Painless.
Painless — это язык сценариев, разработанный для Elasticsearch, который можно использовать для вычислений запросов и агрегирования, сложных условных операторов, преобразований данных и многого другого. Painless также позволяет использовать сценарии в запросах на обновление для изменения документов на основе сложной логики.
В приведенном ниже примере мы используем сценарий Painless для выполнения обновления за один вызов API и увеличения количества новых просмотров на основе значения старого количества просмотров.
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.views += 1" # Increment 'views' by 1 } } # Perform the update using the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
Сценарий Painless довольно интуитивно понятен: он просто увеличивает количество просмотров на 1 для каждого документа.
Вложенные объекты в Elasticsearch — это структура данных, которая позволяет индексировать массивы объектов как отдельные документы в одном родительском документе. Вложенные объекты полезны при работе со сложными данными, которые естественным образом образуют вложенную структуру, например объекты внутри объектов. В типичном документе Elasticsearch массивы объектов сглажены, но использование вложенного типа данных позволяет индексировать и запрашивать каждый объект в массиве независимо.
Безболезненные скрипты также можно использовать для обновления вложенных объектов в Elasticsearch.
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID for the example index_name = 'your_index' document_id = 'your_document_id' # Specify the nested field and the updated value nested_field = "nested_field_name" updated_value = "new_value" # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.nested_field_name = params.updated_value", "params": { "updated_value": updated_value } } } # Perform the update using the Update API and the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Nested object updated successfully!") except Exception as e: print(f"Error updating nested object: {e}")
Добавление нового поля в документ в Elasticsearch можно выполнить с помощью индексной операции.
Вы можете частично обновить существующий документ, добавив новое поле, с помощью API обновления. Если динамическое сопоставление индекса включено, ввести новое поле очень просто. Просто проиндексируйте документ, содержащий это поле, и Elasticsearch автоматически определит подходящее сопоставление и добавит к нему новое поле.
Если динамическое сопоставление индекса отключено, вам потребуется использовать API сопоставления обновлений. Ниже вы можете увидеть пример того, как обновить сопоставление индекса, добавив поле «категория» в индекс фильмов.
PUT /movies/_mapping { "properties": { "category": { "type": "keyword" } } }
Хотя код прост, Elasticsearch выполняет большую работу по выполнению этих обновлений, поскольку данные хранятся в неизменяемых сегментах. В результате Elasticsearch не может просто обновить документ на месте. Единственный способ выполнить обновление — переиндексировать весь документ, независимо от того, какой API используется.
Elasticsearch использует Apache Lucene под капотом. Индекс Lucene состоит из одного или нескольких сегментов. Сегмент — это автономная неизменяемая структура индекса, представляющая подмножество общего индекса. При добавлении или обновлении документов создаются новые сегменты Lucene, а старые документы помечаются для обратимого удаления. Со временем, по мере добавления новых документов или обновления существующих, может накапливаться несколько сегментов. Чтобы оптимизировать структуру индекса, Lucene периодически объединяет меньшие сегменты в более крупные.
Поскольку каждая операция обновления является операцией переиндексации, все обновления по существу представляют собой вставки с обратимым удалением.
Обработка обновления как операции вставки требует затрат. С одной стороны, мягкое удаление данных означает, что старые данные все еще сохраняются в течение некоторого времени, что приводит к раздуванию хранилища и памяти индекса. Выполнение мягкого удаления, переиндексации и операций по сборке мусора также сильно нагружает ЦП, и эта нагрузка усугубляется повторением этих операций на всех репликах.
Обновления могут стать более сложными по мере роста вашего продукта и изменения данных с течением времени. Чтобы сохранить производительность Elasticsearch, вам потребуется обновить сегменты, анализаторы и токенизаторы в вашем кластере, что потребует переиндексации всего кластера. Для производственных приложений это потребует настройки нового кластера и переноса всех данных. Миграция кластеров требует много времени и подвержена ошибкам, поэтому к этой операции нельзя относиться легкомысленно.
Простота операций обновления в Elasticsearch может маскировать тяжелые эксплуатационные задачи, происходящие под капотом системы. Elasticsearch рассматривает каждое обновление как вставку, требуя воссоздания и переиндексации всего документа. Для приложений с частыми обновлениями это может быстро стать дорогостоящим, как мы видели на примере Netflix, где каждую минуту происходят миллионы обновлений. Мы рекомендуем либо группировать обновления с помощью Bulk API , который увеличивает задержку вашей рабочей нагрузки, либо искать альтернативные решения при частых обновлениях в Elasticsearch.
Rockset, база данных поиска и аналитики, созданная в облаке, является изменяемой альтернативой Elasticsearch. Будучи построенным на базе RocksDB , хранилища ключей и значений, популярного благодаря своей изменчивости, Rockset может обновлять документы на месте. В результате обновляются и переиндексируются только значения отдельных полей, а не всего документа.
Если вы хотите сравнить производительность Elasticsearch и Rockset для рабочих нагрузок с большим количеством обновлений, вы можете начать бесплатную пробную версию Rockset с 300 долларами США в виде кредитов.