本文将详细介绍PyFlink项目的部署方法,包括依赖管理、部署方式和最佳实践。

部署准备

环境要求

  • Java 8 或 Java 11
  • Python 3.6+
  • Flink 1.10+(推荐使用较新版本)
  • 集群环境(单机或分布式)

项目结构

1
2
3
4
pyflink-project/
├── requirements.txt # 第三方依赖
├── deploy_kfk_demo.py # 主入口文件
└── thr_fun.py # 自定义依赖文件

依赖管理

第三方依赖

PyFlink 1.10+ 提供了 set_python_requirements 方法来管理第三方依赖:

1
2
3
4
5
6
7
8
9
10
from pyflink.table import StreamTableEnvironment

# 创建TableEnvironment
t_env = StreamTableEnvironment.create(env)

# 设置Python依赖
t_env.set_python_requirements(
requirements_file_path='requirements.txt',
requirements_cache_dir='/tmp/requirements_cache'
)

requirements.txt示例:

1
2
3
numpy==1.16.5
pandas==1.0.3
kafka-python==2.0.2

注意:

  • 执行环境需要有网络连接以上载依赖
  • requirements_cache_dir 用于缓存依赖,可加速后续部署

自定义文件

对于自定义的Python模块,需要使用 -pyfs 参数指定:

1
2
3
4
5
# 单个文件
bin/flink run -m localhost:8081 -py deploy_kfk_demo.py -pyfs thr_fun.py

# 整个目录
bin/flink run -m localhost:8081 -py deploy_kfk_demo.py -pyfs ./my_modules/

部署方式

1. 命令行部署

单机模式

1
2
3
4
5
# 本地执行
bin/flink run -py deploy_kfk_demo.py

# 指定并行度
bin/flink run -p 4 -py deploy_kfk_demo.py

集群模式

1
2
3
4
5
# 连接到Flink集群
bin/flink run -m jobmanager-host:8081 -py deploy_kfk_demo.py

# 包含依赖文件
bin/flink run -m jobmanager-host:8081 -py deploy_kfk_demo.py -pyfs ./dependencies/

2. Python模块部署

对于复杂项目,可以将代码组织为Python包:

1
2
3
4
5
# 部署Python模块
bin/flink run -m localhost:8081 -pym my_project.main

# 包含依赖目录
bin/flink run -m localhost:8081 -pym my_project.main -pyfs ./my_project/

3. Docker部署

使用Docker可以简化环境配置:

Dockerfile示例:

1
2
3
4
5
6
7
8
9
10
11
FROM flink:1.13.0-scala_2.12-java8

# 安装Python和依赖
RUN apt-get update && apt-get install -y python3 python3-pip
RUN pip3 install --no-cache-dir apache-flink numpy pandas

# 复制应用代码
COPY . /opt/flink/usrlib/

# 设置工作目录
WORKDIR /opt/flink

构建和运行:

1
2
3
4
5
6
7
8
9
10
# 构建镜像
docker build -t pyflink-app .

# 运行容器
docker run -d --name pyflink-jobmanager pyflink-app jobmanager

docker run -d --name pyflink-taskmanager --link pyflink-jobmanager:jobmanager pyflink-app taskmanager

# 提交作业
docker exec -it pyflink-jobmanager ./bin/flink run -m localhost:8081 -py /opt/flink/usrlib/deploy_kfk_demo.py

4. Kubernetes部署

对于生产环境,推荐使用Kubernetes部署:

flink-configuration-configmap.yaml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: default
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
log4j-console.properties: |+
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

提交作业:

1
2
3
# 使用kubectl提交作业
kubectl run flink-submit --image=flink:1.13.0-scala_2.12-java8 --restart=Never -- \
/opt/flink/bin/flink run -m flink-jobmanager:8081 -py /opt/flink/usrlib/deploy_kfk_demo.py

高级配置

Python执行环境配置

1
2
3
4
5
6
7
8
# 设置Python可执行文件路径
t_env.set_python_executable('/usr/bin/python3')

# 设置Python依赖目录
t_env.set_python_requirements(
'requirements.txt',
'/tmp/pyflink_cache'
)

资源配置

1
2
3
4
5
# 设置作业资源
bin/flink run -m localhost:8081 \
-yjm 1024m \
-ytm 2048m \
-py deploy_kfk_demo.py

最佳实践

  1. 依赖管理:

    • 使用固定版本的依赖
    • 避免使用过大的依赖包
    • 利用缓存加速部署
  2. 代码组织:

    • 将代码模块化,便于维护
    • 使用相对导入避免路径问题
    • 保持入口文件简洁
  3. 性能优化:

    • 合理设置并行度
    • 避免在UDF中进行耗时操作
    • 使用批处理减少网络传输
  4. 监控和日志:

    • 配置合适的日志级别
    • 使用Flink Web UI监控作业
    • 实现自定义指标收集

常见问题

1. 依赖安装失败

解决方案:

  • 确保执行环境有网络连接
  • 检查requirements.txt格式
  • 使用国内镜像源加速下载

2. 自定义模块导入失败

解决方案:

  • 使用 -pyfs 参数正确指定依赖文件
  • 确保模块路径正确
  • 检查Python版本兼容性

3. 内存不足

解决方案:

  • 增加TaskManager内存
  • 调整并行度
  • 优化数据处理逻辑

版本兼容性

PyFlink版本 Python版本 推荐使用
1.10.x 3.6-3.7 基础功能
1.11.x 3.6-3.8 增强功能
1.12.x 3.6-3.8 稳定性提升
1.13.x+ 3.6-3.9 推荐版本

参考资料