flink搭建

环境:centos7 docker
1.docker pull flink
2.创立文件docker-compose.yml,内容如下:

version: "2.1"services:  jobmanager:    image: flink:1.9.2-scala_2.12    expose:      - "6123"    ports:      - "8081:8081"    command: jobmanager    environment:      - JOB_MANAGER_RPC_ADDRESS=jobmanager  taskmanager:    image: flink:1.9.2-scala_2.12    expose:      - "6121"      - "6122"    depends_on:      - jobmanager    command: taskmanager    links:      - "jobmanager:jobmanager"    environment:      - JOB_MANAGER_RPC_ADDRESS=jobmanager

3.在文件目录下执行命令 docker-compose up -d
4.主机ip+8081查看控制台是否显示

flink我的项目搭建

1.maven创立flink我的项目

$ mvn archetype:generate                                     -DarchetypeGroupId=org.apache.flink                    -DarchetypeArtifactId=flink-quickstart-java            -DarchetypeVersion=1.13.1

2.pom.xml文件增加rabbitmq依赖 依赖如下

<dependencies>        <!-- Apache Flink dependencies -->        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-java</artifactId>            <version>${flink.version}</version><!--            <scope>provided</scope>-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>            <version>${flink.version}</version><!--            <scope>provided</scope>-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-clients_${scala.binary.version}</artifactId>            <version>${flink.version}</version><!--            <scope>provided</scope>-->        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        <!-- Add connector dependencies here. They must be in the default scope (compile). -->        <!-- Example:        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>            <version>${flink.version}</version>        </dependency>        -->        <!-- Add logging framework, to produce console output when running in the IDE. -->        <!-- These dependencies are excluded from the application JAR by default. -->        <dependency>            <groupId>org.apache.logging.log4j</groupId>            <artifactId>log4j-slf4j-impl</artifactId>            <version>${log4j.version}</version><!--            <scope>runtime</scope>-->        </dependency>        <dependency>            <groupId>org.apache.logging.log4j</groupId>            <artifactId>log4j-api</artifactId>            <version>${log4j.version}</version><!--            <scope>runtime</scope>-->        </dependency>        <dependency>            <groupId>org.apache.logging.log4j</groupId>            <artifactId>log4j-core</artifactId>            <version>${log4j.version}</version><!--            <scope>runtime</scope>-->        </dependency>    </dependencies>

3.WordCountJob

public class WordCountJob {    public static void main(String[] args) throws Exception {        //1.获取flink的运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()                .setHost("1.117.78.150")                .setPort(5672)                .setUserName("guest")                .setPassword("guest")                .setVirtualHost("/")                .build();        //2.连贯socket获取输出的数据(数据源Data Source)        //2.1. rabbitmq连贯的配置,2.rabbitmq的队列名,生产的队列名        DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(connectionConfig,                "test.flink",true, new SimpleStringSchema()));        dataStreamSource.print();   //输入Source的信息        //3.数据转换        //MapFunction:第一个参数是你接管的数据的类型        //MapFunction:第二个参数是返回数据的类型        DataStream<String> MessageVo = dataStreamSource.map(                (MapFunction<String, String>) s -> s        );        MessageVo.addSink(new MySink());        //5.这一行代码肯定要实现,否则程序不执行        env.execute("liu er gou");//        SpringApplication.run(PaymentFlinkApplication.class, args);    }}

4.MySink

public class MySink extends RichSinkFunction<String> {    private static final Logger log = LoggerFactory.getLogger(MySink.class);    @Override    public void invoke(String value, Context context) throws Exception {        log.info(value);    }    @Override    public void close() throws Exception {        log.info("MySink close");    }}

5.maven install 拿到jar包

6.关上flink控制台 上传jar包 运行WordCountJob类 没有报错示意运行胜利 同时控制台会有相应的job


7.rabbit队列发送音讯操作 查看程序是否胜利 flink控制台会显示