一、背景
贝壳作为全国领先的房产交易和租赁在线服务平台,有很多业务场景会产出大量实时和离线数据,针对这些数据进行查询分析,对于企业发展和业务拓展至关重要。不同业务线不同查询场景下,单一技术手段很难满足业务方的需求,Druid就是我们在探索之路上发现的比较切合业务方需求的OLAP引擎之一。
表1.1 Druid与其他olap查询引擎对比
通过表格中列举的特性,我们可以大致了解每个引擎适用的查询业务场景,我们选择Druid引擎主要考虑三个方面要素:①查询灵活性②高并发能力③接入和运维成本,这些Druid都支持的比较好,可以解决之前在某些场景下使用Kylin引擎所遇到查询灵活性差、调优成本高、存储占用大等问题。
下图描述了Druid整体架构主要由四部分组成,即主服务节点(Master Servers),查询服务节点(Query Servers),数据服务节点(DataServers)以及外部依赖组件(ExternalDependencies)。
图2.1 Druid技术架构
Druid的三个外部依赖:
- Deep Storage:
每个Druid服务器都可以访问共享文件存储。这通常是一个分布式对象存储如S3,或者是HDFS,或者是一个网络挂载的文件系统,Druid使用它来存储任何已经进入系统的数据,Druid只是使用Deep Storage作为数据备份,并且也作为Druid进程之间传输数据的一种方式。为了快速响应查询,历史进程不从Deep Storage中读取数据,而是从本地磁盘中读取预先存储好的segment,这意味意味着,您必须在Deep Storage和Historical进程中拥有足够的磁盘空间,以便加载数据。
- Metadata Storage:
元数据存储包含各种共享的系统元数据,如segment可用性信息和任务信息,这个通常是一个传统的RDBMS,如PostgreSQL或者MySql。
- Zookeeper:
Druid高性能查询依赖于自身设计合理的数据结构,包括预聚合、bitmap编码 、倒排索引等。下面简单分析一下Druid如何使用以上数据结构,提供一份样例数据 ,这是聚合后的数据,不是原始数据。有四个字段分别是house_type【房屋类型 0:商品房 1:公房 2:经济适用房】,city_name【城市名称】,look_cnt【带看量】,trade_cnt【成交量】。
表2.1 样例数据
在Druid上生成的维度索引结构如下:
图2.2 维度索引结构
以下列查询语句为例:
select city_name,sum(look_cnt) from table_sample where house_type=0 or house_type=1 group by city_name
图2.3 查询流程
三、实践与优化
Druid目前在我们团队中主要支撑离线指标业务,目前20+台规模的Druid集群,承载每日3000w+的查询总量,qps早高峰时段平均值400+,峰值1.2k+, 查询3s内到达率可以在99.9%以上,支撑业务线包括品牌、门店运营、店东、梅林、奥丁、数据看板、海外业务、装修等18+业务线。
3.1.1 数据源构建时间优化
1)数据导入
Druid离线场景下使用的数据源主要以Hive表为主,其中Hive表按照业务分类为全量表和增量表,以时间作为分区字段。全量表每日数据量一般在年级别,表中的时间戳字段按天粒度分布。增量表为每日递增数据,表中的时间戳字段范围为一日。我们目前使用Hadoop index作业方式导入离线数据,整个数据导入步骤如下图所示:
图3.1 离线数据导入Druid关键步骤
在实际应用中,团队遇到每日分区数据量比较大的增量Hive表,在导入数据时候我们按照分区数据行数调整生成parquet文件的数量,从而增加数据摄入阶段map容器个数,同时指定合理的numShards数量,来增加reduce容器个数,通过这些举措提升数据源导入速度。
例如我们针对1亿行非高基数列数据 ,repartition出20个分区,数据导入时候指定5个numShards,下图展示了近七日优化前和优化后数据导入时长对比:
图3.2 近七日优化前后导入数据时长对比
2)shards合并
针对指定numShards导入数据源,这种数据源主要以增量表数据为主,在导入完成后,每个Segment可能分出了若干个shard,每个shard大小,社区推荐是300-700MB左右,如果生产环境中产出每个shard大小比这个小很多,建议可以进行合并操作(compact),因为shard数量小而多的话,查询时候在历史节点上分布的很分散,浪费每个历史节点的CPU资源。相反,如果每个shard大小都比较大,不能很好分布到各个历史节点上,反而不能有效利用集群的CPU资源。说到这里,真是有点宝宝心里苦的感觉。在实际使用中,结合贝壳自身的业务特点,大部分数据源每日数据大小在300M以下,行数也在500w行以内,据此,我们针对每个Segment大小限制在500w行。如果是通过指定numShards导入数据源,每个segment中shards合并后的行数在500w行内。
3)工程实现
我们需要把这两个步骤做的更加智能,来加速不同类型数据源导入,下面完整介绍我们工程上实现思路:
图3.3 加速离线数据导入实现思路
3.1.2 增加集群负载保护策略
刚才已经提到Druid集群承载每日3000w+的查询总量,qps也比较高,为了维护集群的稳定性,查询缓存/动态限流/超时控制是保护集群负载正常的必要手段。
1)查询缓存
查询缓存,是降低Druid集群QPS查询量有效手段。我们对外提供的Druid服务主要由三层缓存构成,分别是指标查询缓存、 queryEngine缓存、Druid自身缓存。指标查询缓存是指我们对外提供api接口的缓存。queryEngine缓存是指上层api接口下发请求会分成若干个指标,这些指标会通过queryEngine翻译成底层引擎可理解的查询语言,queryEngine会缓存引擎的返回结果。Druid缓存是指Druid自身提供的Segment在Broker或者Historical上缓存。
图3.4 查询缓存
使用查询缓存时,需要特别注意查询缓存清理,因为业务方数据有可能针对前一天或者上一小时的数据是有更正的。这块我们之前是通过hadoop index作业运行结束状态,来告知上次服务清理缓存,但随着数据源增多,会凸显出hadoop index作业结束时间和数据源可用于查询时间之间时差问题,当然可以通过扩充historical节点数量以及增加historical节点上druid.segmentCache.numLoadingThreads的线程数量来加速数据源从deep storage到segment cache落盘速度,但并不能从根本上解决问题。我们解决方案是:
图3.5 缓存清理
2)动态限流
限流措施, 我们对Druid源码进行了改造,主要是Broker端去收集HIstorical的节点CPU负载情况, 评估近一分钟CPU SYSTEM LOAD情况,逼近限流CPU阈值,对近段查询量比较大的数据源进行评估,同时结合所属业务线的重要程度,在Broker端进行分级别的查询限流操作,降低Historical的CPU使用率。监测到CPU使用率恢复到警戒水平以下,查询限流操作会自动放开。
图3.6 动态限流实现步骤
某一天早高峰时段Druid各节点CPU负载状态的统计,从下图可以看出“刺突”现象还是非常明显的,如果CPU长时间处于高负载状态【拉平状态】,会影响集群整体的查询效果,适当的资源释放,可以平衡各个业务方的查询需求。
图3.7 高峰期集群CPU负载监控
3)超时控制
目前我们服务接收的查询语句主要集中在近一小时、近一天、近一周、近一月、近一季度、近半年、近一年甚至还会有近两年的聚合查询,时间跨度种类非常多,已经超越了近实时的查询场景。
对于这些查询,查询超时控制的好与坏,直接影响集群整体性能。
Druid原生版本是支持超时控制的druid.server.http.defaultQueryTimeout【默认是5分钟】,我们实际应用中一般配置在15秒左右,但是这个配置并不能结合CPU负载状况灵活控制查询时长,所以我们针对Druid的QueryResource类进行了改进,采用异步线程结合Historical CPU在高峰和非高峰查询时段使用情况调节查询时长。
改动点主要是针对doPost方法中调用的QueryLifecycle实例execute方法和Yielder类的each方法。execute方法用于获取可遍历的序列Sequence,each方法是返回的Yielder类自己内部已经存储好的计算结果,这两个方法在查询高峰期会是耗时的主要环节。
下图反映了我们采用超时控制之后,高峰期查询耗时情况:
图3.8 高峰期查询耗时监控
实时精确去重功能是结合贝壳在实时指标应用中有针对GMV和GSV统计,这些都会要求引擎有精确去重能力。Druid原生版本精确去重能力比较弱,仅支持sql查询类型,native api不具备这项能力。
我们团队借鉴社区对Druid实现的离线精确去重能力,在Druid 0.18.1版本上做了依赖Redis做存储字典实现实时精确去重功能。
社区之前针对Redis作为存储字典也有过讨论,主要担心是Redis的并发能力,目前我们使用多套redis集群,结合公司自身实际情况,可以满足9w/s实时数据编码需求,一般在公司使用的单个Redis集群(cluster模式)160G内存,推荐QPS在5w上下,申请多个Redis集群提升并发能力,将不同数据源hash分配到不同的redis存储上。
图3.9 实时精确去重整体架构
实时精确去重功能主要分为两部分:
1) 字符串编码
所有类型其实都可以看作字符串类型,这里如何快速给字符串进行数值类型编码是解决问题的关键。
当时采用两种方案:一种是使用一个Redis集群提供全局自增id作为数据编码,但是对Redis qps压力会很大,所以就放弃了。另外一种使用Snowflake算法生成数值编码。
需要特别说明的是这里我们提供SnowflakeId服务并不是一个集中式id生成服务器,因为考虑rest接口性能会影响到实时数据的摄取时延,我们采用解决办法是每个kafka index job会启动一个本地生成全局递增id服务,注意kafka index job的并发数量是受到限制的,至于为什么会收到限制,是因为snowflake算法中有10bit是用于工作机器标识,共1024个节点,每一个运行中kafka index job会占用一个节点ID。如果机器开启时间同步功能,为了防止ID出现小概率重复,kafka index job的并发数量还会进一步降低,不过没有关系,因为单个redis集群受到自身qps限制也不可能同时承载太多kafka index job并发访问储存字典编码, 如果要扩容任务并发量可以横向扩充redis存储个数,具体多少个redis合适,需要结合自身实际业务。
2) 新增 CommonUnique 指标存储
这个实现与社区实现离线精确去重Unique指标存储比较类似,唯一比较大的区别在于CommonUnique使用Roaring64NavigableMap,而社区离线版本精确去重使用的是32位的RoaringBitmap。CommonUnique使用Roaring64NavigableMap主要是因为Snowflake算法生成的是Long型数值编码,字典存储编码可以突破21亿基数(int能表示的最大基数)限制。
主要新增类:
- CommonUniqueComplexMetricSerde类继承ComplexMetricSerde基类,用于将该Serde注册到ComplexMetrics中,主要实现extractValue方法,输入行数据,根据metricName获取度量字段的value值,经过数值编码后存储到redis存储中,并返回当前bitmap编码,用于持久化到segment文件中;
- CommonUniqueAggregator类用于实现数据聚合,用于数据查询操作,这里主要实现Buffer版本 BufferAggregator,关键的方法是aggregate和get;
- CommonUniqueAggregatorFactory工厂类,继承AggregatorFactory类,用于定义具体的Aggregator。
实时精确的使用方法:
数据摄入时,定义CommonUnique类型指标:
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "commonUnique",
"name": "date_agent_ucid_unique",
"fieldName": "date_agent_ucid"
},
{
"type": "commonUnique",
"name": "date_open_id_unique",
"fieldName": "date_open_id"
}]
数据查询时,使用CommonUnique类型查询:
{
"queryType": "groupBy",
"dataSource": {
"type": "table",
"name": "user_action_sa_cube_v1"
},
"intervals": {
"type": "intervals",
"intervals":
["2020-09-04T00:00:00.000Z/2020-09-07T00:00:00.000Z"]
},
"granularity": {
"type": "all"
},
"dimensions": ["entity_id"],
"aggregations": [{
"type": "commonUnique",
"name": "date_open_id_unique",
"fieldName": "date_open_id_unique"
}]
}
有了满足业务方查询需求的olap查询引擎,我们怎么才能更好地服务业务方,平台化是我们最好的落地方式。依托Druid作为主引擎,兼容其他引擎能力,从而在上层统一规划设计我们的olap系统,架构如下图所示:
图4.1 OLAP系统整体架构
平台层vili承担着我们整个Olap系统落地实践任务,vili主要提供六方面能力:
- 数仓建模:用于定义离线数据(以hive为主)和实时数据(用Flink加工后输出到kafka的数据),限定业务度量规范,方便各业务线统一使用。
- Cube建模:这里是参考Kylin Cube的设计思想,为了统一底层引擎导入数据源规范。Model用于定义事实表和维度表的join关系,指定维度字段和度量字段。Cube是在Model层上再选取,选取需要作为指标查询的维度和度量,同时指定度量规则,Model和Cube的关系可以是一对多。Cube建立完成后,平台针对离线Cube会帮助用户自动配置例行调度作业,可进行小时级、天级、周级、月级、年级周期配置。针对实时Cube配置完成后,会根据底层引擎类型采用采取不同操作,如果选择的是Druid引擎会启动Kafka index job,生成实时数据源;如果是ClickHouse引擎类型,会直接写入ClickHouse存储中。
- 指标定义:用于定义指标信息,包括指标名称、说明、计算口径、口径描述、聚合方式等内容, 同时可以切换底层查询引擎类型,引擎与引擎之间可以互备。
- 指标加工:基于“指标定义“的指标,再进行自定义计算形成的指标。
- 指标查询API:提供统一的OLAP引擎查询接口,屏蔽底层引擎差异。
- 工具:提供重刷Model元数据、数据补录、不同引擎查询数据对比方便指标迁移引擎等功能。
四、未来的规划
目前我们使用Druid引擎主要应用在离线数据查询场景,在实时指标应用才刚进入轨道,后续我们重点在此方面发力,因为当下实时指标主要承载在ClickHouse上,Clickhouse视图构建等操作对用户来说接入门槛较高,运维起来负担也较重。Doris引擎承载实时指标也在紧罗密布测试当中,这块主要是看中Doris支持数据update功能,Druid这方面是欠缺的,后续考虑在Druid实现此功能。
2.离线数据导入速度进一步加速
研究spark加速离线数据导入可行性,替换现有mapreduce作业导入方式,同时针对index_parallel导入数据方式,增加精确去重编码的能力,尝试利用hive作为全局字典存储,参考kylin4.0版本的实现。
3.数据查询速度优化和物化视图的应用
特别是针对高基数列数据源查询,因为高基维度会影响 Druid 的数据预聚合效果,聚合效果差就会导致索引文件 Size 变大,进而导致查询时的读 IO 变大,整体查询性能变差。针对这种 case 的优化,我们会将低基数列常用维度分别建一个预聚合索引,然后当收到新的查询请求,系统会先分析请求里要查询维度集合,如果要查询的维度集合是刚才新建的专用的索引维度集合的一个子集,则直接访问刚才新建的索引就可以,不需要去访问原始的聚合索引,查询的性能会有一个比较明显的改善。
4.平台化工作进一步深入开展
引擎层与业务有关功能进一步剥离到平台层 :
1)平台侧针对引擎层元数据、数据源、分片信息、任务详情和日志信息等用户关心的信息进一步整合到平台层,方便用户了解数据源和任务执行状态,同时也方便开发人员统一管理多套OLAP集群。
2)针对API限流操作在平台层控制,不同业务线根据业务实际情况设置不同quota数量,底层引擎不感知业务线类型,只关心可以提供服务的总计quota数量,使得底层引擎和业务线解耦,更加专注做好通用底层服务。
转载请注明出处:https://stgod.com/4846/