Spark离线开发框架设计与实现

2022年05月15日 阅读数:7
这篇文章主要向大家介绍Spark离线开发框架设计与实现,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

图片

导读:本文介绍了开发框架的总体设计,随后对各模块进行了拆解,重点介绍了如何快速实现应用程序的开发,并从设计思路、实现方式、功能介绍及建立方式等角度对通用的数据回溯应用进行了全面介绍,实现了一次环境准备,多数据回溯任务的启动方案。总之,框架对开发效率、回溯任务的效率与维护成本及代码管理便捷性都会有显著的效果。前端

全文3308字,预计阅读时间10分钟。java

1、背景

随着 Spark 以及其社区的不断发展,Spark自己技术也在不断成熟,Spark在技术架构和性能上的优点愈来愈明显,目前大多数公司在大数据处理中都倾向使用Spark。Spark支持多种语言的开发,如Scala、Java、Sql、Python等。sql

Spark SQL使用标准的数据链接,与Hive兼容,易与其它语言API整合,表达清晰、简单易上手、学习成本低,是开发者开发简单数据处理的首选语言,但对于复杂的数据处理、数据分析的开发,使用SQL开发显得力不从心,维护成本也很是高,使用高级语言处理会更高效。数据库

在平常的数据仓库开发工做中,咱们除了开发工做外,也涉及大量的数据回溯任务。对于创新型业务来讲,口径变化频繁、业务迅速迭代,数据仓库的回溯很是常见,经过回溯几个月甚至一年是很是广泛的,但传统的回溯任务方式效率极低,并且须要人力密切关注各任务状态。apache

针对目前现状,咱们开发了一套Spark离线开发框架,以下表所示,咱们例举了目前存在的问题及解决方案。框架的实现不只让开发变得简单高效,并且对于数据的回溯工做在不须要任何开发的状况下,快速高效地完成大量的回溯工做。小程序

图片

2、框架设计

框架旨在封装重复的工做,让开发变得简单。框架如图2-1所示,主要分为三个部分,基础框架、可扩展工具及应用程序,开发者只需关注应用程序便可简单快速实现代码开发。前端工程化

图片


2.1 基础框架

基础框架中,咱们对于全部类型的应用实现代码与配置分离机制,资源配置统一以XML文件形式保存并由框架解析处理。框架会根据开发者配置的任务使用资源大小,完成了SparkSession、SparkContext、SparkConf的建立,同时加载了经常使用环境变量,开发了通用的UDF函数(如经常使用的url参数解析等)。其中Application为全部应用的父类,处理流程如图所示,开发者只需编写关注绿色部分便可。api

图片

目前,离线框架所支持的经常使用环境变量以下表所示。网络

图片


2.2 可扩展工具

可扩展工具中包含了大量的工具类,服务于应用程序及基础框架,经常使用有,配置文件解析类,如解析任务资源参数等;数据库工具类,用于读写数据库;日期工具类,用于日期加减、转换、识别并解析环境变量等。服务于应用程序的通用工具模块可统称为可扩展工具,这里再也不赘述。session

2.3 应用程序

2.3.1 SQL应用

对于SQL应用,只须要建立SQL代码及资源配置便可,应用类为惟一类(已实现),有且只有一个,供全部SQL应用使用,开发者无需关心。以下配置所示,class为全部应用的惟一类名,开发者要关心的是path中的sql代码及conf中该sql所使用的资源大小。

<?xml version="1.0" encoding="UTF-8"?>
<project name="test">
    <class>com.way.app.instance.SqlExecutor</class>
    <path>sql文件路径</path>
  <!--    sparksession conf   -->
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>1G</spark.driver.memory>
        <spark.executor.instances>20</spark.executor.instances>
    </conf>
</project>

2.3.2 Java应用

对于复杂的数据处理,SQL代码不能知足需求时,咱们也支持Java程序的编写,与SQL不一样的是,开发者须要建立新的应用类,继承Application父类并实现run方法便可,run方法中开发者只须要关注数据的处理逻辑,对于通用的SparkSession、SparkContext等建立及关闭无需关注,框架还帮助开发者封装了代码的输入、输出逻辑,对于输入类型,框架支持HDFS文件输入、SQL输入等多种输入类型,开发者只需调用相关处理函数便可。

以下为一个简单的Java数据处理应用,从配置文件能够看出,仍需配置资源大小,但与SQL不一样的是,开发者须要定制化编写对应的Java类(class参数),以及应用的输入(input参数)和输出参数(output参数),此应用中输入为SQL代码,输出为HDFS文件。从Test类实现能够看出,开发者只需三步走:获取输入数据、逻辑处理、结果输出,便可完成代码编写。

<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_dwd_hanwuji_click_incr_day_domain">
    <class>com.way.app.instance.ecommerce.Test</class>
    <input>
        <type>table</type>
        <sql>select
            clk_url,
            clk_num
            from test_table
            where event_day='{DATE}'
            and click_pv > 0
            and is_ubs_spam=0
        </sql>
    </input>
    <output>
        <type>afs_kp</type>
        <path>test/event_day={DATE}</path>
    </output>
    <conf>
        <spark.executor.memory>2G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>2G</spark.driver.memory>
        <spark.executor.instances>10</spark.executor.instances>
    </conf>
</project>
package com.way.app.instance.ecommerce;import com.way.app.Application;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Row;import java.util.Map;import org.apache.spark.api.java.function.FilterFunction;import org.apache.spark.sql.Dataset;public class Test extends Application {    @Override    public void run() {        // 输入        Map<String, String> input = (Map<String, String>) property.get("input");        Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num");        // 逻辑处理(简单的筛选出url带有部分站点的日志)        JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> {            String url = row.getAs("url").toString();            return url.contains(".jd.com")                    || url.contains(".suning.com")                    || url.contains("pin.suning.com")                      || url.contains(".taobao.com")                    || url.contains("detail.tmall.hk")                    || url.contains(".amazon.cn")                    || url.contains(".kongfz.com")                    || url.contains(".gome.com.cn")                    || url.contains(".kaola.com")                    || url.contains(".dangdang.com")                    || url.contains("aisite.wejianzhan.com")                    || url.contains("w.weipaitang.com");        })                .toJavaRDD()                .map(row -> row.mkString("\001"));        // 输出        Map<String, String> output = (Map<String, String>) property.get("output");        outRdd.saveAsTextFile(getOutPut(output));    }}

2.3.3 数据回溯应用

数据回溯应用是为解决快速回溯、释放人力而研发的,使用很是便捷,开发者无需重构任务代码,与SQL应用相同,回溯应用类为惟一类(已实现),有且只有一个,供全部回溯任务使用,且支持多种回溯方案。

2.3.3.1 方案设计

在平常回溯过程当中发现,一次回溯任务存在严重的时间浪费,不管以何种方式提交任务,都须要经历如下执行环境申请及准备的过程:

  1. 在client提交application,首先client向RS申请启动ApplicationMaster

  2. RS先随机找到一台NodeManager启动ApplicationMaster

  3. ApplicationMaster向RS申请启动Executor的资源

  4. RS返回一批资源给ApplicationMaster

  5. ApplicationMaster链接Executor

  6. 各个Executor反向注册给ApplicationMaster

  7. ApplicationMaster发送task、监控task执行,回收结果

这个过程占用的时间咱们统称为执行环境准备,咱们提交任务后,经历以下三个过程:

  1. 执行环境准备

  2. 开始执行代码

  3. 释放资源

执行环境准备一般会有5-20分钟的等待时间,以队列当时的资源状况上下波动,失败率为10%左右,失败缘由因为队列、网络、资源不足等形成的不可抗力因素;代码执行过程一般失败率在5%左右,一般因为节点不稳定、网络等因素致使。离线开发框架回溯应用从节省时间和人力两个方面考虑,设计方案图2-3所示。

从回溯时间方面来看:将全部回溯子任务的第1、第三步的时间压缩为一次,即环境准备及释放各一次,执行屡次回溯代码。若开发者回溯任务为30个子任务,则节省的时间为5-20分钟乘29,可见,回溯子任务越多,回溯提效越明显。

从人工介入方面来看,第一,开发者无需额外开发、添加回溯配置便可。第二,离线框架回溯应用启动的任务数量远远小于传统回溯方案,以图2-3为例,该回溯任务为串行回溯方式,使用框架后只需关注一个任务的执行状态,而传统方式则需人工维护N个任务的执行状态。

最后,咱们在使用离线开发框架回溯一个一年的串行任务中,代码的执行只须要5分钟左右,咱们发现,不使用离线开发框架回溯的任务在最理想的状况下(即最短期分配到资源、全部子任务均无失败状况、一次能够串行启动365天),须要的时间为2.5天,但使用离线开发框架回溯的任务,在最坏的状况下(即最长时间分配到资源,任务失败状况出现10%),只须要6个小时就可完成,提效90%以上,且基本无需人力关注。

图片

2.3.3.2 功能介绍

断点续回

使用Spark计算,咱们在享受其计算带来的飞快速度时,不免会遭遇其中的不稳定性,节点宕机、网络链接失败、资源问题带来的任务失败家常便饭,回溯任务动辄几个月、甚至一年,任务量巨大,失败后能够继续从断点处回溯显得尤其重要。在离线框架设计中,记录了任务回溯过程当中已成功的部分,任务失败重启后会进行断点续回。

回溯顺序

在回溯任务中,一般咱们会根据业务须要肯定回溯顺序,如对于有新老用户的增量数据,因为当前的日期数据依赖历史数据,因此咱们一般会从历史到如今开始回溯。但没有这种须要时,通常来讲,先回溯如今能够快速知足业务方对如今数据指标的了解,咱们一般会从如今到历史回溯。在离线框架设计中,开发者可根据业务须要选择回溯顺序。

并行回溯

一般,回溯任务优先级低于例行任务,在资源有限的状况下,回溯过程当中不能一次性所有开启,以避免占用大量资源影响例行任务,因此离线框架默认为串行回溯。固然在资源充分的时间段,咱们能够选择适当的并行回溯。离线开发框架支持必定的并发度,开发者在回溯任务时游刃有余。

2.3.3.3 建立一个回溯任务

回溯应用的使用很是方便,开发者无需新开发代码,使用例行的代码,配置回溯方案便可,以下代码所示,

  • class参数为回溯应用的惟一类,必填参数,全部回溯任务无需变化。

  • type参数为回溯应用类型,默认为sql,若应用类型为java,则type值应为java类名。

  • path参数为回溯代码路径,必填参数,无默认值,一般与例行任务代码相同,无需修改。

  • limitdate参数为回溯的截止日期,必填参数,无默认值。

  • startdate参数为回溯开始日期,必填参数,无默认值,若任务进入断点续回或开启并行回溯时,则该参数无效。

  • order参数为回溯顺序,默认为倒序。当值为1时为正序,为值为-1时为倒序。

  • distance参数为回溯步长,框架默认为串行回溯,但也支持并行回溯,该参数主要用于支持并行回溯,当该参数存在且值不为-1时,回溯开始日期取值为基准日期。如启动两个并行任务,任务的执行范围为基准日期至基准日期加步长或limitdate,若基准日期加步长后日期大于limitdate,则是取limitdate,不然反之。

  • file参数为回溯日志文件,必填参数,无默认值,用于记录已回溯成功的日期,当失败再次重启任务时,startdate会以日志文件中日期的下一个日期为准。

  • conf参数与其余应用相同,为本次回溯任务的资源占用配置。

<?xml version="1.0" encoding="UTF-8"?><project name="ecommerce_ads_others_order_retain_incr_day">    <class>com.way.app.instance.ecommerce.Huisu</class>    <type>sql</type>    <path>/sql/ecommerce/ecommerce_ads_others_order_retain_incr_day.sql</path>    <limitdate>20220404</limitdate>    <startdate>20210101</startdate>    <order>1</order>    <distance>-1</distance>    <file>/user/ecommerce_ads_others_order_retain_incr_day/process</file>    <conf>        <spark.executor.memory>1G</spark.executor.memory>        <spark.executor.cores>2</spark.executor.cores>        <spark.executor.instances>30</spark.executor.instances>        <spark.yarn.maxAppAttempts>1</spark.yarn.maxAppAttempts>    </conf></project>
‍

3、使用方式

3.1 使用介绍

使用离线框架方式开发时,开发者只需重点关注数据逻辑处理部分,开发完成打包后,提交执行,对于每个应用主类相同,如前文所述为Application父类,不随应用变化,惟一变化的是父类须要接收的参数,该参数为应用的配置文件的相对路径。

3.2 使用对比

使用离线框架先后对比图以下所示。

图片


4、展望

目前,离线开发框架仅支持SQL、Java语言代码的开发,但Spark支持的语言远不止这两种,咱们须要继续对框架升级支持多语言开发等,让开发者更方便、快速地进行大数据开发。

----------  END  ----------

推荐阅读

云原生时代的搜索服务算力管理

浅谈小程序开源业务架构建设之路

百度小程序包流式下载安装优化

前端工程化之FaaS SSR方案

日志中台不重不丢实现浅谈

百度ToB垂类帐号权限平台的设计与实践