Управление потоковой передачей данных из исходной системы, такой как PostgreSQL, MongoDB или DynamoDB, в нижестоящую систему для поиска и анализа в реальном времени является проблемой для многих команд. Поток данных часто включает в себя сложные инструменты ETL, а также самоуправляемую интеграцию, чтобы гарантировать, что большие объемы записи, включая обновления и удаления, не нагружают ЦП и не влияют на производительность конечного приложения.
Для такой системы, как Elasticsearch , инженеры должны иметь глубокие знания базовой архитектуры, чтобы эффективно принимать потоковые данные . Elasticsearch был разработан для анализа журналов, где данные меняются нечасто, что создает дополнительные проблемы при работе с транзакционными данными.
Rockset, с другой стороны, представляет собой облачную базу данных, устраняющую множество инструментов и накладных расходов, необходимых для загрузки данных в систему. Поскольку Rockset специально создан для поиска и аналитики в реальном времени, он также был разработан для обеспечения возможности изменения на уровне полей , что позволяет снизить нагрузку на ЦП, необходимую для обработки вставок, обновлений и удалений.
В этом блоге мы сравним и противопоставим, как Elasticsearch и Rockset обрабатывают прием данных, а также предоставим практические методы использования этих систем для аналитики в реальном времени.
Хотя существует множество способов загрузки данных в Elasticsearch, мы рассмотрим три распространенных метода поиска и аналитики в реальном времени:
Принимайте данные из реляционной базы данных в Elasticsearch с помощью плагина ввода Logstash JDBC.
Прием данных из Kafka в Elasticsearch с помощью соединителя приемника службы Kafka Elasticsearch.
Принимайте данные непосредственно из приложения в Elasticsearch с помощью REST API и клиентских библиотек.
Принимайте данные из реляционной базы данных в Elasticsearch с помощью плагина ввода Logstash JDBC. Плагин ввода Logstash JDBC можно использовать для выгрузки данных из реляционной базы данных, такой как PostgreSQL или MySQL, в Elasticsearch для поиска и аналитики.
Logstash — это конвейер обработки событий, который принимает и преобразует данные перед отправкой их в Elasticsearch. Logstash предлагает плагин ввода JDBC , который периодически опрашивает реляционную базу данных, такую как PostgreSQL или MySQL, на предмет вставок и обновлений. Чтобы использовать эту службу, ваша реляционная база данных должна предоставлять записи с отметками времени, которые Logstash может прочитать, чтобы определить, какие изменения произошли.
Этот подход к приему хорошо работает для вставок и обновлений, но при удалении необходимы дополнительные соображения. Это связано с тем, что Logstash не может определить, что было удалено в вашей базе данных OLTP. Пользователи могут обойти это ограничение, реализовав обратимое удаление, при котором к удаленной записи применяется флаг, который используется для фильтрации данных во время запроса. Или они могут периодически сканировать свою реляционную базу данных, чтобы получить доступ к самым последним записям и переиндексировать данные в Elasticsearch.
Принимайте данные из Kafka в Elasticsearch с помощью Kafka Elasticsearch Sink Connector . Также часто используется платформа потоковой передачи событий, такая как Kafka, для отправки данных из исходных систем в Elasticsearch для поиска и анализа в реальном времени.
Confluent и Elastic совместно выпустили Kafka Elasticsearch Service Sink Connector , доступный компаниям, использующим как управляемые предложения Confluent Kafka, так и Elastic Elasticsearch. Соединитель требует установки и управления дополнительным инструментом Kafka Connect.
Используя соединитель, вы можете сопоставить каждую тему в Kafka с одним типом индекса в Elasticsearch. Если в качестве типа индекса используется динамическая типизация, Elasticsearch поддерживает некоторые изменения схемы, такие как добавление полей, удаление полей и изменение типов.
Одна из проблем, которая возникает при использовании Kafka, — это необходимость переиндексации данных в Elasticsearch, когда вы хотите изменить анализатор, токенизатор или индексированные поля. Это связано с тем, что сопоставление нельзя изменить, если оно уже определено. Чтобы выполнить переиндексацию данных, вам потребуется дважды записать исходный индекс и новый индекс, переместить данные из исходного индекса в новый индекс, а затем остановить исходное задание соединителя.
Если вы не используете управляемые сервисы от Confluent или Elastic, вы можете использовать плагин Kafka с открытым исходным кодом для Logstash для отправки данных в Elasticsearch.
Принимайте данные непосредственно из приложения в Elasticsearch с помощью REST API и клиентских библиотек. Elasticsearch предлагает возможность использовать поддерживаемые клиентские библиотеки, включая Java, Javascript, Ruby, Go, Python и другие, для приема данных через REST API непосредственно из вашего приложения. Одна из проблем при использовании клиентской библиотеки заключается в том, что ее необходимо настроить для работы с очередями и противодавлением в случае, когда Elasticsearch не может справиться с нагрузкой приема. Без системы очередей существует вероятность потери данных в Elasticsearch.
Elasticsearch имеет API обновлений , который можно использовать для обработки обновлений и удалений. API обновления уменьшает количество сетевых отключений и вероятность конфликтов версий. API обновления извлекает существующий документ из индекса, обрабатывает изменения, а затем снова индексирует данные. Тем не менее, Elasticsearch не предлагает обновления или удаления на месте. Таким образом, весь документ по-прежнему необходимо переиндексировать, а это ресурсоемкая операция.
Под капотом данные Elasticsearch хранятся в индексе Lucene, и этот индекс разбивается на более мелкие сегменты. Каждый сегмент является неизменяемым, поэтому документы не могут быть изменены. При обновлении старый документ помечается для удаления, а новый документ объединяется в новый сегмент. Чтобы использовать обновленный документ, необходимо запустить все анализаторы, что также может увеличить загрузку ЦП. Клиенты, работающие с постоянно меняющимися данными, часто видят, что слияния индексов съедают значительную часть их общих счетов за вычисления Elasticsearch.
Учитывая количество необходимых ресурсов, Elastic рекомендует ограничить количество обновлений Elasticsearch. Показательный клиент Elasticsearch, Bol.com , использовал Elasticsearch для поиска по сайту как часть своей платформы электронной коммерции. Bol.com ежедневно делал около 700 тысяч обновлений своих предложений, включая изменения в контенте, ценах и доступности. Изначально им хотелось найти решение, которое синхронизировалось бы с любыми изменениями по мере их возникновения. Но, учитывая влияние обновлений на производительность системы Elasticsearch, они решили допустить 15-20-минутные задержки. Пакетная обработка документов в Elasticsearch обеспечивала стабильную производительность запросов.
В Elasticsearch могут возникнуть проблемы, связанные с удалением старых документов и освобождением места.
Elasticsearch выполняет объединение сегментов в фоновом режиме, если в индексе имеется большое количество сегментов или в сегменте много документов, помеченных для удаления. Объединение сегментов — это копирование документов из существующих сегментов во вновь сформированный сегмент, а оставшиеся сегменты удаляются. К сожалению, Lucene не умеет определять размеры сегментов, которые необходимо объединить, что может привести к созданию неравномерных сегментов, которые повлияют на производительность и стабильность.
Это связано с тем, что Elasticsearch предполагает, что все документы имеют одинаковый размер, и принимает решения об объединении на основе количества удаленных документов. При работе с документами разного размера, как это часто бывает в мультитенантных приложениях, размер некоторых сегментов будет расти быстрее, чем других, что снижает производительность крупнейших клиентов приложения. В таких случаях единственным выходом является переиндексация большого объема данных.
Elasticsearch использует для репликации модель основного резервного копирования . Первичная реплика обрабатывает входящую операцию записи, а затем пересылает ее своим репликам. Каждая реплика получает эту операцию и повторно индексирует данные локально. Это означает, что каждая реплика независимо друг от друга тратит дорогостоящие вычислительные ресурсы на повторную индексацию одного и того же документа снова и снова. Если имеется n реплик, Elastic потратит n раз больше ресурсов процессора для индексации одного и того же документа. Это может увеличить объем данных, которые необходимо переиндексировать при обновлении или вставке.
Хотя вы можете использовать API обновления в Elasticsearch, обычно рекомендуется группировать частые изменения с помощью Bulk API . При использовании Bulk API инженерным группам часто приходится создавать очередь и управлять ею, чтобы оптимизировать обновления в системе.
Очередь не зависит от Elasticsearch, ее необходимо настраивать и управлять ею. Очередь будет объединять вставки, обновления и удаления в системе в течение определенного интервала времени, скажем, 15 минут, чтобы ограничить влияние на Elasticsearch. Система организации очередей также применяет регулирование при высокой скорости вставки, чтобы обеспечить стабильность приложения. Хотя очереди полезны для обновлений, они не подходят для определения большого количества изменений данных, требующих полного переиндексирования данных. Это может произойти в любой момент, если в системе много обновлений. В командах, использующих Elastic в больших масштабах, обычно есть выделенные сотрудники, которые ежедневно управляют и настраивают свои очереди.
Как упоминалось в предыдущем разделе, когда имеется множество обновлений или вам нужно изменить сопоставления индексов, происходит переиндексация данных. Переиндексация подвержена ошибкам и может привести к выходу из строя кластера. Что еще более страшно, так это то, что переиндексация может произойти в любой момент.
Если вы хотите изменить сопоставления, у вас будет больше контроля над временем, в течение которого происходит переиндексация. Elasticsearch имеет API переиндексации для создания нового индекса и API псевдонимов, чтобы гарантировать отсутствие простоев при создании нового индекса. При использовании API псевдонимов запросы перенаправляются к псевдониму или старому индексу по мере создания нового индекса. Когда новый индекс будет готов, API псевдонимов преобразует данные для чтения из нового индекса.
С API псевдонимов по-прежнему сложно синхронизировать новый индекс с последними данными. Это связано с тем, что Elasticsearch может записывать данные только в один индекс. Итак, вам нужно будет настроить восходящий конвейер данных для двойной записи в новый и старый индекс.
Rockset использует встроенные коннекторы для синхронизации ваших данных с исходными системами. Управляемые соединители Rockset настроены для каждого типа источника данных, поэтому данные можно получить и сделать запросами в течение 2 секунд. Это позволяет избежать ручных конвейеров, которые увеличивают задержку или могут принимать данные только микропакетами, скажем, каждые 15 минут.
На высоком уровне Rockset предлагает встроенные соединители для баз данных OLTP, потоков данных, озер и хранилищ данных. Вот как они работают:
Встроенные соединители с базами данных OLTP Rockset выполняет первоначальное сканирование ваших таблиц в вашей базе данных OLTP, а затем использует потоки CDC для синхронизации с последними данными, при этом данные становятся доступными для запроса в течение 2 секунд с момента их создания исходная система.
Встроенные соединители для потоков данных. Благодаря таким потокам данных, как Kafka или Kinesis, Rockset постоянно принимает любые новые темы, используя интеграцию на основе извлечения, которая не требует настройки в Kafka или Kinesis.
Встроенные соединители с озерами данных и хранилищами Rockset постоянно отслеживает обновления и принимает любые новые объекты из озер данных, таких как корзины S3. Обычно мы обнаруживаем, что команды хотят объединить потоки данных в реальном времени с данными из своих озер данных для аналитики в реальном времени.
Rockset имеет распределенную архитектуру, оптимизированную для эффективного индексирования данных параллельно на нескольких машинах.
Rockset — это база данных с сегментированием документов , поэтому она записывает целые документы на одну машину, а не разделяет их на части и отправляет разные поля на разные машины. Благодаря этому можно быстро добавлять новые документы для вставки или находить существующие документы на основе первичного ключа _id для обновлений и удалений.
Подобно Elasticsearch, Rockset использует индексы для быстрого и эффективного получения данных при их запросе. Однако, в отличие от других баз данных или поисковых систем, Rockset индексирует данные во время приема в конвергентном индексе , индексе, который сочетает в себе хранилище столбцов, индекс поиска и хранилище строк. Конвергентный индекс хранит все значения полей в виде серии пар ключ-значение. В примере ниже вы можете увидеть документ и то, как он хранится в Rockset.
Под капотом Rockset использует RocksDB , высокопроизводительное хранилище значений ключей, которое делает мутации тривиальными. RocksDB поддерживает атомарную запись и удаление по разным ключам. Если поступает обновление поля name
документа, необходимо обновить ровно 3 ключа, по одному на каждый индекс. Индексы для других полей в документе не затрагиваются, а это означает, что Rockset может эффективно обрабатывать обновления вместо того, чтобы каждый раз тратить циклы на обновление индексов для всего документа.
Вложенные документы и массивы также являются первоклассными типами данных в Rockset, то есть к ним применим тот же процесс обновления, что делает Rockset хорошо подходящим для обновлений данных, хранящихся в современных форматах, таких как JSON и Avro.
Команда Rockset также создала несколько пользовательских расширений для RocksDB для обработки большого количества операций записи и чтения, что является обычным явлением в рабочих нагрузках аналитики в реальном времени. Одним из таких расширений является удаленное сжатие , которое обеспечивает четкое разделение вычислений запросов и вычислений индексации в RocksDB Cloud. Это позволяет Rockset избежать вмешательства записи в чтение. Благодаря этим улучшениям Rockset может масштабировать операции записи в соответствии с потребностями клиентов и предоставлять свежие данные для запросов, даже если в фоновом режиме происходят мутации.
Пользователи Rockset могут использовать поле _id по умолчанию или указать конкретное поле в качестве первичного ключа. Это поле позволяет перезаписать документ или его часть. Разница между Rockset и Elasticsearch заключается в том, что Rockset может обновлять значение отдельного поля, не требуя переиндексации всего документа.
Чтобы обновить существующие документы в коллекции с помощью API Rockset, вы можете отправлять запросы к конечной точке Patch Documents. Для каждого существующего документа, который вы хотите обновить, вы просто указываете поле _id и список операций исправления, которые будут применены к документу.
API Rockset также предоставляет конечную точку «Добавить документы», чтобы вы могли вставлять данные непосредственно в свои коллекции из кода приложения. Чтобы удалить существующие документы, просто укажите поля _id документов, которые вы хотите удалить, и сделайте запрос к конечной точке «Удалить документы» Rockset API.
В отличие от Elasticsearch, только одна реплика в Rockset выполняет индексацию и сжатие с использованием удаленного сжатия RocksDB. Это уменьшает количество ресурсов ЦП, необходимых для индексации, особенно когда для обеспечения надежности используются несколько реплик.
Во время приема в Rockset вы можете использовать преобразование приема, чтобы указать желаемые преобразования данных, которые будут применяться к необработанным исходным данным. Если позже вы захотите изменить преобразование приема, вам потребуется переиндексировать данные.
Тем не менее, Rockset обеспечиваетбессхемный прием и динамически типизирует значения каждого поля данных. Если размер и форма данных или запросов изменятся, Rockset продолжит работать и не потребует переиндексации данных.
Rockset может масштабироваться до сотен терабайт данных без необходимости переиндексации. Это восходит к стратегии шардинга Rockset. Когда объем вычислений, которые клиент выделяет в своем виртуальном экземпляре, увеличивается, подмножество сегментов перемешивается для достижения лучшего распределения по кластеру, что обеспечивает более распараллеленную, более быструю индексацию и выполнение запросов. В результате в этих сценариях переиндексация не требуется.
Elasticsearch был разработан для анализа журналов, где данные не часто обновляются, вставляются или удаляются. Со временем команды расширили использование Elasticsearch, часто используя Elasticsearch в качестве вторичного хранилища данных и механизма индексирования для анализа в реальном времени постоянно меняющихся транзакционных данных. Это может оказаться дорогостоящим мероприятием, особенно для команд, оптимизирующих прием данных в режиме реального времени, а также повлечь за собой значительные затраты на управление.
Rockset, с другой стороны, был разработан для анализа в реальном времени и предоставления новых данных для запроса в течение 2 секунд с момента их создания. Чтобы решить эту проблему, Rockset поддерживает вставку, обновление и удаление на месте, экономя вычислительные ресурсы и ограничивая использование переиндексации документов. Rockset также осознает затраты на управление коннекторами и приемом данных и использует платформенный подход, включая коннекторы реального времени в свои облачные предложения.
В целом мы видели, как компании, которые переходят с Elasticsearch на Rockset для аналитики в реальном времени, экономят 44% только на счетах за вычисления. Присоединяйтесь к волне инженерных команд, перешедших с Elasticsearch на Rockset за считанные дни. Начните бесплатную пробную версию сегодня.