这篇文章是我和我的同事戴凯共同撰写的。我们都是腾讯音乐(NYSE:TME)的数据平台工程师,腾讯音乐是一家音乐流媒体服务提供商,月活跃用户高达8亿。把这个数字放在这里并不是为了吹嘘,而是为了暗示我和我可怜的同事每天都要处理的海量数据。
我们使用ClickHouse的目的
腾讯音乐的音乐库包含各种形式和类型的数据:录音音乐、现场音乐、音频、视频等。作为数据平台工程师,我们的工作是从数据中提取信息,在此基础上,我们的队友可以做出更好的决策,支持我们的用户和音乐合作伙伴。
具体而言,我们对歌曲、歌词、旋律、专辑和艺术家进行全方位分析,将所有这些信息转化为数据资产,并将其传递给我们的内部数据用户,以进行库存盘点、用户分析、指标分析和群体定位。
我们将大部分数据存储和处理在腾讯数据仓库(TDW)中,这是一个离线数据平台,我们将数据放入各种标签和度量系统中,然后创建以每个对象(歌曲、艺术家等)为中心的平面表。
然后,我们将平面表导入ClickHouse进行分析,并将Elasticsearch导入数据搜索和组定位。
之后,我们的数据分析师使用他们需要的标签和指标下的数据来形成不同使用场景的数据集,在此期间,他们可以创建自己的标签和标准。
数据处理管道如下所示:
ClickHouse的问题
在使用上述管道时,我们遇到了一些困难:
- 部分更新:不支持对列进行部分更新。因此,任何一个数据源的任何延迟都可能延迟平面表的创建,从而破坏数据的及时性。
- 高存储成本:不同标签和指标下的数据以不同的频率更新。尽管ClickHouse在处理平表方面表现出色,但将所有数据倒入平表并按天进行分区是对存储资源的巨大浪费,更不用说随之而来的维护成本了。
- 高维护成本:从架构上讲,ClickHouse的特点是存储节点和计算节点的强耦合。其组成部分相互依存程度很高,增加了集群不稳定的风险。此外,对于ClickHouse和Elasticsearch之间的联合查询,我们必须处理大量的连接问题。那太乏味了。
过渡到Apache Doris
Apache Doris是一个实时分析数据库,它拥有一些功能,正是我们解决问题所需要的:
- 部分更新:Doris支持多种数据模型,其中Aggregate Model支持列的实时部分更新。在此基础上,我们可以直接将原始数据摄取到Doris中,并在那里创建平面表。摄取过程如下:首先,我们使用Spark将数据加载到Kafka中;然后,任何增量数据都将通过Flink更新到Doris和Elasticsearch。同时,Flink将对数据进行预聚合,以减轻Doris和Elasticsearch的负担。
- 存储成本:Doris支持跨Hive、Iceberg、Hudi、MySQL和Elasticsearch的多表联接查询和联合查询。这使我们能够将大的平面表拆分为小的平面表,并根据更新频率对其进行分区。这样做的好处包括减轻存储负担和提高查询吞吐量。
- 维护成本:Doris架构简单,兼容MySQL协议。部署Doris只涉及两个过程(FE和BE),不依赖于其他系统,因此易于操作和维护。此外,Doris支持查询外部ES数据表。它可以很容易地与ES中的元数据接口,并自动映射ES中的表模式,这样我们就可以通过Doris对Elasticsearch数据进行查询,而无需处理复杂的连接。
此外,Doris支持多种数据接收方法,包括从HDFS和S3等远程存储批量导入,从MySQL binlog和Kafka读取数据,以及从MySQL、Oracle和PostgreSQL实时同步或批量导入数据。它通过一致性协议确保服务可用性和数据可靠性,并能够自动调试。这对我们的操作员和维护人员来说是个好消息。
从统计数据来看,这些功能使我们的存储成本降低了42%,开发成本降低了40%。
在我们使用Doris的过程中,我们得到了开源Apache Doris社区的大量支持,以及SelectDB团队的及时帮助,该团队目前正在运行Apache Doris的商业版本。
进一步改进以满足我们的需求
引入语义层
说到数据集,从好的方面来看,我们的数据分析师可以自由地重新定义和组合标签和指标。但在黑暗的一面,标签和度量系统的高度异质性导致了它们的使用和管理更加困难。
我们的解决方案是在数据处理管道中引入语义层。语义层是将所有技术术语翻译成我们内部数据用户更容易理解的概念的地方。换句话说,我们正在将标签和指标转变为数据定义和管理的一流公民。
为什么这会有帮助?
对于数据分析师来说,所有标签和度量都将在语义层创建和共享,这样就可以减少混乱,提高效率。
对于数据用户来说,他们不再需要创建自己的数据集或确定哪一个数据集适用于每个场景,而是可以简单地对指定的标签集和度量集进行查询。
升级语义层
仅仅在语义层明确定义标记和度量是不够的。为了建立一个标准化的数据处理系统,我们的下一个目标是确保在整个数据处理管道中标签和度量的定义一致。
为此,我们将语义层作为数据管理系统的核心:
它是如何工作的?
TDW中的所有计算逻辑将以单个标签或度量的形式在语义层定义。
语义层接收来自应用程序端的逻辑查询,相应地选择引擎,并生成SQL。然后,它将SQL命令发送到TDW以供执行。同时,它也可能向Doris发送配置和数据接收任务,并决定应该加速哪些度量和标签。
通过这种方式,我们使标签和度量更加易于管理。美中不足的是,由于每个标记和度量都是单独定义的,我们正在努力为查询自动生成有效的SQL语句。如果你对此有任何想法,欢迎你与我们交谈。
充分发挥Apache Doris的作用
正如您所看到的,ApacheDoris在我们的解决方案中发挥了关键作用。优化Doris的使用可以在很大程度上提高我们的整体数据处理效率。因此,在这一部分中,我们将与您分享我们如何使用Doris来加速数据接收和查询,并降低成本。
我们想要什么?
目前,我们有800多个标签和1300多个度量,它们来自TDW中的80多个源表。将数据从TDW导入Doris时,我们希望实现:
- 实时可用性:除了传统的T+1离线数据接收外,我们还需要实时标记。
- 部分更新:每个源表通过其自己的ETL任务以不同的速度生成数据,并且只涉及部分标记和度量,因此我们需要支持列的部分更新。
- 高性能:在群体定位、分析和报告场景中,我们只需要几秒钟的响应时间。
- 低成本:我们希望尽可能降低成本。
我们做什么?
1.用Flink而不是TDW生成平面表
在TDW中生成平面表有几个缺点:
- 高存储成本:除了离散的80多个源表之外,TDW还必须维护一个额外的平面表。这是巨大的冗余。
- 实时性低:源表中的任何延迟都将被增加并延迟整个数据链路。
- 开发成本高:要实现真正的及时性,需要额外的开发努力和资源。
相反,用多丽丝制作平板桌子要容易得多,也要便宜得多。过程如下:
- 使用Spark以离线方式将新数据导入Kafka。
- 使用Flink消费Kafka数据。
- 通过主键ID创建一个平面表。
- 将平面桌子导入Doris。如下所示,Flink将其中“ID”=1的五行数据聚合为Doris中的一行,从而减轻了Doris的数据写入压力。
这可以在很大程度上降低存储成本,因为TDW不再需要维护两个数据副本,并且KafKa只需要存储等待接收的新数据。更重要的是,我们可以将我们想要的任何ETL逻辑添加到Flink中,并将大量开发逻辑用于离线和实时数据接收。
2.明智地命名列
正如我们提到的,Doris的聚合模型允许对列进行部分更新。在这里,我们简单介绍了Doris中的其他数据模型,供您参考:
- 唯一模型:适用于需要主键唯一性的场景。它只保留相同主键ID的最新数据。(据我们所知,Apache Doris社区也计划在Unique Model中包含列的部分更新。)
- 重复模型:此模型完全按原样存储所有原始数据,无需任何预聚合或重复数据消除。
在确定了数据模型之后,我们必须考虑如何命名列。不能选择使用标记或度量作为列名,因为:
- 一我们的内部数据用户可能需要重命名度量或标记,但Doris 1.1.3不支持修改列名。
- 二标签可能会经常在线和离线获取。如果这涉及到添加和删除列,那么这不仅耗时,而且对查询性能也不利。相反,我们会执行以下操作:
- 为了灵活地重命名标签和度量,我们使用MySQL表来存储元数据(名称、全局唯一ID、状态等)。名称的任何更改都只会发生在元数据中,但不会影响Doris中的表模式。例如,如果一个song_name的ID为4,那么它将以Doris中的列名a4存储。然后,如果查询中涉及到song_name,那么它将在SQL中转换为a4。
对于标签的上线和下线,我们根据标签的使用频率对标签进行分类。使用最少的将在其元数据中给予离线标记。脱机标签下不会有新数据,但这些标签下的现有数据仍然可用。
为了实时获得新添加的标签和度量,我们在Doris表中基于名称ID的映射预构建了一些ID列。这些保留的ID列将被分配给新添加的标签和度量。因此,我们可以避免表模式的更改和随之而来的开销。我们的经验表明,添加标签和指标后仅10分钟,其下的数据就可以使用。
值得注意的是,最近发布的Doris 1.2.0支持Light Schema Change,这意味着要添加或删除列,只需要修改FE中的元数据。此外,只要为数据表启用了“Light Schema Change”,就可以重命名数据表中的列。这对我们来说是个大麻烦。
3.优化日期写入
以下是一些实践,这些实践将我们的每日离线数据摄入时间减少了75%,CUMU压缩得分从600+降低到100。
- Flink预聚合:如上所述。
- 写入批量的自动调整:为了减少Flink资源的使用,我们允许将一个Kafka Topic中的数据写入各种Doris表,并实现基于数据量的批量大小自动更改。
- Doris数据写入的优化:微调平板电脑和水桶的尺寸以及每个场景的压实参数:
max_XXXX_compaction_thread
max_cumulative_compaction_num_singleton_deltas
- BE提交逻辑优化:定期缓存BE列表,逐批提交到BE节点,使用更精细的负载均衡粒度。
4.在查询中使用Dori on ES
我们大约60%的数据查询涉及群体定位。群组定位是通过使用一组标签作为过滤器来查找我们的目标数据。它对我们的数据处理架构提出了一些要求:
- 与APP用户相关的群体定位可能涉及非常复杂的逻辑。这意味着系统必须同时支持数百个标签作为过滤器。
- 大多数群体定位场景只需要最新的标签数据。但是,度量查询需要支持历史数据。
- 数据用户可能需要在确定组目标后对度量数据进行进一步的汇总分析。
- 在确定组目标后,数据用户可能还需要对标签和度量进行详细查询。
经过考虑,我们决定在ES上采用Doris。Doris是我们将每个场景的度量数据存储为分区表的地方,而Elasticsearch存储所有标记数据。Doris on ES解决方案结合了Doris的分布式查询规划能力和Elasticsearch的全文搜索能力。查询模式如下:
SELECT tag, agg(metric)
FROM Doris
WHERE id in (select id from Es where tagFilter)
GROUP BY tag
如图所示,Elasticsearch中的ID数据将用于Doris中的子查询,用于度量分析。在实践中,我们发现查询响应时间与目标组的大小有关。如果目标组包含超过一百万个对象,则查询将耗时60秒。如果它更大,可能会发生超时错误。经过调查,我们确定了两个最大的时间浪费者:
- 当Doris BE从Elasticsearch中提取数据(默认情况下一次1024行)时,对于超过一百万个对象的目标组,网络I/O开销可能是巨大的。
- 数据提取后,Doris BE需要通过SHUFFREE/BROADCAST对本地度量表进行Join操作,这可能会花费大量成本。
因此,我们进行了以下优化:
- 添加一个查询会话变量es_optimize,用于指定是否启用优化。
- 在向ES写入数据时,在对主键ID进行散列后,添加一个BK列来存储存储桶编号。该算法与Doris(CRC32)中的bucketing算法相同。
- 使用Doris BE生成Bucket Join执行计划,将Bucket编号发送到BE ScanNode并将其下推到ES。
- 使用ES压缩查询到的数据;将多个数据提取转换为一个数据提取,并减少网络I/O开销。
- 确保Doris BE只提取与本地度量表相关的bucket的数据,并直接进行本地Join操作,以避免Doris BE之间的数据混洗。
因此,我们将大型群定位的查询响应时间从60秒减少到了令人惊讶的3.7秒。社区信息显示,Doris将从即将发布的2.0.0版本开始支持反向索引。有了这个新版本,我们将能够对文本类型进行全文搜索,对文本、数字和日期时间进行等价或范围过滤,并在过滤中方便地组合and、or、NOT逻辑,因为反向索引支持数组类型。Doris的这一新功能预计在同一任务上的性能将比Elasticsearch高3~5倍。
5.完善数据管理
Doris的冷热数据分离能力为我们的数据处理成本降低策略奠定了基础。
- 基于Doris的TTL机制,我们只将当年的数据存储在Doris中,并将之前的历史数据放在TDW中,以降低存储成本。
- 我们会为不同的数据分区更改拷贝数。例如,我们为最近三个月的数据设置了三个副本,这是经常使用的,一个副本用于六个月以上的数据,两个副本用于其间的数据。
Doris支持将热数据转换为冷数据,因此我们只将过去七天的数据存储在SSD中,并将较旧的数据传输到HDD,以获得更便宜的存储。
结论
感谢您一直滚动到这里并完成这篇长篇阅读。在我们从ClickHouse过渡到Doris的过程中,我们分享了我们的欢呼和泪水、经验教训,以及一些可能对您有价值的实践。我们非常感谢Apache Doris社区和SelectDB团队的帮助,但由于我们试图实现冷数据和热数据的自动识别、常用标签/度量的预计算、使用物化视图简化代码逻辑等等,我们可能仍在追逐他们一段时间。
最新内容
- 6 days 23 hours ago
- 6 days 23 hours ago
- 1 week ago
- 1 week ago
- 1 week ago
- 1 week 5 days ago
- 1 week 6 days ago
- 2 weeks 2 days ago
- 2 weeks 2 days ago
- 2 weeks 2 days ago