Elasticsearch es un motor de búsqueda y análisis de código abierto basado en Apache Lucene. Al crear aplicaciones a partir de datos de captura de datos modificados (CDC) utilizando Elasticsearch, querrá diseñar el sistema para manejar actualizaciones o modificaciones frecuentes de los documentos existentes en un índice.
En este blog, analizaremos las diferentes opciones disponibles para las actualizaciones, incluidas actualizaciones completas, actualizaciones parciales y actualizaciones programadas. También discutiremos lo que sucede bajo el capó en Elasticsearch al modificar un documento y cómo las actualizaciones frecuentes afectan la utilización de la CPU en el sistema.
Para comprender mejor los casos de uso que tienen actualizaciones frecuentes , veamos una aplicación de búsqueda para un servicio de transmisión de video como Netflix. Cuando un usuario busca un programa, es decir, "thriller político", se le devuelve un conjunto de resultados relevantes basados en palabras clave y otros metadatos.
Veamos un documento de ejemplo en Elasticsearch del programa “House of Cards”:
{ "name": "House of Cards", "description": "Frank Underwood is a Democrat appointed as the Secretary of State. Along with his wife, he sets out on a quest to seek revenge from the people who betrayed him while successfully rising to supremacy.", "genres": ["drama", "thriller"], "views": 100, }
La búsqueda se puede configurar en Elasticsearch para utilizar name
y description
como campos de búsqueda de texto completo. El campo views
, que almacena el número de vistas por título, se puede utilizar para mejorar el contenido y clasificar los programas más populares en una posición más alta. El campo views
aumenta cada vez que un usuario mira un episodio de un programa o una película.
Cuando se utiliza esta configuración de búsqueda en una aplicación de la escala de Netflix , el número de actualizaciones realizadas puede fácilmente superar los millones por minuto, según lo determinado por el Informe de participación de Netflix . Según el informe, los usuarios vieron aproximadamente 100 mil millones de horas de contenido de enero a julio. Suponiendo un tiempo de visualización promedio de 15 minutos por episodio o película, el número de visualizaciones por minuto alcanza los 1,3 millones en promedio. Con la configuración de búsqueda especificada anteriormente, cada vista requeriría una actualización en la escala de millones.
Muchas aplicaciones de búsqueda y análisis pueden experimentar actualizaciones frecuentes, especialmente cuando se basan en datos de los CDC.
Profundicemos en un ejemplo general de cómo realizar una actualización en Elasticsearch con el siguiente código:
- from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Retrieve the current document to get the current 'views' value try: current_doc = es.get(index=index_name, id=document_id) current_views = current_doc['_source']['views'] except Exception as e: print(f"Error retrieving current document: {e}") current_views = 0 # Set a default value if there's an error # Define the update body to increment 'views' by 1 update_body = { "doc": { "views": current_views + 1 # Increment 'views' by 1 } } # Perform the update try: es.update(index=index_name, id=document_id, body=update_body) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
Al realizar una actualización en Elasticsearch, puede usar la API de índice para reemplazar un documento existente o la API de actualización para realizar una actualización parcial de un documento.
La API de índice recupera el documento completo, realiza cambios en el documento y luego lo reindexa. Con la API de actualización, simplemente envía los campos que desea modificar, en lugar del documento completo. Esto aún da como resultado que el documento se reindexe pero minimiza la cantidad de datos enviados a través de la red. La API de actualización es especialmente útil en casos en los que el tamaño del documento es grande y enviar el documento completo a través de la red llevará mucho tiempo.
Veamos cómo funcionan tanto la API de índice como la API de actualización usando código Python.
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Retrieve the existing document existing_document = es.get(index=index_name, id=document_id) # Make your changes to the document existing_document["_source"]["field1"] = "new_value1" existing_document["_source"]["field2"] = "new_value2" # Call the index API to perform the full update es.index(index=index_name, id=document_id, body=existing_document["_source"])
Como puede ver en el código anterior, la API de índice requiere dos llamadas separadas a Elasticsearch, lo que puede resultar en un rendimiento más lento y una mayor carga en su clúster.
Las actualizaciones parciales utilizan internamente la API de reindexación , pero se han configurado para requerir solo una única llamada de red para un mejor rendimiento.
from elasticsearch import Elasticsearch # Connect to Elasticsearch es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID index_name = "your_index" document_id = "1" # Specify the fields to be updated update_fields = { "field1": "new_value1", "field2": "new_value2" } # Use the update API to perform a partial update es.update(index=index_name, id=document_id, body={"doc": update_fields})
Puede utilizar la API de actualización en Elasticsearch para actualizar el recuento de vistas pero, por sí sola, la API de actualización no se puede utilizar para incrementar el recuento de vistas en función del valor anterior. Esto se debe a que necesitamos el recuento de vistas anterior para establecer el nuevo valor de recuento de vistas.
Veamos cómo podemos solucionar este problema utilizando un potente lenguaje de programación, Painless.
Painless es un lenguaje de programación diseñado para Elasticsearch y se puede utilizar para consultas y cálculos de agregación, condicionales complejos, transformaciones de datos y más. Painless también permite el uso de scripts en consultas de actualización para modificar documentos basándose en una lógica compleja.
En el siguiente ejemplo, utilizamos un script Painless para realizar una actualización en una única llamada API e incrementar el recuento de vistas nuevas en función del valor del recuento de vistas anteriores.
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID you want to update index_name = 'movies' document_id = 'your_document_id' # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.views += 1" # Increment 'views' by 1 } } # Perform the update using the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Document updated successfully!") except Exception as e: print(f"Error updating document: {e}")
El script Painless es bastante intuitivo de entender, simplemente incrementa el recuento de vistas en 1 para cada documento.
Los objetos anidados en Elasticsearch son una estructura de datos que permite la indexación de matrices de objetos como documentos separados dentro de un único documento principal. Los objetos anidados son útiles cuando se trata de datos complejos que forman naturalmente una estructura anidada, como objetos dentro de objetos. En un documento típico de Elasticsearch, las matrices de objetos se aplanan, pero el uso del tipo de datos anidados permite indexar y consultar cada objeto de la matriz de forma independiente.
También se pueden utilizar scripts sencillos para actualizar objetos anidados en Elasticsearch.
from elasticsearch import Elasticsearch # Connect to your Elasticsearch instance es = Elasticsearch([{'host': 'localhost', 'port': 9200}]) # Index name and document ID for the example index_name = 'your_index' document_id = 'your_document_id' # Specify the nested field and the updated value nested_field = "nested_field_name" updated_value = "new_value" # Define the Painless script for the update update_script = { "script": { "lang": "painless", "source": "ctx._source.nested_field_name = params.updated_value", "params": { "updated_value": updated_value } } } # Perform the update using the Update API and the Painless script try: es.update(index=index_name, id=document_id, body=update_script) print("Nested object updated successfully!") except Exception as e: print(f"Error updating nested object: {e}")
Se puede agregar un nuevo campo a un documento en Elasticsearch mediante una operación de índice.
Puede actualizar parcialmente un documento existente con el nuevo campo utilizando la API de actualización. Cuando el mapeo dinámico en el índice está habilitado, introducir un nuevo campo es sencillo. Simplemente indexe un documento que contenga ese campo y Elasticsearch determinará automáticamente la asignación adecuada y agregará el nuevo campo a la asignación.
Con el mapeo dinámico en el índice deshabilitado, deberá usar la API de mapeo de actualización. Puede ver un ejemplo a continuación de cómo actualizar la asignación de índice agregando un campo de "categoría" al índice de películas.
PUT /movies/_mapping { "properties": { "category": { "type": "keyword" } } }
Si bien el código es simple, Elasticsearch internamente está haciendo mucho trabajo pesado para realizar estas actualizaciones porque los datos se almacenan en segmentos inmutables. Como resultado, Elasticsearch no puede simplemente realizar una actualización in situ de un documento. La única forma de realizar una actualización es volver a indexar todo el documento, independientemente de la API que se utilice.
Elasticsearch usa Apache Lucene bajo el capó. Un índice de Lucene se compone de uno o más segmentos. Un segmento es una estructura de índice autocontenida e inmutable que representa un subconjunto del índice general. Cuando se agregan o actualizan documentos, se crean nuevos segmentos de Lucene y los documentos más antiguos se marcan para su eliminación temporal. Con el tiempo, a medida que se agregan nuevos documentos o se actualizan los existentes, es posible que se acumulen varios segmentos. Para optimizar la estructura del índice, Lucene fusiona periódicamente segmentos más pequeños en otros más grandes.
Dado que cada operación de actualización es una operación de reindexación, todas las actualizaciones son esencialmente inserciones con eliminaciones temporales.
Hay implicaciones de costos por tratar una actualización como una operación de inserción. Por un lado, la eliminación temporal de datos significa que los datos antiguos aún se conservan durante algún tiempo, lo que infla el almacenamiento y la memoria del índice. Realizar operaciones de eliminación temporal, reindexación y recolección de basura también tiene un alto costo para la CPU, un costo que se agrava al repetir estas operaciones en todas las réplicas.
Las actualizaciones pueden volverse más complicadas a medida que su producto crece y sus datos cambian con el tiempo. Para mantener el rendimiento de Elasticsearch, deberá actualizar los fragmentos, analizadores y tokenizadores de su clúster, lo que requerirá una reindexación de todo el clúster. Para aplicaciones de producción, esto requerirá configurar un nuevo clúster y migrar todos los datos. La migración de clústeres requiere mucho tiempo y es propensa a errores, por lo que no es una operación que deba tomarse a la ligera.
La simplicidad de las operaciones de actualización en Elasticsearch puede enmascarar las pesadas tareas operativas que ocurren bajo el capó del sistema. Elasticsearch trata cada actualización como una inserción, lo que requiere que se vuelva a crear y reindexar el documento completo. Para aplicaciones con actualizaciones frecuentes, esto puede resultar costoso rápidamente, como vimos en el ejemplo de Netflix, donde se realizan millones de actualizaciones cada minuto. Recomendamos realizar actualizaciones por lotes utilizando Bulk API , que agrega latencia a su carga de trabajo, o buscar soluciones alternativas cuando se enfrenta a actualizaciones frecuentes en Elasticsearch.
Rockset, una base de datos de búsqueda y análisis construida en la nube, es una alternativa mutable a Elasticsearch. Al estar construido sobre RocksDB , un almacén de valores clave popularizado por su mutabilidad, Rockset puede realizar actualizaciones in situ de los documentos. Esto da como resultado que solo se actualice y reindexe el valor de los campos individuales en lugar de todo el documento.
Si desea comparar el rendimiento de Elasticsearch y Rockset para cargas de trabajo con muchas actualizaciones, puede iniciar una prueba gratuita de Rockset con $300 en créditos.