flink搭建
环境:centos7 docker
1.docker pull flink
2.创立文件docker-compose.yml,内容如下:
version: "2.1"services: jobmanager: image: flink:1.9.2-scala_2.12 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:1.9.2-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
3.在文件目录下执行命令 docker-compose up -d
4.主机ip+8081查看控制台是否显示
flink我的项目搭建
1.maven创立flink我的项目
$ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.13.1
2.pom.xml文件增加rabbitmq依赖 依赖如下
<dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version><!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version><!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version><!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Add connector dependencies here. They must be in the default scope (compile). --> <!-- Example: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> --> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version><!-- <scope>runtime</scope>--> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version><!-- <scope>runtime</scope>--> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version><!-- <scope>runtime</scope>--> </dependency> </dependencies>
3.WordCountJob
public class WordCountJob { public static void main(String[] args) throws Exception { //1.获取flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("1.117.78.150") .setPort(5672) .setUserName("guest") .setPassword("guest") .setVirtualHost("/") .build(); //2.连贯socket获取输出的数据(数据源Data Source) //2.1. rabbitmq连贯的配置,2.rabbitmq的队列名,生产的队列名 DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(connectionConfig, "test.flink",true, new SimpleStringSchema())); dataStreamSource.print(); //输入Source的信息 //3.数据转换 //MapFunction:第一个参数是你接管的数据的类型 //MapFunction:第二个参数是返回数据的类型 DataStream<String> MessageVo = dataStreamSource.map( (MapFunction<String, String>) s -> s ); MessageVo.addSink(new MySink()); //5.这一行代码肯定要实现,否则程序不执行 env.execute("liu er gou");// SpringApplication.run(PaymentFlinkApplication.class, args); }}
4.MySink
public class MySink extends RichSinkFunction<String> { private static final Logger log = LoggerFactory.getLogger(MySink.class); @Override public void invoke(String value, Context context) throws Exception { log.info(value); } @Override public void close() throws Exception { log.info("MySink close"); }}
5.maven install 拿到jar包
6.关上flink控制台 上传jar包 运行WordCountJob类 没有报错示意运行胜利 同时控制台会有相应的job
7.rabbit队列发送音讯操作 查看程序是否胜利 flink控制台会显示