关于java:Apache-Druid-数据摄取本地数据和kafka流式数据

4次阅读

共计 5531 个字符,预计需要花费 14 分钟才能阅读完成。

Durid 概述

Apache Druid 是一个集工夫序列数据库、数据仓库和全文检索零碎特点于一体的剖析性数据平台。本文将带你简略理解 Druid 的个性,应用场景,技术特点和架构。这将有助于你选型数据存储计划,深刻理解 Druid 存储,深刻理解工夫序列存储等。

Apache Druid 是一个高性能的实时剖析型数据库。

上篇文章,咱们理解了 Druid 的加载形式,

咱么次要说两种,一种是加载本地数据,一种是通过 kafka 加载流式数据。

数据摄取

4.1 加载本地文件

咱们导入演示案例种的演示文件

4.1.1.1 数据抉择

通过 UI 抉择local disk

并抉择Connect data

4.1.1.2 演示数据查看

演示数据在 quickstart/tutorial 目录下的 wikiticker-2015-09-12-sampled.json.gz 文件

4.1.1.3 抉择数据源

因为咱们是通过 imply 装置的,在 Base directory 输出绝对路径 /usr/local/imply/imply-2021.05-1/dist/druid/quickstart/tutorial,File filter 输出 wikiticker-2015-09-12-sampled.json.gz, 并抉择apply 利用配置,咱们数据曾经加载进来了

Base directoryFile filter 离开是因为可能须要同时从多个文件中摄取数据。

4.1.1.4 加载数据

数据定位后,您能够点击 ”Next: Parse data” 来进入下一步。

数据加载器将尝试主动为数据确定正确的解析器。在这种状况下,它将胜利确定json。能够随便应用不同的解析器选项来预览 Druid 如何解析您的数据。

4.1.2 数据源标准配置
4.1.2.1 设置工夫列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主工夫列。

​ Druid 的体系结构须要一个主工夫列(外部存储为名为 _time 的列)。如果您的数据中没有工夫戳,请抉择 固定值(Constant Value)。在咱们的示例中,数据加载器将确定原始数据中的工夫列是惟一可用作主工夫列的候选者。

这里能够抉择工夫列,以及工夫的显示方式

4.1.2.2 设置转换器

在这里能够新增虚构列,将一个列的数据转换成另一个虚构列,这里咱们没有设置,间接跳过

4.1.2.3 设置过滤器

这里能够设置过滤器,对于某些数据能够不进行显示,这里咱们也跳过

4.1.2.4 配置 schema

Configure schema 步骤中,您能够配置将哪些维度和指标摄入到 Druid 中,这些正是数据在被 Druid 中摄取后呈现的样子。因为咱们的数据集十分小,关掉 rollup、确认更改。

4.1.2.5 配置 Partition

一旦对 schema 称心后,点击 Next 后进入 Partition 步骤,该步骤中能够调整数据如何划分为段文件的形式,因为咱们数据量十分小,这里咱们依照 DAY 进行分段

4.1.3 提交工作
4.1.3.1 公布数据

点击实现 Tune 步骤,进入到 Publish 步,在这里咱们能够给咱们的数据源命名,这里咱们就命名为druid-sampled

点击下一步就能够查看咱们的数据标准

​ 这就是您构建的标准,为了查看更改将如何更新标准是能够随便返回之前的步骤中进行更改,同样,您也能够间接编辑标准,并在后面的步骤中看到它。

4.1.3.2 提交工作

对摄取标准感到称心后,请单击 Submit,而后将创立一个数据摄取工作。

您能够进入工作视图,重点关注新创建的工作。工作视图设置为主动刷新,请期待工作胜利。

当一项工作胜利实现时,意味着它建设了一个或多个段,这些段当初将由 Data 服务器接管。

4.1.3.3 查看数据源

从题目导航到 Datasources 视图,一旦看到绿色(齐全可用)圆圈,就能够查问数据源。此时,您能够转到 Query 视图以对数据源运行 SQL 查问。

4.1.3.4 查问数据

能够转到查问页面进行数据查问,这里在 sql 窗口编写 sql 后点击运行就能够查问数据了

4.2 kafka 加载流式数据

4.2.1 装置 Kafka

这里咱们应用 docker-compose 的形式启动 kafka

4.2.1.1 编辑资源清单
vi docker-compose.yml
version: '2'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports: 
      - 2181:2181
  kafka:
    image: wurstmeister/kafka       ## 镜像
    volumes: 
        - /etc/localtime:/etc/localtime ## 挂载地位(kafka 镜像和宿主机器之间工夫放弃始终)ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.64.190   ## 批改: 宿主机 IP
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       ## 卡夫卡运行是基于 zookeeper 的
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: 120
      KAFKA_MESSAGE_MAX_BYTES: 10000000
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DELETE_RETENTION_MS: 1000
4.2.2.2 启动容器
docker-compose up -d

docker-compose ps

4.2.3 验证 kafka

启动 kafka 后须要验证 kafka 是否可用

4.2.3.1 登录容器

登录容器并进入指定目录

# 进入容器
docker exec -it kafka_kafka_1 bash

#进入 /opt/kafka_2.13-2.7.0/bin/ 目录下
cd /opt/kafka_2.13-2.7.0/bin/

4.2.3.2 发送音讯

运行客户端发送音讯,留神这里的连贯地址须要写咱们配置的宿主机地址

# 运行 kafka 生产者发送音讯
./kafka-console-producer.sh --broker-list 192.168.64.173:9092 --topic test

发送的数据如下

{"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}

4.2.3.3 生产音讯

运行消费者生产音讯

./kafka-console-consumer.sh --bootstrap-server 192.168.64.173:9092 --topic test --from-beginning

有数据打印阐明咱们 kafka 装置是没有问题的

4.2.4 发送数据到 kafka
4.2.4.1 编写代码

编写代码发送音讯到 kafka 中

@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送音讯到 kafka
     *
     * @param topic   主题
     * @param message 内容体
     */
    public void sendMsg(String topic, String message) {kafkaTemplate.send(topic, message);
    }
}
@RestController
@RequestMapping("/taxi")
public class KafkaController {
    @Autowired
    private KafkaSender kafkaSender;

    @RequestMapping("/batchTask/{num}")
    public String batchAdd(@PathVariable("num") int num) {for (int i = 0; i < num; i++) {Message message = Utils.getRandomMessage();
            kafkaSender.sendMsg("message", JSON.toJSONString(message));
        }
        return "OK";
    }
}
4.2.4.2 发送音讯

应用 postman 发送音讯到 kafka,音讯地址:http://localhost:8010/taxi/ba…,音讯数据如下

显示 OK 阐明音讯曾经发送到了 kafka 中

4.2.5 数据抉择
4.2.51 kafka 数据查看

在 load 页面抉择 kafka,进行数据摄取模式抉择

4.2.5.2 抉择数据源

在这里输出 ZK 的地址以及须要抉择数据的topic

116.62.213.90:10903,116.62.213.90:10904

4.2.5.3 加载数据

点击 apply 利用配置,设置加载数据源

4.2.6 数据源标准配置
4.2.6.1 设置工夫列

json 选择器被选中后,点击 Next:Parse time 进入下一步来决定您的主工夫列。

​ 因为咱们的工夫列有两个创立工夫以及打车工夫,咱们配置工夫列为trvelDate

4.2.6.2 设置转换器

在这里能够新增虚构列,将一个列的数据转换成另一个虚构列,这里咱们减少一个状态的虚构列,来显示状态的中文名称咱们定义 0:测试数据,1: 发动打车,2:排队中,3:司机接单,4:乘客上车,5:实现打车

咱们应用 case_simple 来实现判断性能,更多判断性能参考

case_simple(status,0,'测试数据',1,'发动打车',2,'排队中',3,'司机接单',4,'实现打车','状态谬误')

在这里咱们新建了一个 status_text 的虚构列来展现须要中文显示的列

配置年龄默认值,如果为空咱们设置为 25

nvl(age,25)

配置性别设置,咱们须要设置为男女,0:男,1:女,如果为 null,咱们设置为男

case_simple(nvl(sex,0),0,'男',1,'女','男')

4.2.6.3 设置过滤器

这里能够设置过滤器,对于某些数据不展现,这里咱们应用 区间过滤器 抉择显示 status>=1 的数据,具体表达式可用参考

 {
  "type" : "bound",
  "dimension" : "status",
  "ordering": "numeric",
  "lower": "1",
 }

因为咱们把数据是 0 的测试数据不显示了,所以只显示了一条数据为 1 的数据

4.2.6.4 配置 schema

Configure schema 步骤中,您能够配置将哪些维度和指标摄入到 Druid 中,这些正是数据在被 Druid 中摄取后呈现的样子。因为咱们的数据集十分小,关掉 rollup、确认更改。

4.2.6.5 配置 Partition

一旦对 schema 称心后,点击 Next 后进入 Partition 步骤,该步骤中能够调整数据如何划分为段文件的形式,因为咱们打车个别依照小时来算的,咱们设置为分区为 “hour

4.2.6.6 配置拉取形式

这里设置 kafka 的拉取形式,次要设置偏移量的一些配置

​ 在 Tune 步骤中,将 Use earliest offset 设置为 True 十分重要,因为咱们须要从流的开始地位生产数据。其余没有任何须要更改的中央,进入到 Publish

4.5.7 提交工作
4.2.7.1 公布数据

点击实现 Tune 步骤,进入到 Publish 步,在这里咱们能够给咱们的数据源命名,这里咱们就命名为taxi-message

点击下一步就能够查看咱们的数据标准

​ 这就是您构建的标准,为了查看更改将如何更新标准是能够随便返回之前的步骤中进行更改,同样,您也能够间接编辑标准,并在后面的步骤中看到它。

4.2.7.2 提交工作

对摄取标准感到称心后,请单击 Submit,而后将创立一个数据摄取工作。

您能够进入工作视图,重点关注新创建的工作。工作视图设置为主动刷新,请期待工作胜利。

当一项工作胜利实现时,意味着它建设了一个或多个段,这些段当初将由 Data 服务器接管。

4.2.7.3 查看数据源

从题目导航到 Datasources 视图,一旦看到绿色(齐全可用)圆圈,就能够查问数据源。此时,您能够转到 Query 视图以对数据源运行 SQL 查问。

4.2.7.4 查问数据

能够转到查问页面进行数据查问,这里在 sql 窗口编写 sql 后点击运行就能够查问数据了

4.2.7.5 动静增加数据

发送一条数据到 kafka

druid 查问数据, 发现新的数据曾经进来了

4.2.8 清理数据
4.2.8.1 敞开集群
# 进入 impl 装置目录
cd /usr/local/imply/imply-2021.05-1
# 敞开集群
./bin/service --down

4.2.8.2 期待敞开服务

通过过程查看,查看服务是否曾经敞开

 ps -ef|grep druid

4.2.8.3 清理数据

通过删除 druid 软件包下的 var 目录的内容来重置集群状态

ll
rm -rf var

4.2.8.4 重新启动集群
 nohup bin/supervise -c conf/supervise/quickstart.conf > logs/quickstart.log 2>&1 &
4.2.8.5 查看数据源

登录后查看数据源,咱们发现曾经被重置了

本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源

正文完
 0