在数据进入到数据平台之后,我们就可以正式开始构建数据应用了。一个常见的数据应用是数据报表和数据指标的开发。如何开发这样的数据应用呢?首先要决定的就是使用什么样的开发语言及如何构建开发环境。本文将结合我们的实践经验一起聊一聊这个话题。

数据计算框架

在基于Hadoop的分布式数据平台的场景下,当前最流行的数据计算框架要算Spark了,相比Hive,它提供了更为可控的也更灵活的编程过程。根据需要,我们可以在Spark程序中随时获取到某一个计算结果,然后根据此结果进行后续计算,而Hive要想要实现这样的灵活性,可能不得不借助临时表或其他语言来充当驱动器程序了。

同时,由于Spark在设计之初就声明了兼容Hive,所以,我们完全可以把Spark和Hive联合起来使用。Hive用于在数据分析时随时执行sql查询,Spark用于将数据计算任务工程化。

我们在后面的《数据仓库建模实践》中会讲到数据仓库DWD层的构建,这一层的构建就可以通过Spark来实现。Spark同时提供了编程的DataFrame接口和sql接口,支持Java、Scala、Python、R等编程语言。

数据开发语言

应该选择什么样的接口进行数据开发呢?我们会建议主要使用sql而不是基于DataFrame的API,主要理由是:

sql的缺点及应对

很多人会吐槽sql开发的缺点。究其原因,是由于sql是声明式的语言,需要先定义再执行,且执行过程不能中断。这样一来,直接使用原生的sql来编写复杂代码可能会十分不便。比如以下这些场景。

考虑需要进行流程控制的场景。比如,在数据量很大的情况下,一般需要调整分区数才可以更高效的并行执行。此时,我们需要在执行sql之前先判断数据量。如果直接用sql,这一过程将不容易实现。如果依赖调度工具进行这种流程判断又显得大材小用。

考虑需要使用变量的场景。比如,我们需要根据指定的日期进行指标计算。如果直接使用sql,我们可能要考虑先将此日期写入到一张临时数据库表中,十分不便。

考虑需要重复多次的代码。比如,在空调销售的场景,我们需要同时按日计算销量指标,也需要按月计算销量指标(考虑到取消订单的情况,两者无累加关系,需要单独实现),这两个指标的大部分逻辑是相同的。使用原生的sql,将难以复用这些相同的计算逻辑。

如何应对上面这些问题呢?相信大家心里已经有答案了,那就是进行sql语言的增强。其实,我们只需要实现一个简单的sql执行器就可以解决上提到的大部分问题。

增强sql语法

针对上面的三个场景,我们可以为sql语言添加这些特性:变量、控制语句、模板。

这样增强后的sql可以编写成什么样子呢?采用TDD的思路,我们可以先写个测试,通过编写测试来辅助设计我们所期望的增强sql的使用方式。考虑支持变量和控制语句,这样的测试sql可以编写如下:

数据开发用什么工具(如何选择数据应用开发语言和环境)(1)

针对上面的增强sql语法实现一个驱动器程序应当不是难事。以下是驱动器实现的大致思路:

模板可以很好的解决代码重复问题,如何支持模板呢?可以编写一个测试sql如下:

数据开发用什么工具(如何选择数据应用开发语言和环境)(2)

支持上述测试的驱动器实现的大致思路是:

(上面的测试用例只是覆盖了很小的一部分正常场景,要实现一个完整的驱动器,还需要定义更多的边界场景)

有了这个驱动器,相信前面提到的sql的缺点就不再是缺点了。这个增强版本的sql可以应对大部分的场景。在我们的实践过程中,还增加了一些其他的有用的特性,比如:

第二语言

在决定使用sql作为第一语言之后,我们还需要考虑数据开发的第二语言。比如上述sql驱动器应该选择什么语言来实现呢?

Spark支持得最好的语言是Scala和Python。Spark本身是使用Scala开发而成,所以二者有着无缝的互操作性。Python拥有庞大的数据分析师用户群体,为支持大规模数据分析而设计的Spark自然要好好支持Python。

在项目中,我们优先选择了Python语言,主要的理由是:

当然是用Python也有其缺点,比如:

相对而言,我们认为Python还是做数据开发的更好的选择。

当然,我们也拥抱Scala语言,一些udf的实现我们会建议使用Scala,主要的考虑是性能更快(无需在两种语言中进行数据的序列化和反序列化)。

通过udf/udaf扩展sql

另一个扩展sql语法的方式是udf,即自定义函数。

要编写一个Spark的自定义函数是非常简单的。如果代码逻辑比较简单,可能只需要一行代码即可。比如,假设我们要实现一个函数将两个数值相乘,只需要编写Scala代码spark.udf.register("multi", udf((a: Double, b: Double) => a * b))。得益于Scala具备强大的类型系统及类型推断能力,这里的代码才可以如此简洁。

除了udf,Spark还支持udaf扩展,即自定义聚合函数(在Group By聚合统计的场景下使用)。实现自定义聚合函数时,需要略微费事一点。大体上,需要实现reduce函数,把多个值聚合为一个值,然后实现merge函数,将多个reduce的结果合并为一个。

由于我们可以轻易的实现自定义函数和自定义聚合函数,因此,我们可以更多的借助udf/udaf的能力,把一些通用的计算通过udf/udaf进行封装和抽象。从而使得我们的sql代码更为简洁清晰。

前面提到了我们的第二开发语言是Python,那么Scala可以作为我们的第三开发语言,专门用来开发udf/udaf。在这种场景下使用Scala可以很好的提升程序性能,我们将从中获得很多益处。但是Scala程序的开发环境和依赖管理问题依然是比较麻烦的,如何缓解这个问题呢?

我们建立了这样几个原则:

有了这样的原则,我们可以很好的享受到Scala代码带来的好处,同时可以避免Scala带来的工程管理复杂度。

实现Scala代码的编译和打包的shell脚本可以参考如下:

数据开发用什么工具(如何选择数据应用开发语言和环境)(3)

构建开发环境

任务调度器

一个大的数据应用常常会被拆分为很多小的独立的数据计算任务,比如一张报表,里面的每个指标的计算可能对应一个的任务。我们需要一个调度器将这些任务按照依赖关系周期性的调度运行。这些调度任务一般可以组成一个有向无环图,即DAG,我们也把这样的图称作数据流水线。

当前可以使用的开源调度器种类很多,比如Airflow,作为Airbnb开源的任务调度系统,由于其可以使用代码来定义数据流水线且与其他开源生态有很好的集成而深受广大开发者喜爱。

使用Makefile实现任务管理

如何将任务集成到调度系统里面呢?一般可以使用Shell命令将任务集成到调度系统。我们会建议使用Makefile先进行一层任务封装,然后再使用Shell命令集成到调度系统。相比直接使用Shell命令进行集成,这一实践有以下几个优势:

除此之外,Makefile作为c/c 的构建工具,使用非常广泛,与Shell一样非常轻量级,容易获取和安装。这使得我们使用Makefile成本很低。

大多数调度系统都提供了除Shell外的其他编程语言的集成,比如Airflow提供了Python代码的集成方式。但我们并不建议使用这一方式进行集成,主要理由是:

构建任务运行环境

Spark程序需要运行在一个支持环境中,随之而来的一个问题就是,我们需要几套环境来支持数据开发、测试及上线运行?

由于分布式环境的构建和管理维护并不是一件容易的事,且常常需要较高的成本投入,我们并不建议去搭建多套集群。当然,如果企业内部有独立的运维团队负责集群的日常管理,那情况会不一样,我们可以根据需要来选择使用几套环境。

在我们看来,一般使用两套集群就可以足够满足数据项目的推进了。一套是用于进行集群配置测试的,可以称为测试环境。另一套用于运行日常的数据任务,同时支持数据开发、测试及生产运行(周期性调度运行),可以称作生产环境。

测试环境可以应对很多集群配置调整的问题,我们可以先在测试环境中从容的进行集群配置的调整和测试,在修改好了之后,再将配置迁移到生产环境。

有人可能会担心一套环境同时用于支持开发、测试及生产运行会带来问题。比如数据的隔离性,对生产运行的任务的稳定性造成影响等。但是,事实上,集群已经提供了很多的功能来帮助我们解决这个问题。比如:

使用一套集群同时支持开发、测试及生产运行还有以下好处:

总结

本文讨论了在数据应用开发开始之前需要进行的工作,包括进行编程语言的选择和开发环境的构建。这两方面的相关决策将在很大程度上影响后续数据开发的组织和管理。如果决策得当,后续数据开发将能够很轻松的开展起来,否则,则可能将团队带入泥潭。

本文分享了我们在实践过程中的一些思考和选择,这些经验对于我们是很受用的。总结起来,就是:


数据开发支持工具

使用SQL来作为主要数据开发语言,并且,通常我们需要对标准的SQL进行增强,以便可以更好的支持复杂的数据开发。一些典型的需要新增的特性可以是变量、控制语句、模板等。

增强SQL固然是可以解决我们的数据开发问题,但是它也会给我们带来一些其他的不便。第一个烦恼可能就是,标准的SQL可以在很多数据工具中运行,比如Superset的SQL查询器、Hive的查询控制台等,而使用增强语法的SQL编写的代码则不行。由于我们将标准的SQL增强了,而SQL周边生态工具却无法感知这样的增强,这时各种不便就随之而来了。

支持数据开发过程

如何解决这个问题呢?想要在周边工具中进行SQL扩展不是一件简单的事情,可能需要花费大量的精力和时间。我们只能另寻他法。

从软件开发的视角来看这个问题,可以发现,我们现在有了编程语言,也有了编程语言的执行环境,基本的开发流程确实是打通了,但是还缺少的是对开发过程的支持。一般而言,开发过程支持完善与否将很大程度上决定团队开发效率的高低。下面我们一起来看看如何完善对于开发过程的支持。

主要的开发过程一般包括代码编辑、调试、测试三个步骤。下面我们来看看如何支持这些数据开发过程。

支持代码编辑

代码编辑在当前还不会成为一个问题,因为:

支持代码调试

命令行调试器

现在我们来看调试过程。事实上,使用周边的SQL执行工具来快速验证SQL这个过程本身就是代码调试的过程。

有了增强SQL的语法,我们要如何做呢?回顾增强SQL的语法,我们在其中支持了多个步骤,每个步骤可以是执行SQL,定义变量或者调用外部函数。如果可以一个步骤一个步骤运行,并且可以在每个步骤之后查看当前的变量或SQL执行结果,那将是一件不错的事。这其实也就是一般的程序调试过程。

事实上,有了增强SQL的执行器(即前文提到的驱动器),要实现一个具备基本功能的增强SQL调试器并不困难。按照上面的描述,我们只需要在某一个步骤执行完成之后,先暂停执行,并提供接口查询当前上下文的数据即可。在程序暂停时,一般还可以允许运行一些代码,这也不难,提供接口执行SQL即可。这就是一个命令行的程序调试器雏形。

对应到一般在IDE里面进行调试的交互流程上,打断点的过程,就是指定需要在哪一个步骤暂停,至于查看断点时的状态和在断点时执行代码就跟上面的过程完全一致了。

根据前面的分析,我们可以设计一个命令行调试器类Debugger,它可以具有这些接口:

class Debugger: # 查询执行状态 def is_started(...): def is_inprogress(...): def is_finished(...): # 查询执行步骤信息 def step(...): def current_step(...): def next_step(...): def last_step(...): def left_step_count(...): def print_steps(...): # 查看或设置执行过程中的变量 def vars(...): def set_vars(...): def templates(...): def tempviews(...): def showdf(...): # 执行某一步骤,实现暂停、继续等流程控制功能 def step_on(...): def step_to(...): def run(...): def run_to(...): def restart(...): # 在断点过程中执行sql def sql(...):

上面这些接口借助增强SQL的执行器不难实现。有了Debugger类,一个典型的调试过程就变成:

打印代码执行报告

为了辅助数据开发人员更清楚的理解增强SQL的执行过程,我们最好能打印每一步骤的执行情况,比如实际执行的SQL、执行开始时间、结束时间、当前步骤在整个执行过程中耗时百分比等信息。

一个简单的报告可以设计如下:

===================== REPORT FOR step-1 ==================config: StepConfig(target=..., condition=None, line_no=1) sql: select 1 as astatus: SUCCEEDEDstart time: 2021-04-10 10:05:30, end time: 2021-04-10 10:05:33, execution time: 2.251653s - 8.14%messages:===================== REPORT FOR step-2 ==================config: StepConfig(target=log.a, condition=None, line_no=4) sql: select '1' as astatus: SUCCEEDEDstart time: 2021-04-10 10:05:33, end time: 2021-04-10 10:05:33, execution time: 0.069165s - 0.25%messages:a='1'...

为此,我们可以定义一个执行报告搜集器(ReportCollector),每当一个步骤开始或结束执行时,SQL执行器应当通知报告搜集器搜集该步骤的执行信息。在整个流程执行完成之后,SQL执行器可以调用报告搜集器打印整个过程中搜集到的执行报告。

有了报告搜集器,我们就可以更清楚的了解增强SQL执行过程中的细节了。由于我们的SQL执行基于Spark实现,有了这个报告搜集器,一些简单的Spark程序优化还可以直接通过查看报告来完成。

报告搜集器是一个十分好用的功能,当然需要集成到调试器中了。通过在Debugger类中加入report()方法,我们在调试过程中可以随时打印程序执行报告。

打印日志与执行检查

打印日志也是我们调试程序的常用手段,如何在增强SQL中支持日志打印呢?可以考虑定义一个任务类型为log,按照如下方式来使用:

-- target=log.some_info_about_this_logselect 1 as var_1, 2 as var_2

日志打印结果可以在上述任务报告中出现,一个比较直观的设计可以是:

===================== REPORT FOR step-1 ==================config: StepConfig(target=log.some_info_about_this_log, condition=None, line_no=1) sql: select 1 as var_1, 2 as var_2status: SUCCEEDEDstart time: 2021-04-10 10:05:33, end time: 2021-04-10 10:05:33, execution time: 0.069165s - 0.25%messages:var_1=1, var_2=2

很多编程语言都提供了assert语法,用以在开发过程中进行及时的假设验证,我们也可以在增强SQL增加这样的支持。可以考虑定义一个任务类型为check,按照如下方式来使用:

-- target=check.actual_should_equal_expectedselect 1 as actual, 2 as expected

如果从结果集中的获取的actual值与expected值不相等,则此任务会失败,并打印错误消息。同时,这样的错误可以在上述任务报告中体现,一个比较直观的设计可以是:

===================== REPORT FOR step-1 ==================config: StepConfig(target=check.actual_should_equal_expected, condition=None, line_no=10) sql: select 1 as actual, 2 as expectedstatus: FAILEDstart time: 2021-04-10 10:25:32, end time: 2021-04-10 10:25:32, execution time: 0.071442s - 0.52%messages:check [actual_should_equal_expected] failed! actual=1, expected=2, check_data=[...]

通过调试模式屏蔽调试过程中的副作用

有了调试器,现在可以愉快的写代码了。但很快我们就会发现另一个需要解决的问题,那就是调试过程可能导致写入某些外部数据库表。这将带来一些风险,因为我们有可能在调试的时候把一些不应该被覆盖的数据库表给覆盖了。

要解决这个问题也很简单,我们可以在SQL执行器中引入一个debug标记来实现。有了debug标记,在执行某一步骤的时候,可以判断是否是向外部数据库表做写操作,如果是且debug为true,则跳过写操作,只是将该数据创建一个TempView而已。

上述写表操作只是一个场景而已,有了debug标记,我们还可以做很多事情,比如打印更多的调试信息等。

Debugger类在调用SQL执行器时,应当将debug标记设置为true,这样我们就不用担心调试的时候产生任何不想发生的副作用了。

Web数据开发环境

在JupyterLab中调试代码

有了上面这些功能,调试器看起来是不错了,但是要与IDE的交互体验比起来,命令行版本的还是过于简单了。能不能想办法增强一下呢?

数据分析师常用的用于运行代码的工具要算JupyterLab了。作为一个打开网页就能用的开发环境,JupyterLab有非常多十分好用的功能,比如,可以一段一段的定义和执行代码,可以支持嵌入Markdown文档,可以支持可视化结果展示,可以编辑多种语言代码等等。

JupyterLab能不能作为我们的代码编辑器使用呢?

查看JupyterLab最新版本,我们会发现JupyterLab提供了Code Console的功能,且可以支持多个编辑器分屏。其操作界面如下:

数据开发用什么工具(如何选择数据应用开发语言和环境)(4)

此时,大家可能已经想到了,可以借助这样的交互来实现我们的代码调试功能。JupyterLab不仅给我们提供了一个不错的编辑代码的界面,利用Code Console还可以实现一边写代码一边调试。

在JupyterLab中配置好调试器后,一个典型的使用过程如下:

数据开发用什么工具(如何选择数据应用开发语言和环境)(5)

使用JupyterLab还有一系列的其他好处,比如:

在容器中启动JupyterLab

在基于Hadoop的大数据集群中进行数据开发时,常常还有一个不够方便的地方,那就是客户端环境的构建。

我们常常需要集成了多种集群组件的客户端,比如Spark, HDFS, Hive, HBase等,这些客户端的配置需要保持和集群同步。如果自己去构建这样的客户端,不仅耗时,而且很容易出错。

Ambari可以帮助我们自动配置集群节点,如软件安装,配置同步等繁琐的工作Ambari都可以帮我们搞定。当需要使用集群的客户端环境时,常常也是通过Ambari配置的集群节点来实现。

使用Ambari配置的集群节点作为客户端却有另一个缺点,那就是这样的节点常常由于数量较少而在团队中间共享(由于资源占用问题,我们一般不会配置过多的客户端节点)。

既然是共享的节点,大家都在节点上面操作,就容易发生冲突。比如,小A用自己的帐号登录了(通过kinit),此时小B想要访问集群,如果不使用其他的操作系统帐号,小B就会直接用到小A的帐号权限来访问系统,这不是期望的行为。还比如,小A需要在客户端中安装某一个版本1依赖库,而小B需要在客户端中安装同一个依赖库的版本2,这就产生了冲突,需要小A和小B相互协调才行。

容器技术是解决此问题的一个很好的方式。容器可以提供必要的环境隔离,使得团队成员可以自由的在自己的环境中进行操作,无需担心对他人造成影响。

如何实现呢?其实我们只需要一个运行了sshd的容器即可。通过暴露特定的端口,我们可以把运行着sshd的容器作为一个节点,注册到Ambari中,然后利用Ambari帮我们安装好相关的依赖软件。

软件安装完成之后,我们可以通过docker save命令将这样容器保存为一个基础容器镜像。然后通过运行多个此容器,我们就拥有了多个此类客户端了。由于容器运行成本非常低,可以为每个需要编写代码的团队成员运行一个容器作为他自己的客户端使用。这样一来,开发人员环境隔离问题就迎刃而解了。

在容器环境中运行一个JupyterLab来支持开发是一个不错的主意。这样一来,每个人都拥有了自己的一套独立的用JupyterLab打造的开发环境了。

总结

前面的文章中我们提到使用增强SQL来进行数据开发,但是这带来了一些额外的使用成本。本文讨论了如何支持增强SQL的代码编辑和调试功能。

通过实现一个增强SQL调试器,并在JupyterLab中运行此调试器,我们可以打造了一个基于Web的轻量级数据开发环境,这能很大程度上提高数据开发的效率。为了更好的支持数据开发,我们还可以考虑在SQL执行器中增加执行报告搜集的功能,在调试器中随时打印执行报告对于数据开发是一件好事。除此之外,还可以在SQL执行器中引入调试标记,这可以用来避免调试过程的可能的副作用。


企业数据开发工作台地址:https://data-workbench.com/。核心模块--ETL开发语言开源,开源项目地址:https://github.com/easysql/easy_sql

数据开发用什么工具(如何选择数据应用开发语言和环境)(6)

,