本文将详细介绍PyFlink项目的部署方法,包括依赖管理、部署方式和最佳实践。
部署准备
环境要求
- Java 8 或 Java 11
- Python 3.6+
- Flink 1.10+(推荐使用较新版本)
- 集群环境(单机或分布式)
项目结构
1 2 3 4
| pyflink-project/ ├── requirements.txt # 第三方依赖 ├── deploy_kfk_demo.py # 主入口文件 └── thr_fun.py # 自定义依赖文件
|
依赖管理
第三方依赖
PyFlink 1.10+ 提供了 set_python_requirements 方法来管理第三方依赖:
1 2 3 4 5 6 7 8 9 10
| from pyflink.table import StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)
t_env.set_python_requirements( requirements_file_path='requirements.txt', requirements_cache_dir='/tmp/requirements_cache' )
|
requirements.txt示例:
1 2 3
| numpy==1.16.5 pandas==1.0.3 kafka-python==2.0.2
|
注意:
- 执行环境需要有网络连接以上载依赖
requirements_cache_dir 用于缓存依赖,可加速后续部署
自定义文件
对于自定义的Python模块,需要使用 -pyfs 参数指定:
1 2 3 4 5
| bin/flink run -m localhost:8081 -py deploy_kfk_demo.py -pyfs thr_fun.py
bin/flink run -m localhost:8081 -py deploy_kfk_demo.py -pyfs ./my_modules/
|
部署方式
1. 命令行部署
单机模式
1 2 3 4 5
| bin/flink run -py deploy_kfk_demo.py
bin/flink run -p 4 -py deploy_kfk_demo.py
|
集群模式
1 2 3 4 5
| bin/flink run -m jobmanager-host:8081 -py deploy_kfk_demo.py
bin/flink run -m jobmanager-host:8081 -py deploy_kfk_demo.py -pyfs ./dependencies/
|
2. Python模块部署
对于复杂项目,可以将代码组织为Python包:
1 2 3 4 5
| bin/flink run -m localhost:8081 -pym my_project.main
bin/flink run -m localhost:8081 -pym my_project.main -pyfs ./my_project/
|
3. Docker部署
使用Docker可以简化环境配置:
Dockerfile示例:
1 2 3 4 5 6 7 8 9 10 11
| FROM flink:1.13.0-scala_2.12-java8
RUN apt-get update && apt-get install -y python3 python3-pip RUN pip3 install --no-cache-dir apache-flink numpy pandas
COPY . /opt/flink/usrlib/
WORKDIR /opt/flink
|
构建和运行:
1 2 3 4 5 6 7 8 9 10
| docker build -t pyflink-app .
docker run -d --name pyflink-jobmanager pyflink-app jobmanager
docker run -d --name pyflink-taskmanager --link pyflink-jobmanager:jobmanager pyflink-app taskmanager
docker exec -it pyflink-jobmanager ./bin/flink run -m localhost:8081 -py /opt/flink/usrlib/deploy_kfk_demo.py
|
4. Kubernetes部署
对于生产环境,推荐使用Kubernetes部署:
flink-configuration-configmap.yaml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| apiVersion: v1 kind: ConfigMap metadata: name: flink-config namespace: default data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m log4j-console.properties: |+ log4j.rootLogger=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
|
提交作业:
1 2 3
| kubectl run flink-submit --image=flink:1.13.0-scala_2.12-java8 --restart=Never -- \ /opt/flink/bin/flink run -m flink-jobmanager:8081 -py /opt/flink/usrlib/deploy_kfk_demo.py
|
高级配置
Python执行环境配置
1 2 3 4 5 6 7 8
| t_env.set_python_executable('/usr/bin/python3')
t_env.set_python_requirements( 'requirements.txt', '/tmp/pyflink_cache' )
|
资源配置
1 2 3 4 5
| bin/flink run -m localhost:8081 \ -yjm 1024m \ -ytm 2048m \ -py deploy_kfk_demo.py
|
最佳实践
依赖管理:
- 使用固定版本的依赖
- 避免使用过大的依赖包
- 利用缓存加速部署
代码组织:
- 将代码模块化,便于维护
- 使用相对导入避免路径问题
- 保持入口文件简洁
性能优化:
- 合理设置并行度
- 避免在UDF中进行耗时操作
- 使用批处理减少网络传输
监控和日志:
- 配置合适的日志级别
- 使用Flink Web UI监控作业
- 实现自定义指标收集
常见问题
1. 依赖安装失败
解决方案:
- 确保执行环境有网络连接
- 检查requirements.txt格式
- 使用国内镜像源加速下载
2. 自定义模块导入失败
解决方案:
- 使用
-pyfs 参数正确指定依赖文件
- 确保模块路径正确
- 检查Python版本兼容性
3. 内存不足
解决方案:
- 增加TaskManager内存
- 调整并行度
- 优化数据处理逻辑
版本兼容性
| PyFlink版本 |
Python版本 |
推荐使用 |
| 1.10.x |
3.6-3.7 |
基础功能 |
| 1.11.x |
3.6-3.8 |
增强功能 |
| 1.12.x |
3.6-3.8 |
稳定性提升 |
| 1.13.x+ |
3.6-3.9 |
推荐版本 |
参考资料