PyFlink 1.10+ 支持Python Lambda UDF的开发,也可以调用Java UDF和自定义ScalarFunction。本文将详细介绍PyFlink中UDF的使用方法和最佳实践。
环境准备
在使用PyFlink UDF之前,需要确保已正确安装PyFlink:
1
| pip install apache-flink
|
Python UDF
基本用法
Python UDF可以通过装饰器方式定义,需要指定输入类型和输出类型:
1 2 3 4 5 6 7
| from pyflink.table import DataTypes, StreamTableEnvironment from pyflink.table.functions import udf
@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING(), name="pr") def pr(str): print(str) return str
|
注册和调用
定义UDF后,需要在TableEnvironment中注册,然后在SQL或Table API中使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| st_env = StreamTableEnvironment.create(env)
st_env.register_function('pr', pr)
st_env.from_path('source') \ .window(Session.with_gap('20.minutes').on('rowtime').alias('w')) \ .group_by('w, carnum') \ .select('carnum, cast(count(time) as INT) as count, pr(carnum) as t') \ .insert_into('mySink')
st_env.execute_sql(""" SELECT carnum, COUNT(*) as count, pr(carnum) as t FROM source GROUP BY carnum """)
|
Lambda UDF
对于简单的函数,可以使用Lambda表达式定义:
1 2 3 4 5 6 7
| from pyflink.table.functions import udf
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
st_env.register_function("add", add)
|
Java UDF
定义Java UDF
Java UDF必须继承ScalarFunction,有无参构造器,且方法名为eval:
1 2 3 4 5 6 7 8 9 10 11 12
| package my.java.function;
import org.apache.flink.table.functions.ScalarFunction;
public class HashCode extends ScalarFunction { private int factor = 12; public int eval(String s) { return s.hashCode() * factor; } }
|
编译和部署
- 编译Java代码为JAR文件
- 将JAR文件放入PyFlink环境中,通常是
$PYFLINK_HOME/lib目录
注册和使用
1 2 3 4 5 6 7 8
| table_env.register_java_function("hashCode", "my.java.function.HashCode")
result = table_env.execute_sql(""" SELECT hashCode(name) as hash_name FROM source """)
|
Python ScalarFunction
对于复杂的UDF,可以继承ScalarFunction类:
1 2 3 4 5 6 7 8 9 10 11 12
| from pyflink.table.functions import ScalarFunction from pyflink.table import DataTypes
class Add(ScalarFunction): def eval(self, i, j): return i + j
add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
table_env.register_function("add", add)
|
表值函数 (UDTF)
除了标量函数,PyFlink还支持表值函数(UDTF),可以返回多行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from pyflink.table.functions import TableFunction, udtf from pyflink.table import DataTypes
class Split(TableFunction): def eval(self, string): for s in string.split(" "): yield s
split = udtf(Split(), [DataTypes.STRING()], [DataTypes.STRING()])
table_env.register_function("split", split)
table_env.execute_sql(""" SELECT * FROM source, LATERAL TABLE(split(content)) AS T(word) """)
|
最佳实践
性能优化:
- 对于简单函数,使用Lambda UDF
- 对于复杂逻辑,使用
ScalarFunction类
- 避免在UDF中进行耗时操作,如网络请求
类型处理:
- 明确指定输入和输出类型
- 注意Python和Flink类型的对应关系
错误处理:
- 在UDF中添加异常处理
- 使用
try-except捕获并处理异常
状态管理:
- 对于有状态的UDF,考虑使用
AggregateFunction
版本兼容性
- PyFlink 1.10+ 支持Python UDF
- 较新版本(如1.13+)提供了更多UDF类型和优化
参考资料