这篇文章是我和我的同事戴凯共同撰写的。我们都是腾讯音乐(纽约证券交易所代码:TME)的数据平台工程师,腾讯音乐是一家每月活跃用户高达 8 亿的音乐流媒体服务提供商。把这个数字放在这里并不是吹牛,而是暗示我和我可怜的同事每天都要处理的海量数据。
腾讯音乐的音乐库包含各种形式和类型的数据:录制的音乐、现场音乐、音频、视频等。作为数据平台工程师,我们的工作是从数据中提取信息,我们的队友可以根据这些信息做出更好的决策支持我们的用户和音乐合作伙伴。
具体来说,我们对歌曲、歌词、旋律、专辑、艺术家进行全方位的分析,将这些信息全部转化为数据资产,传递给我们内部的数据用户,用于库存盘点、用户画像、指标分析和群体定位。 .
我们在腾讯数据仓库 (TDW) 中存储和处理大部分数据,这是一个离线数据平台,我们将数据放入各种标签和指标系统,然后创建以每个对象(歌曲、艺术家等)为中心的平面表格。
然后我们将平面表导入 ClickHouse 进行分析,并导入 Elasticsearch 进行数据搜索和群组定位。
之后,我们的数据分析师将他们需要的tags和metrics下的数据,做成不同使用场景的数据集,期间可以创建自己的tags和metrics。
数据处理管道如下所示:
在使用上述管道时,我们遇到了一些困难:
部分更新:不支持列的部分更新。因此,任何一个数据源的任何延迟都可能延迟平面表的创建,从而破坏数据的及时性。
高存储成本:不同标签和指标下的数据以不同的频率更新。尽管 ClickHouse 在处理平面表方面表现出色,但仅将所有数据倒入一个平面表中并按天分区是对存储资源的巨大浪费,更不用说随之而来的维护成本。
维护成本高:从架构上讲,ClickHouse的特点是存储节点和计算节点的强耦合。它的组件高度相互依赖,增加了集群不稳定的风险。此外,对于跨 ClickHouse 和 Elasticsearch 的联合查询,我们必须处理大量的连接问题。那太乏味了。
Apache Doris是一个实时分析数据库,拥有一些正是我们解决问题所需的功能:
部分更新:Doris 支持多种数据模型,其中聚合模型支持列的实时部分更新。在此基础上,我们可以直接将原始数据提取到 Doris 中并在那里创建平面表。摄取过程是这样的:首先,我们使用 Spark 将数据加载到 Kafka 中;然后,任何增量数据都会通过 Flink 更新到 Doris 和 Elasticsearch。同时,Flink 会对数据进行预聚合,减轻 Doris 和 Elasticsearch 的负担。
存储成本:Doris 支持跨 Hive、Iceberg、Hudi、MySQL 和 Elasticsearch 的多表连接查询和联合查询。这允许我们将大型平面表拆分为较小的平面表,并按更新频率对它们进行分区。这样做的好处包括减轻存储负担和增加查询吞吐量。
维护成本:Doris架构简单,兼容MySQL协议。部署 Doris 仅涉及 FE 和 BE 两个进程,不依赖其他系统,易于运维。另外,Doris 支持查询外部 ES 数据表。它可以轻松地与 ES 中的元数据进行接口,并自动映射来自 ES 的表模式,因此我们可以通过 Doris 对 Elasticsearch 数据进行查询,而无需处理复杂的连接。
更重要的是,Doris 支持多种数据摄取方式,包括从 HDFS 和 S3 等远程存储批量导入,从 MySQL binlog 和 Kafka 读取数据,以及从 MySQL、Oracle 和 PostgreSQL 实时数据同步或批量导入。它通过一致性协议保证服务可用性和数据可靠性,并具有自动调试功能。这对我们的操作员和维护人员来说是个好消息。
据统计,这些功能使我们的存储成本降低了 42%,开发成本降低了 40%。
在我们使用 Doris 的过程中,得到了开源 Apache Doris 社区的大力支持和 SelectDB 团队的及时帮助,目前该团队正在运行 Apache Doris 的商业版本。
说到数据集,好的一面是,我们的数据分析师可以在方便时自由地重新定义和组合标签和指标。但不利的一面是,标签和度量系统的高度异质性导致它们的使用和管理更加困难。
我们的解决方案是在我们的数据处理管道中引入一个语义层。语义层是将所有技术术语翻译成我们内部数据用户更容易理解的概念的地方。换句话说,我们正在将标签和指标转变为数据定义和管理的一等公民。
为什么这会有帮助?
对于数据分析师,所有标签和指标都将在语义层创建和共享,因此会减少混乱并提高效率。
对于数据使用者而言,他们不再需要创建自己的数据集或确定每个场景适用的数据集,而是可以简单地对他们指定的标签集和指标集进行查询。
在语义层明确定义标签和指标是不够的。为了构建一个标准化的数据处理系统,我们的下一个目标是确保整个数据处理管道中标签和指标定义的一致性。
为此,我们将语义层作为数据管理系统的核心:
它是如何工作的?
TDW 中的所有计算逻辑都将以单个标签或度量的形式在语义层定义。
语义层接收应用端的逻辑查询,据此选择引擎,生成SQL。然后它将SQL命令发送给TDW执行。同时,它还可能向 Doris 发送配置和数据摄取任务,并决定应该加速哪些指标和标签。
通过这种方式,我们使标签和指标更易于管理。美中不足的是,由于每个标签和指标都是单独定义的,我们正在努力为查询自动生成有效的 SQL 语句。如果您对此有任何想法,我们非常欢迎您与我们交谈。
如您所见,Apache Doris 在我们的解决方案中起到了举足轻重的作用。优化 Doris 的使用,可以在很大程度上提升我们整体的数据处理效率。所以在这一部分,我们将与大家分享我们用 Doris 做了什么来加速数据摄取和查询并降低成本。
我们想要什么?
目前,我们有 800 多个标签和 1300 多个指标来自 TDW 中的 80 多个源表。从TDW导入数据到Doris时,我们希望实现:
实时可用性:除了传统的 T+1 离线数据摄取,我们还需要实时标记。
部分更新:每个源表通过自己的 ETL 任务以不同的速度生成数据,并且只涉及部分标签和指标,因此我们需要支持列的部分更新。
高性能:在群体定位、分析和报告场景中,我们只需要几秒钟的响应时间。
低成本:我们希望尽可能地降低成本。
在 Flink 中生成平面表而不是 TDW
高存储成本:TDW 必须在离散的 80 多个源表之外维护一个额外的平面表。这是巨大的冗余。
实时性低:源表中的任何延迟都会增加并延迟整个数据链路。
高开发成本:要实现实时性需要额外的开发工作和资源。
相反,在 Doris 中生成平面表要容易得多,成本也更低。过程如下:
这可以大大降低存储成本,因为 TDW 不再需要维护两个数据副本,而 KafKa 只需要存储等待摄取的新数据。更重要的是,我们可以将任何我们想要的 ETL 逻辑添加到 Flink 中,并重用大量开发逻辑来进行离线和实时数据摄取。
正如我们提到的,Doris 的聚合模型允许对列进行部分更新。这里我们对 Doris 中的其他数据模型做一个简单的介绍,供大家参考:
Unique Model :适用于需要主键唯一性的场景。它只保留相同主键 ID 的最新数据。 (据我们所知,Apache Doris 社区也计划在唯一模型中包含部分列更新。)
复制模型:该模型将所有原始数据完全原封不动地存储,没有任何预聚合或重复数据删除。
确定了数据模型后,我们不得不考虑如何命名列。使用标签或指标作为列名不是一种选择,因为:
一、我们内部数据用户可能需要重命名指标或标签,但 Doris 1.1.3 不支持修改列名。
二.标签可能会频繁地在线和离线。如果涉及到列的增删改查,不仅耗时,而且不利于查询性能。相反,我们执行以下操作:
为了灵活地重命名标签和指标,我们使用 MySQL 表来存储元数据(名称、全局唯一 ID、状态等)。名称的任何更改只会发生在元数据中,而不会影响 Doris 中的表架构。比如给一个song_name
取一个ID为4,那么在Doris中会以a4的列名存储。那么如果song_name
参与了query,在SQL中会转为a4。
对于标签的上线和下线,我们按照标签的使用频率进行分类。最少使用的将在其元数据中给出离线标记。不会将新数据放在离线标签下,但这些标签下的现有数据仍然可用。
为了实时获取新添加的标签和指标,我们根据名称 ID 的映射,在 Doris 表中预建了一些 ID 列。这些保留的 ID 列将分配给新添加的标签和指标。因此,我们可以避免表架构更改和随之而来的开销。我们的经验表明,标签和指标添加后10分钟,其下的数据就可用了。
值得一提的是,最近发布的 Doris 1.2.0 支持 Light Schema Change,这意味着添加或删除列,只需要修改 FE 中的元数据即可。此外,只要为表启用了 Light Schema Change,就可以重命名数据表中的列。这对我们来说是一个很大的麻烦。
以下是一些实践,它们将我们每天的离线数据摄取时间减少了 75%,并将我们的 CUMU 压缩分数从 600+ 减少到 100。
Flink pre-aggregation:上面说了。
写batch自动调整大小:为了减少Flink的资源占用,我们将一个Kafka Topic中的数据写到不同的Doris表中,实现batch size根据数据量自动调整。
Doris 数据写入优化:微调 tablets 和 bucket 的大小以及每个场景的 compaction 参数:
max_XXXX_compaction_thread max_cumulative_compaction_num_singleton_deltas
BE提交逻辑优化:定期缓存BE列表,批量提交到BE节点,负载均衡粒度更细。
在查询中使用 Dori-on-ES
我们大约 60% 的数据查询涉及群体定位。 Group Targeting就是通过一组标签作为过滤器来找到我们的目标数据。它对我们的数据处理架构提出了一些要求:
与APP用户相关的群体定位可能涉及非常复杂的逻辑。这意味着系统必须同时支持数百个标签作为过滤器。
大多数组目标场景只需要最新的标记数据。但是,指标查询需要支持历史数据。
数据用户可能需要在群体定位后对指标数据进行进一步的聚合分析。
数据用户可能还需要在群体定位后对标签和指标进行详细查询。
经过考虑,我们决定采用Doris-on-ES。 Doris 是我们将每个场景的 metric 数据存储为分区表,而 Elasticsearch 存储所有标签数据。 Doris-on-ES 方案结合了 Doris 的分布式查询规划能力和 Elasticsearch 的全文搜索能力。查询模式如下:
SELECT tag, agg(metric) FROM Doris WHERE id in (select id from Es where tagFilter) GROUP BY tag
如图所示,位于Elasticsearch中的ID数据将用于Doris中的子查询中进行指标分析。在实践中,我们发现查询响应时间与目标群体的大小有关。如果目标组包含超过一百万个对象,则查询最多需要 60 秒。如果它更大,可能会发生超时错误。经过调查,我们确定了两个最大的时间浪费:
I. Doris BE 在从 Elasticsearch 中拉取数据时(默认一次拉取 1024 行),对于超过一百万个对象的目标组,网络 I/O 开销可能非常大。
二。数据拉取后,Doris BE 需要通过 SHUFFLE/BROADCAST 与本地的 metric 表进行 Join 操作,开销较大。
因此,我们进行以下优化:
添加指定是否启用优化的查询会话变量es_optimize
。
在写入ES的数据中,增加一个BK列,用于存储主键ID哈希后的桶号。该算法与 Doris 中的分桶算法相同(CRC32)。
使用Doris BE生成Bucket Join执行计划,将桶号派发给BE ScanNode,下推给ES。
使用ES对查询到的数据进行压缩;将多个数据获取合并为一个并减少网络 I/O 开销。
确保 Doris BE 只拉取本地 metric 表相关的 bucket 数据,直接进行本地 Join 操作,避免 Doris BE 之间的数据混洗。
因此,我们将针对大型群体定位的查询响应时间从 60 秒减少到惊人的 3.7 秒。社区信息显示,Doris 从 2.0.0 版本开始支持倒排索引,即将发布。有了这个新版本,我们将可以对文本类型进行全文搜索,对文本、数字、日期时间进行等值或范围过滤,并且由于倒排索引支持数组类型,可以方便地结合AND、OR、NOT逻辑进行过滤。 Doris 的这一新特性有望在相同任务上提供比 Elasticsearch 高 3~5 倍的性能。
Doris 的冷热数据分离能力为我们在数据处理方面的成本降低策略提供了基础。
基于 Doris 的 TTL 机制,我们只在 Doris 中存储当年的数据,将之前的历史数据放在 TDW 中,以降低存储成本。
我们改变不同数据分区的副本数。比如最近三个月的数据,经常用到的,我们设置三份,六个月以上的数据一份,中间的数据两份。
Doris 支持将热数据转换为冷数据,因此我们只将过去 7 天的数据存储在 SSD 中,并将比这更早的数据转移到 HDD 以降低存储成本。
感谢您一直向下滚动到这里并完成这篇冗长的阅读。在我们从 ClickHouse 过渡到 Doris 的过程中,我们分享了我们的欢呼和泪水、经验教训以及一些可能对您有一定价值的实践。我们非常感谢 Apache Doris 社区和 SelectDB 团队的帮助,但由于我们试图实现冷热数据的自动识别,常用标签/指标的预计算,我们可能还要追他们一段时间,使用物化视图简化代码逻辑,等等。