news 2026/6/15 13:27:16

PyFlink Connectors 如何在 Python 作业里正确使用 Kafka/JSON 等连接器(JAR 依赖、DDL 建表、pipeline.jars、内置 Source/Sink、

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink Connectors 如何在 Python 作业里正确使用 Kafka/JSON 等连接器(JAR 依赖、DDL 建表、pipeline.jars、内置 Source/Sink、

1. PyFlink 为什么要手动指定 Connector/Format JAR?

因为:

  • Flink 核心运行时在 JVM 上
  • connector(如 kafka)和 format(如 json)都是 JVM 侧实现
  • Python 代码只是驱动 Table/SQL 的规划与提交

所以你需要通过pipeline.jars指定依赖(多个 jar 用;分隔):

table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

实战建议:

  • connector jar 和 format jar 都要带上(例如 Kafka + JSON)
  • 路径用file:///这种绝对 URI,避免分布式环境找不到文件
  • 生产上更推荐把 jar 放到统一位置(Flink lib 或制品仓)并在提交时声明依赖,pipeline.jars适合快速验证与 demo

2. 在 PyFlink Table API 中,推荐用 DDL 定义 Source/Sink

PyFlink 的 Table API 使用 connector 最推荐的方式是:DDL + execute_sql()
理由很简单:DDL 更直观、更可复制、也最接近线上 SQL Gateway/SQL Client 的使用方式。

2.1 Kafka Source/Sink + JSON Format(最小可用示例)

source_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table").wait()

关键点拆解:

  • execute_sql()注册表(source/sink)
  • sql_query()产出一个 Table
  • execute_insert()触发写入(并提交作业)
  • .wait()在本地/mini cluster 场景常用,用于等待作业执行(远程集群通常不建议一直 wait)

3. 完整可运行的 Python 结构(把 jar、DDL、DML 串起来)

你给的完整示例结构非常标准,我建议你在博客里也用这种方式组织代码:

frompyflink.tableimportTableEnvironment,EnvironmentSettingsdeflog_processing():env_settings=EnvironmentSettings.in_streaming_mode()t_env=TableEnvironment.create(env_settings)# 1) 指定 connector & format jarst_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")# 2) DDL: source/sinksource_ddl=""" CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'test_3', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """sink_ddl=""" CREATE TABLE sink_table( a VARCHAR ) WITH ( 'connector' = 'kafka', 'topic' = 'sink_topic', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' ) """t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)# 3) DML: query + insertt_env.sql_query("SELECT a FROM source_table")\.execute_insert("sink_table")\.wait()if__name__=='__main__':log_processing()

4. PyFlink 里“内置”的 Sources/Sinks:不用额外 jar 也能跑

除了 Kafka 这类外部 connector,Flink 也提供了一些“开箱即用”的数据源/数据汇,特别适合本地调试与单测。

4.1 from/to Pandas(非常适合快速验证)

frompyflink.table.expressionsimportcolimportpandasaspdimportnumpyasnp pdf=pd.DataFrame(np.random.rand(1000,2))table=t_env.from_pandas(pdf,["a","b"]).filter(col('a')>0.5)pdf2=table.to_pandas()

注意:to_pandas()会把结果收集到客户端内存,生产慎用,建议先limit()

4.2 from_elements():用 Python 集合直接造表

frompyflink.tableimportDataTypes# 自动推断table_env.from_elements([(1,'Hi'),(2,'Hello')])# 指定字段名table_env.from_elements([(1,'Hi'),(2,'Hello')],['a','b'])# 指定 schema(更稳)table_env.from_elements([(1,'Hi'),(2,'Hello')],DataTypes.ROW([DataTypes.FIELD("a",DataTypes.INT()),DataTypes.FIELD("b",DataTypes.STRING())]))

这类内置 source 对写教程、做 POC、复现 bug 特别省事。

5. 自定义 Sources & Sinks:Python 不能直接写,需 Java/Scala 实现

文档明确说明了现阶段的边界:

  • 自定义 source/sink 需要 Java/Scala 实现
  • Python 侧可以通过实现 TableFactory(也是 Java/Scala)让它能被 DDL 发现并使用

也就是说:你可以用 PyFlink 写作业逻辑,但 connector 生态仍然是 JVM 的。

如果你后面要写“自定义 connector”系列博客,可以按这个路线写:

  • 先用 Java 写 DynamicTableSourceFactory / DynamicTableSinkFactory(SPI 注册)
  • 再在 PyFlink 里通过 DDL'connector'='xxx'直接使用

6. 常见踩坑清单(PyFlink Connector 场景高频问题)

  • 只加了 connector jar,没加 format jar:DDL 里用了'format'='json',但没带 json format 的 jar,会在运行期报找不到 format factory
  • pipeline.jars 路径不可达:本地 file 路径对集群 TaskManager 不可见,必须用集群可访问路径或随 job 提交
  • 用 DDL 建表但没触发执行:Table/SQL 是惰性执行,必须execute_insert()execute_sql(INSERT ...)才会提交作业
  • wait() 用错场景:本地调试很方便;远程集群提交通常希望异步返回,避免客户端阻塞
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 10:04:00

如何用CRNN OCR实现多列文本正确排序?

如何用CRNN OCR实现多列文本正确排序? 📖 项目简介 在现代文档数字化场景中,OCR(光学字符识别)技术已成为信息提取的核心工具。无论是扫描的纸质文件、电子发票,还是网页截图中的排版内容,OCR都…

作者头像 李华
网站建设 2026/6/15 13:18:48

AppSmith无代码开发平台深度解析:从业务需求到企业级应用构建

AppSmith无代码开发平台深度解析:从业务需求到企业级应用构建 【免费下载链接】appsmith appsmithorg/appsmith: Appsmith 是一个开源的无代码开发平台,允许用户通过拖拽式界面构建企业级Web应用程序,无需编写任何后端代码,简化了…

作者头像 李华
网站建设 2026/6/15 12:41:19

一键部署实战:用Llama Factory预置环境快速搭建智能客服Demo

一键部署实战:用Llama Factory预置环境快速搭建智能客服Demo 对于初创公司CTO来说,在投资人会议前快速搭建一个智能客服原型可能是个挑战,尤其是缺乏专业AI团队的情况下。本文将介绍如何利用Llama Factory预置环境,在30分钟内完成…

作者头像 李华
网站建设 2026/6/15 12:41:47

Llama Factory可视化分析:理解你的微调过程

Llama Factory可视化分析:理解你的微调过程 作为一名AI研究员,你是否遇到过这样的困扰:在微调大语言模型时,只能通过最终的评估指标来判断模型表现,却无法直观地观察训练过程中的动态变化?本文将介绍如何利…

作者头像 李华
网站建设 2026/6/15 12:41:34

零基础入门:10分钟搞定PADDLEOCR-VL部署

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容: 开发一个PADDLEOCR-VL极简部署向导,要求:1.图形化交互界面 2.自动环境检测和修复 3.一键式部署流程 4.内置测试样例 5.实时进度反馈。使用最简化的命令行交…

作者头像 李华
网站建设 2026/6/15 12:40:14

移动端集成:将Llama Factory微调模型部署到App的完整流程

移动端集成:将Llama Factory微调模型部署到App的完整流程 作为一名移动应用开发者,当你成功使用Llama Factory微调了大语言模型后,下一步就是将模型集成到iOS或Android应用中。本文将带你从模型导出到端侧部署,完成整个流程。 这类…

作者头像 李华