news 2026/5/22 20:58:30

Flink SQL Materialized Table 语句CREATE / ALTER / DROP介绍

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink SQL Materialized Table 语句CREATE / ALTER / DROP介绍

1. Flink 目前支持的 Materialized Table 语句

1.1CREATE MATERIALIZED TABLE:创建物化表(定义查询 + 刷新策略)
1.2ALTER MATERIALIZED TABLE:管理物化表(暂停/恢复/手动刷新/改查询)
1.3DROP MATERIALIZED TABLE:删除物化表(先删刷新管道再删元数据)

2. CREATE MATERIALIZED TABLE:创建物化表

2.1 语法总览

2.1.1 完整语法:

CREATEMATERIALIZEDTABLE[catalog_name.][db_name.]table_name[([<table_constraint>])][COMMENTtable_comment][PARTITIONEDBY(partition_column_name1,partition_column_name2,...)][WITH(key1=val1,key2=val2,...)][FRESHNESS=INTERVAL'<num>'{SECOND[S]|MINUTE[S]|HOUR[S]|DAY[S]}][REFRESH_MODE={ CONTINUOUS|FULL}]AS<select_statement>

2.1.2 核心理解:

  • AS <select_statement>:决定“算什么”(物化结果来自这条查询)
  • FRESHNESS / REFRESH_MODE:决定“怎么刷、多频繁”(自动生成刷新 Pipeline)
  • Schema:由查询自动推导(你不能显式写列定义)

2.2 PRIMARY KEY:可选主键约束

2.2.1 语法:

<table_constraint>:[CONSTRAINTconstraint_name]PRIMARYKEY(column_name,...)NOTENFORCED

2.2.2 关键点:

  • PRIMARY KEY 用于“逻辑唯一标识每行”
  • 主键列必须非 NULL
  • NOT ENFORCED表示 Flink 不强制校验唯一性(通常依赖外部存储/语义保证)

2.3 PARTITIONED BY:可选分区键(强约束)

2.3.1 语法:

PARTITIONEDBY(partition_column_name1,partition_column_name2,...)

2.3.2 强约束(非常重要):

  • 分区列必须包含在物化表的查询输出中(即必须出现在AS SELECT的 select 列里)

2.3.3 示例:按ds分区创建物化表

CREATEMATERIALIZEDTABLEmy_materialized_table PARTITIONEDBY(ds)FRESHNESS=INTERVAL'1'HOURASSELECTdsFROM...;

2.3.4 直观收益:

  • 若物化表 sink 是 filesystem,会按分区创建目录结构
  • FULL 模式下结合date-formatter可以做到“只刷最新分区”,成本更低

2.4 WITH Options:表属性 / Connector 参数 / 分区时间格式映射

2.4.1 用途:

  • 指定物化表属性(含 connector options)
  • 指定分区字段的时间格式选项:partition.fields.<field>.date-formatter

2.4.2 示例:按ds分区,并指定ds的时间格式为yyyy-MM-dd

CREATEMATERIALIZEDTABLEmy_materialized_table PARTITIONEDBY(ds)WITH('format'='json','partition.fields.ds.date-formatter'='yyyy-MM-dd')FRESHNESS=INTERVAL'1'HOURASSELECTds,...FROM...;

2.4.3 机制说明:

  • 在 FULL 模式下,每次调度触发都会把“调度时间”转换成ds分区值
  • 例如调度时间2024-01-01 00:00:00→ 刷新分区ds = '2024-01-01'

2.4.4 注意事项(官方限制)

  • partition.fields.#.date-formatter只在 FULL 模式生效
  • 该配置里的字段必须是string 类型分区字段

2.5 FRESHNESS:数据新鲜度(可选,但极关键)

2.5.1 定义与语法

2.5.1.1 定义:

  • FRESHNESS 定义物化表允许落后基础表更新的最大时间(目标值,非强保证)

2.5.1.2 语法:

FRESHNESS=INTERVAL'<num>'{SECOND|MINUTE|HOUR|DAY}

2.5.2 参数规则与合法性

2.5.2.1 规则:

  • <num>必须是正整数
  • 不支持MONTH / YEAR
  • FULL 模式下<num>还要满足“可转 cron/公约数”要求(下文有支持列表)

2.5.2.2 典型非法例子:

FRESHNESS=INTERVAL'-1'SECOND-- 负数FRESHNESS=INTERVAL'0'SECOND-- 0FRESHNESS=INTERVAL'1'MONTH-- 不支持FRESHNESS=INTERVAL'1'YEAR-- 不支持

2.5.3 FRESHNESS 与刷新模式/刷新频率的关系

2.5.3.1 关系总结:

  • Freshness 会参与推断 Refresh Mode(CONTINUOUS 或 FULL)

  • Freshness 会决定刷新频率

    • CONTINUOUS:Freshness → streaming job 的 checkpoint interval
    • FULL:Freshness → workflow 的调度周期(cron)

2.5.3.2 示例(假设materialized-table.refresh-mode.freshness-threshold = 30 minutes

  • FRESHNESS = INTERVAL '1' SECOND→ Streaming Job,checkpoint=1s
  • FRESHNESS = INTERVAL '1' MINUTE→ Streaming Job,checkpoint=1m
  • FRESHNESS = INTERVAL '1' HOUR→ Scheduled Workflow,周期=1h
  • FRESHNESS = INTERVAL '1' DAY→ Scheduled Workflow,周期=1d

2.5.4 默认 FRESHNESS(当你省略时)

2.5.4.1 省略FRESHNESS会使用系统默认值:

  • CONTINUOUS:materialized-table.default-freshness.continuous(默认 3 分钟)
  • FULL:materialized-table.default-freshness.full(默认 1 小时)

2.5.4.2 示例:省略 freshness(默认 CONTINUOUS 3 分钟)

CREATEMATERIALIZEDTABLEmy_materialized_tableASSELECT*FROMsource_table;

2.5.4.3 示例:省略 freshness,但显式指定 FULL(默认 1 小时)

CREATEMATERIALIZEDTABLEmy_materialized_table_full REFRESH_MODE=FULLASSELECT*FROMsource_table;

2.5.5 FULL 模式 freshness 的 cron 支持范围(必须牢记)

由于 FULL 模式需要把 freshness 翻译成 cron,当前仅支持以下间隔:

2.5.5.1 Second 支持:1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30
2.5.5.2 Minute 支持:1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30
2.5.5.3 Hour 支持:1, 2, 3, 4, 6, 8, 12
2.5.5.4 Day 支持:1

2.5.5.5 FULL 模式常见非法例子(不在支持列表或不满足约束):

FRESHNESS=INTERVAL'60'SECONDFRESHNESS=INTERVAL'5'HOUR

2.6 REFRESH_MODE:显式指定刷新模式(优先级最高)

2.6.1 语法:

REFRESH_MODE={ CONTINUOUS|FULL}

2.6.2 行为:

  • 一旦显式指定,优先于系统基于 freshness 的推断

2.6.3 示例:freshness 1 小时,但强制跑 CONTINUOUS(checkpoint=1h)

CREATEMATERIALIZEDTABLEmy_materialized_table FRESHNESS=INTERVAL'1'HOURREFRESH_MODE=CONTINUOUSASSELECT...;

2.6.4 示例:freshness 10 分钟,但强制跑 FULL(调度周期=10m)

CREATEMATERIALIZEDTABLEmy_materialized_table FRESHNESS=INTERVAL'10'MINUTEREFRESH_MODE=FULLASSELECT...;

2.7 AS <select_statement>:定义物化查询

2.7.1 说明:

  • 物化表的数据来自AS <select_statement>
  • 上游可以是:物化表 / 普通表 / 视图
  • select_statement支持所有 Flink SQL Queries

2.7.2 示例:

CREATEMATERIALIZEDTABLEmy_materialized_table FRESHNESS=INTERVAL'10'SECONDASSELECT*FROMkafka_catalog.db1.kafka_table;

2.8 CREATE 的限制

2.8.1 不支持显式指定列定义(列名/类型从查询自动推导)
2.8.2 不支持在查询中引用临时表、临时视图、临时函数

3. ALTER MATERIALIZED TABLE:管理与演进物化表

3.1 语法总览

ALTERMATERIALIZEDTABLE[catalog_name.][db_name.]table_name SUSPEND|RESUME[WITH(key1=val1,key2=val2,...)]|REFRESH[PARTITIONpartition_spec]|AS<select_statement>

3.2 SUSPEND:暂停后台刷新管道

3.2.1 语法:

ALTERMATERIALIZEDTABLEmy_materialized_table SUSPEND;

3.2.2 关键注意:

  • 如果物化表是 CONTINUOUS 模式,默认使用STOP WITH SAVEPOINT暂停作业
  • 因此你需要先设置 savepoint 保存路径:
SET'execution.checkpointing.savepoint-dir'='hdfs://savepoint_path';ALTERMATERIALIZEDTABLEmy_materialized_table SUSPEND;

3.3 RESUME:恢复刷新(支持动态选项,但不持久化)

3.3.1 基本恢复:

ALTERMATERIALIZEDTABLEmy_materialized_table RESUME;

3.3.2 带动态参数(仅对当前刷新 pipeline 生效,不会写回元数据):

ALTERMATERIALIZEDTABLEmy_materialized_table RESUMEWITH('sink.parallelism'='10');

3.4 REFRESH:手动触发刷新(会启动 Batch Job)

3.4.1 刷新整表:

ALTERMATERIALIZEDTABLEmy_materialized_table REFRESH;

3.4.2 刷新指定分区:

ALTERMATERIALIZEDTABLEmy_materialized_table REFRESHPARTITION(ds='2024-06-28');

3.4.3 注意:

  • REFRESH 会启动一个Flink Batch Job来刷新数据

3.5 ALTER … AS:修改查询定义(并触发 schema 演进)

3.5.1 用途:

  • 修改物化表的 query definition
  • 系统会先基于新 query 推导 schema 并进行 schema evolution,然后用新 query 刷新数据
  • 默认不影响历史数据(尤其是 FULL 模式的历史分区)

3.5.2 FULL 模式下的行为

3.5.2.1 流程:

  • 更新 schema + query definition

  • 下次刷新触发时:

    • 若分区表且正确设置partition.fields.#.date-formatter→ 只刷新最新分区
    • 否则 → 整表覆盖刷新

3.5.3 CONTINUOUS 模式下的行为(风险点)

3.5.3.1 流程:

  • 暂停当前运行的刷新 job
  • 更新 schema + query definition
  • 启动新的刷新 job

3.5.3.2 风险:

  • 新 job不会恢复旧 job 的 state
  • 可能造成短暂的数据重复或数据丢失
  • 数据源起始 offset 由 connector 默认实现或 query 中的动态 hint 决定

3.5.4 Schema 演进限制(当前唯一支持的方式)

  • 仅支持:在原 schema 末尾新增“可为 NULL”的列

3.5.5 示例:在末尾新增一个可空列avg_amount

ALTERMATERIALIZEDTABLEmy_materialized_tableASSELECTuser_id,COUNT(*)ASevent_count,SUM(amount)AStotal_amount,AVG(amount)ASavg_amountFROMkafka_catalog.db1.eventsWHEREevent_type='purchase'GROUPBYuser_id;

4. DROP MATERIALIZED TABLE:删除物化表

4.1 语法

DROPMATERIALIZEDTABLE[IFEXISTS][catalog_name.][database_name.]table_name;

4.2 行为说明

4.2.1 先删除后台刷新 pipeline
4.2.2 再从 Catalog 删除物化表元数据

4.3 示例

DROPMATERIALIZEDTABLEIFEXISTSmy_materialized_table;

5. 典型创建示例(连续刷新 vs 定时刷新)

5.1 示例一:FRESHNESS=10 秒 → 推断 CONTINUOUS(Streaming 增量刷新)

CREATEMATERIALIZEDTABLEmy_materialized_table_continuous PARTITIONEDBY(ds)WITH('format'='debezium-json','partition.fields.ds.date-formatter'='yyyy-MM-dd')FRESHNESS=INTERVAL'10'SECONDASSELECTk.ds,k.user_id,COUNT(*)ASevent_count,SUM(k.amount)AStotal_amount,MAX(u.age)ASmax_ageFROMkafka_catalog.db1.kafka_table kJOINuser_catalog.db1.user_table uONk.user_id=u.user_idWHEREk.event_type='purchase'GROUPBYk.ds,k.user_id;

注意:date-formatter 仅 FULL 生效;这里写了也不会影响 CONTINUOUS 的分区覆盖逻辑,但不影响语法演示。

5.2 示例二:FRESHNESS=1 小时 → 推断 FULL(定时批刷新,覆盖写入)

CREATEMATERIALIZEDTABLEmy_materialized_table_full PARTITIONEDBY(ds)WITH('format'='json','partition.fields.ds.date-formatter'='yyyy-MM-dd')FRESHNESS=INTERVAL'1'HOURASSELECTp.ds,p.product_id,p.product_name,AVG(s.sale_price)ASavg_sale_price,SUM(s.quantity)AStotal_quantityFROMpaimon_catalog.db1.product_table pLEFTJOINpaimon_catalog.db1.sales_table sONp.product_id=s.product_idWHEREp.category='electronics'GROUPBYp.ds,p.product_id,p.product_name;

6. 生产落地避坑清单(强烈建议收藏)

6.1 分区字段一定要在AS SELECT输出中,否则建表会失败或逻辑不成立
6.2 FULL 模式想“只刷最新分区”,必须同时满足:

  • PARTITIONED BY (ds)
  • WITH ('partition.fields.ds.date-formatter'='yyyy-MM-dd')
  • ds为 string 类型分区字段
    6.3 FULL 模式 freshness 不是随便写:只支持指定的秒/分/时/天间隔(否则无法转 cron)
    6.4 CONTINUOUS 模式 freshness 越小 checkpoint 越频繁,可能明显影响性能
    6.5ALTER ... AS在 CONTINUOUS 模式会重启作业且不继承 state:
  • 可能短暂重复/丢失
  • 上线修改建议配合灰度、或先切 FULL 过渡
    6.6 schema evolution 目前只支持“末尾追加可空列”,别指望随意改列类型/顺序/删除列
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/19 17:48:29

《经济研究》LaTeX模板终极指南:5步搞定专业学术排版

《经济研究》LaTeX模板终极指南&#xff1a;5步搞定专业学术排版 【免费下载链接】Chinese-ERJ 《经济研究》杂志 LaTeX 论文模板 - LaTeX Template for Economic Research Journal 项目地址: https://gitcode.com/gh_mirrors/ch/Chinese-ERJ 还在为《经济研究》投稿格式…

作者头像 李华
网站建设 2026/5/14 15:03:00

BetterNCM插件管理器:3分钟学会免费安装与使用技巧

BetterNCM插件管理器&#xff1a;3分钟学会免费安装与使用技巧 【免费下载链接】BetterNCM-Installer 一键安装 Better 系软件 项目地址: https://gitcode.com/gh_mirrors/be/BetterNCM-Installer 还在为网易云音乐功能单一而烦恼吗&#xff1f;BetterNCM插件管理器将彻…

作者头像 李华
网站建设 2026/5/22 20:44:34

BOSS AI push algorithm 2025.12.22

BOSS AI push algorithm 感觉这个BOSS推送算法&#xff0c;年龄上35都是直接推网约车&#xff0c;销售类的&#xff0c;苦笑不得啊&#xff0c;同时真残酷啊。哎哟喂。 2024年8月推送更加多&#xff0c;300 20251222直聘BOSS的算法是年龄超过35岁一直推送网约车和各种销售类&a…

作者头像 李华
网站建设 2026/5/21 8:48:35

DesktopNaotu:跨平台思维导图工具的完整使用指南

DesktopNaotu&#xff1a;跨平台思维导图工具的完整使用指南 【免费下载链接】DesktopNaotu 桌面版脑图 (百度脑图离线版&#xff0c;思维导图) 跨平台支持 Windows/Linux/Mac OS. (A cross-platform multilingual Mind Map Tool) 项目地址: https://gitcode.com/gh_mirrors/…

作者头像 李华
网站建设 2026/5/22 12:13:20

DesktopNaotu完整教程:10分钟掌握百度脑图本地备份终极技巧

DesktopNaotu完整教程&#xff1a;10分钟掌握百度脑图本地备份终极技巧 【免费下载链接】DesktopNaotu 桌面版脑图 (百度脑图离线版&#xff0c;思维导图) 跨平台支持 Windows/Linux/Mac OS. (A cross-platform multilingual Mind Map Tool) 项目地址: https://gitcode.com/g…

作者头像 李华
网站建设 2026/5/20 22:05:24

ppInk屏幕标注工具终极指南:从零开始掌握专业标注技巧

ppInk屏幕标注工具终极指南&#xff1a;从零开始掌握专业标注技巧 【免费下载链接】ppInk Fork from Gink 项目地址: https://gitcode.com/gh_mirrors/pp/ppInk 在数字化教学和远程协作日益普及的今天&#xff0c;如何让屏幕标注变得更加高效直观&#xff1f;ppInk作为一…

作者头像 李华