Pulsar Connectors

音讯解决(Processing guarantee)

  • at-most-once
  • at-least-once
  • effectively-once

操作流程(JDBC sink)

  1. Add a configuration file.
  2. Create a schema.
  3. Upload a schema to a topic.
  4. Create a JDBC sink
  5. Stop a JDBC sink
  6. Restart a JDBC sink
  7. Update a JDBC sink

内建连接器(Built-in connector)

Source connector

  • Canal
  • File
  • Flume
  • Kafka
  • RabbitMQ

Sink connector

  • ElasticSearch/Solr
  • Flume
  • HBase
  • HDFS2/HDFS3
  • InfluxDB
  • JDBC ClickHouse/MariaDB/PostgreSQL
  • Kafka
  • MongoDB
  • RabbitMQ
  • Redis

ClickHouse Sink

  1. 创立表
CREATE DATABASE IF NOT EXISTS monitor;CREATE TABLE IF NOT EXISTS monitor.pulsar_clickhouse_jdbc_sink(    id   UInt32,    name String) ENGINE = TinyLog;INSERT INTO monitor.pulsar_clickhouse_jdbc_sink (id, name)VALUES (1, 'tmp');SELECT *FROM monitor.pulsar_clickhouse_jdbc_sink;
  1. 创立配置
$ vi $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml{    "userName": "sysop",    "password": "123456",    "jdbcUrl": "jdbc:clickhouse://server-101:8123/monitor",    "tableName": "pulsar_clickhouse_jdbc_sink"}
  1. 创立schema
$ vi $PULSAR_HOME/connectors/json-schema.json{  "name": "",  "schema": {    "type": "record",    "name": "SeedEvent",    "namespace": "com.cloudwise.quickstart.model",    "fields": [      {        "name": "id",        "type": [          "null",          "int"        ]      },      {        "name": "name",        "type": [          "null",          "string"        ]      }    ]  },  "type": "JSON",  "properties": {    "__alwaysAllowNull": "true",    "__jsr310ConversionEnabled": "false"  }}
  1. 上传schema
$ $PULSAR_HOME/bin/pulsar-admin schemas upload \pulsar-postgres-jdbc-sink-topic \-f $PULSAR_HOME/connectors/json-schema.json
  1. 运行
$ $PULSAR_HOME/bin/pulsar-admin sinks create \--tenant public \--namespace default \--name pulsar-clickhouse-jdbc-sink \--inputs pulsar-clickhouse-jdbc-sink-topic \--sink-config-file $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml \--archive $PULSAR_HOME/connectors/pulsar-io-jdbc-clickhouse-2.6.2.nar \--processing-guarantees EFFECTIVELY_ONCE \--parallelism 1

更多福利

云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维治理平台OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、复原 等性能,可为用户提供便捷的运维能力和业务管理,在进步运维人员等工作效率的同时,极大晋升了业务的连续性和安全性。点击下方地址链接,欢送大家给OMP点赞送star,理解更多相干内容~

GitHub地址:https://github.com/CloudWise-...

Gitee地址:https://gitee.com/CloudWise/OMP

微信扫描辨认下方二维码,备注【OMP】退出AIOps社区运维治理平台OMP开发者交换群,与更多行业大佬一起交流学习~

系列浏览

深入浅出Apache Pulsar(1):Pulsar vs Kafka

深入浅出Apache Pulsar(2):Pulsar音讯机制

深入浅出 Apache Pulsar(3):Pulsar Schema

深入浅出 Apache Pulsar(4)Pulsar Functions