PyFlink 1.10+ 支持Python Lambda UDF的开发,也可以调用Java UDF和自定义ScalarFunction。本文将详细介绍PyFlink中UDF的使用方法和最佳实践。

环境准备

在使用PyFlink UDF之前,需要确保已正确安装PyFlink:

1
pip install apache-flink

Python UDF

基本用法

Python UDF可以通过装饰器方式定义,需要指定输入类型和输出类型:

1
2
3
4
5
6
7
from pyflink.table import DataTypes, StreamTableEnvironment
from pyflink.table.functions import udf

@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING(), name="pr")
def pr(str):
print(str)
return str

注册和调用

定义UDF后,需要在TableEnvironment中注册,然后在SQL或Table API中使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建StreamTableEnvironment
st_env = StreamTableEnvironment.create(env)

# 注册UDF
st_env.register_function('pr', pr)

# 在Table API中使用
st_env.from_path('source') \
.window(Session.with_gap('20.minutes').on('rowtime').alias('w')) \
.group_by('w, carnum') \
.select('carnum, cast(count(time) as INT) as count, pr(carnum) as t') \
.insert_into('mySink')

# 或在SQL中使用
st_env.execute_sql("""
SELECT carnum, COUNT(*) as count, pr(carnum) as t
FROM source
GROUP BY carnum
""")

Lambda UDF

对于简单的函数,可以使用Lambda表达式定义:

1
2
3
4
5
6
7
from pyflink.table.functions import udf

# 定义Lambda UDF
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

# 注册和使用
st_env.register_function("add", add)

Java UDF

定义Java UDF

Java UDF必须继承ScalarFunction,有无参构造器,且方法名为eval

1
2
3
4
5
6
7
8
9
10
11
12
package my.java.function;

import org.apache.flink.table.functions.ScalarFunction;

public class HashCode extends ScalarFunction {

private int factor = 12;

public int eval(String s) {
return s.hashCode() * factor;
}
}

编译和部署

  1. 编译Java代码为JAR文件
  2. 将JAR文件放入PyFlink环境中,通常是$PYFLINK_HOME/lib目录

注册和使用

1
2
3
4
5
6
7
8
# 注册Java UDF
table_env.register_java_function("hashCode", "my.java.function.HashCode")

# 在SQL中使用
result = table_env.execute_sql("""
SELECT hashCode(name) as hash_name
FROM source
""")

Python ScalarFunction

对于复杂的UDF,可以继承ScalarFunction类:

1
2
3
4
5
6
7
8
9
10
11
12
from pyflink.table.functions import ScalarFunction
from pyflink.table import DataTypes

class Add(ScalarFunction):
def eval(self, i, j):
return i + j

# 创建UDF实例
add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

# 注册和使用
table_env.register_function("add", add)

表值函数 (UDTF)

除了标量函数,PyFlink还支持表值函数(UDTF),可以返回多行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyflink.table.functions import TableFunction, udtf
from pyflink.table import DataTypes

class Split(TableFunction):
def eval(self, string):
for s in string.split(" "):
yield s

# 创建UDTF
split = udtf(Split(), [DataTypes.STRING()], [DataTypes.STRING()])

# 注册和使用
table_env.register_function("split", split)

# 在SQL中使用 LATERAL TABLE
table_env.execute_sql("""
SELECT *
FROM source, LATERAL TABLE(split(content)) AS T(word)
""")

最佳实践

  1. 性能优化

    • 对于简单函数,使用Lambda UDF
    • 对于复杂逻辑,使用ScalarFunction
    • 避免在UDF中进行耗时操作,如网络请求
  2. 类型处理

    • 明确指定输入和输出类型
    • 注意Python和Flink类型的对应关系
  3. 错误处理

    • 在UDF中添加异常处理
    • 使用try-except捕获并处理异常
  4. 状态管理

    • 对于有状态的UDF,考虑使用AggregateFunction

版本兼容性

  • PyFlink 1.10+ 支持Python UDF
  • 较新版本(如1.13+)提供了更多UDF类型和优化

参考资料