本文主要研究一下rocketmq-mysql的ColumnParser

ColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java

public abstract class ColumnParser {    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {        switch (dataType) {            case "tinyint":            case "smallint":            case "mediumint":            case "int":                return new IntColumnParser(dataType, colType);            case "bigint":                return new BigIntColumnParser(colType);            case "tinytext":            case "text":            case "mediumtext":            case "longtext":            case "varchar":            case "char":                return new StringColumnParser(charset);            case "date":            case "datetime":            case "timestamp":                return new DateTimeColumnParser();            case "time":                return new TimeColumnParser();            case "year":                return new YearColumnParser();            case "enum":                return new EnumColumnParser(colType);            case "set":                return new SetColumnParser(colType);            default:                return new DefaultColumnParser();        }    }    public static String[] extractEnumValues(String colType) {        String[] enumValues = {};        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);        if (matcher.matches()) {            enumValues = matcher.group(2).replace("'", "").split(",");        }        return enumValues;    }    public abstract Object getValue(Object value);}
  • ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

IntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java

public class IntColumnParser extends ColumnParser {    private int bits;    private boolean signed;    public IntColumnParser(String dataType, String colType) {        switch (dataType) {            case "tinyint":                bits = 8;                break;            case "smallint":                bits = 16;                break;            case "mediumint":                bits = 24;                break;            case "int":                bits = 32;        }        this.signed = !colType.matches(".* unsigned$");    }    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof Long) {            return value;        }        if (value instanceof Integer) {            Integer i = (Integer) value;            if (signed || i > 0) {                return i;            } else {                return (1L << bits) + i;            }        }        return value;    }}
  • IntColumnParser解析tinyint、smallint、mediumint、int类型

BigIntColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java

public class BigIntColumnParser extends ColumnParser {    private static BigInteger max = BigInteger.ONE.shiftLeft(64);    private boolean signed;    public BigIntColumnParser(String colType) {        this.signed = !colType.matches(".* unsigned$");    }    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof BigInteger) {            return value;        }        Long l = (Long) value;        if (!signed && l < 0) {            return max.add(BigInteger.valueOf(l));        } else {            return l;        }    }}
  • BigIntColumnParser解析bigint类型

StringColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java

public class StringColumnParser extends ColumnParser {    private String charset;    public StringColumnParser(String charset) {        this.charset = charset.toLowerCase();    }    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof String) {            return value;        }        byte[] bytes = (byte[]) value;        switch (charset) {            case "utf8":            case "utf8mb4":                return new String(bytes, Charsets.UTF_8);            case "latin1":            case "ascii":                return new String(bytes, Charsets.ISO_8859_1);            case "ucs2":                return new String(bytes, Charsets.UTF_16);            default:                return new String(bytes, Charsets.toCharset(charset));        }    }}
  • StringColumnParser解析tinytext、text、mediumtext、longtext、varchar、char类型

DateTimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java

public class DateTimeColumnParser extends ColumnParser {    private static SimpleDateFormat dateTimeFormat;    private static SimpleDateFormat dateTimeUtcFormat;    static {        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));    }    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof Timestamp) {            return dateTimeFormat.format(value);        }        if (value instanceof Long) {            return dateTimeUtcFormat.format(new Date((Long) value));        }        return value;    }}
  • DateTimeColumnParser解析date、datetime、timestamp类型

TimeColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java

public class TimeColumnParser extends ColumnParser {    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof Timestamp) {            return new Time(((Timestamp) value).getTime());        }        return value;    }}
  • TimeColumnParser解析time类型

YearColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java

public class YearColumnParser extends ColumnParser {    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof Date) {            Calendar calendar = Calendar.getInstance();            calendar.setTime((Date) value);            return calendar.get(Calendar.YEAR);        }        return value;    }}
  • YearColumnParser解析year类型

EnumColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java

public class EnumColumnParser extends ColumnParser {    private String[] enumValues;    public EnumColumnParser(String colType) {        enumValues = extractEnumValues(colType);    }    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof String) {            return value;        }        Integer i = (Integer) value;        if (i == 0) {            return null;        } else {            return enumValues[i - 1];        }    }}
  • EnumColumnParser解析enum类型

SetColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java

public class SetColumnParser extends ColumnParser {    private String[] enumValues;    public SetColumnParser(String colType) {        enumValues = extractEnumValues(colType);    }    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof String) {            return value;        }        StringBuilder builder = new StringBuilder();        long l = (Long) value;        boolean needSplit = false;        for (int i = 0; i < enumValues.length; i++) {            if (((l >> i) & 1) == 1) {                if (needSplit)                    builder.append(",");                builder.append(enumValues[i]);                needSplit = true;            }        }        return builder.toString();    }}
  • SetColumnParser解析set类型

DefaultColumnParser

rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java

public class DefaultColumnParser extends ColumnParser {    @Override    public Object getValue(Object value) {        if (value == null) {            return null;        }        if (value instanceof byte[]) {            return Base64.encodeBase64String((byte[]) value);        }        return value;    }}
  • DefaultColumnParser通过base64将byte数组转为string

小结

ColumnParser定义了getValue抽象方法;它提供了getColumnParser方法用于根据dataType获取对应的ColumnParser实现类

doc

  • ColumnParser