
【大数据面试题】34 手写一个 Flink SQL 样例
一步一个脚印,一天一道大数据面试题祝你身体健康,事事顺心!我们来看看Flink SQL。
·
一步一个脚印,一天一道大数据面试题
博主希望能够得到大家的点赞收,藏支持!非常感谢~
点赞,收藏是情分,不点是本分。祝你身体健康,事事顺心!
我们来看看 Flink SQL
大概流程和样例:
流程:
1.创建 流处理环境 StreamExecutionEnvironment env
2.创建 表环境 StreamTableEnvironment.create(env);
3.创建 source
表,sink
表
4.用 table API 编写查询 SQL(返回 Table
对象)
5.执行 sink executeInsert("sink")
代码样例:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class SqlDemo2 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.创建表环境
// 1.1 方法 1
// EnvironmentSettings settings = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .build();
// TableEnvironment tableEnv = TableEnvironment.create(settings);
// 1.2 方法 2
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建表
// 用 datagen 生成随机数据作为 source
tableEnv.executeSql("CREATE TABLE source (\n" +
" id INT\n" +
" ,ts BIGINT\n" +
" ,vc INT\n" +
") WITH (\n" +
" 'connector' = 'datagen'\n" +
" ,'rows-per-second'='1'\n" +
" ,'fields.id.kind'='random'\n" +
" ,'fields.id.min'='1'\n" +
" ,'fields.id.max'='10'\n" +
" ,'fields.ts.kind'='sequence'\n" +
" ,'fields.ts.min'='1'\n" +
" ,'fields.ts.max'='1000000'\n" +
" ,'fields.vc.kind'='random'\n" +
" ,'fields.vc.min'='1'\n" +
" ,'fields.vc.max'='100'\n" +
");\n");
tableEnv.executeSql("CREATE TABLE sink(\n" +
" id INT,\n" +
" sumVC INT,\n" +
") WITH (\n" +
"'connector'='print'\n" +
");\n");
// 执行查询
Table source = tableEnv.from("source");
Table select = source.where($("id").isGreater(5))
.groupBy($("id"))
.aggregate($("vc").sum().as("sumVC"))
.select($("id"), $("sumVC"));
// 执行 sink
select.executeInsert("sink");
}
}
运行截图:
我是近未来,祝你变得更强!
更多推荐
所有评论(0)