最新DolphinScheduler海豚调度教程_海豚调度使用说明,2024年最新大数据开发基础教程ppt
/示例:token:刚刚生成的Token1.租户对应的是 linux 用户, 用户 worker 提交作业所使用的的用户, 如果 linux 没有这个用户, worker 会在执行脚本的时候创建这个用户2.租户和租户编码都是唯一不能重复,好比一个人有名字有身份证号。3.创建完租户会在 hdfs 对应的目录上有相关的文件夹。token: 刚刚生成的 Token/***/...@Override/**


网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
resource.upload.path=/dolphinscheduler
# resource storage type : HDFS,S3,NONE
resource.storage.type=HDFS
# whether kerberos starts
hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path
java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
login.user.keytab.path=/opt/hdfs.headless.keytab
# if resource.storage.type is HDFS,and your Hadoop Cluster NameNode has HA enabled, you need to put core-site.xml and hdfs-site.xml in the installPath/conf directory. In this example, it is placed under /opt/soft/dolphinscheduler/conf, and configure the namenode cluster name; if the NameNode is not HA, modify it to a specific IP or host name.
# if resource.storage.type is S3,write S3 address,HA,for example :s3a://dolphinscheduler,
# Note,s3 be sure to create the root directory /dolphinscheduler
fs.defaultFS=hdfs://mycluster:8020
#resourcemanager ha note this need ips , this empty if single
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
yarn.application.status.address=http://xxxx:8088/ws/v1/cluster/apps/%s
### 文件管理
>
> 是对各种资源文件的管理,包括创建基本的txt/log/sh/conf/py/java等文件、上传jar包等各种类型文件,可进行编辑、重命名、下载、删除等操作。
>
>
>

* 创建文件
>
> 文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties
>
>
>

* 上传文件
>
> 上传文件:点击"上传文件"按钮进行上传,将文件拖拽到上传区域,文件名会自动以上传的文件名称补全
>
>
>

* 文件查看
>
> 对可查看的文件类型,点击文件名称,可查看文件详情
>
>
>
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rUFXelKK-1683205976507)(null)]
* 下载文件
>
> 点击文件列表的"下载"按钮下载文件或者在文件详情中点击右上角"下载"按钮下载文件
>
>
>
* 文件重命名

* 删除
>
> 文件列表->点击"删除"按钮,删除指定文件
>
>
>
### UDF管理
#### 资源管理
>
> 资源管理和文件管理功能类似,不同之处是资源管理是上传的UDF函数,文件管理上传的是用户程序,脚本及配置文件 操作功能:重命名、下载、删除。
>
>
>
* 上传udf资源
>
> 和上传文件相同。
>
>
>
#### 函数管理
* 创建udf函数
>
> 点击“创建UDF函数”,输入udf函数参数,选择udf资源,点击“提交”,创建udf函数。 目前只支持HIVE的临时UDF函数
>
>
>
* UDF函数名称:输入UDF函数时的名称
* 包名类名:输入UDF函数的全路径
* UDF资源:设置创建的UDF对应的资源文件

## 监控中心
### 服务管理
* 服务管理主要是对系统中的各个服务的健康状况和基本信息的监控和显示
#### master监控
* 主要是master的相关信息。

#### worker监控
* 主要是worker的相关信息。

#### Zookeeper监控
* 主要是zookpeeper中各个worker和master的相关配置信息。

#### DB监控
* 主要是DB的健康状况

### 统计管理

* 待执行命令数:统计t\_ds\_command表的数据
* 执行失败的命令数:统计t\_ds\_error\_command表的数据
* 待运行任务数:统计Zookeeper中task\_queue的数据
* 待杀死任务数:统计Zookeeper中task\_kill的数据
## 安全中心(权限系统)
* 安全中心只有管理员账户才有权限操作,分别有队列管理、租户管理、用户管理、告警组管理、worker分组管理、令牌管理等功能,在用户管理模块可以对资源、数据源、项目等授权
* 管理员登录,默认用户名密码:admin/dolphinscheduler123
### 创建队列
* 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。
* 管理员进入安全中心->队列管理页面,点击“创建队列”按钮,创建队列。

### 添加租户
* 租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,则会导致任务运行失败。你可以通过修改 `worker.properties` 配置文件中参数 `worker.tenant.auto.create=true` 实现当 linux 用户不存在时自动创建该用户。`worker.tenant.auto.create=true` 参数会要求 worker 可以免密运行 `sudo` 命令。
* 租户编码:**租户编码是Linux上的用户,唯一,不能重复**
* 管理员进入安全中心->租户管理页面,点击“创建租户”按钮,创建租户。

### 创建普通用户
* 用户分为**管理员用户**和**普通用户**
+ 管理员有授权和用户管理等权限,没有创建项目和工作流定义的操作的权限。
+ 普通用户可以创建项目和对工作流定义的创建,编辑,执行等操作。
+ 注意:如果该用户切换了租户,则该用户所在租户下所有资源将复制到切换的新租户下。
* 进入安全中心->用户管理页面,点击“创建用户”按钮,创建用户。

#### 编辑用户信息
* 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息。
* 普通用户登录后,点击用户名下拉框中的用户信息,进入用户信息页面,点击"编辑"按钮,编辑用户信息。
#### 修改用户密码
* 管理员进入安全中心->用户管理页面,点击"编辑"按钮,编辑用户信息时,输入新密码修改用户密码。
* 普通用户登录后,点击用户名下拉框中的用户信息,进入修改密码页面,输入密码并确认密码后点击"编辑"按钮,则修改密码成功。
### 创建告警组
* 告警组是在启动时设置的参数,在流程结束以后会将流程的状态和其他信息以邮件形式发送给告警组。
* 管理员进入安全中心->告警组管理页面,点击“创建告警组”按钮,创建告警组。

### 令牌管理
>
> 由于后端接口有登录检查,令牌管理提供了一种可以通过调用接口的方式对系统进行各种操作。
>
>
>
* 管理员进入安全中心->令牌管理页面,点击“创建令牌”按钮,选择失效时间与用户,点击"生成令牌"按钮,点击"提交"按钮,则选择用户的token创建成功。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQj8OzXt-1683205975817)(null)]
* 普通用户登录后,点击用户名下拉框中的用户信息,进入令牌管理页面,选择失效时间,点击"生成令牌"按钮,点击"提交"按钮,则该用户创建token成功。
* 调用示例:
/\*\*
* test token
*/
public void doPOSTParam()throws Exception{
// create HttpClient
CloseableHttpClient httpclient = HttpClients.createDefault();
// create http post request
HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create");
httpPost.setHeader("token", "123");
// set parameters
List<NameValuePair> parameters = new ArrayList<NameValuePair>();
parameters.add(new BasicNameValuePair("projectName", "qzw"));
parameters.add(new BasicNameValuePair("desc", "qzw"));
UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
httpPost.setEntity(formEntity);
CloseableHttpResponse response = null;
try {
// execute
response = httpclient.execute(httpPost);
// response status code 200
if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
System.out.println(content);
}
} finally {
if (response != null) {
response.close();
}
httpclient.close();
}
}
### 授予权限
* 授予权限包括项目权限,资源权限,数据源权限,UDF函数权限。
* 管理员可以对普通用户进行非其创建的项目、资源、数据源和UDF函数进行授权。因为项目、资源、数据源和UDF函数授权方式都是一样的,所以以项目授权为例介绍。
* 注意:对于用户自己创建的项目,该用户拥有所有的权限。则项目列表和已选项目列表中不会显示。
* 管理员进入安全中心->用户管理页面,点击需授权用户的“授权”按钮,如下图所示:

* 选择项目,进行项目授权。

* 资源、数据源、UDF函数授权同项目授权。
### Worker分组
每个worker节点都会归属于自己的Worker分组,默认分组为default.
在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务.
>
> 新增/更新 worker分组
>
>
>
* 打开要设置分组的worker节点上的"conf/worker.properties"配置文件. 修改worker.groups参数.
* worker.groups参数后面对应的为该worker节点对应的分组名称,默认为default.
* 如果该worker节点对应多个分组,则以逗号隔开.
示例:
worker.groups=default,test
* 也可以在运行中修改worker所属的worker分组,如果修改成功,worker就会使用这个新建的分组,忽略`worker.properties`中的配置。修改步骤为"安全中心 -> worker分组管理 -> 点击 ‘新建worker分组’ -> 输入’组名称’ -> 选择已有worker -> 点击’提交’"
### 环境管理
* 在线配置Worker运行环境,一个Worker可以指定多个环境,每个环境等价于dolphinscheduler\_env.sh文件.
* 默认环境为dolphinscheduler\_env.sh文件.
* 在任务执行时,可以将任务分配给指定worker分组,根据worker分组选择对应的环境,最终由该组中的worker节点执行环境后执行该任务.
>
> 创建/更新 环境
>
>
>
* 环境配置等价于dolphinscheduler\_env.sh文件内配置

>
> 使用 环境
>
>
>
* 在工作流定义中创建任务节点选择Worker分组和Worker分组对应的环境,任务执行时Worker会先执行环境在执行任务.

## API 调用
### 背景
一般都是通过页面来创建项目、流程等,但是与第三方系统集成就需要通过调用 API 来管理项目、流程
### 操作步骤
#### 创建 token
1. 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌

1. 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”

#### 使用 Token
1. 打开 API文档页面
>
> 地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh\_CN&lang=cn
>
>
>

1. 选一个测试的接口,本次测试选取的接口是:查询所有项目
>
> projects/query-project-list
>
>
>
2. 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果
token:刚刚生成的Token

#### 创建项目
这里以创建名为 “wudl-flink-test” 的项目为例



返回 msg 信息为 “success”,说明我们已经成功通过 API 的方式创建了项目。
如果您对创建项目的源码感兴趣,欢迎继续阅读下面内容
#### 附:创建项目源码


## Flink调用
### 调用 flink 操作步骤
#### 创建队列
1. 登录调度系统,点击 “安全中心”,再点击左侧的 “队列管理”,点击 “队列管理” 创建队列
2. 填写队列名称和队列值,然后点击 “提交”
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QOXUFbby-1683205974909)(null)]
#### 创建租户
1.租户对应的是 linux 用户, 用户 worker 提交作业所使用的的用户, 如果 linux 没有这个用户, worker 会在执行脚本的时候创建这个用户
2.租户和租户编码都是唯一不能重复,好比一个人有名字有身份证号。
3.创建完租户会在 hdfs 对应的目录上有相关的文件夹。

#### 创建用户

#### 创建 Token
1. 登录调度系统,点击 “安全中心”,再点击左侧的 “令牌管理”,点击 “令牌管理” 创建令牌

1. 选择 “失效时间” (Token有效期),选择 “用户” (以指定的用户执行接口操作),点击 “生成令牌” ,拷贝 Token 字符串,然后点击 “提交”

#### 使用 Token
1. 打开 API文档页面
>
> 地址:http://{api server ip}:12345/dolphinscheduler/doc.html?language=zh\_CN&lang=cn
>
>
>

1. 选一个测试的接口,本次测试选取的接口是:查询所有项目
>
> projects/query-project-list
>
>
>
2. 打开 Postman,填写接口地址,并在 Headers 中填写 Token,发送请求后即可查看结果
token: 刚刚生成的 Token

#### 用户授权

#### 用户登录
http://192.168.1.163:12345/dolphinscheduler/ui/#/monitor/servers/master

#### 资源上传

#### 创建工作流




#### 查看执行结果

#### 查看日志结果

## (二)高级指南
### 系统架构设计
本章节介绍Apache DolphinScheduler调度系统架构
#### 1.系统架构
##### 1.1 系统架构图

*系统架构图*
##### 1.2 启动流程活动图

*启动流程活动图*
##### 1.3 架构说明
* **MasterServer**
MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。
###### 该服务内主要包含:
+ **Distributed Quartz**分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作
+ **MasterSchedulerService**是一个扫描线程,定时扫描数据库中的 **command** 表,生成工作流实例,根据不同的**命令类型**进行不同的业务操作
+ **WorkflowExecuteThread**主要是负责DAG任务切分、任务提交、各种不同命令类型的逻辑处理,处理任务状态和工作流状态事件
+ **EventExecuteService**处理master负责的工作流实例所有的状态变化事件,使用线程池处理工作流的状态事件
+ **StateWheelExecuteThread**处理依赖任务和超时任务的定时状态更新
* **WorkerServer**
WorkerServer也采用分布式无中心设计理念,支持自定义任务插件,主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。
###### 该服务包含:
+ **WorkerManagerThread**主要通过netty领取master发送过来的任务,并根据不同任务类型调用**TaskExecuteThread**对应执行器。
+ **RetryReportTaskStatusThread**主要通过netty向master汇报任务状态,如果汇报失败,会一直重试汇报
+ **LoggerServer**是一个日志服务,提供日志分片查看、刷新和下载等功能
* **Registry**
注册中心,使用插件化实现,默认支持Zookeeper, 系统中的MasterServer和WorkerServer节点通过注册中心来进行集群管理和容错。另外系统还基于注册中心进行事件监听和分布式锁。
* **Alert**
提供告警相关功能,仅支持单机服务。支持自定义告警插件。
* **API**
API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。
* **UI**
系统的前端页面,提供系统的各种可视化操作界面,详见[系统使用手册]( )部分。
##### 1.4 架构设计思想
###### 一、去中心化vs中心化
###### 中心化思想
中心化的设计理念比较简单,分布式集群中的节点按照角色分工,大体上分为两种角色:

* Master的角色主要负责任务分发并监督Slave的健康状态,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。
* Worker的角色主要负责任务的执行工作并维护和Master的心跳,以便Master可以分配任务给Slave。
中心化思想设计存在的问题:
* 一旦Master出现了问题,则群龙无首,整个集群就会崩溃。为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性。
* 另外一个问题是如果Scheduler在Master上,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,则一个DAG中所有的任务都只能在某一台机器上进行作业提交,则并行任务比较多的时候,Slave的压力可能会比较大。
###### 去中心化

* 在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,地位是平等的,全球互联网就是一个典型的去中心化的分布式系统,联网的任意节点设备down机,都只会影响很小范围的功能。
* 去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的”管理者”,因此不存在单点故障问题。但由于不存在” 管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度。
* 实际上,真正去中心化的分布式系统并不多见。反而动态中心化分布式系统正在不断涌出。在这种架构下,集群中的管理者是被动态选择出来的,而不是预置的,并且集群在发生故障的时候,集群的节点会自发的举行"会议"来选举新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go语言实现的Etcd。
* DolphinScheduler的去中心化是Master/Worker注册到Zookeeper中,实现Master集群和Worker集群无中心,使用分片机制,公平分配工作流在master上执行,并通过不同的发送策略将任务发送给worker执行具体的任务
###### 二、Master执行流程
1. DolphinScheduler使用分片算法将command取模,根据master的排序id分配,master将拿到的command转换成工作流实例,使用线程池处理工作流实例
2. DolphinScheduler对工作流的处理流程:
* 通过UI或者API调用,启动工作流,持久化一条command到数据库中
* Master通过分片算法,扫描Command表,生成工作流实例ProcessInstance,同时删除Command数据
* Master使用线程池运行WorkflowExecuteThread,执行工作流实例的流程,包括构建DAG,创建任务实例TaskInstance,将TaskInstance通过netty发送给worker
* Worker收到任务以后,修改任务状态,并将执行信息返回Master
* Master收到任务信息,持久化到数据库,并且将状态变化事件存入EventExecuteService事件队列
* EventExecuteService根据事件队列调用WorkflowExecuteThread进行后续任务的提交和工作流状态的修改
###### 三、容错设计
容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况
###### 1. 宕机容错
服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

其中Master监控其他Master和Worker的目录,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错。
* Master容错流程图:

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到”正在运行”和“提交成功”的任务,对”正在运行”的任务监控其任务实例的状态,对”提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
* Worker容错流程图:

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交。
注意:由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件。对于这种情况,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接,则直接将Master或Worker服务停掉。
###### 2.任务失败重试
这里首先要区分任务失败重试、流程失败恢复、流程失败重跑的概念:
* 任务失败重试是任务级别的,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次
* 流程失败恢复是流程级别的,是手动进行的,恢复是从只能**从失败的节点开始执行**或**从当前节点开始执行**
* 流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行
接下来说正题,我们将工作流中的任务节点分了两种类型。
* 一种是业务节点,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点,MR节点、Spark节点、依赖节点等。
* 还有一种是逻辑节点,这种节点不做实际的脚本或语句处理,只是整个流程流转的逻辑处理,比如子流程节等。
所有任务都可以配置失败重试的次数,当该任务节点失败,会自动重试,直到成功或者超过配置的重试次数。
如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作
###### 四、任务优先级设计
在早期调度设计中,如果没有优先级设计,采用公平调度设计的话,会遇到先行提交的任务可能会和后继提交的任务同时完成的情况,而不能做到设置流程或者任务的优先级,因此我们对此进行了重新设计,目前我们设计如下:
* 按照
不同流程实例优先级
优先于
同一个流程实例优先级
优先于
同一流程内任务优先级
优先于
同一流程内任务
提交顺序依次从高到低进行任务处理。
+ 具体实现是根据任务实例的json解析优先级,然后把**流程实例优先级\_流程实例id\_任务优先级\_任务id**信息保存在ZooKeeper任务队列中,当从任务队列获取的时候,通过字符串比较即可得出最需要优先执行的任务
- 其中流程定义的优先级是考虑到有些流程需要先于其他流程进行处理,这个可以在流程启动或者定时启动时配置,共有5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图

- 任务的优先级也分为5级,依次为HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下图

###### 五、Logback和netty实现日志访问
* 由于Web(UI)和Worker不一定在同一台机器上,所以查看日志不能像查询本地文件那样。有两种方案:
* 将日志放到ES搜索引擎上
* 通过netty通信获取远程日志信息
* 介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了gRPC实现远程访问日志信息。

* 我们使用自定义Logback的FileAppender和Filter功能,实现每个任务实例生成一个日志文件。
* FileAppender主要实现如下:
/**
* task log appender
*/
public class TaskLogAppender extends FileAppender {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId\_processInstanceId\_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId\_processInstanceId\_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
以/流程定义id/流程实例id/任务实例id.log的形式生成日志
* 过滤匹配以TaskLogInfo开始的线程名称:
* TaskLogFilter实现如下:
/**
* task log filter
*/
public class TaskLogFilter extends Filter {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith(“TaskLogInfo-”)){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
### Dolphin Scheduler 2.0元数据文档
#### 表概览
| 表名 | 表信息 |
| --- | --- |
| t\_ds\_access\_token | 访问ds后端的token |
| t\_ds\_alert | 告警信息 |
| t\_ds\_alertgroup | 告警组 |
| t\_ds\_command | 执行命令 |
| t\_ds\_datasource | 数据源 |
| t\_ds\_error\_command | 错误命令 |
| t\_ds\_process\_definition | 流程定义 |
| t\_ds\_process\_instance | 流程实例 |
| t\_ds\_project | 项目 |
| t\_ds\_queue | 队列 |
| t\_ds\_relation\_datasource\_user | 用户关联数据源 |
| t\_ds\_relation\_process\_instance | 子流程 |
| t\_ds\_relation\_project\_user | 用户关联项目 |
| t\_ds\_relation\_resources\_user | 用户关联资源 |
| t\_ds\_relation\_udfs\_user | 用户关联UDF函数 |
| t\_ds\_relation\_user\_alertgroup | 用户关联告警组 |
| t\_ds\_resources | 资源文件 |
| t\_ds\_schedules | 流程定时调度 |
| t\_ds\_session | 用户登录的session |
| t\_ds\_task\_instance | 任务实例 |
| t\_ds\_tenant | 租户 |
| t\_ds\_udfs | UDF资源 |
| t\_ds\_user | 用户 |
| t\_ds\_version | ds版本信息 |
#### 用户 队列 数据源

* 一个租户下可以有多个用户
* t\_ds\_user中的queue字段存储的是队列表中的queue\_name信息,t\_ds\_tenant下存的是queue\_id,在流程定义执行过程中,用户队列优先级最高,用户队列为空则采用租户队列
* t\_ds\_datasource表中的user\_id字段表示创建该数据源的用户,t\_ds\_relation\_datasource\_user中的user\_id表示,对数据源有权限的用户
#### 项目 资源 告警

* 一个用户可以有多个项目,用户项目授权通过t\_ds\_relation\_project\_user表完成project\_id和user\_id的关系绑定
* t\_ds\_projcet表中的user\_id表示创建该项目的用户,t\_ds\_relation\_project\_user表中的user\_id表示对项目有权限的用户
* t\_ds\_resources表中的user\_id表示创建该资源的用户,t\_ds\_relation\_resources\_user中的user\_id表示对资源有权限的用户
* t\_ds\_udfs表中的user\_id表示创建该UDF的用户,t\_ds\_relation\_udfs\_user表中的user\_id表示对UDF有权限的用户
#### 命令 流程 任务


* 一个项目有多个流程定义,一个流程定义可以生成多个流程实例,一个流程实例可以生成多个任务实例
* t\_ds\_schedulers表存放流程定义的定时调度信息
* t\_ds\_relation\_process\_instance表存放的数据用于处理流程定义中含有子流程的情况,parent\_process\_instance\_id表示含有子流程的主流程实例id,process\_instance\_id表示子流程实例的id,parent\_task\_instance\_id表示子流程节点的任务实例id,流程实例表和任务实例表分别对应t\_ds\_process\_instance表和t\_ds\_task\_instance表
#### 核心表Schema
##### t\_ds\_process\_definition
| 字段 | 类型 | 注释 |
| --- | --- | --- |
| id | int | 主键 |
| name | varchar | 流程定义名称 |
| version | int | 流程定义版本 |
| release\_state | tinyint | 流程定义的发布状态:0 未上线 1已上线 |
| project\_id | int | 项目id |
| user\_id | int | 流程定义所属用户id |
| process\_definition\_json | longtext | 流程定义json串 |
| description | text | 流程定义描述 |
| global\_params | text | 全局参数 |
| flag | tinyint | 流程是否可用:0 不可用,1 可用 |
| locations | text | 节点坐标信息 |
| connects | text | 节点连线信息 |
| receivers | text | 收件人 |
| receivers\_cc | text | 抄送人 |
| create\_time | datetime | 创建时间 |
| timeout | int | 超时时间 |
| tenant\_id | int | 租户id |
| update\_time | datetime | 更新时间 |
| modify\_by | varchar | 修改用户 |
| resource\_ids | varchar | 资源id集 |
##### t\_ds\_process\_instance
| 字段 | 类型 | 注释 |
| --- | --- | --- |
| id | int | 主键 |
| name | varchar | 流程实例名称 |
| process\_definition\_id | int | 流程定义id |
| state | tinyint | 流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 |
| recovery | tinyint | 流程实例容错标识:0 正常,1 需要被容错重启 |
| start\_time | datetime | 流程实例开始时间 |
| end\_time | datetime | 流程实例结束时间 |
| run\_times | int | 流程实例运行次数 |
| host | varchar | 流程实例所在的机器 |
| command\_type | tinyint | 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 |
| command\_param | text | 命令的参数(json格式) |
| task\_depend\_type | tinyint | 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 |
| max\_try\_times | tinyint | 最大重试次数 |
| failure\_strategy | tinyint | 失败策略 0 失败后结束,1 失败后继续 |
| warning\_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
| warning\_group\_id | int | 告警组id |
| schedule\_time | datetime | 预期运行时间 |
| command\_start\_time | datetime | 开始命令时间 |
| global\_params | text | 全局参数(固化流程定义的参数) |
| process\_instance\_json | longtext | 流程实例json(copy的流程定义的json) |
| flag | tinyint | 是否可用,1 可用,0不可用 |
| update\_time | timestamp | 更新时间 |
| is\_sub\_process | int | 是否是子工作流 1 是,0 不是 |
| executor\_id | int | 命令执行用户 |
| locations | text | 节点坐标信息 |
| connects | text | 节点连线信息 |
| history\_cmd | text | 历史命令,记录所有对流程实例的操作 |
| dependence\_schedule\_times | text | 依赖节点的预估时间 |
| process\_instance\_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
| worker\_group | varchar | 任务指定运行的worker分组 |
| timeout | int | 超时时间 |
| tenant\_id | int | 租户id |
##### t\_ds\_task\_instance
| 字段 | 类型 | 注释 |
| --- | --- | --- |
| id | int | 主键 |
| name | varchar | 任务名称 |
| task\_type | varchar | 任务类型 |
| process\_definition\_id | int | 流程定义id |
| process\_instance\_id | int | 流程实例id |
| task\_json | longtext | 任务节点json |
| state | tinyint | 任务实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成 |
| submit\_time | datetime | 任务提交时间 |
| start\_time | datetime | 任务开始时间 |
| end\_time | datetime | 任务结束时间 |
| host | varchar | 执行任务的机器 |
| execute\_path | varchar | 任务执行路径 |
| log\_path | varchar | 任务日志路径 |
| alert\_flag | tinyint | 是否告警 |
| retry\_times | int | 重试次数 |
| pid | int | 进程pid |
| app\_link | varchar | yarn app id |
| flag | tinyint | 是否可用:0 不可用,1 可用 |
| retry\_interval | int | 重试间隔 |
| max\_retry\_times | int | 最大重试次数 |
| task\_instance\_priority | int | 任务实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
| worker\_group | varchar | 任务指定运行的worker分组 |
##### t\_ds\_schedules
| 字段 | 类型 | 注释 |
| --- | --- | --- |
| id | int | 主键 |
| process\_definition\_id | int | 流程定义id |
| start\_time | datetime | 调度开始时间 |
| end\_time | datetime | 调度结束时间 |
| crontab | varchar | crontab 表达式 |
| failure\_strategy | tinyint | 失败策略: 0 结束,1 继续 |
| user\_id | int | 用户id |
| release\_state | tinyint | 状态:0 未上线,1 上线 |
| warning\_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
| warning\_group\_id | int | 告警组id |
| process\_instance\_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
| worker\_group | varchar | 任务指定运行的worker分组 |
| create\_time | datetime | 创建时间 |
| update\_time | datetime | 更新时间 |
##### t\_ds\_command
| 字段 | 类型 | 注释 |
| --- | --- | --- |
| id | int | 主键 |
| command\_type | tinyint | 命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程 |
| process\_definition\_id | int | 流程定义id |
| command\_param | text | 命令的参数(json格式) |
| task\_depend\_type | tinyint | 节点依赖类型:0 当前节点,1 向前执行,2 向后执行 |
| failure\_strategy | tinyint | 失败策略:0结束,1继续 |
| warning\_type | tinyint | 告警类型:0 不发,1 流程成功发,2 流程失败发,3 成功失败都发 |
| warning\_group\_id | int | 告警组 |
| schedule\_time | datetime | 预期运行时间 |
| start\_time | datetime | 开始时间 |
| executor\_id | int | 执行用户id |
| dependence | varchar | 依赖字段 |
| update\_time | datetime | 更新时间 |
| process\_instance\_priority | int | 流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest |
| worker\_group | varchar | 任务指定运行的worker分组 |
### 配置文件
#### 前言
本文档为dolphinscheduler配置文件说明文档,针对版本为 dolphinscheduler-1.3.x 版本.
#### 目录结构
目前dolphinscheduler 所有的配置文件都在 [conf ] 目录中. 为了更直观的了解[conf]目录所在的位置以及包含的配置文件,请查看下面dolphinscheduler安装目录的简化说明. 本文主要讲述dolphinscheduler的配置文件.其他部分先不做赘述.
[注:以下 dolphinscheduler 简称为DS.]
├─bin DS命令存放目录
│ ├─dolphinscheduler-daemon.sh 启动/关闭DS服务脚本
│ ├─start-all.sh 根据配置文件启动所有DS服务
│ ├─stop-all.sh 根据配置文件关闭所有DS服务
├─conf 配置文件目录
│ ├─application-api.properties api服务配置文件
│ ├─datasource.properties 数据库配置文件
│ ├─zookeeper.properties zookeeper配置文件
│ ├─master.properties master服务配置文件
│ ├─worker.properties worker服务配置文件
│ ├─quartz.properties quartz服务配置文件
│ ├─common.properties 公共服务[存储]配置文件
│ ├─alert.properties alert服务配置文件
│ ├─config 环境变量配置文件夹
│ ├─install_config.conf DS环境变量配置脚本[用于DS安装/启动]
│ ├─env 运行脚本环境变量配置目录
│ ├─dolphinscheduler_env.sh 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME …]
│ ├─org mybatis mapper文件目录
│ ├─i18n i18n配置文件目录
│ ├─logback-api.xml api服务日志配置文件
│ ├─logback-master.xml master服务日志配置文件
│ ├─logback-worker.xml worker服务日志配置文件
│ ├─logback-alert.xml alert服务日志配置文件
├─sql DS的元数据创建升级sql文件
│ ├─create 创建SQL脚本目录
│ ├─upgrade 升级SQL脚本目录
│ ├─dolphinscheduler_postgre.sql postgre数据库初始化脚本
│ ├─dolphinscheduler_mysql.sql mysql数据库初始化脚本
│ ├─soft_version 当前DS版本标识文件
├─script DS服务部署,数据库创建/升级脚本目录
│ ├─create-dolphinscheduler.sh DS数据库初始化脚本
│ ├─upgrade-dolphinscheduler.sh DS数据库升级脚本
│ ├─monitor-server.sh DS服务监控启动脚本
│ ├─scp-hosts.sh 安装文件传输脚本
│ ├─remove-zk-node.sh 清理zookeeper缓存文件脚本
├─ui 前端WEB资源目录
├─lib DS依赖的jar存放目录
├─install.sh 自动安装DS服务脚本
#### 配置文件详解
| 序号 | 服务分类 | 配置文件 |
| --- | --- | --- |
| 1 | 启动/关闭DS服务脚本 | [dolphinscheduler-daemon.sh]( ) |
| 2 | 数据库连接配置 | datasource.properties |
| 3 | zookeeper连接配置 | zookeeper.properties |
| 4 | 公共[存储]配置 | common.properties |
| 5 | API服务配置 | application-api.properties |
| 6 | Master服务配置 | master.properties |
| 7 | Worker服务配置 | worker.properties |
| 8 | Alert 服务配置 | alert.properties |
| 9 | Quartz配置 | quartz.properties |
| 10 | DS环境变量配置脚本[用于DS安装/启动] | install\_config.conf |
| 11 | 运行脚本加载环境变量配置文件 [如: JAVA\_HOME,HADOOP\_HOME, HIVE\_HOME …] | dolphinscheduler\_env.sh |
| 12 | 各服务日志配置文件 | api服务日志配置文件 : logback-api.xml master服务日志配置文件 : logback-master.xml worker服务日志配置文件 : logback-worker.xml alert服务日志配置文件 : logback-alert.xml |
##### [1.dolphinscheduler-daemon.sh]( ) [启动/关闭DS服务脚本]
dolphinscheduler-daemon.sh脚本负责DS的启动&关闭. [start-all.sh/stop-all.sh最终也是通过dolphinscheduler-daemon.sh对集群进行启动/关闭操作]( ). 目前DS只是做了一个基本的设置,JVM参数请根据各自资源的实际情况自行设置.
默认简化参数如下:
export DOLPHINSCHEDULER_OPTS="
-server
-Xmx16g
-Xms1g
-Xss512k
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:+UseFastAccessorMethods
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSInitiatingOccupancyFraction=70
"
>
> 不建议设置"-XX:DisableExplicitGC" , DS使用Netty进行通讯,设置该参数,可能会导致内存泄漏.
>
>
>
##### 2.datasource.properties [数据库连接]
在DS中使用Druid对数据库连接进行管理,默认简化配置如下.
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| spring.datasource.driver-class-name | | 数据库驱动 |
| spring.datasource.url | | 数据库连接地址 |
| spring.datasource.username | | 数据库用户名 |
| spring.datasource.password | | 数据库密码 |
| spring.datasource.initialSize | 5 | 初始连接池数量 |
| spring.datasource.minIdle | 5 | 最小连接池数量 |
| spring.datasource.maxActive | 5 | 最大连接池数量 |
| spring.datasource.maxWait | 60000 | 最大等待时长 |
| spring.datasource.timeBetweenEvictionRunsMillis | 60000 | 连接检测周期 |
| spring.datasource.timeBetweenConnectErrorMillis | 60000 | 重试间隔 |
| spring.datasource.minEvictableIdleTimeMillis | 300000 | 连接保持空闲而不被驱逐的最小时间 |
| spring.datasource.validationQuery | SELECT 1 | 检测连接是否有效的sql |
| spring.datasource.validationQueryTimeout | 3 | 检测连接是否有效的超时时间[seconds] |
| spring.datasource.testWhileIdle | true | 申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 |
| spring.datasource.testOnBorrow | true | 申请连接时执行validationQuery检测连接是否有效 |
| spring.datasource.testOnReturn | false | 归还连接时执行validationQuery检测连接是否有效 |
| spring.datasource.defaultAutoCommit | true | 是否开启自动提交 |
| spring.datasource.keepAlive | true | 连接池中的minIdle数量以内的连接,空闲时间超过minEvictableIdleTimeMillis,则会执行keepAlive操作。 |
| spring.datasource.poolPreparedStatements | true | 开启PSCache |
| spring.datasource.maxPoolPreparedStatementPerConnectionSize | 20 | 要启用PSCache,必须配置大于0,当大于0时,poolPreparedStatements自动触发修改为true。 |
##### 3.zookeeper.properties [zookeeper连接配置]
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| zookeeper.quorum | localhost:2181 | zk集群连接信息 |
| zookeeper.dolphinscheduler.root | /dolphinscheduler | DS在zookeeper存储根目录 |
| zookeeper.session.timeout | 60000 | session 超时 |
| zookeeper.connection.timeout | 30000 | 连接超时 |
| zookeeper.retry.base.sleep | 100 | 基本重试时间差 |
| zookeeper.retry.max.sleep | 30000 | 最大重试时间 |
| zookeeper.retry.maxtime | 10 | 最大重试次数 |
##### 4.common.properties [hadoop、s3、yarn配置]
common.properties配置文件目前主要是配置hadoop/s3a相关的配置.
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件 |
| resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,NONE |
| resource.upload.path | /dolphinscheduler | 资源文件存储路径 |
| hadoop.security.authentication.startup.state | false | hadoop是否开启kerberos权限 |
| java.security.krb5.conf.path | /opt/krb5.conf | kerberos配置目录 |
| login.user.keytab.username | [hdfs-mycluster@ESZ.COM](mailto:hdfs-mycluster@ESZ.COM) | kerberos登录用户 |
| login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos登录用户keytab |
| kerberos.expire.time | 2 | kerberos过期时间,整数,单位为小时 |
| resource.view.suffixs | txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties | 资源中心支持的文件格式 |
| hdfs.root.user | hdfs | 如果存储类型为HDFS,需要配置拥有对应操作权限的用户 |
| fs.defaultFS | hdfs://mycluster:8020 | 请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录 |
| fs.s3a.endpoint | | s3 endpoint地址 |
| fs.s3a.access.key | | s3 access key |
| fs.s3a.secret.key | | s3 secret key |
| yarn.resourcemanager.ha.rm.ids | | yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可 |
| yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | 如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname |
| dolphinscheduler.env.path | env/dolphinscheduler\_env.sh | 运行脚本加载环境变量配置文件[如: JAVA\_HOME,HADOOP\_HOME, HIVE\_HOME …] |
| development.state | false | 是否处于开发模式 |
##### 5.application-api.properties [API服务配置]
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| server.port | 12345 | api服务通讯端口 |
| server.servlet.session.timeout | 7200 | session超时时间 |
| server.servlet.context-path | /dolphinscheduler | 请求路径 |
| spring.servlet.multipart.max-file-size | 1024MB | 最大上传文件大小 |
| spring.servlet.multipart.max-request-size | 1024MB | 最大请求大小 |
| server.jetty.max-http-post-size | 5000000 | jetty服务最大发送请求大小 |
| spring.messages.encoding | UTF-8 | 请求编码 |
| spring.jackson.time-zone | GMT+8 | 设置时区 |
| spring.messages.basename | i18n/messages | i18n配置 |
| security.authentication.type | PASSWORD | 权限校验类型 |
##### 6.master.properties [Master服务配置]
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| master.listen.port | 5678 | master监听端口 |
| master.exec.threads | 100 | master工作线程数量,用于限制并行的流程实例数量 |
| master.exec.task.num | 20 | master每个流程实例的并行任务数量 |
| master.dispatch.task.num | 3 | master每个批次的派发任务数量 |
| master.host.selector | LowerWeight | master host选择器,用于选择合适的worker执行任务,可选值: Random, RoundRobin, LowerWeight |
| master.heartbeat.interval | 10 | master心跳间隔,单位为秒 |
| master.task.commit.retryTimes | 5 | 任务重试次数 |
| master.task.commit.interval | 1000 | 任务提交间隔,单位为毫秒 |
| master.max.cpuload.avg | -1 | master最大cpuload均值,只有高于系统cpuload均值时,master服务才能调度任务. 默认值为-1: cpu cores \* 2 |
| master.reserved.memory | 0.3 | master预留内存,只有低于系统可用内存时,master服务才能调度任务,单位为G |
##### 7.worker.properties [Worker服务配置]
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| worker.listen.port | 1234 | worker监听端口 |
| worker.exec.threads | 100 | worker工作线程数量,用于限制并行的任务实例数量 |
| worker.heartbeat.interval | 10 | worker心跳间隔,单位为秒 |
| worker.max.cpuload.avg | -1 | worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores \* 2 |
| worker.reserved.memory | 0.3 | worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G |
| worker.groups | default | worker分组配置,逗号分隔,例如’worker.groups=default,test’ worker启动时会根据该配置自动加入对应的分组 |
##### 8.alert.properties [Alert 告警服务配置]
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| alert.type | EMAIL | 告警类型 |
| mail.protocol | SMTP | 邮件服务器协议 |
| mail.server.host | [xxx.xxx.com]( ) | 邮件服务器地址 |
| mail.server.port | 25 | 邮件服务器端口 |
| mail.sender | [xxx@xxx.com](mailto:xxx@xxx.com) | 发送人邮箱 |
| mail.user | [xxx@xxx.com](mailto:xxx@xxx.com) | 发送人邮箱名称 |
| mail.passwd | 111111 | 发送人邮箱密码 |
| mail.smtp.starttls.enable | true | 邮箱是否开启tls |
| mail.smtp.ssl.enable | false | 邮箱是否开启ssl |
| mail.smtp.ssl.trust | [xxx.xxx.com]( ) | 邮箱ssl白名单 |
| xls.file.path | /tmp/xls | 邮箱附件临时工作目录 |
| | 以下为企业微信配置[选填] | |
| enterprise.wechat.enable | false | 企业微信是否启用 |
| [enterprise.wechat.corp.id]( ) | xxxxxxx | |
| enterprise.wechat.secret | xxxxxxx | |
| [enterprise.wechat.agent.id]( ) | xxxxxxx | |
| enterprise.wechat.users | xxxxxxx | |
| enterprise.wechat.token.url | https://qyapi.weixin.qq.com/cgi-bin/gettoken? corpid=corpId&corpsecret=secret | |
| enterprise.wechat.push.url | https://qyapi.weixin.qq.com/cgi-bin/message/send? access\_token=$token | |
| enterprise.wechat.user.send.msg | | 发送消息格式 |
| enterprise.wechat.team.send.msg | | 群发消息格式 |
| plugin.dir | /Users/xx/your/path/to/plugin/dir | 插件目录 |
##### 9.quartz.properties [Quartz配置]
这里面主要是quartz配置,请结合实际业务场景&资源进行配置,本文暂时不做展开.
| 参数 | 默认值 | 描述 |
| --- | --- | --- |
| org.quartz.jobStore.driverDelegateClass | org.quartz.impl.jdbcjobstore.StdJDBCDelegate | |
| org.quartz.jobStore.driverDelegateClass | org.quartz.impl.jdbcjobstore.PostgreSQLDelegate | |
| org.quartz.scheduler.instanceName | DolphinScheduler | |
| org.quartz.scheduler.instanceId | AUTO | |
| org.quartz.scheduler.makeSchedulerThreadDaemon | true | |
| org.quartz.jobStore.useProperties | false | |
| org.quartz.threadPool.class | org.quartz.simpl.SimpleThreadPool | |
| org.quartz.threadPool.makeThreadsDaemons | true | |
| org.quartz.threadPool.threadCount | 25 | |
| org.quartz.threadPool.threadPriority | 5 | |
| org.quartz.jobStore.class | org.quartz.impl.jdbcjobstore.JobStoreTX | |
| org.quartz.jobStore.tablePrefix | QRTZ\_ | |
| org.quartz.jobStore.isClustered | true | |
| org.quartz.jobStore.misfireThreshold | 60000 | |
| org.quartz.jobStore.clusterCheckinInterval | 5000 | |
| org.quartz.jobStore.acquireTriggersWithinLock | true | |
| org.quartz.jobStore.dataSource | myDs | |
| org.quartz.dataSource.myDs.connectionProvider.class | org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider | |
##### 10.install\_config.conf [DS环境变量配置脚本[用于DS安装/启动]]
install\_config.conf这个配置文件比较繁琐,这个文件主要有两个地方会用到.
* 1.DS集群的自动安装.
>
> 调用install.sh脚本会自动加载该文件中的配置.并根据该文件中的内容自动配置上述的配置文件中的内容. 比如:dolphinscheduler-daemon.sh、datasource.properties、zookeeper.properties、common.properties、application-api.properties、master.properties、worker.properties、alert.properties、quartz.properties 等文件.
>
>
>
* 2.DS集群的启动&关闭.
>
> DS集群在启动&关闭的时候,会加载该配置文件中的masters,workers,alertServer,apiServers等参数,启动/关闭DS集群.
>
>
>
文件内容如下:
注意: 该配置文件中如果包含特殊字符,如: .\*[]^${}\+?|()@#&, 请转义,
示例: [ 转义为 \[
数据库类型, 目前仅支持 postgresql 或者 mysql
dbtype=“mysql”
数据库 地址 & 端口
dbhost=“192.168.xx.xx:3306”
数据库 名称
dbname=“dolphinscheduler”
数据库 用户名
username=“xx”
数据库 密码
password=“xx”
Zookeeper地址
zkQuorum=“192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181”
将DS安装到哪个目录,如: /data1_1T/dolphinscheduler,
installPath=“/data1_1T/dolphinscheduler”
使用哪个用户部署
注意: 部署用户需要sudo 权限, 并且可以操作 hdfs .
如果使用hdfs的话,根目录必须使用该用户进行创建.否则会有权限相关的问题.
deployUser=“dolphinscheduler”
以下为告警服务配置
邮件服务器地址
mailServerHost=“smtp.exmail.qq.com”
邮件服务器 端口
mailServerPort=“25”
发送者
mailSender=“xxxxxxxxxx”
发送用户
mailUser=“xxxxxxxxxx”
邮箱密码
mailPassword=“xxxxxxxxxx”
TLS协议的邮箱设置为true,否则设置为false
starttlsEnable=“true”
开启SSL协议的邮箱配置为true,否则为false。注意: starttlsEnable和sslEnable不能同时为true
sslEnable=“false”
邮件服务地址值,同 mailServerHost
sslTrust=“smtp.exmail.qq.com”
#业务用到的比如sql等资源文件上传到哪里,可以设置:HDFS,S3,NONE。如果想上传到HDFS,请配置为HDFS;如果不需要资源上传功能请选择NONE。
resourceStorageType=“NONE”
if S3,write S3 address,HA,for example :s3a://dolphinscheduler,
Note,s3 be sure to create the root directory /dolphinscheduler
defaultFS=“hdfs://mycluster:8020”
如果resourceStorageType 为S3 需要配置的参数如下:
s3Endpoint=“http://192.168.xx.xx:9010”
s3AccessKey=“xxxxxxxxxx”
s3SecretKey=“xxxxxxxxxx”
如果ResourceManager是HA,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx",否则如果是单ResourceManager或者根本没用到yarn,请配置yarnHaIps=““即可,如果没用到yarn,配置为””
yarnHaIps=“192.168.xx.xx,192.168.xx.xx”
如果是单ResourceManager,则配置为ResourceManager节点ip或主机名,否则保持默认值即可。
singleYarnIp=“yarnIp1”
资源文件在 HDFS/S3 存储路径
resourceUploadPath=“/dolphinscheduler”
HDFS/S3 操作用户
hdfsRootUser=“hdfs”
以下为 kerberos 配置
kerberos是否开启
kerberosStartUp=“false”
kdc krb5 config file path
krb5ConfPath=“$installPath/conf/krb5.conf”
keytab username
keytabUserName=“hdfs-mycluster@ESZ.COM”
username keytab path
keytabPath=“$installPath/conf/hdfs.headless.keytab”
api 服务端口
apiServerPort=“12345”
部署DS的所有主机hostname
ips=“ds1,ds2,ds3,ds4,ds5”
ssh 端口 , 默认 22
sshPort=“22”
部署master服务主机
masters=“ds1,ds2”
部署 worker服务的主机
注意: 每一个worker都需要设置一个worker 分组的名称,默认值为 “default”
workers=“ds1:default,ds2:default,ds3:default,ds4:default,ds5:default”
部署alert服务主机
alertServer=“ds3”
部署api服务主机
apiServers=“ds1”
##### 11.dolphinscheduler\_env.sh [环境变量配置]
通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中. 涉及到的任务类型有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等
export HADOOP_HOME=/opt/soft/hadoop
export HADOOP_CONF_DIR=/opt/soft/hadoop/etc/hadoop
export SPARK_HOME1=/opt/soft/spark1
export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/opt/soft/python
export JAVA_HOME=/opt/soft/java
export HIVE_HOME=/opt/soft/hive
export FLINK_HOME=/opt/soft/flink
export DATAX_HOME=/opt/soft/datax/bin/datax.py
export PATH= H A D O O P _ H O M E / b i n : HADOOP\_HOME/bin: HADOOP_HOME/bin:SPARK_HOME1/bin: S P A R K _ H O M E 2 / b i n : SPARK\_HOME2/bin: SPARK_HOME2/bin:PYTHON_HOME: J A V A _ H O M E / b i n : JAVA\_HOME/bin: JAVA_HOME/bin:HIVE_HOME/bin: P A T H : PATH: PATH:FLINK_HOME/bin: D A T A X _ H O M E : DATAX\_HOME: DATAX_HOME:PATH
##### 12.各服务日志配置文件
| 对应服务服务名称 | 日志文件名 |
| --- | --- |
| api服务日志配置文件 | logback-api.xml |
| master服务日志配置文件 | logback-master.xml |
| worker服务日志配置文件 | logback-worker.xml |
| alert服务日志配置文件 | logback-alert.xml |
### 任务总体存储结构
在dolphinscheduler中创建的所有任务都保存在t\_ds\_process\_definition 表中.
该数据库表结构如下表所示:
| 序号 | 字段 | 类型 | 描述 |
| --- | --- | --- | --- |
| 1 | id | int(11) | 主键 |
| 2 | name | varchar(255) | 流程定义名称 |
| 3 | version | int(11) | 流程定义版本 |
| 4 | release\_state | tinyint(4) | 流程定义的发布状态:0 未上线 , 1已上线 |
| 5 | project\_id | int(11) | 项目id |
| 6 | user\_id | int(11) | 流程定义所属用户id |
| 7 | process\_definition\_json | longtext | 流程定义JSON |
| 8 | description | text | 流程定义描述 |
| 9 | global\_params | text | 全局参数 |
| 10 | flag | tinyint(4) | 流程是否可用:0 不可用,1 可用 |
| 11 | locations | text | 节点坐标信息 |
| 12 | connects | text | 节点连线信息 |
| 13 | receivers | text | 收件人 |
| 14 | receivers\_cc | text | 抄送人 |
| 15 | create\_time | datetime | 创建时间 |
| 16 | timeout | int(11) | 超时时间 |
| 17 | tenant\_id | int(11) | 租户id |
| 18 | update\_time | datetime | 更新时间 |
| 19 | modify\_by | varchar(36) | 修改用户 |
| 20 | resource\_ids | varchar(255) | 资源ids |
其中process\_definition\_json 字段为核心字段, 定义了 DAG 图中的任务信息.该数据以JSON 的方式进行存储.
公共的数据结构如下表.
| 序号 | 字段 | 类型 | 描述 |
| --- | --- | --- | --- |
| 1 | globalParams | Array | 全局参数 |
| 2 | tasks | Array | 流程中的任务集合 [ 各个类型的结构请参考如下章节] |
| 3 | tenantId | int | 租户id |
| 4 | timeout | int | 超时时间 |
数据示例:
{
“globalParams”:[
{
“prop”:“golbal_bizdate”,
“direct”:“IN”,
“type”:“VARCHAR”,
“value”:“${system.biz.date}”
}
],
“tasks”:Array[1],
“tenantId”:0,
“timeout”:0
}
### 任务结构
**各任务类型存储结构详解**
#### Shell节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | SHELL |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | rawScript | String | Shell脚本 | |
| 6 | | localParams | Array | 自定义参数 | |
| 7 | | resourceList | Array | 资源文件 | |
| 8 | description | | String | 描述 | |
| 9 | runFlag | | String | 运行标识 | |
| 10 | conditionResult | | Object | 条件分支 | |
| 11 | | successNode | Array | 成功跳转节点 | |
| 12 | | failedNode | Array | 失败跳转节点 | |
| 13 | dependence | | Object | 任务依赖 | 与params互斥 |
| 14 | maxRetryTimes | | String | 最大重试次数 | |
| 15 | retryInterval | | String | 重试间隔 | |
| 16 | timeout | | Object | 超时控制 | |
| 17 | taskInstancePriority | | String | 任务优先级 | |
| 18 | workerGroup | | String | Worker 分组 | |
| 19 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“SHELL”,
“id”:“tasks-80760”,
“name”:“Shell Task”,
“params”:{
“resourceList”:[
{
“id”:3,
“name”:“run.sh”,
“res”:“run.sh”
}
],
“localParams”:[
],
"rawScript":"echo "This is a shell script""
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### SQL节点
通过 SQL对指定的数据源进行数据查询、更新操作.
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | SQL |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | type | String | 数据库类型 | |
| 6 | | datasource | Int | 数据源id | |
| 7 | | sql | String | 查询SQL语句 | |
| 8 | | udfs | String | udf函数 | UDF函数id,以逗号分隔. |
| 9 | | sqlType | String | SQL节点类型 | 0 查询 , 1 非查询 |
| 10 | | title | String | 邮件标题 | |
| 11 | | receivers | String | 收件人 | |
| 12 | | receiversCc | String | 抄送人 | |
| 13 | | showType | String | 邮件显示类型 | TABLE 表格 , ATTACHMENT附件 |
| 14 | | connParams | String | 连接参数 | |
| 15 | | preStatements | Array | 前置SQL | |
| 16 | | postStatements | Array | 后置SQL | |
| 17 | | localParams | Array | 自定义参数 | |
| 18 | description | | String | 描述 | |
| 19 | runFlag | | String | 运行标识 | |
| 20 | conditionResult | | Object | 条件分支 | |
| 21 | | successNode | Array | 成功跳转节点 | |
| 22 | | failedNode | Array | 失败跳转节点 | |
| 23 | dependence | | Object | 任务依赖 | 与params互斥 |
| 24 | maxRetryTimes | | String | 最大重试次数 | |
| 25 | retryInterval | | String | 重试间隔 | |
| 26 | timeout | | Object | 超时控制 | |
| 27 | taskInstancePriority | | String | 任务优先级 | |
| 28 | workerGroup | | String | Worker 分组 | |
| 29 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“SQL”,
“id”:“tasks-95648”,
“name”:“SqlTask-Query”,
“params”:{
“type”:“MYSQL”,
“datasource”:1,
“sql”:“select id , namge , age from emp where id = ${id}”,
“udfs”:“”,
“sqlType”:“0”,
“title”:“xxxx@xxx.com”,
“receivers”:“xxxx@xxx.com”,
“receiversCc”:“”,
“showType”:“TABLE”,
“localParams”:[
{
“prop”:“id”,
“direct”:“IN”,
“type”:“INTEGER”,
“value”:“1”
}
],
“connParams”:“”,
“preStatements”:[
“insert into emp ( id,name ) value (1,‘Li’ )”
],
“postStatements”:[
]
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### PROCEDURE[存储过程]节点
**节点数据结构如下:** **节点数据样例:**
#### SPARK节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | SPARK |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | mainClass | String | 运行主类 | |
| 6 | | mainArgs | String | 运行参数 | |
| 7 | | others | String | 其他参数 | |
| 8 | | mainJar | Object | 程序 jar 包 | |
| 9 | | deployMode | String | 部署模式 | local,client,cluster |
| 10 | | driverCores | String | driver核数 | |
| 11 | | driverMemory | String | driver 内存数 | |
| 12 | | numExecutors | String | executor数量 | |
| 13 | | executorMemory | String | executor内存 | |
| 14 | | executorCores | String | executor核数 | |
| 15 | | programType | String | 程序类型 | JAVA,SCALA,PYTHON |
| 16 | | sparkVersion | String | Spark 版本 | SPARK1 , SPARK2 |
| 17 | | localParams | Array | 自定义参数 | |
| 18 | | resourceList | Array | 资源文件 | |
| 19 | description | | String | 描述 | |
| 20 | runFlag | | String | 运行标识 | |
| 21 | conditionResult | | Object | 条件分支 | |
| 22 | | successNode | Array | 成功跳转节点 | |
| 23 | | failedNode | Array | 失败跳转节点 | |
| 24 | dependence | | Object | 任务依赖 | 与params互斥 |
| 25 | maxRetryTimes | | String | 最大重试次数 | |
| 26 | retryInterval | | String | 重试间隔 | |
| 27 | timeout | | Object | 超时控制 | |
| 28 | taskInstancePriority | | String | 任务优先级 | |
| 29 | workerGroup | | String | Worker 分组 | |
| 30 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“SPARK”,
“id”:“tasks-87430”,
“name”:“SparkTask”,
“params”:{
“mainClass”:“org.apache.spark.examples.SparkPi”,
“mainJar”:{
“id”:4
},
“deployMode”:“cluster”,
“resourceList”:[
{
“id”:3,
“name”:“run.sh”,
“res”:“run.sh”
}
],
“localParams”:[
],
"driverCores":1,
"driverMemory":"512M",
"numExecutors":2,
"executorMemory":"2G",
"executorCores":2,
"mainArgs":"10",
"others":"",
"programType":"SCALA",
"sparkVersion":"SPARK2"
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### MapReduce(MR)节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | MR |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | mainClass | String | 运行主类 | |
| 6 | | mainArgs | String | 运行参数 | |
| 7 | | others | String | 其他参数 | |
| 8 | | mainJar | Object | 程序 jar 包 | |
| 9 | | programType | String | 程序类型 | JAVA,PYTHON |
| 10 | | localParams | Array | 自定义参数 | |
| 11 | | resourceList | Array | 资源文件 | |
| 12 | description | | String | 描述 | |
| 13 | runFlag | | String | 运行标识 | |
| 14 | conditionResult | | Object | 条件分支 | |
| 15 | | successNode | Array | 成功跳转节点 | |
| 16 | | failedNode | Array | 失败跳转节点 | |
| 17 | dependence | | Object | 任务依赖 | 与params互斥 |
| 18 | maxRetryTimes | | String | 最大重试次数 | |
| 19 | retryInterval | | String | 重试间隔 | |
| 20 | timeout | | Object | 超时控制 | |
| 21 | taskInstancePriority | | String | 任务优先级 | |
| 22 | workerGroup | | String | Worker 分组 | |
| 23 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“MR”,
“id”:“tasks-28997”,
“name”:“MRTask”,
“params”:{
“mainClass”:“wordcount”,
“mainJar”:{
“id”:5
},
“resourceList”:[
{
“id”:3,
“name”:“run.sh”,
“res”:“run.sh”
}
],
“localParams”:[
],
"mainArgs":"/tmp/wordcount/input /tmp/wordcount/output/",
"others":"",
"programType":"JAVA"
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### Python节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | PYTHON |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | rawScript | String | Python脚本 | |
| 6 | | localParams | Array | 自定义参数 | |
| 7 | | resourceList | Array | 资源文件 | |
| 8 | description | | String | 描述 | |
| 9 | runFlag | | String | 运行标识 | |
| 10 | conditionResult | | Object | 条件分支 | |
| 11 | | successNode | Array | 成功跳转节点 | |
| 12 | | failedNode | Array | 失败跳转节点 | |
| 13 | dependence | | Object | 任务依赖 | 与params互斥 |
| 14 | maxRetryTimes | | String | 最大重试次数 | |
| 15 | retryInterval | | String | 重试间隔 | |
| 16 | timeout | | Object | 超时控制 | |
| 17 | taskInstancePriority | | String | 任务优先级 | |
| 18 | workerGroup | | String | Worker 分组 | |
| 19 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“PYTHON”,
“id”:“tasks-5463”,
“name”:“Python Task”,
“params”:{
“resourceList”:[
{
“id”:3,
“name”:“run.sh”,
“res”:“run.sh”
}
],
“localParams”:[
],
"rawScript":"print("This is a python script")"
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### Flink节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | FLINK |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | mainClass | String | 运行主类 | |
| 6 | | mainArgs | String | 运行参数 | |
| 7 | | others | String | 其他参数 | |
| 8 | | mainJar | Object | 程序 jar 包 | |
| 9 | | deployMode | String | 部署模式 | local,client,cluster |
| 10 | | slot | String | slot数量 | |
| 11 | | taskManager | String | taskManager数量 | |
| 12 | | taskManagerMemory | String | taskManager内存数 | |
| 13 | | jobManagerMemory | String | jobManager内存数 | |
| 14 | | programType | String | 程序类型 | JAVA,SCALA,PYTHON |
| 15 | | localParams | Array | 自定义参数 | |
| 16 | | resourceList | Array | 资源文件 | |
| 17 | description | | String | 描述 | |
| 18 | runFlag | | String | 运行标识 | |
| 19 | conditionResult | | Object | 条件分支 | |
| 20 | | successNode | Array | 成功跳转节点 | |
| 21 | | failedNode | Array | 失败跳转节点 | |
| 22 | dependence | | Object | 任务依赖 | 与params互斥 |
| 23 | maxRetryTimes | | String | 最大重试次数 | |
| 24 | retryInterval | | String | 重试间隔 | |
| 25 | timeout | | Object | 超时控制 | |
| 26 | taskInstancePriority | | String | 任务优先级 | |
| 27 | workerGroup | | String | Worker 分组 | |
| 38 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“FLINK”,
“id”:“tasks-17135”,
“name”:“FlinkTask”,
“params”:{
“mainClass”:“com.flink.demo”,
“mainJar”:{
“id”:6
},
“deployMode”:“cluster”,
“resourceList”:[
{
“id”:3,
“name”:“run.sh”,
“res”:“run.sh”
}
],
“localParams”:[
],
"slot":1,
"taskManager":"2",
"jobManagerMemory":"1G",
"taskManagerMemory":"2G",
"executorCores":2,
"mainArgs":"100",
"others":"",
"programType":"SCALA"
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### HTTP节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | HTTP |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | url | String | 请求地址 | |
| 6 | | httpMethod | String | 请求方式 | GET,POST,HEAD,PUT,DELETE |
| 7 | | httpParams | Array | 请求参数 | |
| 8 | | httpCheckCondition | String | 校验条件 | 默认响应码200 |
| 9 | | condition | String | 校验内容 | |
| 10 | | localParams | Array | 自定义参数 | |
| 11 | description | | String | 描述 | |
| 12 | runFlag | | String | 运行标识 | |
| 13 | conditionResult | | Object | 条件分支 | |
| 14 | | successNode | Array | 成功跳转节点 | |
| 15 | | failedNode | Array | 失败跳转节点 | |
| 16 | dependence | | Object | 任务依赖 | 与params互斥 |
| 17 | maxRetryTimes | | String | 最大重试次数 | |
| 18 | retryInterval | | String | 重试间隔 | |
| 19 | timeout | | Object | 超时控制 | |
| 20 | taskInstancePriority | | String | 任务优先级 | |
| 21 | workerGroup | | String | Worker 分组 | |
| 22 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“HTTP”,
“id”:“tasks-60499”,
“name”:“HttpTask”,
“params”:{
“localParams”:[
],
"httpParams":[
{
"prop":"id",
"httpParametersType":"PARAMETER",
"value":"1"
},
{
"prop":"name",
"httpParametersType":"PARAMETER",
"value":"Bo"
}
],
"url":"https://www.xxxxx.com:9012",
"httpMethod":"POST",
"httpCheckCondition":"STATUS\_CODE\_DEFAULT",
"condition":""
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### DataX节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | DATAX |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | customConfig | Int | 自定义类型 | 0定制 , 1自定义 |
| 6 | | dsType | String | 源数据库类型 | |
| 7 | | dataSource | Int | 源数据库ID | |
| 8 | | dtType | String | 目标数据库类型 | |
| 9 | | dataTarget | Int | 目标数据库ID | |
| 10 | | sql | String | SQL语句 | |
| 11 | | targetTable | String | 目标表 | |
| 12 | | jobSpeedByte | Int | 限流(字节数) | |
| 13 | | jobSpeedRecord | Int | 限流(记录数) | |
| 14 | | preStatements | Array | 前置SQL | |
| 15 | | postStatements | Array | 后置SQL | |
| 16 | | json | String | 自定义配置 | customConfig=1时生效 |
| 17 | | localParams | Array | 自定义参数 | customConfig=1时生效 |
| 18 | description | | String | 描述 | |
| 19 | runFlag | | String | 运行标识 | |
| 20 | conditionResult | | Object | 条件分支 | |
| 21 | | successNode | Array | 成功跳转节点 | |
| 22 | | failedNode | Array | 失败跳转节点 | |
| 23 | dependence | | Object | 任务依赖 | 与params互斥 |
| 24 | maxRetryTimes | | String | 最大重试次数 | |
| 25 | retryInterval | | String | 重试间隔 | |
| 26 | timeout | | Object | 超时控制 | |
| 27 | taskInstancePriority | | String | 任务优先级 | |
| 28 | workerGroup | | String | Worker 分组 | |
| 29 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“DATAX”,
“id”:“tasks-91196”,
“name”:“DataxTask-DB”,
“params”:{
“customConfig”:0,
“dsType”:“MYSQL”,
“dataSource”:1,
“dtType”:“MYSQL”,
“dataTarget”:1,
“sql”:"select id, name ,age from user “,
“targetTable”:“emp”,
“jobSpeedByte”:524288,
“jobSpeedRecord”:500,
“preStatements”:[
“truncate table emp "
],
“postStatements”:[
“truncate table user”
]
},
“description”:””,
“runFlag”:“NORMAL”,
“conditionResult”:{
“successNode”:[
“”
],
“failedNode”:[
“”
]
},
“dependence”:{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### Sqoop节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | SQOOP |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | JSON 格式 |
| 5 | | concurrency | Int | 并发度 | |
| 6 | | modelType | String | 流向 | import,export |
| 7 | | sourceType | String | 数据源类型 | |
| 8 | | sourceParams | String | 数据源参数 | JSON格式 |
| 9 | | targetType | String | 目标数据类型 | |
| 10 | | targetParams | String | 目标数据参数 | JSON格式 |
| 11 | | localParams | Array | 自定义参数 | |
| 12 | description | | String | 描述 | |
| 13 | runFlag | | String | 运行标识 | |
| 14 | conditionResult | | Object | 条件分支 | |
| 15 | | successNode | Array | 成功跳转节点 | |
| 16 | | failedNode | Array | 失败跳转节点 | |
| 17 | dependence | | Object | 任务依赖 | 与params互斥 |
| 18 | maxRetryTimes | | String | 最大重试次数 | |
| 19 | retryInterval | | String | 重试间隔 | |
| 20 | timeout | | Object | 超时控制 | |
| 21 | taskInstancePriority | | String | 任务优先级 | |
| 22 | workerGroup | | String | Worker 分组 | |
| 23 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“SQOOP”,
“id”:“tasks-82041”,
“name”:“Sqoop Task”,
“params”:{
“concurrency”:1,
“modelType”:“import”,
“sourceType”:“MYSQL”,
“targetType”:“HDFS”,
“sourceParams”:“{“srcType”:“MYSQL”,“srcDatasource”:1,“srcTable”:”“,“srcQueryType”:“1”,“srcQuerySql”:“selec id , name from user”,“srcColumnType”:“0”,“srcColumns”:”“,“srcConditionList”:[],“mapColumnHive”:[{“prop”:“hivetype-key”,“direct”:“IN”,“type”:“VARCHAR”,“value”:“hivetype-value”}],“mapColumnJava”:[{“prop”:“javatype-key”,“direct”:“IN”,“type”:“VARCHAR”,“value”:“javatype-value”}]}”,
“targetParams”:“{“targetPath”:”/user/hive/warehouse/ods.db/user",“deleteTargetDir”:false,“fileType”:“–as-avrodatafile”,“compressionCodec”:“snappy”,“fieldsTerminated”:“,”,“linesTerminated”:“@”}",
“localParams”:[
]
},
“description”:“”,
“runFlag”:“NORMAL”,
“conditionResult”:{
“successNode”:[
“”
],
“failedNode”:[
“”
]
},
“dependence”:{
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
}
#### 条件分支节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | SHELL |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | null |
| 5 | description | | String | 描述 | |
| 6 | runFlag | | String | 运行标识 | |
| 7 | conditionResult | | Object | 条件分支 | |
| 8 | | successNode | Array | 成功跳转节点 | |
| 9 | | failedNode | Array | 失败跳转节点 | |
| 10 | dependence | | Object | 任务依赖 | 与params互斥 |
| 11 | maxRetryTimes | | String | 最大重试次数 | |
| 12 | retryInterval | | String | 重试间隔 | |
| 13 | timeout | | Object | 超时控制 | |
| 14 | taskInstancePriority | | String | 任务优先级 | |
| 15 | workerGroup | | String | Worker 分组 | |
| 16 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“CONDITIONS”,
“id”:“tasks-96189”,
“name”:“条件”,
“params”:{
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
"test04"
],
"failedNode":[
"test05"
]
},
"dependence":{
"relation":"AND",
"dependTaskList":[
]
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
"test01",
"test02"
]
}
#### 子流程节点
**节点数据结构如下:**
| 序号 | 参数名 | | 类型 | 描述 | 描述 |
| --- | --- | --- | --- | --- | --- |
| 1 | id | | String | 任务编码 | |
| 2 | type | | String | 类型 | SHELL |
| 3 | name | | String | 名称 | |
| 4 | params | | Object | 自定义参数 | Json 格式 |
| 5 | | processDefinitionId | Int | 流程定义id | |
| 6 | description | | String | 描述 | |
| 7 | runFlag | | String | 运行标识 | |
| 8 | conditionResult | | Object | 条件分支 | |
| 9 | | successNode | Array | 成功跳转节点 | |
| 10 | | failedNode | Array | 失败跳转节点 | |
| 11 | dependence | | Object | 任务依赖 | 与params互斥 |
| 12 | maxRetryTimes | | String | 最大重试次数 | |
| 13 | retryInterval | | String | 重试间隔 | |
| 14 | timeout | | Object | 超时控制 | |
| 15 | taskInstancePriority | | String | 任务优先级 | |
| 16 | workerGroup | | String | Worker 分组 | |
| 17 | preTasks | | Array | 前置任务 | |
**节点数据样例:**
{
“type”:“SUB_PROCESS”,
“id”:“tasks-14806”,
“name”:“SubProcessTask”,
“params”:{
“processDefinitionId”:2
},
“description”:“”,
“runFlag”:“NORMAL”,
“conditionResult”:{
“successNode”:[
“”
],
“failedNode”:[
“”
]
},
“dependence”:{
},
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]



既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
| Array | 前置任务 | |
节点数据样例:
{
"type":"CONDITIONS",
"id":"tasks-96189",
"name":"条件",
"params":{
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
"test04"
],
"failedNode":[
"test05"
]
},
"dependence":{
"relation":"AND",
"dependTaskList":[
]
},
"maxRetryTimes":"0",
"retryInterval":"1",
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
"test01",
"test02"
]
}
子流程节点
节点数据结构如下:
| 序号 | 参数名 | 类型 | 描述 | 描述 | |
|---|---|---|---|---|---|
| 1 | id | String | 任务编码 | ||
| 2 | type | String | 类型 | SHELL | |
| 3 | name | String | 名称 | ||
| 4 | params | Object | 自定义参数 | Json 格式 | |
| 5 | processDefinitionId | Int | 流程定义id | ||
| 6 | description | String | 描述 | ||
| 7 | runFlag | String | 运行标识 | ||
| 8 | conditionResult | Object | 条件分支 | ||
| 9 | successNode | Array | 成功跳转节点 | ||
| 10 | failedNode | Array | 失败跳转节点 | ||
| 11 | dependence | Object | 任务依赖 | 与params互斥 | |
| 12 | maxRetryTimes | String | 最大重试次数 | ||
| 13 | retryInterval | String | 重试间隔 | ||
| 14 | timeout | Object | 超时控制 | ||
| 15 | taskInstancePriority | String | 任务优先级 | ||
| 16 | workerGroup | String | Worker 分组 | ||
| 17 | preTasks | Array | 前置任务 |
节点数据样例:
{
"type":"SUB\_PROCESS",
"id":"tasks-14806",
"name":"SubProcessTask",
"params":{
"processDefinitionId":2
},
"description":"",
"runFlag":"NORMAL",
"conditionResult":{
"successNode":[
""
],
"failedNode":[
""
]
},
"dependence":{
},
"timeout":{
"strategy":"",
"interval":null,
"enable":false
},
"taskInstancePriority":"MEDIUM",
"workerGroup":"default",
"preTasks":[
]
[外链图片转存中...(img-jmkSjYDy-1715105410266)]
[外链图片转存中...(img-5cTbBv3o-1715105410266)]
[外链图片转存中...(img-EyMXWhH6-1715105410266)]
**既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!**
**由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新**
**[需要这份系统化资料的朋友,可以戳这里获取](https://bbs.csdn.net/topics/618545628)**
更多推荐


所有评论(0)