Flink大数据流处理-第一篇(Local模式部署)
Flink 常见的部署模式:本地模式、Standalone 模式和 Flink On Yarn 模式。下一篇:Flink的Standalone模式安装部署。
文章目录
前言
Flink 常见的部署模式:本地模式
、Standalone模式
和 Yarn模式
。
一、概念
Flink:是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink提供了诸多高抽象层的API以便用户编写分布式任务:DataSet API、DataStream API、Table API等
二、Local模式部署
Local 模式是 Flink 提供的最简单部署模式,一般用来本地测试和演示使用。
2.1 下载Flink压缩包
下载 Apache Flink 1.10.0 for Scala 2.11 版本进行演示,下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
2.2 上传至Linux
上传到Linux之后,解压:
tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
注意:Flink的运行环境依赖与JDK,我们需要提前安装好JDK。
2.3 开启防火墙
# 查看防火墙状态
systemctl status firewalld
# 开启防火墙
systemctl start firewalld
# 关闭防火墙
# systemctl stop firewalld
2.4 添加指定需要开放的端口
Flink启动默认端口维8081,我们需要设置放行,Web页面才能访问得到。
# 添加指定需要开放的端口:
firewall-cmd --zone=public --add-port=8081/tcp --permanent
# 重载入添加的端口:
firewall-cmd --reload
# 查询指定端口是否开启成功:
firewall-cmd --query-port=8080/tcp
# 查询全部端口:
firewall-cmd --list-all
2.5 启动 Flink
./bin/start-cluster.sh
查看启动日志:
2.6 访问测试
输入访问地址:http://IP:8081
三、任务执行
使用Idea创建一个Maven项目,编写Job。
3.1 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dt</groupId>
<artifactId>flink-java</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink-java</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--Flink 版本-->
<flink.version>1.10.0</flink.version>
<!--JDK 版本-->
<java.version>1.8</java.version>
<!--Scala 2.11 版本-->
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- 使用 maven-shade 插件创建一个包含所有必要的依赖项的 fat Jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--注意:这里一定要换成你自己的 Job main 方法的启动类-->
<mainClass>com.dt.BoundedStreamWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
3.2 BoundedStreamWordCount.class
package com.dt;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @description: 流处理DataStream api
* @author: DT辰白 Created by 2023/5/9 9:56
* @version: v1.0
*/
public class BoundedStreamWordCount
{
public static void main(String[] args) throws Exception {
//创建流运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
env.fromElements(WORDS)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] splits = value.toLowerCase().split("\\W+");
for (String split : splits) {
if (split.length() > 0) {
out.collect(new Tuple2<>(split, 1));
}
}
}
})
.keyBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value1.f1);
}
})
.print();
//Streaming 程序必须加这个才能启动程序,否则不会有结果
env.execute("## word count streaming demo");
}
private static final String[] WORDS = new String[]{
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer"
};
}
IDEA本地运行测试:
3.3 Flink集群运行Job
在 http://IP:8081/#/submit 页面上传本地打包成功的 flink-java-1.0-SNAPSHOT.jar 后,然后点击 Submit 后就可以运行了。
Job 的结果在 Task Manager 的 Stdout 中,查看运行结果:
总结
Flink是一个分布式计算引擎框架,可以处理有界和无界数据,是基于事件驱动的,是面向流的处理框架,Flink基于每个事件一行一行地流式处理,是真正的流式计算. 另外他也可以基于流来模拟批进行计算实现批处理。
更多推荐
所有评论(0)