共计 4667 个字符,预计需要花费 12 分钟才能阅读完成。
1. Flink 根底案例
-
环境搭建配置
FLINK 集成,POM 配置
<dependencies> <!-- Flink 的外围依赖组件 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
-
批处理案例
性能:通过批处理形式,统计日志文件中的异样数量。
代码:
public class BatchProcessorApplication {public static void main(String[] args) throws Exception { // 1. 定义运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2. 读取数据源(日志文件)DataSource<String> logData = env.readTextFile("./data/order_info.log"); // 3. 荡涤转换数据 logData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {// 1) 依据正则,提取每行日志的级别 Pattern pattern = Pattern.compile("\\[main\\](.*?)\\["); Matcher matcher = pattern.matcher(value); if(matcher.find()) {// 2) 如果匹配合乎规定,搁置元组内 collector.collect(new Tuple2<String,Integer>(matcher.group(1).trim(), 1)); } } }).groupBy(0).sum(1).print(); // 4. 依据日志级别,汇总统计,打印后果} }
-
流解决案例
性能:依据 IP 统计拜访次数
代码:
public class StreamProcessorApplication {public static void main(String[] args) throws Exception{ // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取 Socket 数据源 DataStreamSource<String> socketStr = env.socketTextStream("127.0.0.1", 9911, "\n"); // 3. 转换解决流数据 socketStr.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { // 依据分隔符解析数据 String[] arrValue = value.split("\t"); collector.collect(new Tuple2<String,Integer>(arrValue[0], 1)); } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(2); env.execute("accessLog"); } }
2. Flink 部署配置
-
装置配置 JDK8 环境
[root@localhost ~]# java -version java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
-
下载 Flink 安装包
官网地址
安装包
-
装置配置
1)解压
tar -xvf flink-1.11.2-bin-scala_2.11.tgz
2)运行
bin/start-cluster.sh
主节点拜访端口:
vi conf/masters:
localhost:8081
-
拜访控制台
http://10.10.20.132:8081/#/ov…
Available Task Slots:无效工作槽数量
对应配置文件:vi conf/flink-conf.yaml
taskmanager.numberOfTaskSlots: 1
TaskManger 与 JobManager 关系
![file](/img/bVcRAYy)
Client 用来提交工作给 JobManager,JobManager 散发工作给 TaskManager 去执行,TaskManager 会采纳心跳的形式,汇报工作的执行状态。
JobManager 负责整个 Flink 集群工作的 调度以及资源的治理。
TaskManager 负责具体的 工作执行 和对应工作在每个节点上的 资源申请和治理。
3. Flink 工作提交
第一种形式:界面提交
-
批改代码配置
socket 数据源连贯,采纳主机名称配置
DataStreamSource<String> socketStr = env.socketTextStream("flink1", 9911, "\n");
-
工程代码打包
POM 文件减少打包插件
<build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <!-- 打 jar 包插件(会蕴含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- 能够设置 jar 包的入口类(可选) --> <mainClass>com.itcast.flink.usage.stream.StreamProcessorApplication</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
留神,这里不能采纳 spring-boot-maven-plugin 打包插件,否则 flink 不能失常辨认。
-
提交工作
上传 Jar 包
接下来,在 flink1 节点上,开启 Socket 交互端口 9911
[root@flink1 flink-1.11.2]# nc -lk 9911
而后提交并执行工作
![file](/img/bVcRAYA)
savepoint path:容错机制中快照保留的门路。
-
运行验证
nc 发送一些数据,在 TaskManager 当中能够查看输入后果。
第二种形式:命令行提交
在 flink 控制台革除原有的 Job 工作。
-
上传 Jar 包
将 Jar 包上传至 flink 服务器:
[root@flink1 examples]# ll total 81880 drwxr-xr-x. 2 root root 194 Sep 9 23:48 batch -rw-r--r--. 1 root root 83843774 Sep 26 05:57 flink-usage-1.0-SNAPSHOT.jar drwxr-xr-x. 2 root root 50 Sep 9 23:48 gelly drwxr-xr-x. 3 root root 19 Sep 9 23:48 python drwxr-xr-x. 2 root root 241 Sep 9 23:48 streaming drwxr-xr-x. 2 root root 209 Sep 9 23:48 table
-
提交工作
采纳命令行形式提交工作:
[root@flink1 flink-1.11.2]# bin/flink run -c com.itcast.flink.usage.stream.StreamProcessorApplication examples/flink-usage-1.0-SNAPSHOT.jar Job has been submitted with JobID 4c127f68f6683e5a9342410d7b6540db
-
验证后果
发送一些数据并在控制台验证输入后果。
-
- *
本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn
- *