乐趣区

关于flink:flink学习

flink 搭建

环境:centos7 docker
1.docker pull flink
2. 创立文件 docker-compose.yml,内容如下:

version: "2.1"
services:
  jobmanager:
    image: flink:1.9.2-scala_2.12
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:1.9.2-scala_2.12
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

3. 在文件目录下执行命令 docker-compose up -d
4. 主机 ip+8081 查看控制台是否显示

flink 我的项目搭建

1.maven 创立 flink 我的项目

$ mvn archetype:generate                               
      -DarchetypeGroupId=org.apache.flink              
      -DarchetypeArtifactId=flink-quickstart-java      
      -DarchetypeVersion=1.13.1

2.pom.xml 文件增加 rabbitmq 依赖 依赖如下

<dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
<!--            <scope>runtime</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
<!--            <scope>runtime</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
<!--            <scope>runtime</scope>-->
        </dependency>
    </dependencies>

3.WordCountJob

public class WordCountJob {public static void main(String[] args) throws Exception {
        //1. 获取 flink 的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("1.117.78.150")
                .setPort(5672)
                .setUserName("guest")
                .setPassword("guest")
                .setVirtualHost("/")
                .build();
        //2. 连贯 socket 获取输出的数据 (数据源 Data Source)
        //2.1. rabbitmq 连贯的配置,2.rabbitmq 的队列名,生产的队列名
        DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(connectionConfig,
                "test.flink",true, new SimpleStringSchema()));
        dataStreamSource.print();   // 输入 Source 的信息


        //3. 数据转换
        //MapFunction: 第一个参数是你接管的数据的类型
        //MapFunction: 第二个参数是返回数据的类型
        DataStream<String> MessageVo = dataStreamSource.map((MapFunction<String, String>) s -> s
        );
        MessageVo.addSink(new MySink());
        //5. 这一行代码肯定要实现,否则程序不执行
        env.execute("liu er gou");

//        SpringApplication.run(PaymentFlinkApplication.class, args);
    }
}

4.MySink

public class MySink extends RichSinkFunction<String> {private static final Logger log = LoggerFactory.getLogger(MySink.class);


    @Override
    public void invoke(String value, Context context) throws Exception {log.info(value);
    }

    @Override
    public void close() throws Exception {log.info("MySink close");
    }
}

5.maven install 拿到 jar 包

6. 关上 flink 控制台 上传 jar 包 运行 WordCountJob 类 没有报错示意运行胜利 同时控制台会有相应的 job


7.rabbit 队列发送音讯操作 查看程序是否胜利 flink 控制台会显示

退出移动版