目录
3)稀疏索引、bloomfilter、Bitmap Index
5)使用BITMAP / HyperLogLog 数据类型进行去重
原文大佬介绍的这篇StarRocks数仓建设实践有借鉴意义的,这些摘抄下来用作沉淀学习。如有侵权,请告知~
前言
多点 DMALL 成立于2015年,是一站式全渠道数字零售解决方案服务商。 多点大数据部门使用 StarRocks逐步替代了 Impala、Impala on Kudu、Apache Kylin等存储引擎,实现了存储引擎的收敛,简化了实时数据处理链路,同时也能保障较高的查询并发以及较低的响应延迟要求。
一、背景介绍
多点大数据部门为内部业务研发团队、数据分析师、外部用户以及合作伙伴,提供了基础的大数据产品、平台服务,帮助零售企业解决了从基本的数据汇总管理、统一的数据计算应用、到各种场景下对数据的多模式使用的需求,可覆盖零售企业绝大部分数据诉求。
技术层面,多点大数据部门基于 Hadoop 开源技术栈,并进行了部分二次开发后构建起了以下的一个技术架构全景图。从下到上分为基础设施层、数据源层、数据集成层、离线/实时计算层、集市层、分析存储层、数据服务/应用层,数据开发,数据模型中心与运维管理层对各层提供支持。
- 基础设施层:包括超大带宽的专线网络;公有云、私有云、机房托管的混合云部署;
- 数据源层:包括企业OLTP 数据库、业务数据、日志数据、三方接入数据;
- 数据集成层:DataBus 是多点自研数据同步平台,解决企业内各业务线之间,跨企业组织之间以及跨行业的数据汇聚,融合等问题,将不同系统的数据互相打通,实现数据自由流动;
- 离线计算层:利用Hive /Spark高可扩展的批处理能力承担离线数仓的ETL和数据模型加工;
- 实时计算层:利用Flink /Spark Streaming完成实时数据的ETL(包括维度扩充,多流join,实时汇总)等;
- 离线/实时集市层:使用数仓分层模型构建ODS(原始数据层),DWD(数据明细层)、DWS(汇总层)、DIM(维度层)、DWT(主题层)、ADS(应用层),并根据公司业务拆不同的数据域;
- 分析存储层:主要依赖Druid、ClickHouse、Impala on Kudu、Apache Kylin、Elasticsearch、HBase、MySQL、StarRocks提供OLAP查询能力;
- 数据服务/应用层:该层通过提供BI分析产品、数据服务接口,营销,报表类产品,向内部运营人员、外部客户,合作伙伴提供数据分析决策能力;
二、原有架构的痛点
上述架构解决了多点绝大部分数据诉求,在整个架构中,无论是基于Hive,Spark的离线计算,基于Flink ,Spark Streaming 的实时计算;基于HDFS,kafka的存储;基于数仓分层模型建设等方案都已基本成熟。但是在OLAP领域,无论是多点还是业界仍然处于百家争鸣,各有所长的状态。纵观多点在OLAP引擎的探索实践中,遇到了各种各样问题,总结如下:
2.1 技术成本
由于上层业务场景复杂,各个场景的技术难点,核心点均不一样。多点生活在整个技术架构升级的过程中先后引入了HBase、Elasticsearch、Druid、ClickHouse、Impala on Kudu、Kylin等OLAP引擎。但是随着技术栈增多,技术曲线陡峭,没有充足的资源进行多技术栈的维护,造成了比较高的技术成本。
2.2 开发成本
多点数据分析场景大致可以分为离线+1更新分析场景,实时更新分析场景,固定维度分析场景。
2.2.1 离线 T+1 更新的分析场景
例如多点的精细化用户运营平台,其核心的功能是基于用户、消费、行为、设备等属性,提供多维度筛选条件,并通过自定义条件实现用户分层,便于进行精细化用户运营。
针对数据更新为T+1的分析场景,原主要使用的分析引擎为ClickHouse。利用ClickHouse构建“大宽表”模型,将事实表与维度表提前进行关联,对外提供单表聚合的SQL查询,以及通过构建DWT主题宽表,提供Ad-Hoc查询;该场景面临的问题是:虽然ClickHouse单表查询强悍,但是Join能力不强,需要提前进行关联,将多表关联成单表,会存在额外的开发成本。
2.2.2 实时更新分析场景
实时更新场景主要是实时监控经营的各项指标,如当前时间段内的 GMV、下单数量、妥投数量、指标达成、对比,环比等指标,为客户的经营决策提供更具备时效性的参考依据。
针对数据为实时(秒级)更新的场景,原主要使用Impala on Kudu 引擎,采用Lambda架构,基于相同的主键,将流式的预计算的结果数据,批计算的结果数据,基于相同的主键进行merge。
上述方案中的 Flink AGG部分, 该程序的功能包括窗口内的预计算,多流Join等操作。当业务需求变更或者上游数据结构变更的时候,需要升级 Flink AGG程序,以及离线ETL的任务,类似于“烟囱式”的迭代开发,开发效率低下。资源消耗层面,在Flink里面做预计算,时间窗口的选取以及内存占用之间也需要平衡。
2.2.3 固定维度分析场景
固定维度的分析场景主要针对固化的,标准的业务场景进行分析,多维分析可通过多维形式组织起来的数据进行上卷, 下下钻,切片,切块,旋转等各种分析操作,以便剖析数据,使分析者、决策者能从多个角度、多个侧面观察数据仓库中的数据,深入了解包含在数据中的信息和内涵。
针对分析维度固定的分析场景,按照业务上常用的分析指标以及维度,此前使用Kylin进行cube预计算,但是使用 Kylin也会遇到如下问题:
- 由于多点业务场景涉及的维度比较多,各种类目、营运组织的组合,会导致cube膨胀,占用比较多的存储资源;
- 当数据重跑以及新增维度,指标的时候,针对已经在线上运行的cube模型,为了保障数据重跑时候服务依然可用,需要新增cube模型,并行提供支持,造成存储重复;
- 由于目前使用的 Kylin v3.1.2 是使用HBase作为后端存储,row key顺序设计以及分区键的选择会严重的影响查询性能,对开发不友好。
2.2.4 运维成本
多点大数据产品系统的接入可以大致分为SaaS 化接入、私有云以及本地化部署。针对私有云,本地化部署的客户,OLAP 引擎易部署、易维护、极简的架构尤其重要,像 HBase、Impala on Kudu、Kylin等强依赖 Hadoop生态的OLAP引擎,会增加部署的复杂性;ClickHouse集群不能自动感知集群拓扑变化,有不能自动balance数据,会增加缩容,扩容等的维护成本。
三、选择StarRocks的原因
多点大数据部门从2021年年初开始,在调研市面上常用的存储引擎时发现了StarRocks。StarRocks 架构设计融合了 MPP 数据库,以及分布式系统的设计思想,具备架构精简,支持全面向量化引擎、智能查询优化、高效更新、智能物化视图、标准SQL、流批一体,高可用易扩展等特性,天然的解决了上述的问题。
3.1 引擎收敛
原有系统的多维分析,高并发查询,预计算,实时分析,Ad-hoc查询等场景系使用了多套系统,基本上可以使用一套 StarRocks 解决。多点大数据平台、产品逐步形成以 StarRocks为主,其他 OLAP引擎为辅的存储架构,解决维护多套引擎的技术成本问题。
3.2 “大宽表”模型替换
StarRocks 支持Broadcast Join、Colocate Join等分布式 Join 的特性,可以在查询性能可接受的范围内,使用星型、星座模型替代“大宽表”模型,节约提前关联关联的开发成本,同时针对事实表中历史数据变更,需要重新“跑数”的场景,可以只需重跑(overwrite)部分表的数据,提高整体的“跑数”效率。
3.3 简化Lambda架构
StarRocks支持明细、聚合、更新、主键模型,可以基于StarRocks自带预聚合的特性,优化掉现有Lambda架构中的预聚合部分。
StarRocks 直接拉取/订阅Hive或者 Kafka 中的数据,在 StarRocks 中进行聚合运算;StarRocks的数据模型是Aggregate模型,通过MAX、SUM、MIN、BITMAP_UNION 等聚合函数在StarRocks中进行预聚合。
3.4 模型持续迭代
针对已在线上运行的模型,如果有需求上的变更,比如增加、删除、变更字段,可以使用简单SQL命令动态地修改表的定义,在表结构变更的过程中,线上的服务不受任何的影响。
3.5 明细、汇总一体化
在实际的业务场景中,通常存在两种场景并存的分析需求:对固定维度的聚合分析和对原始明细数据的查询。在这种情况下,StarRocks支持对原表构建物化视图,数据更新的时候,物化视图跟随原表一起进行更新,保证数据的一致性。当用户查询时,并不感知物化视图的存在,不必显式的指定物化视图的名称,查询优化器可以根据查询条件自动判断是否可以路由到相应的物化视图上。
3.6 外表能力
StarRocks支持以外部表的形式,接入其他数据源包括 MySQL、HDFS、Elasticsearch、Hive 等。比如可以使用 StarRocks 建立Elasticsearch的外表,为Elasticsearch 提供SQL 查询的能力。
3.7 单表聚合查询
在现有的数据 T+1 更新的汇总业务场景中,选取了多点报表业务中的“单品销售分析”场景进行测试,单表单天数据亿级别,上百个维度和分析指标,属于典型的基于“大宽表”的 Ad-hoc 查询场景。在相同情况(机器配置、数据量、sql)下进行ClickHouse 对比 StarRocks 的性能测试:
横坐标:分区(天)数-并发数;纵坐标:响应时长(ms)
从查询响应时长来看,单表的聚合查询,ClickHouse 与 StarRocks 的查询响应时长相差不多。
3.8 多表关联查询
在现有的数据 T+1 更新多表关联的汇总分析业务场景中,选取了现在多点报表业务中的“门店销售分析”场景进行测试,事实表单天数据亿级别,多个维表数据量在十万级别,属于典型的高维分析场景。在相同情况(机器配置、数据量、sql)下进行ClickHouse 对比 StarRocks 的性能测试:
横坐标:分区(天)数-并发数;纵坐标:响应时长(ms)
从查询响应时长来看,多表关联聚合查询,StarRocks的性能要优于ClickHouse。
3.9 实时更新读写查询
在现有的数据准实时更新(边写边读)的汇总查询业务场景中,选取了“实时销售分析”场景进行测试,订单数据实时更新,单天数据量亿级别。属于典型的“实时更新,实时查询”场景。在相同情况(机器配置、数据量、SQL)下进行Impala on Kudu对比 StarRocks 的性能测试:
横坐标:分区(天)数-并发数;纵坐标:响应时长(ms)。
从查询响应时长来看,在边读边写的情况下,聚合查询的 SQL,StarRocks 的性能要优于 Impala on Kudu。
四、实践经验
多点目前已经在高维业务指标报表,Ad-hoc分析、实时全链路监控等场景中引入了 StarRocks,在使用中总结出以下经验:
4.1 集群拆分
由于 StarRocks极简的架构设计,易于运维部署。根据一定的规则,搭建了多套集群,避免业务之间的相互影响。
4.2 按照数据更新频率进行拆分
例如数据是T+1更新,且单表数据量在百亿级别以上的场景(例如高维业务指标报表、Ad-hoc 分析),构建了离线分析集群。通过提高StarRocks的查询并发(parallel_fragment_exec_instance_num))、单节点内存限制(exec_mem_limit)等对复杂查询有好的参数,提高集群的查询性能;
针对数据是准实时更新,写多读多的场景(实时报表,实时全链路监控),构建了实时分析集群,通过调整StarRocks的compaction(cumulative_compaction_num_threads_per_disk、base_compaction_num_threads_per_disk)等对写入友好的参数,加快数据版本合并。
4.3 按照业务域进行拆分
多点客户的接入方式不同,且各种SLA要求也不同,会按照不同的需求搭建不同的StarRocks就请你,尽量满足多种客户需求。
4.4 调优手段
针对在线服务、系统,为了提高系统整体的查询性能,可以从不同的维度进行优化:
4.4.1 优化表结构定义
1)模型选择
StarRocks 的模型包括明细模型、聚合模型、更新模型、主键模型。
如果需要对原始的数据(例如订单流水,原始操作记录等)来进行分析,可以选择明细模型;
如果业务方进行的查询为汇总类查询,比如 SUM、COUNT、MAX 等类型的查询,可以选择聚合模型,提前进行预聚合,查询的时候直接获取结果;
如果数据需要频繁的进行状态更新(比如订单的状态变更),可以选择更新模型。
2)分区和分桶
StarRocks可以对表进行分区(parition) 和分桶(bucket),分区在逻辑上把表划分成了多个子表,可以按照时间进行分区;分桶可以按照不同的策略将数据划分为不同的 tablet,分布在不同的 BE 节点上。按照目前多点大数据集群的机器配置(64C+256G+12TB SSD),通常将一个 tablet 保持在200MB~1GB的大小,会有比较好的性能。
3)稀疏索引、bloomfilter、Bitmap Index
为了提高查询的性能,可以对 StarRocks 的表结构额外构建索引。稀疏索引:可以将查询中常见的过滤字段放在 schema 的前面, 区分度越大,频次越高的查询字段越往前放;同时对区分度比较大(高基数列)的列构建 bloomfilter;对区分度不大(低基数列)的列构建 Bitmap Index。
4)物化视图
针对实际查询场景中经常用到的查询 SQL,可以对原始表构建物化视图,其本质为原始表 (base table)的一个物化索引,通过物化视图提前进行索引排序,指标预计算,查询的时候子自动路由到物化视图进行查询。
5)使用BITMAP / HyperLogLog 数据类型进行去重
在交易场景中进行会计算交易次数,使用常规的方式(COUNT DISTRINCT order_id)去重,其缺点是需要消耗极大的计算和存储资源,对大规模数据集和查询延迟敏感的去重场景支持不够友好。通过定义 BITMAP 的数据类型,可以减少传统 COUNT DISTINCT 去重的执行需要的内存空间、执行时长;而对于像流量统计场景中针对 UV 的计算,在允许有部分统计偏差的前提下,可以定义 HyperLogLog 的数据类型,提高去重效率。
4.4.2 优化查询SQL
1)Broadcast Join
当大表与小表进行 Join的时候,可以使用 Broadcast Join(StarRocks 针对小表的默认 Join 方式为 Broadcast Join),小表向大表广播的方式进行Join。该方式可以用于事实表与维度表进行关联查询;
2)Colocation Join
当大表与大表进行 Join 的时候,为了加速查询,相关表可以采用共同的分桶列(colocate_with)进行分桶。当分桶列相同,相关表进行 Join 操作时,可以直接在本地进行 Join,再将结果数据进行合并,避免数据在中间计算的时候就在集群中的传输。
3)并行度调整
当机器资源比较充裕时,可以将增加执行并行度( parallel_fragment_exec_instance_num)让更多的执行实例同时处理一组数据扫描,从而提升查询效率。但是并行度设置为较大的数值会消耗更多的机器资源,例如CPU、内存、磁盘 IO,影响整体的 QPS。需要根据实际上的查询场景来设置并行度,一般建议占用机器核数的50%。
4)CBO 优化器
针对复杂Ad-hoc 场景,可以开启 StarRocks的基于成本(Cost-based Optimizer,CBO)的查询规划器,在众多查询计划空间中快速找到最优计划,提高查询优化器。
4.5 工具集成
为了与多点的大数据平台进行打通,对StartRocks进行了一些集成封装。
4.5.1 数据集成
通过封装 StarRocks 的Broker Load 以及 Stream Load 接口,与多点的大数据平台打通,实现通过配置的方式将数据从Hive批量同步到StarRocks,或者订阅MQ将实时数据同步到StarRocks。
4.5.2 监控预警
通过集成Prometheus和Grafana,与监控平台打通。对多个StarRocks集群的运行情况进行监控,当集群的某些指标超过一定阈值的时候进行报警。
五、总结
多点从2021年上半年开始调研引入 StarRocks,当前已有四个集群在稳定运行提供线上服务,逐步替代 Impala、Impala on Kudu、Apache Kylin 等存储引擎,实现了存储引擎的收敛,简化了实时数据处理链路,同时也能保障较高的查询并发以及较低的响应延迟要求。
参考文章: