seatunnel 原名 waterdrop,是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。

本文介绍使用 docker 为 flink 创建 standalone 集群,运行 seatunnel 快速开始任务。

本地机器为 mac,flink 运行在 docker 容器中,部分步骤与效果和官网并不一致。

搭建 flink 集群

docker 的优势在于迁移方便,当创建好 flink 镜像后,编写 docker-compose 配置文件,即可随时随地起一个 flink 的本地集群。

seatunnel 目前版本支持的 flink 引擎版本是 1.9.0。

同时 seatunnel 对项目中 flink 的依赖声明为 provided,flink 版本的向后兼容是存在问题的。

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>

对于较新版本的 flink 缺少依赖支持,无法运行,所以创建的 flink 的集群同样采用 1.9.0 版本。

version: "3.1" services: jobmanager: image: flink:1.9.0-scala_2.11 environment: FLINK_PROPERTIES: | jobmanager.rpc.address: jobmanager ports: - 8081:8081 command: jobmanager volumes: - jobmanager:/flink/jobmanager networks: - flink taskmanager: image: flink:1.9.0-scala_2.11 depends_on: - jobmanager environment: FLINK_PROPERTIES: | jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 8 command: taskmanager scale: 1 volumes: - taskmanager:/flink/taskmanager networks: - flink networks: flink: driver: bridge volumes: jobmanager: taskmanager:

将上面配置保存为 docker-compose.yml 文件,运行 docker-compose up -d 即可启动 standalone 集群。

打包或下载 seatunnel

可以在 github 下载 seatunnel 的 2.x 版本,或者 clone 源码自行打包。

本文 clone 源码,基于 dev 分支自行打包。

在项目源码目录下运行 mvn clean package 打包项目,打包文件位于 home/seatunnel-dist/target 目录下。

解压安装包:

tar -zxf seatunnel-dist-2.0.4-2.11.8-bin.tar.gz

配置 seatunnel

提交 seatunnel 需要依赖本地 flink 安装路径,需在 seatunnel-env.sh 文件中编辑配置 FLINK_HOME 为 flink 的安装路径。

为了能够提交任务到 flink 集群中,依然从 flink 官网下载了 1.9.0 版本的项目,进行解压。

创建任务配置文件

在 config 目录下,创建 application.conf 文件,内容如下:

env { execution.parallelism = 1 } source { SocketStream{ result_table_name = "fake" field_name = "info" host = xxx.xxx.xxx.xxx port = 19999 } } transform { Split{ separator = "#" fields = ["name","age"] } sql { sql = "select * from (select info,split(info) as info_row from fake) t1" } } sink { ConsoleSink {} }

因为 flink 集群运行在 docker 中,seatunnel 提供的 SourceStream 的 host 配置默认为 localhost,指向 docker 内 taskmanager 所在容器的本地地址,如果不修改的话需要登陆容器启动 nc 服务,这里将其修改为自己本地机器的 ip 地址。

port 配置默认为 9999,这里改为 19999。

启动 nc

nc -l 19999

启动 seatunnel

在 seatunnel 解压目录运行命令,启动任务

./bin/start-seatunnel-flink.sh --config ./config/application.conf

等待片刻任务启动后,即可在flink web-ui 中看到任务:

docker使用超详细入门级(安装体验-基于)(1)

测试

在 nc 中输入 xg#1995。

任务中配置基于 # 的字符串分割为 name 和 age 字段。

在 taskmanager 所在容器的 std 输出中即可看到 xg#1995,xg,1995 输出。

docker 启动的 flink 集群并不能很好地采集日志和 std 输出,因此在 flink 的 web-ui 中 Logs 和 Stdout 是没有任何输出的,查看任务的输出需要借助 docker-compose 命令:

docker-compose logs -f taskmanager

,