聊聊rocketmqmysql的Replicator

68次阅读

共计 3942 个字符,预计需要花费 10 分钟才能阅读完成。

本文主要研究一下 rocketmq-mysql 的 Replicator

Replicator

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java

public class Replicator {private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);

    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");

    private Config config;

    private EventProcessor eventProcessor;

    private RocketMQProducer rocketMQProducer;

    private Object lock = new Object();
    private BinlogPosition nextBinlogPosition;
    private long nextQueueOffset;
    private long xid;

    public static void main(String[] args) {Replicator replicator = new Replicator();
        replicator.start();}

    public void start() {

        try {config = new Config();
            config.load();

            rocketMQProducer = new RocketMQProducer(config);
            rocketMQProducer.start();

            BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);
            binlogPositionLogThread.start();

            eventProcessor = new EventProcessor(this);
            eventProcessor.start();} catch (Exception e) {LOGGER.error("Start error.", e);
            System.exit(1);
        }
    }

    public void commit(Transaction transaction, boolean isComplete) {String json = transaction.toJson();

        for (int i = 0; i < 3; i++) {
            try {if (isComplete) {long offset = rocketMQProducer.push(json);

                    synchronized (lock) {xid = transaction.getXid();
                        nextBinlogPosition = transaction.getNextBinlogPosition();
                        nextQueueOffset = offset;
                    }

                } else {rocketMQProducer.push(json);
                }
                break;

            } catch (Exception e) {LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
            }
        }
    }

    public void logPosition() {

        String binlogFilename = null;
        long xid = 0L;
        long nextPosition = 0L;
        long nextOffset = 0L;

        synchronized (lock) {if (nextBinlogPosition != null) {
                xid = this.xid;
                binlogFilename = nextBinlogPosition.getBinlogFilename();
                nextPosition = nextBinlogPosition.getPosition();
                nextOffset = nextQueueOffset;
            }
        }

        if (binlogFilename != null) {POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
                xid, binlogFilename, nextPosition, nextOffset);
        }

    }

    public Config getConfig() {return config;}

    public BinlogPosition getNextBinlogPosition() {return nextBinlogPosition;}

}
  • Replicator 提供了 start、commit、logPosition 方法;start 方法会创建 RocketMQProducer、BinlogPositionLogThread 及 EventProcessor,然后执行其 start 方法;commit 方法会通过 rocketMQProducer 将 transaction.toJson() 发送出去,对于 isComplete 为 true 的会更新 xid、nextBinlogPosition、nextQueueOffset;logPosition 方法会打印 binlogFilename、nextPosition、nextOffset

RocketMQProducer

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java

public class RocketMQProducer {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);

    private DefaultMQProducer producer;
    private Config config;

    public RocketMQProducer(Config config) {this.config = config;}

    public void start() throws MQClientException {producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
        producer.setNamesrvAddr(config.mqNamesrvAddr);
        producer.start();}

    public long push(String json) throws Exception {LOGGER.debug(json);

        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
        SendResult sendResult = producer.send(message);

        return sendResult.getQueueOffset();}
}
  • RocketMQProducer 的 start 方法创建 DefaultMQProducer 并执行其 start 方法;其 push 方法则通过 producer.send(message) 发送消息,并返回 sendResult.getQueueOffset()

BinlogPositionLogThread

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java

public class BinlogPositionLogThread extends Thread {private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);

    private Replicator replicator;

    public BinlogPositionLogThread(Replicator replicator) {
        this.replicator = replicator;
        setDaemon(true);
    }

    @Override
    public void run() {while (true) {
            try {Thread.sleep(1000);
            } catch (InterruptedException e) {logger.error("Offset thread interrupted.", e);
            }

            replicator.logPosition();}
    }
}
  • BinlogPositionLogThread 会定时执行 replicator.logPosition() 来打印 position 信息

小结

Replicator 提供了 start、commit、logPosition 方法;start 方法会创建 RocketMQProducer、BinlogPositionLogThread 及 EventProcessor,然后执行其 start 方法;commit 方法会通过 rocketMQProducer 将 transaction.toJson() 发送出去,对于 isComplete 为 true 的会更新 xid、nextBinlogPosition、nextQueueOffset;logPosition 方法会打印 binlogFilename、nextPosition、nextOffset

doc

  • Replicator

正文完
 0