1. Flink根底案例

  1. 环境搭建配置

    FLINK集成,POM配置

    <dependencies>    <!-- Flink的外围依赖组件 -->    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>        <version>${flink.version}</version>    </dependency>    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>        <version>${flink.version}</version>    </dependency>    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-clients_${scala.binary.version}</artifactId>        <version>${flink.version}</version>    </dependency></dependencies>
  2. 批处理案例

    性能: 通过批处理形式,统计日志文件中的异样数量。

    代码:

    public class BatchProcessorApplication {       public static void main(String[] args) throws Exception {        // 1. 定义运行环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();           // 2. 读取数据源(日志文件)        DataSource<String> logData = env.readTextFile("./data/order_info.log");           // 3. 荡涤转换数据        logData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {               @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {                // 1) 依据正则, 提取每行日志的级别                Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");                Matcher matcher = pattern.matcher(value);                if(matcher.find()) {                    // 2) 如果匹配合乎规定, 搁置元组内                    collector.collect(new Tuple2<String,Integer>(matcher.group(1).trim(), 1));                }            }        }).groupBy(0).sum(1).print(); // 4. 依据日志级别, 汇总统计, 打印后果         }   }
  3. 流解决案例

    性能: 依据IP统计拜访次数

    代码:

    public class StreamProcessorApplication {    public static void main(String[] args) throws Exception{        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 2. 读取Socket数据源        DataStreamSource<String> socketStr = env.socketTextStream("127.0.0.1", 9911, "\n");        // 3. 转换解决流数据        socketStr.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {                // 依据分隔符解析数据                String[] arrValue = value.split("\t");                collector.collect(new Tuple2<String,Integer>(arrValue[0], 1));            }        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(2);        env.execute("accessLog");    }}

2. Flink部署配置

  1. 装置配置JDK8环境

    [root@localhost ~]# java -versionjava version "1.8.0_181"Java(TM) SE Runtime Environment (build 1.8.0_181-b13)Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
  2. 下载Flink安装包

    官网地址

    安装包

  3. 装置配置

    1) 解压

    tar -xvf flink-1.11.2-bin-scala_2.11.tgz

2)运行

bin/start-cluster.sh

主节点拜访端口:

vi conf/masters:

localhost:8081
  1. 拜访控制台

    http://10.10.20.132:8081/#/ov...

    Available Task Slots: 无效工作槽数量

    对应配置文件: vi conf/flink-conf.yaml

    taskmanager.numberOfTaskSlots: 1

TaskManger与JobManager关系

 ![file](/img/bVcRAYy)

Client 用来提交工作给 JobManager,JobManager 散发工作给 TaskManager 去执行, TaskManager 会采纳心跳的形式, 汇报工作的执行状态。

JobManager 负责整个 Flink 集群工作的调度以及资源的治理

TaskManager 负责具体的工作执行和对应工作在每个节点上的资源申请和治理

3. Flink工作提交

第一种形式: 界面提交

  1. 批改代码配置

    socket数据源连贯,采纳主机名称配置

    DataStreamSource<String> socketStr = env.socketTextStream("flink1", 9911, "\n");
  2. 工程代码打包

    POM文件减少打包插件

    <build>        <plugins>            <!-- 编译插件 -->            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.5.1</version>                <configuration>                    <source>1.8</source>                    <target>1.8</target>                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->                </configuration>            </plugin>            <!-- 打jar包插件(会蕴含所有依赖) -->            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-shade-plugin</artifactId>                <version>2.3</version>                <executions>                    <execution>                        <phase>package</phase>                        <goals>                            <goal>shade</goal>                        </goals>                        <configuration>                            <filters>                                <filter>                                    <artifact>*:*</artifact>                                    <excludes>                                        <!--                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->                                        <exclude>META-INF/*.SF</exclude>                                        <exclude>META-INF/*.DSA</exclude>                                        <exclude>META-INF/*.RSA</exclude>                                    </excludes>                                </filter>                            </filters>                            <transformers>                                <transformer                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                                    <!-- 能够设置jar包的入口类(可选) -->                                    <mainClass>com.itcast.flink.usage.stream.StreamProcessorApplication</mainClass>                                </transformer>                            </transformers>                        </configuration>                    </execution>                </executions>            </plugin>        </plugins>    </build>

留神,这里不能采纳spring-boot-maven-plugin打包插件, 否则flink不能失常辨认。

  1. 提交工作

    上传Jar包

    接下来,在flink1节点上, 开启Socket交互端口9911

    [root@flink1 flink-1.11.2]# nc -lk 9911

而后提交并执行工作

 ![file](/img/bVcRAYA)

savepoint path: 容错机制中快照保留的门路。

  1. 运行验证

    nc发送一些数据, 在TaskManager当中能够查看输入后果。

第二种形式: 命令行提交

在flink控制台革除原有的Job工作。

  1. 上传Jar包

    将Jar包上传至flink服务器:

    [root@flink1 examples]# lltotal 81880drwxr-xr-x. 2 root root      194 Sep  9 23:48 batch-rw-r--r--. 1 root root 83843774 Sep 26 05:57 flink-usage-1.0-SNAPSHOT.jardrwxr-xr-x. 2 root root       50 Sep  9 23:48 gellydrwxr-xr-x. 3 root root       19 Sep  9 23:48 pythondrwxr-xr-x. 2 root root      241 Sep  9 23:48 streamingdrwxr-xr-x. 2 root root      209 Sep  9 23:48 table
  2. 提交工作

    采纳命令行形式提交工作:

    [root@flink1 flink-1.11.2]# bin/flink run -c com.itcast.flink.usage.stream.StreamProcessorApplication examples/flink-usage-1.0-SNAPSHOT.jarJob has been submitted with JobID 4c127f68f6683e5a9342410d7b6540db
  3. 验证后果

    发送一些数据并在控制台验证输入后果。

    • *
      本文由mirson创作分享,如需进一步交换,请加QQ群:19310171或拜访www.softart.cn