seatunnel 原名 waterdrop,是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。
本文介绍使用 docker 为 flink 创建 standalone 集群,运行 seatunnel 快速开始任务。
本地机器为 mac,flink 运行在 docker 容器中,部分步骤与效果和官网并不一致。
搭建 flink 集群
docker 的优势在于迁移方便,当创建好 flink 镜像后,编写 docker-compose 配置文件,即可随时随地起一个 flink 的本地集群。
seatunnel 目前版本支持的 flink 引擎版本是 1.9.0。
同时 seatunnel 对项目中 flink 的依赖声明为 provided,flink 版本的向后兼容是存在问题的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
对于较新版本的 flink 缺少依赖支持,无法运行,所以创建的 flink 的集群同样采用 1.9.0 版本。
version: "3.1"
services:
jobmanager:
image: flink:1.9.0-scala_2.11
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
ports:
- 8081:8081
command: jobmanager
volumes:
- jobmanager:/flink/jobmanager
networks:
- flink
taskmanager:
image: flink:1.9.0-scala_2.11
depends_on:
- jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 8
command: taskmanager
scale: 1
volumes:
- taskmanager:/flink/taskmanager
networks:
- flink
networks:
flink:
driver: bridge
volumes:
jobmanager:
taskmanager:
将上面配置保存为 docker-compose.yml 文件,运行 docker-compose up -d 即可启动 standalone 集群。
打包或下载 seatunnel
可以在 github 下载 seatunnel 的 2.x 版本,或者 clone 源码自行打包。
本文 clone 源码,基于 dev 分支自行打包。
在项目源码目录下运行 mvn clean package 打包项目,打包文件位于 home/seatunnel-dist/target 目录下。
解压安装包:
tar -zxf seatunnel-dist-2.0.4-2.11.8-bin.tar.gz
配置 seatunnel
提交 seatunnel 需要依赖本地 flink 安装路径,需在 seatunnel-env.sh 文件中编辑配置 FLINK_HOME 为 flink 的安装路径。
为了能够提交任务到 flink 集群中,依然从 flink 官网下载了 1.9.0 版本的项目,进行解压。
创建任务配置文件
在 config 目录下,创建 application.conf 文件,内容如下:
env {
execution.parallelism = 1
}
source {
SocketStream{
result_table_name = "fake"
field_name = "info"
host = xxx.xxx.xxx.xxx
port = 19999
}
}
transform {
Split{
separator = "#"
fields = ["name","age"]
}
sql {
sql = "select * from (select info,split(info) as info_row from fake) t1"
}
}
sink {
ConsoleSink {}
}
因为 flink 集群运行在 docker 中,seatunnel 提供的 SourceStream 的 host 配置默认为 localhost,指向 docker 内 taskmanager 所在容器的本地地址,如果不修改的话需要登陆容器启动 nc 服务,这里将其修改为自己本地机器的 ip 地址。
port 配置默认为 9999,这里改为 19999。
启动 nc
nc -l 19999
启动 seatunnel
在 seatunnel 解压目录运行命令,启动任务
./bin/start-seatunnel-flink.sh --config ./config/application.conf
等待片刻任务启动后,即可在flink web-ui 中看到任务:
测试
在 nc 中输入 xg#1995。
任务中配置基于 # 的字符串分割为 name 和 age 字段。
在 taskmanager 所在容器的 std 输出中即可看到 xg#1995,xg,1995 输出。
docker 启动的 flink 集群并不能很好地采集日志和 std 输出,因此在 flink 的 web-ui 中 Logs 和 Stdout 是没有任何输出的,查看任务的输出需要借助 docker-compose 命令:
docker-compose logs -f taskmanager