共计 3663 个字符,预计需要花费 10 分钟才能阅读完成。
flink 搭建
环境:centos7 docker
1.docker pull flink
2. 创立文件 docker-compose.yml,内容如下:
version: "2.1"
services:
jobmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
3. 在文件目录下执行命令 docker-compose up -d
4. 主机 ip+8081 查看控制台是否显示
flink 我的项目搭建
1.maven 创立 flink 我的项目
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.1
2.pom.xml 文件增加 rabbitmq 依赖 依赖如下
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<!-- <scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<!-- <scope>runtime</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<!-- <scope>runtime</scope>-->
</dependency>
</dependencies>
3.WordCountJob
public class WordCountJob {public static void main(String[] args) throws Exception {
//1. 获取 flink 的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("1.117.78.150")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
//2. 连贯 socket 获取输出的数据 (数据源 Data Source)
//2.1. rabbitmq 连贯的配置,2.rabbitmq 的队列名,生产的队列名
DataStream<String> dataStreamSource = env.addSource(new RMQSource<String>(connectionConfig,
"test.flink",true, new SimpleStringSchema()));
dataStreamSource.print(); // 输入 Source 的信息
//3. 数据转换
//MapFunction: 第一个参数是你接管的数据的类型
//MapFunction: 第二个参数是返回数据的类型
DataStream<String> MessageVo = dataStreamSource.map((MapFunction<String, String>) s -> s
);
MessageVo.addSink(new MySink());
//5. 这一行代码肯定要实现,否则程序不执行
env.execute("liu er gou");
// SpringApplication.run(PaymentFlinkApplication.class, args);
}
}
4.MySink
public class MySink extends RichSinkFunction<String> {private static final Logger log = LoggerFactory.getLogger(MySink.class);
@Override
public void invoke(String value, Context context) throws Exception {log.info(value);
}
@Override
public void close() throws Exception {log.info("MySink close");
}
}
5.maven install 拿到 jar 包
6. 关上 flink 控制台 上传 jar 包 运行 WordCountJob 类 没有报错示意运行胜利 同时控制台会有相应的 job
7.rabbit 队列发送音讯操作 查看程序是否胜利 flink 控制台会显示
正文完