本文主要研究一下rocketmq-mysql的Replicator

Replicator

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java

public class Replicator {    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");    private Config config;    private EventProcessor eventProcessor;    private RocketMQProducer rocketMQProducer;    private Object lock = new Object();    private BinlogPosition nextBinlogPosition;    private long nextQueueOffset;    private long xid;    public static void main(String[] args) {        Replicator replicator = new Replicator();        replicator.start();    }    public void start() {        try {            config = new Config();            config.load();            rocketMQProducer = new RocketMQProducer(config);            rocketMQProducer.start();            BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);            binlogPositionLogThread.start();            eventProcessor = new EventProcessor(this);            eventProcessor.start();        } catch (Exception e) {            LOGGER.error("Start error.", e);            System.exit(1);        }    }    public void commit(Transaction transaction, boolean isComplete) {        String json = transaction.toJson();        for (int i = 0; i < 3; i++) {            try {                if (isComplete) {                    long offset = rocketMQProducer.push(json);                    synchronized (lock) {                        xid = transaction.getXid();                        nextBinlogPosition = transaction.getNextBinlogPosition();                        nextQueueOffset = offset;                    }                } else {                    rocketMQProducer.push(json);                }                break;            } catch (Exception e) {                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);            }        }    }    public void logPosition() {        String binlogFilename = null;        long xid = 0L;        long nextPosition = 0L;        long nextOffset = 0L;        synchronized (lock) {            if (nextBinlogPosition != null) {                xid = this.xid;                binlogFilename = nextBinlogPosition.getBinlogFilename();                nextPosition = nextBinlogPosition.getPosition();                nextOffset = nextQueueOffset;            }        }        if (binlogFilename != null) {            POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",                xid, binlogFilename, nextPosition, nextOffset);        }    }    public Config getConfig() {        return config;    }    public BinlogPosition getNextBinlogPosition() {        return nextBinlogPosition;    }}
  • Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

RocketMQProducer

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java

public class RocketMQProducer {    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);    private DefaultMQProducer producer;    private Config config;    public RocketMQProducer(Config config) {        this.config = config;    }    public void start() throws MQClientException {        producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");        producer.setNamesrvAddr(config.mqNamesrvAddr);        producer.start();    }    public long push(String json) throws Exception {        LOGGER.debug(json);        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));        SendResult sendResult = producer.send(message);        return sendResult.getQueueOffset();    }}
  • RocketMQProducer的start方法创建DefaultMQProducer并执行其start方法;其push方法则通过producer.send(message)发送消息,并返回sendResult.getQueueOffset()

BinlogPositionLogThread

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java

public class BinlogPositionLogThread extends Thread {    private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);    private Replicator replicator;    public BinlogPositionLogThread(Replicator replicator) {        this.replicator = replicator;        setDaemon(true);    }    @Override    public void run() {        while (true) {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                logger.error("Offset thread interrupted.", e);            }            replicator.logPosition();        }    }}
  • BinlogPositionLogThread会定时执行replicator.logPosition()来打印position信息

小结

Replicator提供了start、commit、logPosition方法;start方法会创建RocketMQProducer、BinlogPositionLogThread及EventProcessor,然后执行其start方法;commit方法会通过rocketMQProducer将transaction.toJson()发送出去,对于isComplete为true的会更新xid、nextBinlogPosition、nextQueueOffset;logPosition方法会打印binlogFilename、nextPosition、nextOffset

doc

  • Replicator