Flink-sql平台化配置之flink-streaming-platform-web

2022年01月14日 阅读数:3
这篇文章主要向大家介绍Flink-sql平台化配置之flink-streaming-platform-web,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

        记录使用flink-streaming-platform-web平台化提交Flink任务。主要是记录经过这个工具提交flink任务的原理。前端

        相关参考:java

        gitee地址:flink-streaming-platform-web: 基于flink-sql的实时流计算web平台mysql

        github地址:https://github.com/zhp8341/flink-streaming-platform-webgit

一、idea本地开发

        能够查看:docs/idea-run.md · 无情(朱慧培)/flink-streaming-platform-web - Gitee.comgithub

        真正用来提交任务的是flink-streamin-core下面的com.flink.streaming.core.JobApplication类。经过这个来建立flink任务所须要的执行环境以及解析咱们须要提交的sql脚本等参数。web

        例如:这里经过args来获取sql脚本所在的位置。sql

        注意:本地调试须要修改pom.xml中的scope。数据库

二、经过前端页面来提交任务的流程

        因为还没有配置flink on yarn的环境,这里以Standalone模式为例:bash

2.一、新增系统设置

        经过前端页面来配置Flink环境的相关信息,这些信息都被记录在数据库当中。app

2.二、经过前端页面编写提交参数以及sql语句

        当点击保存页面的时候,这些参数会保存在job_config表中,以下,包括咱们所写的SQL也会是mediumtext的格式保存下来。

3.三、任务提交

3.3.一、点击提交任务

         当点击提交任务,会进入到JobStandaloneServerAOImpl.star(...)方法。在这里作了以下事情:

  1. 经过任务id获取这个任务的相关配置信息JobConfig,包括savepoint相关信息SavePointBackup。
  2. 将该任务的sql语句下载到本地,并返回该任务提交的一些参数jobRunParamDTO。
  3. 给状态表的该任务加上乐观锁,修改成提交中。
  4. 异步提交任务。
 public void start(Long id, Long savepointId, String userName) {

        JobConfigDTO jobConfigDTO = jobConfigService.getJobConfigById(id);

        //一、检查jobConfigDTO 状态等参数
        jobBaseServiceAO.checkStart(jobConfigDTO);

        // TODO 要不要检查集群上任务是否存在

        //二、将配置的sql 写入本地文件而且返回运行所需参数
        JobRunParamDTO jobRunParamDTO = jobBaseServiceAO.writeSqlToFile(jobConfigDTO);

        //三、插一条运行日志数据
        Long jobRunLogId = jobBaseServiceAO.insertJobRunLog(jobConfigDTO, userName);

        //四、变动任务状态(变动为:启动中) 有乐观锁 防止重复提交
        jobConfigService.updateStatusByStart(jobConfigDTO.getId(), userName, jobRunLogId, jobConfigDTO.getVersion());

        String savepointPath = savepointBackupService.getSavepointPathById(id, savepointId);

        //异步提交任务
        jobBaseServiceAO.aSyncExecJob(jobRunParamDTO, jobConfigDTO, jobRunLogId, savepointPath);

    }

3.3.二、构建执行命令并提交

public void aSyncExecJob(JobRunParamDTO jobRunParamDTO, JobConfigDTO jobConfigDTO,
                             Long jobRunLogId, String savepointPath){
    //一、构建执行命令
    command = CommandUtil.buildRunCommandForCluster(jobRunParamDTO, jobConfigDTO, savepointPath);
    //二、提交任务
    appId = this.submitJobForStandalone(command, jobConfigDTO, localLog);
    //三、修改任务状态
    this.updateStatusAndLog(jobConfigDTO, jobRunLogId, jobStatus, localLog.toString(), appId);
}

        上面异步提交任务的四个参数:

  1.  jobRunParamDTO:任务提交的主要参数,包括flink bin目录地址,flink运行参数,sql语句存放目录等信息
  2. jobConfigDTO:这里是该任务配置相关信息,如任务名、提交模式等
  3. jobRunLogId:这里主要是向mysql数据库的日志表中插入数据,表示该任务的提交
  4. savepointPath:经过该任务ip获取其savepoint路径

        在jobBaseServiceAO.aSyncExecJob(...)方法中最主要的是以上三行,构建执行命令,提交任务,修改任务状态。

        一、构建执行命令具体能够查看CommandUtil.buildRunCommandForCluster(...)方法。以下,能够构成一条完整的执行命令并返回。

/opt/module/flink-1.13.2/bin/flink run -d -c  com.flink.streaming.core.JobApplication /opt/module/flink-streaming-platform-web/lib/flink-streaming-core-1.3.0.RELEASE.jar -sql /opt/module/flink-streaming-platform-web/sql/job_sql_2.sql  -type 0

        二、命令提交,主要下面的commandRpcClinetAdapter.submitJob(...)方法。以下,是经过Runtime.getRuntime().exec(command)方法来提交

appId = this.submitJobForStandalone(command, jobConfigDTO, localLog);
    String appId = commandRpcClinetAdapter.submitJob(command, localLog, jobRunLogId, jobConfig.getDeployModeEnum());
    JobStandaloneInfo jobStandaloneInfo = flinkRestRpcAdapter.getJobInfoForStandaloneByAppId(appId,
                        jobConfig.getDeployModeEnum());

         三、由上所述,最终是执行了条以下命令:

/opt/module/flink-1.13.2/bin/flink run -d -c  com.flink.streaming.core.JobApplication /opt/module/flink-streaming-platform-web/lib/flink-streaming-core-1.3.0.RELEASE.jar -sql /opt/module/flink-streaming-platform-web/sql/job_sql_2.sql  -type 0

        这里最终是经过命令行的方式提交了JobApplication这个class。最后能够从一、idea本地开发查看其执行流程。