对于许多团队来说,将源系统(如 PostgreSQL、MongoDB 或 DynamoDB)中的流数据管理到下游系统以进行实时搜索和分析是一项挑战。数据流通常涉及复杂的 ETL 工具以及自我管理集成,以确保大量写入(包括更新和删除)不会占用 CPU 或影响最终应用程序的性能。
对于像Elasticsearch这样的系统,工程师需要深入了解底层架构,以便高效地提取流数据。Elasticsearch 是为数据不经常变化的日志分析而设计的,这在处理事务数据时带来了额外的挑战。
另一方面,Rockset 是一个云原生数据库,它省去了将数据导入系统所需的大量工具和开销。由于 Rockset 专为实时搜索和分析而设计,因此它还针对字段级可变性进行了设计,从而减少了处理插入、更新和删除所需的 CPU。
在这篇博客中,我们将比较和对比Elasticsearch 和 Rockset如何处理数据提取,并提供使用这些系统进行实时分析的实用技术。
虽然将数据导入 Elasticsearch 的方法有很多,但我们只介绍三种常用的实时搜索和分析方法:
使用 Logstash JDBC 输入插件将关系数据库中的数据导入 Elasticsearch
使用 Kafka Elasticsearch Service Sink Connector 将数据从 Kafka 导入 Elasticsearch
使用 REST API 和客户端库将数据直接从应用程序提取到 Elasticsearch
使用 Logstash JDBC 输入插件将数据从关系数据库导入 Elasticsearch。Logstash JDBC 输入插件可用于将数据从关系数据库(如 PostgreSQL 或 MySQL)卸载到 Elasticsearch 进行搜索和分析。
Logstash 是一个事件处理管道,它会提取和转换数据,然后再将其发送到 Elasticsearch。Logstash 提供了一个JDBC 输入插件,它会定期轮询关系数据库(如 PostgreSQL 或 MySQL)以进行插入和更新。要使用此服务,您的关系数据库需要提供带时间戳的记录,Logstash 可以读取这些记录以确定发生了哪些更改。
这种采集方法对于插入和更新非常有效,但对于删除则需要额外的考虑。这是因为 Logstash 无法确定 OLTP 数据库中删除了什么。用户可以通过实施软删除来解决此限制,即对已删除的记录应用一个标志,并在查询时使用该标志过滤数据。或者,他们可以定期扫描关系数据库以访问最新的记录并在 Elasticsearch 中重新索引数据。
使用Kafka Elasticsearch Sink Connector将数据从 Kafka 导入 Elasticsearch 。使用像 Kafka 这样的事件流平台将数据从源系统发送到 Elasticsearch 进行实时搜索和分析也很常见。
Confluent 和 Elastic 合作发布了Kafka Elasticsearch Service Sink Connector ,可供同时使用托管 Confluent Kafka 和 Elastic Elasticsearch 产品的公司使用。该连接器确实需要安装和管理额外的工具 Kafka Connect。
使用连接器,您可以将 Kafka 中的每个主题映射到 Elasticsearch 中的单个索引类型。如果使用动态类型作为索引类型,则 Elasticsearch 支持一些架构更改,例如添加字段、删除字段和更改类型。
使用 Kafka 时确实会出现的一个挑战是,当您想要修改分析器、标记器或索引字段时,需要重新索引 Elasticsearch 中的数据。这是因为映射一旦定义就无法更改。要对数据执行重新索引,您需要对原始索引和新索引进行双重写入,将数据从原始索引移动到新索引,然后停止原始连接器作业。
如果您不使用 Confluent 或 Elastic 的托管服务,则可以使用 Logstash 的开源 Kafka 插件将数据发送到 Elasticsearch。
使用 REST API 和客户端库将数据直接从应用程序提取到 Elasticsearch Elasticsearch 提供了使用受支持的客户端库(包括 Java、Javascript、Ruby、Go、Python 等)的功能,可以通过 REST API 直接从应用程序提取数据。使用客户端库的挑战之一是,必须将其配置为在 Elasticsearch 无法处理提取负载的情况下使用排队和背压。如果没有排队系统,Elasticsearch 可能会丢失数据。
Elasticsearch 有一个Update API ,可用于处理更新和删除。Update API 减少了网络通信次数和版本冲突的可能性。Update API 从索引中检索现有文档,处理更改,然后再次索引数据。也就是说,Elasticsearch 不提供就地更新或删除。因此,仍然必须重新索引整个文档,这是一项 CPU 密集型操作。
在底层,Elasticsearch 数据存储在 Lucene 索引中,该索引被分解为更小的段。每个段都是不可变的,因此文档无法更改。进行更新时,旧文档将被标记为删除,新文档将合并以形成新段。为了使用更新的文档,需要运行所有分析器,这也会增加 CPU 使用率。对于数据不断变化的客户来说,索引合并通常会占用其整体 Elasticsearch 计算费用的相当一部分。
考虑到所需的资源量,Elastic 建议限制 Elasticsearch 中的更新次数。Elasticsearch 的参考客户Bol.com使用 Elasticsearch 作为其电子商务平台的一部分进行站点搜索。Bol.com 每天对其产品进行大约 70 万次更新,包括内容、定价和可用性更改。他们最初想要一种能够与发生的任何更改保持同步的解决方案。但是,考虑到更新对 Elasticsearch 系统性能的影响,他们选择允许 15-20 分钟的延迟。将文档批量放入 Elasticsearch 可确保一致的查询性能。
在 Elasticsearch 中,可能会存在与删除旧文档和回收空间相关的挑战。
当索引中有大量段或段中有大量标记为删除的文档时,Elasticsearch 会在后台完成段合并。段合并是指将文档从现有段复制到新形成的段,并删除剩余的段。不幸的是,Lucene 不擅长确定需要合并的段的大小,可能会创建不均匀的段,从而影响性能和稳定性。
这是因为 Elasticsearch 假设所有文档的大小都是统一的,并根据删除的文档数量做出合并决策。在处理异构文档大小时(多租户应用程序中经常出现这种情况),某些段的大小增长速度会比其他段快,从而降低应用程序上最大客户的性能。在这些情况下,唯一的补救措施是重新索引大量数据。
Elasticsearch 使用主备份模型进行复制。主副本处理传入的写入操作,然后将操作转发到其副本。每个副本接收此操作并再次在本地重新索引数据。这意味着每个副本都独立花费昂贵的计算资源来一遍又一遍地重新索引同一文档。如果有 n 个副本,Elastic 将花费 n 倍的 CPU 来索引同一文档。这可能会加剧更新或插入时需要重新索引的数据量。
虽然您可以在 Elasticsearch 中使用 Update API,但通常建议使用Bulk API批量处理频繁更改。使用 Bulk API 时,工程团队通常需要创建和管理队列以简化系统中的更新。
队列独立于 Elasticsearch,需要进行配置和管理。队列将在特定时间间隔(例如 15 分钟)内整合对系统的插入、更新和删除,以限制对 Elasticsearch 的影响。当插入率较高时,队列系统还将应用节流以确保应用程序稳定性。虽然队列对更新很有帮助,但它们不擅长确定何时有大量数据更改需要对数据进行完全重新索引。如果系统有大量更新,则可能随时发生这种情况。大规模运行 Elastic 的团队通常会让专门的运营成员每天管理和调整他们的队列。
如上一节所述,当有大量更新或需要更改索引映射时,就会发生数据重新索引。 重新索引很容易出错,并且有可能导致集群崩溃。更可怕的是,重新索引可能随时发生。
如果您确实想更改映射,则可以更好地控制重新索引发生的时间。Elasticsearch 有一个用于创建新索引的重新索引 API 和一个用于确保在创建新索引时不会停机的别名 API 。使用别名 API,在创建新索引时,查询将被路由到别名或旧索引。当新索引准备就绪时,别名 API 将转换为从新索引读取数据。
使用别名 API,保持新索引与最新数据同步仍然很棘手。这是因为 Elasticsearch 只能将数据写入一个索引。因此,您需要配置上游数据管道以双重写入新索引和旧索引。
Rockset 使用内置连接器来保持数据与源系统同步。Rockset 的托管连接器针对每种类型的数据源进行了调整,因此可以在 2 秒内提取数据并使其可查询。这避免了手动管道增加延迟或只能以微批次(例如每 15 分钟)提取数据。
从高层次来看,Rockset 为 OLTP 数据库、数据流、数据湖和仓库提供了内置连接器。它们的工作原理如下:
OLTP 数据库的内置连接器Rockset 对 OLTP 数据库中的表进行初始扫描,然后使用CDC 流与最新数据保持同步,数据在源系统生成后 2 秒内即可供查询。
内置数据流连接器借助Kafka或 Kinesis 等数据流,Rockset 使用基于拉取的集成持续提取任何新主题,无需在 Kafka 或 Kinesis 中进行调整。
内置数据湖和仓库连接器Rockset 不断监控更新并从数据湖(如 S3 存储桶)中提取任何新对象。我们通常发现团队希望将实时流与数据湖中的数据连接起来以进行实时分析。
Rockset 具有经过优化的分布式架构,可以在多台机器上高效地并行索引数据。
Rockset 是一个文档分片数据库,因此它将整个文档写入一台机器,而不是将其拆分并将不同的字段发送到不同的机器。因此,可以快速添加新文档进行插入或定位现有文档,并根据主键 _id 进行更新和删除。
与 Elasticsearch 类似,Rockset 使用索引在查询数据时快速高效地检索数据。但与其他数据库或搜索引擎不同的是,Rockset 在摄取时将数据索引到融合索引中,该索引结合了列存储、搜索索引和行存储。融合索引将字段中的所有值存储为一系列键值对。在下面的示例中,您可以看到一个文档以及它如何存储在 Rockset 中。
在底层, Rockset 使用 RocksDB ,这是一种高性能的键值存储,可使更改变得轻而易举。RocksDB 支持跨不同键的原子写入和删除。如果对文档的name
字段进行更新,则需要更新恰好 3 个键,每个索引一个。文档中其他字段的索引不受影响,这意味着 Rockset 可以高效地处理更新,而不是每次都浪费周期更新整个文档的索引。
嵌套文档和数组也是 Rockset 中的一流数据类型,这意味着相同的更新过程也适用于它们,这使得 Rockset 非常适合更新以 JSON 和 Avro 等现代格式存储的数据。
Rockset 团队还为 RocksDB 构建了多个自定义扩展,以处理高写入和高读取量,这是实时分析工作负载中的常见模式。其中一个扩展是远程压缩,它将查询计算和索引计算完全分离到 RocksDB Cloud。这使 Rockset 能够避免写入干扰读取。由于这些增强功能,Rockset 可以根据客户的需求扩展其写入,并在后台发生突变时提供新数据以供查询。
Rockset 的用户可以使用默认的 _id 字段,也可以指定特定字段作为主键。此字段允许覆盖文档或文档的一部分。Rockset 和 Elasticsearch 之间的区别在于,Rockset 可以更新单个字段的值,而无需重新索引整个文档。
要使用 Rockset API 更新集合中的现有文档,您可以向 Patch Documents 端点发出请求。对于要更新的每个现有文档,只需指定 _id 字段和要应用于文档的修补操作列表。
Rockset API 还公开了一个添加文档端点,以便您可以从应用程序代码中将数据直接插入到集合中。要删除现有文档,只需指定要删除的文档的 _id 字段,然后向 Rockset API 的删除文档端点发出请求。
与 Elasticsearch 不同,Rockset 中只有一个副本使用 RocksDB 远程压缩进行索引和压缩。这减少了索引所需的 CPU 数量,尤其是在使用多个副本实现持久性时。
在 Rockset 中采集数据时,您可以使用采集转换来指定要应用于原始源数据的所需数据转换。如果您希望在以后更改采集转换,则需要重新索引数据。
尽管如此,Rockset 支持无模式提取并动态输入每个数据字段的值。如果数据或查询的大小和形状发生变化,Rockset 将继续保持高性能,并且不需要重新索引数据。
Rockset 可以扩展到数百 TB 的数据,而无需重新编制索引。这可以追溯到 Rockset 的分片策略。当客户在其虚拟实例中分配的计算量增加时,分片的子集将被打乱,以在集群中实现更好的分布,从而实现更并行、更快的索引和查询执行。因此,在这些情况下不需要重新编制索引。
Elasticsearch 专为不经常更新、插入或删除数据的日志分析而设计。随着时间的推移,团队扩大了对 Elasticsearch 的使用范围,通常将 Elasticsearch 用作辅助数据存储和索引引擎,以实时分析不断变化的事务数据。这可能是一项昂贵的工作,尤其是对于优化实时数据提取的团队而言,并且涉及大量的管理开销。
而 Rockset 则专为实时分析而设计,旨在使新数据在生成后 2 秒内可供查询。为了解决这一用例,Rockset 支持就地插入、更新和删除,从而节省计算资源并限制对文档进行重新索引的使用。Rockset 还认识到连接器和提取的管理开销,并采用平台方法,将实时连接器整合到其云产品中。
总体而言,我们发现从 Elasticsearch 迁移到 Rockset进行实时分析的公司仅在计算费用上就节省了 44%。加入在几天内从 Elasticsearch 转向 Rockset 的工程团队浪潮吧。立即开始免费试用。