
资源数据指标体系建设
本文介绍了基于YARN的数据指标体系建设,涉及数据收集、数仓建设等实践调优内容
1 项目背景
1.1 数据体系的需求与问题
Yarn 指标数据用户分析及需求
对于Yarn指标数据数据用户可概括为两类内部用户和外部用户。
用户类型 | 数据使用者 | 需求 |
---|---|---|
内部用户 | Yarn 开发者、Yarn SRE 、Yarn 资源管理者 | 集群资源的监控(资源分布、资源使用率、集群资源预警) 集群故障排查 (App异常排查、NM&RM 故障排查) 测试与性能优化 (上线前压测回归、优化结果评估) |
外部用户 | 资源管理平台、数据分析平台、实时与离线平台等 | 资源管理平台(基于用户视角的资源查询与分析) 数据分析平台,服务健康治理分析需求 用户自建资源分析与监控 |
这里总结一下目前数仓遇到的相关问题。
历史问题
- 数据分布在Mysql 、Hive 中,Mysql 集群无法支撑大数据量的指标数据,也不能利用平台现有的报表工具
- 原始数据表呈现数据内容稀少,导致查询极其困难,需要大量的关联查询(join操作),效率极低。
- 历史的数据报表统计分析都是基于Mysql,不适用现有集群、App 体量无力再支撑对应的统计分析
- 没有明确的分层设计,导致分析SQL 复杂难以维护
未来面对的问题
总结来说,问题可以总结为问题排查、性能测试和上层需求三个层面。
- 异常作业打满带宽、部分集群被异常作业打挂的问题,由于数据的时效性不能及时定位发现问题,该场景对数据时效性提出更高的要求
- 对于RM 性能测试以及回归没有基础数据的支持和对比,对于的性能指标难以反映生产问题评估优化改造结果,迫切的需要生产数据支持对比
- 管理平台(外部用户)具有查询需求,资源层需要理解上层用户需求,再进行设计开发,这样拉长了开发链条同时也增加了RD的工作量.
1.2 目标
数好用:数据准确、及时、使用便捷
把数用好:从数据中分析推演出更多潜在的信息, 提取知识经验、反哺业务和系统,为决策和优化提供支撑
2 数据收集
2.1 总述
目前原始数据分为三个方面,mysql的元数据,yarn日志,Yarn API接口,如下图:
Yarn新数据指标建设中,所有原始数据都来源于数据收集部分。简单来说,数据源就下面三个,并且也对应着下面三条收集链路。下表对每一个层级进行了说明:
数据链路 | 原始数据 | 预处理 | 数据收集 | 数据存储 | 数据内容 |
---|---|---|---|---|---|
元数据表清洗备份 | mthdp MySQL库中数据表 | 使用定时调度平台定时调度,对MySQL中数据字段名进行清洗、规整化,重新写入MySQL | 使用平台DB接入功能 | MySQL数据写入Hive数仓 | yarn_ods_cluster yarn_ods_zone yarn_ods_groups yarn_ods_queue |
日志收集 | Yarn RM/NM/Router组件日志 | - | 本地LogAgent接入Kafka | Kafka接入实时数仓,以实时流的形式作为Yarn ods层数据。后续在实时数仓平台实时作业平台中执行生产任务,最终仍落地到Hive/Doris/ES中 Kafka数据接入ES/Doris,直接供实时分析 监控指标数据,直接上报给监控平台 | yarn_app_static_info yarn_app_attempt_static_info yarn_rm_container_static_info yarn_nm_container_static_info yarn_app_final_info yarn_container_final_info yarn_app_attempt_final_info yarn_nm_state_record yarn_app_state_record yarn_app_attempt_state_record yarn_nm_container_state_record yarn_container_resource_realtime_info |
YarnCollector | Yarn RM API | - | YarnCollector作为常驻服务,使用定时调度平台定时调度,调用RM接口,解析数据,通过Socket方式上报Kafka | 同上 | yarn_app_resource_realtime_info yarn_nm_node_static_info yarn_nm_resource_realtime_info |
具体表数据量评估
收集方式 | 类别 | 数据表 | 数据量级(天)(Yarn日志仅收集离线集群) |
---|---|---|---|
YarnCollector | 资源 | yarn_app_resource_realtime_info | 7千万 |
资源 | app_monitor_res | 未接入hive | |
资源 | app_tag_info | 未接入hive | |
资源 | yarn_nm_resource_realtime_info | 6千万 | |
静态 | yarn_nm_node_static_info | 4w | |
Yarn日志 | 静态 | yarn_app_static_info | 50w |
静态 | yarn_app_attempt_static_info | 60w | |
静态 | yarn_rm_container_static_info | 3亿 | |
终态 | yarn_app_final_info | ||
终态 | yarn_app_attempt_final_info | ||
终态 | yarn_container_final_info | 3亿 | |
状态转移 | yarn_nm_state_record | ||
状态转移 | yarn_app_state_record | 400w | |
状态转移 | yarn_app_attempt_state_record | 500w | |
资源 | yarn_container_resource_realtime_info | 12亿 |
问题: nm_container和rm_container的区别
2.2 Yarn collector组件
2.2.1 项目概述
该项目包含四个收集层面,元数据、app作业实时数据、RM监控数据和NM节点资源实时数据。
-
DataPreProcess 层的:即MysqlDataPreValidator 程序,将mysql的元数据规范化,并同步到hive。
-
Yarn-collector 层的:RMAppCollector (App打点数据收集)、RMJmxCollector(RM的Jmx数据收集)、NMInfoCollector (NM节点信息收集)
-
依赖的外部服务为:定时调度平台
总体流程如下图:
2.2.2 项目实现
对于四个模块,我们采用MVC的项目思想,具体controller去调用相应的service函数,去处理相应处理作业。
MysqlDataPreValidator设计
流程概述
1)首先由定时调度平台发起每天的定时任务
2)MysqlDataPreValidator首先会从{table}_validator_meta_data.xml读取需要进行转换的表的元数据信息
3)通过DataPreValidator统一转换逻辑将当天数据导出到目标表中
RMAppCollector设计
流程概述:
- 首先由定时调度平台发起每分钟的定时任务
- 从yarn-api-server接口获取集群列表,RMAppCollector将同机房的RM添加的收集列表中
- 得到收集集群列表后并行的从各集群中拉取App数据,再并行的将数据推送到S3、落地本地磁盘、上报到日志中心(Log-Collector)
- 最后将结果告知定时调度平台
RMJmxCollector设计
流程概述:
- 首先由定时调度平台发起每分钟的定时任务
- 从yarn接口获取集群列表,RMAppCollector将同机房的RM添加的收集列表中
- 再读取采集”强制约束“配置,对获取的同机房的集群列表结果进行过滤,其次收集节点与”强制约束“匹配则将匹配到的集群添加到收集列表中
- 得到集群列表后并行的从各集群中拉取App数据,再并行的将数据推送到S3、解析上报到指标监控平台、解析上报到日志中心(Log-Collector)
- 最后将结果告知定时调度平台
NMCollector设计
静态信息收集
- 首先由定时调度平台定时调起节点信息收集任务
- NMCollector首先会向RM拉取非lost节点列表
- 接下来NMCollector会执行ssh 命令远程执行GetNodeInfo.py脚本,获取节点信息
- NMCollector接收到节点信息后解析格式化为上报到日志中心
- 当单集群获取节点信息失败率大于1%则认为获取该集群节点信息失败,最后返回定时调度平台执行失败
- 否则返回成功
动态信息收集
-
流程大致同上
-
定时调度平台每分钟发起任务调度,拉取集群RM接口,解析Node信息,同步S3、本地文件和日志中心
作业调度关系
-
抽取CollectorJob抽象类,并根据具体类型ServiceEnum实例化。其实就是个简单工厂。
-
每一个Job使用单独的线程池,对线程池进行心跳监控
-
在具体Service中开放接口,处理重导逻辑
公共模块
common模块,主要是需要将定时调度平台调度的逻辑抽出来:对每一个服务进行定时任务调度时,都会有如下逻辑:
- 获取该执行机的目标集群
- 对每个集群,调用不同的任务(app、jmx、nm静态、nm动态)
- 但任务的传参都一样:集群信息、任务Id(任务的唯一标识)
- 在集群的维度下,执行子任务(to 本地文件、to S3、to 日志中心…)
- 每一个子任务都会返回一个状态码结果。而集群维度的状态码结果,又依赖子任务的状态码结果:若子任务上传S3、写入本地文件均失败,则该集群任务需及时感知,并返回:S3失败+本地文件失败
- 集群维度同理,需要获取每个集群的执行结果。若集群出现子任务失败,必要时需抛出异常,使得定时调度平台服务端能感知,从而发送报警
因此需要将这部分逻辑抽取出来:对每一种定时调度平台任务,只需要区分具体的任务即可。
数据收集总结
服务名 | 说明 | 数据表 | 定时调度任务名 | 定时任务频率 | 是否写入本地文件 | 是否上传S3 | 是否上报日志中心 | 是否上报Falcon |
---|---|---|---|---|---|---|---|---|
RMApp | RM App打点数据 | yarn_app_resource_realtime_info | yarncollector.rm.app | 每分钟 | 是 | 是 | 是 | 否 |
RMApp | RM App作业配置tag数据 | yarn_app_tag_info | yarncollector.rm.app | 每分钟 | 是 | 是 | 是 | 否 |
RMApp | RM App资源监控信息 | yarn_app_monitor_res | yarncollector.rm.app | 每分钟 | 是 | 是 | 是 | 否 |
RMJmx | RM JMX打点数据 | yarn_rm_jmx_metrics | yarncollector.rm.jmx | 每分钟(第30秒) | 是 | 是 | 是 | 是 |
NMStatic | NM静态信息收集 | yarn_nm_node_static_info | yarncollector.nm.static | 每天 | 是 | 是 | 是 | 否 |
NMResource | NM资源指标收集 | yarn_nm_resource_realtime_info | yarncollector.nm.resource | 每5分钟 | 是 | 否 | 是 | 否 |
问题:
1)完善日志输出的源码分析
2)为什么yarn_container_resource_realtime_info可以通过日志获取,而app与app_attampt是通过打点定时yarn-collector调接口?
3)为什么要部署在router
2.2 Yarn日志文件
总结日志数据的链路为,在本地进行标准规格的日志打印,再通过log-agent同步到kafka,通过不同topic分发,进行使用,目前在离线集群全部上线落地。具体流程如下图:
这里可以给出RM日志存储数据的相关信息,如下表:
数据表名 | 具体数据内容含义 |
---|---|
yarn_app_static_info | app作业静态信息,如提交信息等 |
yarn_rm_container_static_info | container静态信息 |
yarn_app_final_info | app作业结束信息,描述其生命周期状态 |
yarn_container_final_info | container结束信息,描述生命周期状态 |
yarn_app_attempt_final_info | app_attempt结束信息,描述生命周期状态 |
yarn_container_resource_realtime_info | container的打点资源数据信息 |
3 实时数仓建设
在完成数据收集的建设后,我们来进行数仓的总体设计。
3.1 架构思想
3.1.1 数仓理论
数仓的核心能力在于数据的存储和计算,数据计算又分为离线计算(批处理)和实时计算(流式处理)。一个功能完备的数仓应当支持这两种计算能力。为此工业界有两种架构方式:
此架构的思想是将离线计算和实时计算分离,如上图所示,包含三个模块:
- Batch Layer(批处理层):对历史的全量数据进行批处理,准确性高但时效性低(一般有小时级或天级延迟);
- Real-time Layer(实时层):对短期的增量数据进行流式处理或微批处理,实现秒级或分钟级的低延迟,弥补批处理层的滞后性;
- Serving Layer(服务层):汇总以上两层模块的处理结果,暴露给外部服务调用。
业界的大多数数仓采用的是Spark/Hive离线计算引擎 + Storm/Flink实时计算引擎 + OLAP/OLTP存储引擎的组件选型,这本质上就是Lambda架构。
- Kappa架构
此架构的思想是离线计算和实时计算合二为一,如上图所示,包含两个模块:
- Real-time Layer(实时层):扩展了Lambda架构的实时层,既处理短期增量数据,也处理历史全量数据,兼顾准确性和时效性;
- Serving Layer(服务层):与Lambda架构相同,收集实时层的计算结果并对外提供服务。
Kappa架构适用于侧重实时数据处理的场景,典型的技术选型是Kafka消息中间件 + Flink实时计算引擎,业界也有越来越多的领域开始基于Kappa架构建设数仓。
Lambda与Kappa架构对比
架构方式 | 优点 | 缺点 |
---|---|---|
Lambda架构 | 离线和实时计算分别采用最合适的技术选型,同时具备高准确性和低延迟性离线数据容易订正 | 需要搭建两套计算引擎,维护成本高需要确保两套引擎处理逻辑和结果完全一致,难度较高 |
Kappa架构 | 只需搭建一套引擎,维护成本低无需合并离线和实时处理结果 | 实时引擎处理历史大量数据时,难以达到与离线引擎相同的性能在进行复杂逻辑计算时,实时引擎可能会存在数据丢失、计算偏差等问题,准确性较差 |
数仓选型我们选择lambda架构,这里需要将数仓进行分层
首先可以通过数据分层来实现较粗粒度的组织,每层的数据专注于特定的职责。下图所示的「四层模型」,就是一种典型的分层方式:
- ODS层:位于最底层,将原始数据(包含业务DB、业务日志、第三方采买等来源)经过清洗和规范化后入仓,在入仓前需要进行去噪、去重、去除脏数据、量纲统一等初步转换;
- DW层:它是数仓的主体,围绕业务主题建立各种数据模型(通常是维度模型)。这一层由ODS层数据汇总加工产生,包含历史全量的、明细的数据;
- DM层:它对DW层的数据进一步加工,生成轻度汇总的非明细数据。这一层的数据主要面向后续的业务查询、OLAP分析等;
- APP层:它基于前面三层的数据,计算出数据产品最终使用的结果数据。这一层是高度汇总的最终产出数据,一般会同步到MySQL、ES、Druid等其他OLTP/OLAP存储引擎中供查询分析使用。
除了四层模型外,还有三层模型等其他划分方式,思想都大同小异,遵循“原始数据 -> 基础明细数据 -> 半成品数据 -> 成品数据”的路径进行层次划分。在实际构建数仓时,也需要结合具体的业务场景,灵活制定分层结构。
分层模型的好处显而易见,主要有以下几点:
- 提升可维护性:将复杂的数据生产逻辑拆解成各层的子逻辑,便于维护和定位问题;
- 简化血缘关系:只允许上层依赖下层,不允许反向依赖,避免混乱的数据流向;
- 减少重复开发:通过主题建模,开发一些通用的中间数据(例如维度表),实现复用;
- 屏蔽原始数据:通过设计良好的ODS层,数仓使用方不必了解原始数据的细节和异常即可接入。
3.1.2 数仓总体架构设计
这里我们给出yarn实时数仓建设的总体方案图,后一节进行详细介绍,图如下:
3.2 ODS数据接入
这一层主要是由数据收集层接入实时作业平台,具体来讲这层任务是起一个flink作业,将收集层的具体kafka topic中的数据接入平台,并存入hive当中,对数据进行粗粒度的规整化,属于无状态作业,转化效率极高,占用资源少。
这部分数据与log数据并没有较大的形态变化,主要作用在于接入kafka topic,用于转化为符合实时平台格式的topic,用于后续生产操作。
对于ods层的数据进行归纳,首先是三个yarn主体,app、app_attampt、container。
- 实时资源信息,主要包含各资源信息,如已分配资源量,为分配资源量等。
- 静态信息,主要包含维度信息,即作业和提交者信息,如队列和作业id。
- 结束信息,主要包含结束状态、动作时间点和诊断信息等。
3.3 DW层数据生产
3.3.1 dw层作业总述
DW层的表以以下三个维度进行划分:
- 场景:实时表(Doris、ES)/ 离线表(Hive)
- 类型:资源表(每分钟打点的资源信息) / 生命周期表(静态信息 + 终态信息)
- 单位:App/AppAttempt/Container/NM
表的命名由2、3两者决定:yarn_(单位)_(类型)_detail
对于实时/离线,由于字段基本相同,接入方式也统一:(Kafka -> 实时作业平台 SQL -> Kafka -> Hive / Doris / ES),因此统一使用相同的表名称。
类型 | 资源表 | 生命周期表 |
---|---|---|
App | yarn_app_resource_detail | yarn_app_life_detail |
AppAttempt | - | yarn_app_attempt_life_detail |
Container | yarn_container_resource_detail | yarn_container_life_detail |
NM | yarn_nm_resource_detail | - |
RM | - | - |
作业处理流程
整体dw层的作业可以分为实时资源信息任务和生命周期任务,只有生命周期任务设计union kafka,下图完整的展现了实时任务流程。
3.3.2 实时资源信息
作业设计
这部分作业的数据生产目标,是拿到作业属性与资源信息的宽表,涉及到流维关联,用于掌握各yarn主体的实时资源情况。
App维度关联 yarn_app_resource_realtime_info * yarn_app_static_info -> yarn_app_resource_detail
Container维度关联 yarn_container_resource_realtime_info * yarn_app_static_info -> yarn_container_resource_detail
Node维度关联 yarn_nm_resource_realtime_info * yarn_nm_node_static_info -> yarn_nm_resource_detail
数据生产
未完成流维关联,我们采用将维表信息存储同步到tair中,时效为36小时,这部分用于关联时进行查询。关联项主要是对每个yarn主体的ID进行关联,如app_id、container_id和节点host等,最终落地到新的dw层kafka topic中,用于后续数据生产和数据落地存储。
这里给出yarn_app_resource_detail的生产逻辑,作为例子:
配置netbuffer相关参数
set `taskmanager.memory.network.max`= `1gb`;
set `taskmanager.memory.network.fraction`= `0.2`;
创建app资源流表
CREATE TABLE app_resource(
app_id VARCHAR,
elapsed_ms DOUBLE comment '作业已运行时长',
allocated_memory DOUBLE comment '已分配的内存(MB)',
allocated_vcores INTEGER comment '已分配的vcores',
pending_memory DOUBLE comment 'Pending的内存(MB)',
pending_vcores INTEGER comment 'Pending的vcores',
collect_time TIMESTAMP comment '数据收集时间(yyyy-MM-dd HH:mm:ss)',
process_time as ProcessTime()
) WITH (
source = 'hdp-yarn.ods_org.yarn_app_resource_realtime_info',
process_time_field = 'process_time'
);
创建app维表,从tair中读出
CREATE TEMPORAL TABLE app_static(
app_id VARCHAR,
cluster_id VARCHAR comment '提交集群',
queue_code VARCHAR comment 'app所在子队列',
hadoop_user VARCHAR comment '提交作业的Hadoop账号',
app_name VARCHAR comment '作业名称',
app_type VARCHAR comment '作业类型',
priority INTEGER comment 'app配置的优先级',
max_app_attempt INTEGER comment 'app失败重试次数',
app_tags VARCHAR,
client_info VARCHAR comment '引擎提交上来的客户端信息,包括hostname信息',
submit_time TIMESTAMP comment 'yyyy-MM-dd HH:mm:ss:SSS'
) WITH (
source = 'hdp-yarn.yarn_app_static_info',
local_appkey = 'com.sankuai.yarn.yarncollector',
timeout = 100,
retry_fail_strategy = 'SKIP',
batch_return_percent = 90
);
//生产落地表yarn_app_resource_detail
CREATE TABLE sink_table (
dt VARCHAR comment '日期(YYYYMMDD)',
`hour` VARCHAR comment '数据收集时间,所在小时',
collect_min VARCHAR comment '最小收集粒度,分钟级别',
collect_time TIMESTAMP comment '数据收集时间(yyyy-MM-dd HH:mm:ss)',
app_id VARCHAR,
app_name VARCHAR comment '作业名称',
app_type VARCHAR comment '作业类型',
cluster_id VARCHAR comment '提交集群',
zone_name VARCHAR comment '作业所在的Zone',
group_code VARCHAR comment '作业所在父队列',
queue_code VARCHAR comment 'app所在子队列',
hadoop_user VARCHAR comment '提交作业的Hadoop账号',
priority INTEGER comment 'app配置的优先级',
max_app_attempt INTEGER comment 'app失败重试次数',
node_constraint_exp VARCHAR comment 'app_tags',
tags VARCHAR comment 'clientInfo:引擎提交上来的客户端信息,包括hostname信息',
submit_time TIMESTAMP comment 'yyyy-MM-dd HH:mm:ss:SSS',
elapsed_ms DOUBLE comment '作业已运行时长',
allocated_memory DOUBLE comment '已分配的内存(MB)',
allocated_vcores INTEGER comment '已分配的vcores',
pending_memory DOUBLE comment 'Pending的内存(MB)',
pending_vcores INTEGER comment 'Pending的vcores',
create_time TIMESTAMP comment 'join生成该条记录的时间(yyyy-MM-dd HH:mm:ss)'
) WITH (
sink = 'hdp-yarn.yarn_app_resource_detail'
);
//主要关联逻辑sql
insert into
sink_table
select
TimestampFormat(s.collect_time, 'yyyyMMdd'),
TimestampFormat(s.collect_time, 'HH'),
TimestampFormat(s.collect_time, 'yyyy-MM-dd HH:mm'),
s.collect_time,
r.app_id ,
r.app_name,
r.app_type,
r.cluster_id,
SPLIT_INDEX(r.queue_code,'.',1),
CONCAT(SPLIT_INDEX(r.queue_code,'.',0) ,'.',SPLIT_INDEX(r.queue_code,'.',1),'.' ,SPLIT_INDEX(r.queue_code,'.',2)),
r.queue_code,
r.hadoop_user,
r.priority ,
r.max_app_attempt,
r.app_tags,
r.client_info,
r.submit_time,
s.elapsed_ms,
s.allocated_memory,
s.allocated_vcores,
s.pending_memory,
s.pending_vcores,
LOCALTIMESTAMP
from app_resource as s join app_static
for system_time as of s.process_time as r
on r.app_id = s.app_id
数据存储
这部分数据主要是同步到hive与doris,同步hive并没有什么特别的更改,目的就是数据落地存储,解决查询需求。
对于存入doris,由于doris具有聚合和rollup的特性,这部分需要进行设计。
首先我们选择聚合类型,维度选择和主体相关的信息(如appid、队列信息、时间信息等),指标设置为资源相关信息,聚合类型为replace,这里对于聚合类型的考虑,在正常情况下,是不应该出现replace操作的,因为对于同一信息的时间维度字段是不同的,但是考虑到如果由于上层导致数据重复的情况,可通过该方式去重。另一方面,对于统计的要求,我们需要对于资源信息进行聚合统计,例如在队列或集群维度下,有多少作业资源未被满足。该需求就需要对pending资源项做sum,这需要将指标项设置为sum类型。经过取舍,最终放弃replace的去重保护,选择sum操作。对于去重需求,选择在kafka侧保证。
对于rollup的设置,主要是解决队列维度或集群维度的作业资源统计时进行加速操作,所以设置rollup的维度分别为queue_code(队列维度),cluster_id(集群维度),collect_time(时间点)等,指标为资源信息。
3.3.3 生命周期信息任务
作业设计
这部分作业的数据生产目标,是拿到作业的属性与生命周期信息,涉及到流维关联;与资源信息不同的是,这部分还需要保证在跑的作业与跑完的作业信息在同一表中可以查到,也就是说对于没跑完的作业,我们要能查到其维度信息。对于跑完的作业,要能查到其维度与终态信息,这是具有一定难度的。e.g 在实时场景下,我们需要实时查询出App/AppAttempt/Container的生命周期情况。比如一个App提交于10:00,AM启动为10:30,结束于11:00,那么我们需要在10:45的时候就能查询到App的状态是RUNNING,并且查出它的一系列静态信息。
为解决上述需求,我们考虑两个方案,即维度topic数据和关联后的topic同时接入doris,但是doris侧单表只支持单一kafka topic,该方案放弃。另一方案是,将维度topic和关联后的topic进行一层union操作,然后存储到doris当中,Doris采用REPLACE方式,在同一个导入批次中的数据,对于 REPLACE 这种聚合方式,替换顺序不做保证。如在上述例子中,最终保存下来的,也有可能是 2017-10-01 06:00:00。而对于不同导入批次中的数据,可以保证,后一批次的数据会替换前一批次,这样就保证了数据的正确性。
流程图可参考3.3.1节的实时任务流程。
数据生产
app维度:yarn_app_static_info * yarn_app_final_info -> yarn_app_life_detail
app_attampt维度:yarn_app_attampt_static_info * yarn_app_attampt_final_info -> yarn_app_attampt_life_detail
container维度:yarn_container_static_info * yarn_container_final_info -> yarn_container_life_detail
以app生命周期数据生产举例,如下是关联操作:
CREATE TABLE app_final (
app_id VARCHAR comment 'AppId',
started_time TIMESTAMP comment '开始时间(yyyy-MM-dd HH:mm:ss:SSS)',
finished_time TIMESTAMP comment '结束时间(yyyy-MM-dd HH:mm:ss:SSS)',
elapsed_ms INTEGER comment 'App持续运行时间',
engine_state VARCHAR comment '引擎侧返回的状态',
yarn_state VARCHAR comment 'Yarn App状态',
diagnostics_info VARCHAR comment '诊断信息',
create_time TIMESTAMP comment '创建时间(yyyy-MM-dd HH:mm:ss:SSS)',
final_app_attempt_id VARCHAR comment '最终的AppAttempt Id',
process_time as ProcessTime()
) WITH (
source = 'hdp-yarn.ods_org.yarn_app_final_info',
process_time_field = 'process_time'
);
CREATE TEMPORAL TABLE app_static (
app_id VARCHAR,
cluster_id VARCHAR comment '提交集群',
queue_code VARCHAR comment 'app所在子队列',
hadoop_user VARCHAR comment '提交作业的Hadoop账号',
app_name VARCHAR comment '作业名称',
app_type VARCHAR comment '作业类型',
priority INTEGER comment 'app配置的优先级',
max_app_attempt INTEGER comment 'app失败重试次数',
app_tags VARCHAR,
client_info VARCHAR comment '客户端传入的Tag信息',
submit_time TIMESTAMP comment 'yyyy-MM-dd HH:mm:ss:SSS'
) WITH (
source = 'hdp-yarn.yarn_app_static_info',
local_appkey = 'com.sankuai.yarn.yarncollector',
timeout = 100,
batch_return_percent = 90
-- retry_fail_strategy = 'SKIP'
);
CREATE TABLE sink_table (
dt VARCHAR comment '日期',
`hour` VARCHAR comment '小时',
app_id VARCHAR comment 'AppId',
app_name VARCHAR comment '作业名称',
cluster_id VARCHAR comment '作业所在集群',
zone_name VARCHAR comment '作业所在Zone',
group_code VARCHAR comment '作业所在组队列',
queue_code VARCHAR comment '作业所在子队列',
hadoop_user VARCHAR comment '提交运行作业的Hadoop账号',
app_type VARCHAR comment '作业类型',
tags VARCHAR comment '作业Tag',
node_constraint_exp VARCHAR comment 'App NodeConstraints表达式',
priority INTEGER comment 'App提交时优先级',
final_app_attempt VARCHAR comment '最终的AppAttempt Id',
engine_state VARCHAR comment '引擎侧返回的状态',
yarn_state VARCHAR comment 'Yarn上的App状态',
diagnostics_info VARCHAR comment '失败诊断信息',
create_time TIMESTAMP comment '数据创建时间',
submit_time TIMESTAMP comment 'App提交时间',
started_time TIMESTAMP comment 'App开始运行时间',
finished_time TIMESTAMP comment 'App运行结束时间',
elapsed_ms BIGINT comment 'App运行时长',
launch_ms BIGINT comment 'App启动时长'
) WITH (
sink = 'hdp-yarn.yarn_app_life_detail'
);
insert into
sink_table
select
TimestampFormat(r.create_time, 'yyyyMMdd'),
TimestampFormat(r.create_time, 'HH'),
s.app_id,
s.app_name,
s.cluster_id,
SPLIT_INDEX(s.queue_code, '.', 1),
CONCAT(SPLIT_INDEX(s.queue_code, '.', 0), '.', SPLIT_INDEX(s.queue_code, '.', 1), '.' , SPLIT_INDEX(s.queue_code, '.', 2)),
s.queue_code,
s.hadoop_user,
s.app_type,
s.client_info,
s.app_tags,
s.priority,
r.final_app_attempt_id,
r.engine_state,
r.yarn_state,
r.diagnostics_info,
LOCALTIMESTAMP,
s.submit_time,
r.started_time,
r.finished_time,
Timestamp2MilliSec(r.finished_time) - Timestamp2MilliSec(r.started_time),
Timestamp2MilliSec(r.started_time) - Timestamp2MilliSec(s.submit_time)
from app_final as r join app_static
for system_time as of r.process_time as s
on s.app_id = r.app_id
union操作
//已完成的app生命周期信息(省略部分字段)
CREATE TABLE app_detail (
dt VARCHAR comment '日期',
`hour` VARCHAR comment '小时',
app_id VARCHAR comment 'AppId',
app_name VARCHAR comment '作业名称',
cluster_id VARCHAR comment '作业所在集群',
tags VARCHAR comment '作业Tag',
queue_code VARCHAR comment 'app所在子队列',
yarn_state VARCHAR comment 'Yarn上的App状态',
diagnostics_info VARCHAR comment '失败诊断信息',
create_time TIMESTAMP comment '数据创建时间',
finished_time TIMESTAMP comment 'App运行结束时间',
) WITH (
source = 'hdp-yarn.yarn_app_life_detail'
);
//app作业维度信息(省略部分字段)
CREATE TABLE app_static (
app_id VARCHAR,
app_name VARCHAR comment '作业名称',
cluster_id VARCHAR comment '提交集群',
tags VARCHAR comment '作业Tag',
queue_code VARCHAR comment 'app所在子队列',
submit_time TIMESTAMP comment 'yyyy-MM-dd HH:mm:ss:SSS',
create_time TIMESTAMP comment '数据创建时间(yyyy-MM-dd HH:mm:ss:SSS)',
) WITH (
source = 'hdp-yarn.ods_org.yarn_app_static_info'
);
//用于合并的union表
CREATE TABLE sink_table (
dt VARCHAR comment '日期',
`hour` VARCHAR comment '小时',
app_id VARCHAR comment 'AppId',
app_name VARCHAR comment '作业名称',
cluster_id VARCHAR comment '作业所在集群',
queue_code VARCHAR comment '作业所在子队列',
tags VARCHAR comment '作业Tag',
yarn_state VARCHAR comment 'Yarn上的App状态',
diagnostics_info VARCHAR comment '失败诊断信息',
create_time TIMESTAMP comment '数据创建时间',
submit_time TIMESTAMP comment 'App提交时间',
started_time TIMESTAMP comment 'App开始运行时间',
finished_time TIMESTAMP comment 'App运行结束时间'
) WITH (
sink = 'hdp-yarn.yarn_app_life_search'
);
//合并信息sql,对未跑完作业的终态信息赋值
insert into
sink_table
select
dt,
`hour`,
app_id,
app_name,
cluster_id,
queue_code,
tags,
yarn_state,
diagnostics_info,
create_time,
submit_time,
started_time,
finished_time
FROM
(
(SELECT
dt,
`hour`,
app_id,
app_name,
cluster_id,
queue_code,
tags,
yarn_state,
substring(diagnostics_info,0,1000) as diagnostics_info, 此处对诊断信息进行了限长
create_time,
submit_time,
started_time,
finished_time
FROM
app_detail
)UNION ALL(
SELECT
TimestampFormat(localtimestamp, 'yyyyMMdd') as dt,
TimestampFormat(localtimestamp, 'HH') as `hour`,
app_id,
app_name,
cluster_id,
queue_code,
app_tags,
'null' as yarn_state,
'null' as diagnostics_info,
localtimestamp as create_time,
submit_time,
cast('' as TIMESTAMP) as started_time,
cast('' as TIMESTAMP) as finished_time
FROM
app_static
)
)
数据存储
这部分最开始的设计是将数据落地到es当中,主要解决其他存储引擎对于字段的长度限制,由于生命周期信息都包含对应的诊断字段和tag字段,这两个字段的长度都是不固定的,但是由于平台侧不支持同步es,最终选择doris进行代替,在flink的union任务当中,对这两个进行限长截取。
对于doris的存储设计,主要是需要将指标设置为replace类型,这样可以将后产生的作业生命周期信息替换掉之前的空值项,起到数据更新的效果。
另一方面,存入hive,为后一阶段的查询与生产做准备。
3.3.4 维度数据生产
背景
目前Yarn数据指标体系中,对于维度表和事实表的关联,全部基于实时作业平台作建设:维度数据(App、AppAttempt、Container、NM元数据),在tair中保留36小时。因此在做实时聚合时,相当于只能关联到36小时内的维度数据。
而在实时场景下,一个作业很可能运行几个月甚至几年,仅保留36小时内的维度数据是无法接受的。
举个实际的例子便于理解:App1在1号00:00提交,一直持续运行。它的维度数据(静态信息)会在tair中存储36小时。在2号的12:00,
目标
将历史的维度数据同步到当天,使得事实表在基于当天的数据聚合时,可以准确找到维度数据。
方案
新增「维度表」概念:
- mart_hdp_yarn.yarn_app_dim
- mart_hdp_yarn.yarn_app_attempt_dim
- mart_hdp_yarn.yarn_app_container_dim
维度表中,存储着App/AppAttempt/Container的维度数据,按照dt字段分区。
以App为例,假设我们查询dt = 20210902的数据,实际查到的是:所有截至「2021-09-02 00:00:00」仍然「正在运行」的App维度数据。
那么,如何得到上述提到的维度数据呢?每次都重导之前的所有数据,数据量极大,不可行。我们的方式是:每一天都保存「昨天」没有跑完的维度数据,然后不断进行累加。
举例
- App1在2021-09-01进行提交,一直运行到3号
- 在2021-09-02,我们发现App1没有跑完,记录到维度表中(dt = 20210902)
- 在2021-09-03,我们回溯维度表,发现App1仍没有跑完,再次记录到维度表中(dt = 20210903)
- 在2021-09-04,我们回溯维度表,发现App1跑完了,这个时候就不再记录了
因此,实际的生产逻辑相当于是:「昨天提交的App」 + 「昨天之前的所有App」 - 「昨天跑完的所有App」。(在第一天任务刚启动调度时,「昨天之前的所有App」是一个空集合)
下图以「App」为例,描述了整个逻辑链路,cellar可以理解为tair:
因此,对于每一类维度数据,我们需要维护2个任务,一个Hive2Hive,生产维度数据;一个Hive2tair任务,将生产的维度数据同步到tair中。
3.4 作业设计与调试
3.4.1 flink sql操作
这里附上对于核心join操作的语法说明,这里可以理解为先给流表加一个处理时间字段,然后以这个字段值去获取此时间点的维表内容,然后on条件,进行join。
for system_time as of的理解
join操作举例
CREATE TABLE source_table (
id BIGINT,
waybill_status INTEGER,
process_time as ProcessTime() -- 注意:必须先通过计算列声明虚拟字段`process_time`(字段名称可自定义)
) With (
source = 'rt-实时作业平台.sj_test_json01',
process_time_field = 'process_time',
parallelism = 1
);
CREATE TEMPORAL TABLE dim_table (
id BIGINT,
dim_id_value VARCHAR
) With (
source = 'rt-实时作业平台.local_test_tair_09',
local_appkey = 'com.sankuai.data.rt.petra'
);
CREATE TABLE sink_table (
-- 此处sink_table仅作为用户在数仓平台已注册模型的引用。
id BIGINT,
waybill_status INTEGER,
dim_id_value VARCHAR
) With (
sink = 'rt-实时作业平台.sj_test_json02', -- sink为必填项。
parallelism = 1
);
USE FUNCTION ProcessTime -- 必须声明此UDF作为处理时间
WITH (
projectName = '实时作业平台-udf-public',
funcName = 'ProcessTime',
version = '2.7'
);
INSERT INTO
sink_table
SELECT
s.id,
s.waybill_status,
r.dim_id_value
from
source_table as s
join dim_table
for system_time as of s.process_time as r
on r.id = s.id
union all操作举例
insert into
output
SELECT
ts
FROM
(
(
SELECT
cast(request_time as TIMESTAMP) AS ts
FROM
input
WHERE
mode = 4
)
UNION ALL
(
SELECT
cast(request_time as TIMESTAMP) AS ts
FROM
input
WHERE
mode = 4
)
)
这里给出一些参考文档:
3.4.2 实时作业资源配置理解
资源调度背景
-
当前的所有Flink作业是部署在YARN集群的一个队列上;
-
Flink作业的JobManager(Master)和TaskManager(Worker)都运行在YARN Container中;
-
每个JobManager和TaskManager的最小/最大资源申请量分别为:1vcore+1G内存 / 8vcore+16G内存;
-
作业的内存大小(Mem)决定了作业会不会挂掉,并发数( Vcore )决定了作业的处理性能;
-
若内存开够、并发数开少,可能导致作业处理数据出现延迟,但作业不会挂掉;
-
若内存开少、并发数足够,若内存一旦满了,仍会报OOM一类的异常,导致作业被集群kill掉;
-
资源配置项
-
并发度(VCore/Slot数) :可粗略理解为作业当前最大并行线程数(Flink启动后的slot数 = vcore数(并发度与tm数成倍时))
-
TM数: 实际执行计算任务的Worker进程数(与Flink中taskManager数、物理进程数相等);
-
TM内存:Worker所使用的内存大小(Flink on YARN模式会有25%的内存是Container预留的,所以用户作业实际使用内存会比设置的要少);
-
JM内存:JM会管理作业的所有Task运行状态,以及指标收集和统计;
建议:TM越多,JM的内存越大(具体资源值需要根据经验进行调试)
eg:在flink中启一个作业,已为作业配置5个总并发数、TM内存2G、JM内存1G,则
- 作业总并发数: ceil (5 / 2) * 2 = 6(即Yarn为此作业分配6个作业总并发)
- 作业内存总量:1024M * 1 + 2048M * 2 = 5120M
资源用量
- 理想状况:不多也不少,刚刚满足我的流量需求。
- 现实情况:有状态计算很难做到动态资源调度,所以必须在上线时就一次性申请足量资源。
目前应申请多少资源没有硬性规定,平台方仅根据经验给出如下建议:
**Step1:**在作业上前先起一个作业运行一段时间,不断调整资源,直至作业可用性符合预期,从而压测出不会出现延迟的最小资源使用量;
**Step2:**预估业务流量近期(月为单位)的增长趋势;
**Step3:*正式上线时按当前“最小资源使用量(1+增量%)”去申请资源;
进一步理解,可以参考这个blog: Flink 原理与实现: 理解 Flink 中的计算资源
官方关于 slot & 并行度 的解释:flink官方文档
3.4.3 常见实时问题
问题名称 | 问题定义 | 问题影响 | 解决方案 |
---|---|---|---|
资源不足问题 | 作业资源不足而导致启动失败场景,常见如I/O读取文件问题、日志报yarn侧分配资源失败 | 作业启动失败 | 对于日志报资源分配失败的场景,应分析集群是否存在足量的资源对于I/O读取文件问题,其实本质是netbuffer相关资源配置不合理的问题,需要扩大配置netbuffer对应参数 |
消息挤压 | 首先消息挤压的现象是发生在kafka端的,消费者的消费速度无法达到生产者的生产速度,导致消息积压在kafka,这在流式实时作业当中是常见的问题。 | 对于实时性要求很高的作业,消息延迟的影响很大,直接的后果就是生产数据未完成。但是对于实时性要求不高且存在流量变化的场景下,如果资源量有限,是可以容忍一部分的消息延迟的。当流量放缓时,作业顺序消费是可以将积压的消息处理完,这和ResourceManager中对资源的reserved处理思想很像。 | 本质上都是提升kafka的消息消费能力,这部分为两个方面。1)增加下游作业的消费能力 这部分是解决消息挤压的核心方法,上游消息积压极可能是作业的性能问题,提升处理性能可通过扩并发或扩资源实现。 2)增加消息读取能力这部分可总结为下游作业对kafka的读取消息能力,有时候消息消费能力差,并不是作业处理能力差,而是对Kafka消息读取的并行度过低导致的。具体操作,可以通过提升Kafka的分区数,并提升下游作业的读取并行度。![]() |
反压 | 反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。 | 反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。 | 定位反压节点要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:通过 Flink Web UI 自带的反压监控面板;通过 Flink Task Metrics。前者比较容易上手,适合简单分析,后者则提供了更加丰富的信息,适合用于监控系统。因为反压会向上游传导,这两种方式都要求我们从 Source 节点到 Sink 的逐一排查,直到找到造成反压的根源原因。 |
数据异常 | 这个问题包含很多层面,例如flink作业定义的数据类型与上游kafka topic当中不匹配,或者数据精度不匹配导致的。面对这种问题,建议首先排查上游的收集程序,是脏数据导致还是收集逻辑有误导致的。 | 数据敏感的场景下会导致作业无法运行,反复重启。 | 对于数据敏感的场景,需要在数据收集层进行规范性判断,保证上层生产不出现数据异常问题。对于数据流量敏感的场景,可以调整处理的容忍性,将不规范字段置为null或将该条数据丢失。 |
3.5 大作业调试案例
3.5.1 作业背景
这是在实时平台上的一个实时作业,总结来说任务是将维表存储到tair当中,用于下游的流维关联作业。作业上线后出现了上游topic消息挤压的问题,进而导致反压等现象。
另一方面,由于该kafka收集的数据是container信息,由于夜间离线任务增多,数据量会增大,总结为任务数据量在夜间会突增,12点瞬时会为4倍,夜间持续为高峰期。
flink作业关系图如下:
延迟现象图:
3.5.2 排查过程
总体来说,一个实时任务导致的消息挤压的表面原因可以归纳为:下游的实时任务消费能力满足不了上游kafka的生产能力。而本质原因却比较复杂,可以是几个维度。
1)kafka本身维度:即kafka考虑由于分区数受限,夜间流量突增,数据处理能力不够,分区消费延迟指标高。其次,夜间kafka执行磁盘均衡会附加压力,也属于高峰期,最终的现象体现在Kafka的分区延迟指标抖动极高。
Kafka写入指标图:
kafka分区延迟指标图:
进一步分析kafka细粒度指标
responseSendTime指标图:
这个responseSendTime对于消费请求而言指的是把待消费的数据发送给consumer用的时间,这部分耗时决定于数据传输的速率以及consumer消费到之后处理这部分数据的时间,可以发现显著时间变长,子阶段耗时在responseSendTime。
总结来讲,这里其实就可以怀疑是下游消费能力导致的问题了,为进一步验证原因,首先在当晚限制该部分的磁盘迁移,看生产情况。这部分给出一个解决方案,按细粒度情况调整迁移限速,白天并发加速,夜间做适当限制。
最终问题复现,排除该原因。
2)实时作业本身的运算生产能力(即对上有kafka消息的消费能力):这部分其实也可以分为两个部分,首先是对数据的运算能力,其次是对运算后的写入下游能力,例如本作业就是将数据写入外部存储tair中。
对于提升作业处理性能,最先想到就是扩并发,根据队列的现状,我们将TM数提升设置为27,并发度81,即每个TM有3个vcore,TM内存为12G。资源大幅提升,但是问题并没有解决,在夜间高峰处理依然呈现消息积压的情况。
其次,考虑作业写数据到tair端的能力瓶颈,查看当天的tair监控,发现写tair在高峰期有长时间瓶颈,维持在10k,与tair同学沟通后,结论为不存在tair服务端的流量限制。
tair写监控图:
这时其实忽略了一个关键点,即作业写到tair的能力,即sink parallelism。作业生产出数据,但是将数据同步到tair的配置限制了sink性能。换句话说,就是tair客户端的限制。
之前作业sink并发配置为10,远小于上游并发度,所以该部分可能为核心瓶颈。至于kafka指标延迟,是由于下游写入有瓶颈,sink 算子反压上游,ckp时间显著变长,最终导致消费延迟上升。
ckp时间指标图:
3.5.3 解决方案
对于该flink作业,核心算子并发度是81,sink开1或者开81都不影响资源占用,所以将sink parallelism改为81,观察现象。
调整结果图:
上图可见,flink作业生产能力已覆盖kafka中数据流,调试完成。
4 总结
项目成果
- 稳定进行数据收集,RM与yarnCollector正常工作,打点操作未影响性能。
- 完成实时任务ods层10个,dw的层8个,日生产数据量15亿;离线作业8个,稳定完成维度数据生产。
- 建设起对资源层主体的实时信息与生命周期的应用,提供上层资源分析与本层问题排查的数据支持。
下图描述了全部的数据指标收集链路,作为新数据指标体系的全景图:
更多推荐
所有评论(0)