大数据平台 —— 调度系统之Azkaban

网友投稿 401 2022-11-25

大数据平台 —— 调度系统之Azkaban

Azkaban介绍

常见的开源调度框架:

Linux Crontab:针对个人用户及小任务量 Apache Oozie:功能强大,配置复杂的Hadoop任务调度框架 Azkaban:开源的工作流管理器,轻量级调度框架 AirFlow:基于Python开发的通用批处理调度框架 Zenus:阿里开源的基于Hadoop的工作流调度系统 EasyScheduler:国内开源的分布式工作流任务调度系统

Azkaban简介:

Linkedin公司开源的分布式批量工作流任务调度器 通过简单的KV的方式,生成Job,并构建依赖关系 通过插件化的任务提交模块,支持可扩展的多任务提交 官方文档:https://azkaban.readthedocs.io/en/latest/

Azkaban优点:

可通过job配置文件,快速建立任务和任务之间的依赖关系 提供模块化和可插拔的插件机制,原生支持shell、 java、 hive等 基于Java开发,提供Ajax Api,易于二次开发

Azkaban适用场景:

通过Azkaban结合Datax实现定时的数据采集服务 通过Azkaban调度执行Shell、Java、 Hive、 Hadoop等 任务 开发可复用的程序,通过Azkaban编排成工作流,执行批处理任务 对Azkaban进行二次开发通过接口创建任务、调度任务、管理任务 将Azkaban作为数据平台的- -部分,提供任务调度的能力 基于Azkaban的异常处理、监控报警、审计日志完善数据平台功能

Azkaban架构与调度流程

AzkabanServer:Azkaban的管理服务,提供WebUI,负责Project管理、权限管理、定时执行、跟踪进度、审计日志等等功能 AzkabanExecutor:负责工作流的提交和执行,搜集执行日志,也就是具体干活的节点 MySQL:存储工作流详情及节点和任务的状态信息等

其中AzkabanWebServer可以说是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。

同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。

并且Azkaban使用方便,Azkaban使用以.job为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip的方式通过Azkaban UI上传到Web服务器上。

Azkaban有三种部署模式:

Solo mode:内置数据库,Server和Executor在同一个 进程中 Two mode:基于Mysq|数据库,启动一个Server和一个Executor Multi mode:分布式模式,一个Server和多个Executor

用户通过界面或者API提交任务到Webserver,Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个合适的executor下发工作流; executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点; 分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业; 被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。

AzkabanServer主动调用Executor的API获取状态信息 根据计算规则选择执行的Executor Server(任务数量、内存和CPU等资源、最近分配的时间) 调度WorkFlow到Executor执行,Executor执行并监控任务

Azkaban安装部署

Azkaban官网:https://azkaban.github.io 软件下载地址:https://github.com/azkaban/azkaban 官方插件地址:https://github.com/azkaban/azkaban-plugins 官方文档地址:mode部署模式,因为Multi mode只不过是在该基础上部署了多个ExecutorServer,也就是说在Two mode基础上增加ExecutorServer节点就是Multi mode了。

编译Azkaban源码

首先,准备好Java和Maven:

[root@azkaban01 ~]# java -version java version "1.8.0_261" Java(TM) SE Runtime Environment (build 1.8.0_261-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode) [root@azkaban01 ~]# mvn -v Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: /usr/local/maven Java version: 1.8.0_261, vendor: Oracle Corporation, runtime: /usr/local/jdk/1.8/jre Default locale: zh_CN, platform encoding: UTF-8 OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix" [root@azkaban01 ~]#

安装一些工具:

[root@azkaban01 ~]# yum install -y git gcc-c++

然后从GitHub上拉取Azkaban的源码:

[root@hadoop01 ~]# cd /usr/local/src [root@hadoop01 /usr/local/src]# git clone /usr/local/src]# cd azkaban [root@azkaban01 /usr/local/src/azkaban]# vim settings.gradle pluginManagement { repositories { maven { url 'https://maven.aliyun.com/repository/gradle-plugin' } gradlePluginPortal() } } ...

然后修改build.gradle文件中的仓库配置:

[root@azkaban01 /usr/local/src/azkaban]# vim build.gradle buildscript { repositories { maven { url '} maven { url '} maven { url '} maven { url '} maven { url '} } ... } ... allprojects { apply plugin: 'jacoco' repositories { mavenLocal() maven { url '} maven { url '} maven { url '} maven { url '} maven { url '} } }

gradle/wrapper/gradle-wrapper.properties文件中会定义从远程下载gradle,如果下载不下来的话,可以通过别的方式下载,然后上传到相应的目录下,并在该文件指定从本地文件系统中加载gradle的安装包:

[root@azkaban01 /usr/local/src/azkaban]# vim gradle/wrapper/gradle-wrapper.properties distributionUrl=file:///usr/local/src/gradle-4.6-all.zip

完成以上的修改后,就可以执行如下命令开始编译安装了:

[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test

打包编译的过程中,有可能会报如下错误:

FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':azkaban-web-server:nodeSetup'. > Could not resolve all files for configuration ':azkaban-web-server:detachedConfiguration1'. > Could not download node-linux-x64.tar.gz (org.nodejs:node:8.10.0) > Could not get resource 'https://nodejs.org/dist/v8.10.0/node-v8.10.0-linux-x64.tar.gz'. > Read timed out

这是因为系统中没有安装NodeJS,而azkaban-web-server这个模块需要用到NodeJS来编译web代码。由于无法通过远程下载NodeJS的安装包就会报这个错。解决方式也简单,在系统中安装NodeJS即可。步骤如下:

[root@azkaban01 /usr/local/src/azkaban]# curl --silent --location | bash - [root@azkaban01 /usr/local/src/azkaban]# yum install -y nodejs [root@azkaban01 /usr/local/src/azkaban]# npm -v 6.14.8 [root@azkaban01 /usr/local/src/azkaban]# node -v v14.15.0 [root@azkaban01 /usr/local/src/azkaban]#

设置npm使用淘宝镜像仓库:

[root@azkaban01 /usr/local/src/azkaban]# npm config set registry https://registry.npm.taobao.org [root@azkaban01 /usr/local/src/azkaban]# npm config get registry https://registry.npm.taobao.org/ [root@azkaban01 /usr/local/src/azkaban]#

打开azkaban-web-server模块下的build.gradle文件,修改原本的仓库配置,并注释掉node相关的配置。如下所示:

[root@azkaban01 /usr/local/src/azkaban]# vim azkaban-web-server/build.gradle buildscript { repositories { maven { url '} maven { url '} maven { url '} maven { url '} maven { url '} mavenCentral() } ... } ... //node { // Version of node to use. //version = '8.10.0' // Version of npm to use. //npmVersion = '5.6.0' // Base URL for fetching node distributions (change if you have a mirror). //distBaseUrl = 'https://nodejs.org/dist' // If true, it will download node using above parameters. // If false, it will try to use globally installed node. //download = true // Set the work directory for unpacking node //workDir = file("${project.buildDir}/nodejs") // Set the work directory where node_modules should be located //nodeModulesDir = file("${project.projectDir}") //}

然后重新执行打包编译命令:

[root@azkaban01 /usr/local/src/azkaban]# ./gradlew build installDist -x test

此时在核心组件的build/distributions目录下,可以看到打包好的安装包:

[root@azkaban01 /usr/local/src/azkaban]# ls azkaban-exec-server/build/distributions/ azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz azkaban-exec-server-0.1.0-SNAPSHOT.zip [root@azkaban01 /usr/local/src/azkaban]# ls azkaban-web-server/build/distributions/ azkaban-web-server-0.1.0-SNAPSHOT.tar.gz azkaban-web-server-0.1.0-SNAPSHOT.zip [root@azkaban01 /usr/local/src/azkaban]# ls azkaban-db/build/distributions/ azkaban-db-0.1.0-SNAPSHOT.tar.gz azkaban-db-0.1.0-SNAPSHOT.zip [root@azkaban01 /usr/local/src/azkaban]#

安装部署Azkaban

解压安装包:

[root@azkaban01 /usr/local/src/azkaban]# mkdir /usr/local/azkaban [root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-db/build/distributions/azkaban-db-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban [root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-exec-server/build/distributions/azkaban-exec-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban [root@azkaban01 /usr/local/src/azkaban]# tar -zxvf azkaban-web-server/build/distributions/azkaban-web-server-0.1.0-SNAPSHOT.tar.gz -C /usr/local/azkaban

为了查看方便,将解压后的目录重命名:

[root@azkaban01 /usr/local/src/azkaban]# cd /usr/local/azkaban/ [root@azkaban01 /usr/local/azkaban]# mv azkaban-db-0.1.0-SNAPSHOT/ azkaban-db [root@azkaban01 /usr/local/azkaban]# mv azkaban-exec-server-0.1.0-SNAPSHOT/ azkaban-exec-server [root@azkaban01 /usr/local/azkaban]# mv azkaban-web-server-0.1.0-SNAPSHOT/ azkaban-web-server

首先,到MySQL中创建azkaban数据库,然后将azkaban-db目录下的create-all-sql-0.1.0-SNAPSHOT.sql文件给导入到MySQL中:

create database azkaban; use azkaban; source /usr/local/azkaban/azkaban-db/create-all-sql-0.1.0-SNAPSHOT.sql

然后配置azkaban-exec-server:

[root@azkaban01 /usr/local/azkaban]# cd azkaban-exec-server/ [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim conf/azkaban.properties # webserver的连接地址 azkaban.webserver.url=http://localhost:8081 database.type=mysql mysql.port=3306 mysql.host=192.168.1.11 # MySQL8.x需要加时区参数,5.x则不需要 mysql.database=azkaban?serverTimezone=Asia/Shanghai mysql.user=root mysql.password=123456a. mysql.numconnections=100

由于azkaban-exec-server默认使用的是5.x版本的MySQL驱动,而我这部署的MySQL是8.x版本的,所以还得替换一下MySQL驱动包:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/ [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# rm -rf lib/mysql-connector-java-5.1.28.jar

启动azkaban-exec-server:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh

检查azkaban-exec-server进程是否正常运行:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# jps 2005 Jps 1982 AzkabanExecutorServer [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# netstat -lntp |grep 1982 tcp6 0 0 :::35195 :::* LISTEN 1982/java tcp6 0 0 :::36304 :::* LISTEN 1982/java [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]#

通过API手动激活Executor Server:

$ curl /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/ [root@azkaban01 /usr/local/azkaban/azkaban-web-server]# vim conf/azkaban.properties database.type=mysql mysql.port=3306 mysql.host=192.168.1.11 # MySQL8.x需要加时区参数,5.x则不需要 mysql.database=azkaban?serverTimezone=Asia/Shanghai mysql.user=root mysql.password=123456a. mysql.numconnections=100

替换MySQL驱动包:

[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# cp /usr/local/src/mysql-connector-java-8.0.21.jar lib/ [root@azkaban01 /usr/local/azkaban/azkaban-web-server]# rm -rf lib/mysql-connector-java-5.1.28.jar

启动azkaban-webserver:

[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh

检查azkaban-webserver进程是否正常运行:

[root@azkaban01 /usr/local/azkaban/azkaban-web-server]# jps 2201 Jps 2172 AzkabanWebServer 1982 AzkabanExecutorServer [root@azkaban01 /usr/local/azkaban/azkaban-web-server]# netstat -lntp |grep 2172 tcp6 0 0 :::46136 :::* LISTEN 2172/java tcp6 0 0 :::8081 :::* LISTEN 2172/java [root@azkaban01 /usr/local/azkaban/azkaban-web-server]#

webserver的用户相关配置可以在conf/azkaban-users.xml文件中修改

提交Azkaban任务

关于Job的官方文档:

https://azkaban.readthedocs.io/en/latest/createFlows.html#job-config

Azkaban工作流:

Project:Azkaban的抽象概念,项目。一个Project包括多个Flow Flow:流程,一个Flow包含多个Job及Job的依赖关系 Job:具体的任务,有command、java、hive、hadoopJava等 类型

Azkaban任务类型:

Azkaban拥有独立的plugins仓库,需对其进行编译 不同的Job plugin是建立在command的基础之.上 Command类型是万能的Azkaban任务类型,因为通过command调用shell脚本,就可以在shell脚本里实现任意操作

单个任务

我们来通过WebServer的可视化界面提交一个最简单的command任务,首先创建任务定义文件:

$ vim cmd_test.job type=command command=sh job1.sh

编写一个简单的shell脚本:

$ vim job1.sh #!/bin/sh echo "hello azkaban"

多个任务

以上演示了单个任务的定义、提交和调度,接下来演示下多个任务的定义、提交和调度,并且这多个任务之间还存在依赖关系,也就是任务之间的调度存在先后顺序。首先,创建任务文件:

$ vim job1.job type=command command=sh job1.sh ---------- $ vim job2.job type=command command=sh job2.sh # 依赖job1,当job1调度执行完才会执行job2 dependencies=job1 ---------- $ vim job3.job type=command command=sh job3.sh dependencies=job1 ---------- $ vim job4.job type=command command=sh job4.sh dependencies=job2,job3

编写与任务对应的shell脚本:

$ vim job1.sh #!/bin/sh echo "job1 exec over" ---------- $ vim job2.sh #!/bin/sh echo "job2 exec over" ---------- $ vim job3.sh #!/bin/sh echo "job3 exec over" ---------- $ vim job4.sh #!/bin/sh echo "job4 exec over"

Azkaban用户代理

Azkaban代理用户:

Azkaban可以代理其他linux用户执行命令 通过代理用户模式可以实现Hadoop的权限控制

编译用户代理模块:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# mkdir extlib [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# gcc /usr/local/src/azkaban/az-exec-util/src/main/c/execute-as-user.c -o extlib/execute-as-user [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# chmod 6050 extlib/execute-as-user

创建配置文件:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# vim plugins/jobtypes/commonprivate.properties execute.as.user=true azkaban.native.lib=/usr/local/azkaban/azkaban-exec-server/extlib/ azkaban.group.name=root

重启ExecuteServer:

[root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/shutdown-exec.sh Killing executor. [pid: 1982], attempt: 1 shutdown succeeded [root@azkaban01 /usr/local/azkaban/azkaban-exec-server]# bin/start-exec.sh

激活ExecutorServer:

$ curl /usr/local/azkaban/azkaban-exec-server]# cd ../azkaban-web-server/ [root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/shutdown-web.sh Killing web-server. [pid: 2172], attempt: 1 shutdown succeeded [root@azkaban01 /usr/local/azkaban/azkaban-web-server]# bin/start-web.sh

接下来提交任务测试一下,创建任务定义文件:

$ vim proxy.job type=command command=sh test.sh

编写对应的shell脚本:

$ vim test.sh #!/bin/sh echo "----------------" whoami echo "----------------"

到操作系统上,新建一个用户:

$ useradd hadoop

Tips:Azkaban默认是禁止代理root用户的

修改任务配置文件,指定代理用户,如下所示:

$ vim proxy.job type=command command=sh test.sh user.to.proxy=hadoop

以上的示例都是简单的执行一个shell脚本,如果想真正调度起一个MR任务其实也很简单,就只需要配置执行相应的命令就可以了。如下示例:

type=command command=yarn jar /soft/home/hadoop-2.8.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.5.jar pi 16 1000 user.to.proxy=hadoop

关于Java操作Azkaban Api

除了可以在可视化的Azkaban WebServer界面上进行项目的创建、任务的上传/提交等操作外,Azkaban还支持通过HTTP API来完成这些操作。因为我们如果要开发自己的大数据平台,可能并不会使用Azkaban WebServer的可视化界面,而是希望在自己的大数据平台界面去与Azkaban进行交互,完成任务的调度管理。所以Azkaban提供了HTTP Api的支持,让我们可以轻松实现与自研平台的整合。

关于Azkaban Api的官方文档地址如下:

https://azkaban.readthedocs.io/en/latest/ajaxApi.html

我这里准备了一个示例代码仓库,可以简单参考下:

https://gitee.com/Zero-One/azkaban-api-demo

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微博Service Mesh高可用架构实战.
下一篇:基于Linux的嵌入式网络存储器设计
相关文章

 发表评论

暂时没有评论,来抢沙发吧~