前言

由于公司项目主要模块为Python开发,最好项目都能跑在Python环境下。公司虽然有Flink平台,但如果需要公司平台来支持PyFlink,不一定有能力和资源去推动做这件事。我们需要自己来维护一个小集群Flink环境,满足项目需求即可。最好的方案是部署在Docker环境中,管理方便,资源最小化。

PyFlink环境准备

推荐版本

  1. JDK 11+ (推荐JDK 11,Flink 1.15+不再支持JDK 8)
  2. Python 3.7+ (推荐Python 3.8或3.9)
  3. Flink 1.17.0+ (推荐使用最新稳定版本)

本地环境安装

下载并解压最新稳定版本的Flink:

1
2
3
4
5
6
# 下载Flink
wget https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

# 解压
tar -xzf flink-1.17.0-bin-scala_2.12.tgz
cd flink-1.17.0

使用pip安装最新版本的PyFlink:

1
2
3
4
5
6
7
8
# 升级pip
python -m pip install --upgrade pip

# 安装PyFlink
pip install apache-flink

# 验证安装
pip list | grep apache-flink

3. 配置国内pip源

安装模块需要比较好的网络,可以使用国内pip源:

Linux/macOS用户:创建或编辑~/.pip/pip.conf文件

1
2
3
4
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
trusted-host = mirrors.aliyun.com

Windows用户:创建或编辑%APPDATA%\pip\pip.ini文件

1
2
3
4
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
trusted-host = mirrors.aliyun.com

Docker集群环境

1. 构建PyFlink环境Docker镜像

官方没有提供预构建的Python环境Flink镜像,需要自己构建。以下是现代的Dockerfile配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
FROM flink:1.17.0-scala_2.12

# 安装Python和依赖
RUN set -ex; \
apt-get update; \
apt-get -y install --no-install-recommends python3 python3-pip python3-dev; \
ln -s /usr/bin/python3 /usr/bin/python; \
ln -s /usr/bin/pip3 /usr/bin/pip; \
apt-get clean; \
rm -rf /var/lib/apt/lists/*

# 安装PyFlink
RUN set -ex; \
python -m pip install --upgrade pip; \
pip install --no-cache-dir apache-flink==1.17.0; \
pip install --no-cache-dir pandas pyarrow

# 设置环境变量
ENV PYTHONPATH=$PYTHONPATH:/opt/flink/lib

# 复制Flink配置文件(如果需要自定义)
# COPY flink-conf.yaml /opt/flink/conf/

2. 构建镜像

1
2
3
4
docker build -t pyflink:1.17.0 .

# 查看构建结果
docker images

3. 使用docker-compose编排

创建docker-compose.yml文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
version: "3.8"
services:
jobmanager:
image: pyflink:1.17.0
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"

taskmanager:
image: pyflink:1.17.0
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
deploy:
replicas: 2

4. 启动集群

1
2
3
4
docker-compose up -d

# 查看状态
docker-compose ps

访问 http://localhost:8081 验证集群是否成功启动。

Kubernetes部署

Flink提供了官方的Kubernetes Operator,这是部署和管理Flink集群的推荐方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 安装Flink Kubernetes Operator
kubectl apply -f https://github.com/apache/flink-kubernetes-operator/releases/download/v1.4.0/flink-kubernetes-operator.yaml

# 创建Flink集群
kubectl apply -f - << EOF
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: pyflink-cluster
namespace: default
spec:
image: pyflink:1.17.0
flinkVersion: v1.17.0
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
replicas: 1
resources:
memory: "2048m"
cpu: 1
taskManager:
replicas: 2
resources:
memory: "2048m"
cpu: 1
EOF

2. 提交PyFlink作业到K8s

1
2
3
4
5
# 使用flink CLI提交作业
./bin/flink run -t kubernetes-session \
-Dkubernetes.cluster-id=pyflink-cluster \
-Dkubernetes.namespace=default \
-py /path/to/your/pyflink_job.py

部署PyFlink作业

1. 本地开发模式

1
2
3
4
5
# 本地运行
python your_pyflink_job.py

# 或使用flink CLI
./bin/flink run -py your_pyflink_job.py

2. 集群模式

1
2
3
4
5
6
7
8
9
10
11
12
13
# 提交到远程集群
./bin/flink run -m jobmanager:8081 -py your_pyflink_job.py

# 或使用Python API提交
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# 构建作业...

# 提交到远程集群
env.execute_remote('Your Job Name', 'jobmanager:8081')

生产环境最佳实践

1. 依赖管理

对于生产环境,建议使用requirements.txt文件管理依赖:

1
2
3
4
apache-flink==1.17.0
pandas==1.5.3
pyarrow==12.0.0
# 其他依赖...

2. 资源配置

根据作业需求配置合适的资源:

1
2
3
4
5
# flink-conf.yaml 示例
jobmanager.memory.process.size: 2048m
taskmanager.memory.process.size: 4096m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2

3. 状态管理

对于有状态的作业,配置状态后端:

1
2
3
4
# flink-conf.yaml 示例
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints

4. 监控与日志

集成Prometheus和Grafana进行监控:

1
2
3
# flink-conf.yaml 示例
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260

常见问题与解决方案

1. 依赖冲突

问题ModuleNotFoundError: No module named 'grpc'
解决方案:重新安装依赖,确保版本兼容

1
pip install --force-reinstall grpcio==1.48.2

2. Protobuf冲突

问题google/protobuf/descriptor_database.cc:58] file already exists in database
解决方案:确保protobuf版本兼容

1
pip install --force-reinstall protobuf==3.20.3

3. Python版本兼容性

问题Some syntactic constructs of Python 3 are not yet fully supported by Apache Beam
解决方案:使用推荐的Python版本(3.8或3.9),避免使用过新的Python语法特性

4. Docker镜像大小优化

问题:构建的Docker镜像过大
解决方案:使用多阶段构建和 Alpine 基础镜像

1
2
3
4
5
6
7
# 多阶段构建示例
FROM python:3.9-slim as builder
RUN pip install --user apache-flink==1.17.0

FROM flink:1.17.0-scala_2.12-alpine
COPY --from=builder /root/.local/lib/python3.9/site-packages /usr/local/lib/python3.9/site-packages
RUN apk add --no-cache python3 py3-pip

总结

PyFlink提供了强大的流处理能力,结合Python的易用性,是处理大数据流的理想选择。通过Docker和Kubernetes部署,可以快速搭建和管理PyFlink集群,满足不同规模的业务需求。

随着Flink版本的迭代,PyFlink的功能和性能也在不断提升。建议定期关注官方文档和社区动态,及时更新版本以获得最佳体验。

参考资料