前言
由于公司项目主要模块为Python开发,最好项目都能跑在Python环境下。公司虽然有Flink平台,但如果需要公司平台来支持PyFlink,不一定有能力和资源去推动做这件事。我们需要自己来维护一个小集群Flink环境,满足项目需求即可。最好的方案是部署在Docker环境中,管理方便,资源最小化。
PyFlink环境准备
推荐版本
- JDK 11+ (推荐JDK 11,Flink 1.15+不再支持JDK 8)
- Python 3.7+ (推荐Python 3.8或3.9)
- Flink 1.17.0+ (推荐使用最新稳定版本)
本地环境安装
1. 安装Flink
下载并解压最新稳定版本的Flink:
1 2 3 4 5 6
| wget https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
tar -xzf flink-1.17.0-bin-scala_2.12.tgz cd flink-1.17.0
|
2. 安装PyFlink
使用pip安装最新版本的PyFlink:
1 2 3 4 5 6 7 8
| python -m pip install --upgrade pip
pip install apache-flink
pip list | grep apache-flink
|
3. 配置国内pip源
安装模块需要比较好的网络,可以使用国内pip源:
Linux/macOS用户:创建或编辑~/.pip/pip.conf文件
1 2 3 4
| [global] index-url = https://mirrors.aliyun.com/pypi/simple/ [install] trusted-host = mirrors.aliyun.com
|
Windows用户:创建或编辑%APPDATA%\pip\pip.ini文件
1 2 3 4
| [global] index-url = https://mirrors.aliyun.com/pypi/simple/ [install] trusted-host = mirrors.aliyun.com
|
Docker集群环境
1. 构建PyFlink环境Docker镜像
官方没有提供预构建的Python环境Flink镜像,需要自己构建。以下是现代的Dockerfile配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| FROM flink:1.17.0-scala_2.12
RUN set -ex; \ apt-get update; \ apt-get -y install --no-install-recommends python3 python3-pip python3-dev; \ ln -s /usr/bin/python3 /usr/bin/python; \ ln -s /usr/bin/pip3 /usr/bin/pip; \ apt-get clean; \ rm -rf /var/lib/apt/lists/*
RUN set -ex; \ python -m pip install --upgrade pip; \ pip install --no-cache-dir apache-flink==1.17.0; \ pip install --no-cache-dir pandas pyarrow
ENV PYTHONPATH=$PYTHONPATH:/opt/flink/lib
|
2. 构建镜像
1 2 3 4
| docker build -t pyflink:1.17.0 .
docker images
|
3. 使用docker-compose编排
创建docker-compose.yml文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| version: "3.8" services: jobmanager: image: pyflink:1.17.0 ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager - FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
taskmanager: image: pyflink:1.17.0 depends_on: - jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager - FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" deploy: replicas: 2
|
4. 启动集群
1 2 3 4
| docker-compose up -d
docker-compose ps
|
访问 http://localhost:8081 验证集群是否成功启动。
Kubernetes部署
1. 使用Flink Kubernetes Operator
Flink提供了官方的Kubernetes Operator,这是部署和管理Flink集群的推荐方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| kubectl apply -f https://github.com/apache/flink-kubernetes-operator/releases/download/v1.4.0/flink-kubernetes-operator.yaml
kubectl apply -f - << EOF apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: pyflink-cluster namespace: default spec: image: pyflink:1.17.0 flinkVersion: v1.17.0 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: replicas: 1 resources: memory: "2048m" cpu: 1 taskManager: replicas: 2 resources: memory: "2048m" cpu: 1 EOF
|
2. 提交PyFlink作业到K8s
1 2 3 4 5
| ./bin/flink run -t kubernetes-session \ -Dkubernetes.cluster-id=pyflink-cluster \ -Dkubernetes.namespace=default \ -py /path/to/your/pyflink_job.py
|
部署PyFlink作业
1. 本地开发模式
1 2 3 4 5
| python your_pyflink_job.py
./bin/flink run -py your_pyflink_job.py
|
2. 集群模式
1 2 3 4 5 6 7 8 9 10 11 12 13
| ./bin/flink run -m jobmanager:8081 -py your_pyflink_job.py
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1)
env.execute_remote('Your Job Name', 'jobmanager:8081')
|
生产环境最佳实践
1. 依赖管理
对于生产环境,建议使用requirements.txt文件管理依赖:
1 2 3 4
| apache-flink==1.17.0 pandas==1.5.3 pyarrow==12.0.0 # 其他依赖...
|
2. 资源配置
根据作业需求配置合适的资源:
1 2 3 4 5
| jobmanager.memory.process.size: 2048m taskmanager.memory.process.size: 4096m taskmanager.numberOfTaskSlots: 2 parallelism.default: 2
|
3. 状态管理
对于有状态的作业,配置状态后端:
1 2 3 4
| state.backend: rocksdb state.checkpoints.dir: hdfs:///flink/checkpoints state.savepoints.dir: hdfs:///flink/savepoints
|
4. 监控与日志
集成Prometheus和Grafana进行监控:
1 2 3
| metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: 9250-9260
|
常见问题与解决方案
1. 依赖冲突
问题:ModuleNotFoundError: No module named 'grpc'
解决方案:重新安装依赖,确保版本兼容
1
| pip install --force-reinstall grpcio==1.48.2
|
2. Protobuf冲突
问题:google/protobuf/descriptor_database.cc:58] file already exists in database
解决方案:确保protobuf版本兼容
1
| pip install --force-reinstall protobuf==3.20.3
|
3. Python版本兼容性
问题:Some syntactic constructs of Python 3 are not yet fully supported by Apache Beam
解决方案:使用推荐的Python版本(3.8或3.9),避免使用过新的Python语法特性
4. Docker镜像大小优化
问题:构建的Docker镜像过大
解决方案:使用多阶段构建和 Alpine 基础镜像
1 2 3 4 5 6 7
| FROM python:3.9-slim as builder RUN pip install --user apache-flink==1.17.0
FROM flink:1.17.0-scala_2.12-alpine COPY --from=builder /root/.local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages RUN apk add --no-cache python3 py3-pip
|
总结
PyFlink提供了强大的流处理能力,结合Python的易用性,是处理大数据流的理想选择。通过Docker和Kubernetes部署,可以快速搭建和管理PyFlink集群,满足不同规模的业务需求。
随着Flink版本的迭代,PyFlink的功能和性能也在不断提升。建议定期关注官方文档和社区动态,及时更新版本以获得最佳体验。
参考资料