本文首发于泊浮目标简书:https://www.jianshu.com/u/204...
版本日期备注
1.02020.9.13文章首发

前言

Clean Architecture是Bob大叔在2012年提出的一个架构模型。其依据过来几十年中的一系列架构提炼而成:

  • Hexagonal Architecture:由 Alistair Cockburn 首先提出
  • DCI:由 James Coplien 和Trygve Reenskaug 首先提出
  • BCE:由 Ivar Jacobson 在他的 Obect Oriented Software Engineering: A Use-Case Driven Approach 一书中首先提出

依据这些架构设计进去的零碎,往往具备以下特点:

  • 独立于框架:这些零碎的架构并不依赖某个功能丰富的框架之中的某个函数。框架能够被当成工具来应用, 但不须要让零碎来适应框架 。
  • 可被测试 :这些零碎的业务逻辑可 以脱离 UI、 数据库、Web 服务以及其余的内部元素来进行测试 。
  • 独立于 UI :这些零碎的 UI 变更起来很容易, 不须要批改其余的零碎局部 。 例如, 咱们能够在不批改业务逻辑的前提下将一个零碎的 UI 由 Web 界面替 换成命令行界面 。
  • 独立于数据库:咱们能够轻易将这些零碎应用的 Oracle 、SQL Server 替换成 Mongo、BigTable、 CouchDB 之类的数据库。因为业务逻辑与数据库之间曾经实现了结耦 。独立于任何内部机构:这些零碎的业务逻辑并不需要晓得 任何其余内部接口的存在 。

对于Clean Architecture的介绍到此为止,有趣味的同学能够自行查阅google。

背景

最近写了很多业务代码,因为每个组件都是分布式部署的,导致手动测试时十分的苦楚,耗时耗力。于是我便开始思考自动化测试的计划。

目前业务中一部分的代码应用了Storm这个框架,咱们挑一个不便了解的用例,这里大略波及三个组件:

  • ReadSpout:从kafka、database读取音讯,并将其下发
  • DispatcherBolt:读取上游下发的音讯,并依据肯定的规定散发——比方主键(最近又减少了自定义键和组合键),而后将关键字雷同的数据放在一起,而后下发
  • KafkaWriteBolt:读取上游下发的音讯,将关键字一样的数据写入kafka

DispatcherBolt的外围代码大抵如下:

    @Override    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {        super.prepare(conf, context, collector);        try {            init();        } catch (Exception e) {            collector.reportError(e);            throw new RuntimeException(e);        }    }    @Override    public void execute(Tuple dataTuple) {        this.input = dataTuple;        try {            Object obj = dataTuple.getValueByField(EmitFields.MESSAGE);            String key = (String) dataTuple.getValueByField(EmitFields.GROUP_FIELD);            List<MessageEntry> messageEntries = (List<MessageEntry>) obj;            emitMessageEntry(key, messageEntries);            this.collector.ack(dataTuple);        } catch (Exception e) {            logger.info("Dispatcher Execute error: ", e);            this.collector.reportError(e);            this.collector.fail(dataTuple);        }    }    private void emitMessageEntry(String key, List<MessageEntry> messageEntries) throws Exception {        long lastPos = 0L, uniquePos = 0L, payloadSize = 0L;        UmsMessageBuilder builder = null;        // 解决任一schema 分组的表数据        String tableName = messageEntries.get(0).getEntryHeader().getTableName();        for (MessageEntry msgEntry : messageEntries) {            EntryHeader header = msgEntry.getEntryHeader();            header.setLastPosition(lastPos);            // 若应用schema 进行分组,则同一组数据中可能会呈现多张表的情景,须要处理表名呈现切换的状况            if (StringUtils.isEmpty(tableName) || (getExtractorConfig().getGroupType() == GroupType.SCHEMA && !StringUtils.equalsIgnoreCase(tableName, header.getTableName()))) {                emitBuilderMessage(builder, key);                builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(),                        MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType()));                payloadSize = 0;            }            // DDL解决            if (msgEntry.isDdl()) {                emitBuilderMessage(builder, key);                executeDdlEvent(msgEntry);                emitDDLMessage(key, msgEntry);                builder = null;                continue;            }            if (builder != null && msgEntry.getEntryHeader().getHeader().getSourceType().equalsIgnoreCase(MediaType.DataSourceType.ORACLE.getName())) {                emitBoltMessage(key, builder.getMessage());                builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(),                        MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType()));                payloadSize = 0;            }            // DML解决            if (builder == null) {                builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(),                        MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType()));                payloadSize = 0;            }            for (CanalEntry.RowData rowData : msgEntry.getRowDataLst()) {                lastPos = Long.parseLong(header.getPosition()) + (++uniquePos);                // 对UPDATE类型的增量数据非凡解决                if (header.isUpdate()) {                    if (getExtractorConfig().getOutputBeforeUpdateFlg()) {                        payloadSize += appendUpdateBefore2Builder(builder, header, rowData, EventType.BEFORE.getValue().toLowerCase());                    }                    if (ExtractorHelper.isPkUpdate(rowData.getAfterColumnsList())) {                        payloadSize += appendUpdateBefore2Builder(builder, header, rowData, getEventTypeForUMS(CanalEntry.EventType.DELETE));                    }                }                List<Object> payloads = new ArrayList<>();                payloadSize += appendRowData2Builder(payloads, builder, header, rowData);                builder.appendPayload(payloads.toArray());                }            }        }        // 最初一批数据发送        emitBuilderMessage(builder, key);    }

留神,这里的两个办法prepareexecute都是框架裸露进去的接口。如果开发者使用不当,则会导致业务代码和框架耦合。

计划1:利用IOC

这个计划在晚期的时候做过尝试,简略的来说就是将两头那段emitMessageEntry相干的代码形象成一个对象,并用接口示意。然而通过spring这种IOC框架注入进来,相似于:

    override fun prepare(topoConf: MutableMap<String, Any>, context: TopologyContext, collector: OutputCollector) {        super.prepare(topoConf, context, collector)        try {            init()            this.dispatcherServer = IOCUtil.getBean(DispatcherServer::class.java).init(collector)        } catch (e: Exception) {            collector.reportError(e)            throw RuntimeException(e)        }    }    override fun execute(input: Tuple) {            val obj = dataTuple.getValueByField(EmitFields.MESSAGE)            val key = dataTuple.getValueByField(EmitFields.GROUP_FIELD) as String            val messageEntries = obj as List<MessageEntry>            dispatcherService.dispatcherLogical(messageEntries,key)    }

这样咱们在单元测试里能够间接将dispatcherService类注入进来,并本人实现一个OutputCollector用于收集散发的数据。而后将mock的参数填入,并断言后果是否合乎咱们的期待。

但因为storm会波及到散发相干事宜(如序列化),这会让业务代码有点变扭:

  1. 将这个dispatcherService成员在Bolt里申明为Transient
  2. 须要在初始化时初始化IOC容器
  3. 在初始化IOC容器后注入dispatcherService

能够看到,咱们为了测试,居然不得不批改业务代码来退出无关紧要的逻辑,这显然不是一个好的计划。

计划2:Mockito

Mockito实现的计划对业务没有任何入侵性,间接写测试代码即可,写进去的代码相似于:

@RunWith(PowerMockRunner::class)@PowerMockIgnore("javax.management.*")class DispatcherBoltTest {    private lateinit var config: AbstractSinkConfig    private lateinit var outputCollector: OutputCollector    private lateinit var tuple: Tuple    @Before    fun atBefore() {        config = PowerMockito.mock(AbstractSinkConfig::class.java)        outputCollector = PowerMockito.mock(OutputCollector::class.java)        tuple = PowerMockito.mock(Tuple::class.java)    }    private fun init(dispatcherBoltImpl: DispatcherBoltImpl) {        reset(config)        reset(outputCollector)        reset(tuple)        dispatcherBoltImpl.prepare(mutableMapOf(), PowerMockito.mock(TopologyContext::class.java), outputCollector)    }    @Test    fun testSingleUms() {        //定义mock对象的一些行为        `when`(config.configProps).thenReturn(Properties())        //将须要测试的类实例化        val dispatcherBoltImpl = DispatcherBoltImpl(config)        init(dispatcherBoltImpl)        val umsMap = generateSingleUmsBo()        val boMap = getBoMap(intArrayOf(1))        //定义mock对象的一些行为        `when`(tuple.getValueByField(EmitFields.MESSAGE)).thenReturn(umsMap.messages)        `when`(tuple.getValueByField(EmitFields.GROUP_FIELD)).thenReturn(umsMap.dispatchKey)        `when`(tuple.getValueByField(EmitFields.EX_BO)).thenReturn(boMap)        dispatcherBoltImpl.handleDataTuple(tuple)        // 后果验证        Mockito.verify(outputCollector, Mockito.times(1))                .emit(EmitFields.DATA_MSG, tuple, Values(umsMap.dispatchKey, umsMap.messages,                        boMap,                        EmitFields.EMIT_TO_BOLT))    }}

逻辑很清晰易懂:先抉择须要mock的对象,并定义其被mock的行为,而后把数据填装进去即可,最初依据后果校验。

但如果把视线放高点看,有两个潜在的问题须要思考:

  1. 目前该类的业务逻辑比较简单,所以咱们须要关注的链路也较少——这体现在咱们对于mock对象的mock行为编写上。换句话说,该类越简单,咱们就须要编写越多的mock代码。
  2. 目前咱们的业务和框架是紧耦合的,那么咱们测试时须要将框架的行为一起思考进去。同时也意味着框架行为变动时(如降级),测试用例须要大量变更。亦或是更换框架时,测试用例会变得简直不可用。**这曾经违反整洁架构的准则了——业务须要独立于框架,而不是严密耦合。

计划3:Clean Architecture

依据后面提到的,咱们要做的第一件事就是剥离业务和框架的耦合。那么该如何剥离呢?咱们间接拿出答案:

/** * 剥离与任何流解决框架的耦合,仅关注UMS散发的服务 * */interface DispatcherServer {    fun dispatcherMessageEntry(key: String, messageEntries: List<MessageEntry>, destination: String,                               tableToDispatchColumn: HashMap<String, Set<String>>,                               resultConsumer: (group: MutableMap<Int, UmsMessageBuilder>, key: String) -> Unit,                               executeDdlEventBlock: (messageEntry: MessageEntry) -> Unit,                               ddlMessageConsumer: (key: String, messageEntry: MessageEntry) -> Unit)}

咱们定义了三个函数型参数。利用这种形式,咱们能够轻易的将业务和框架隔离开来。于是代码调用起来就像这样:

    override fun execute(dataTuple: Tuple) {        input = dataTuple        try {            val obj = dataTuple.getValueByField(EmitFields.MESSAGE)            val key = dataTuple.getValueByField(EmitFields.GROUP_FIELD) as String            val messageEntries = obj as List<MessageEntry>            dispatcherServer.dispatcherMessageEntry(key, messageEntries, destination, tableToDispatchColumn,                    resultConsumer = { builder, innerKey -> emitBuilderMessage(builder, innerKey) },                    executeDdlEventBlock = { entry -> executeDdlEvent(entry) },                    ddlMessageConsumer = { innerKey, msgEntry -> emitDDLMessage(innerKey, msgEntry) }            )            collector.ack(dataTuple)        } catch (e: Exception) {            logger.info("Dispatcher Execute error: ", e)            collector.reportError(e)            collector.fail(dataTuple)        }    }

emitBuilderMessageexecuteDdlEventemitDDLMessage只是DispatcherBolt中的一个公有办法,外面会将传入的数据通过collector依照肯定规定下发上来。这样,咱们就将框架相干的代码放在了DispatcherBolt里。

而和框架无关的业务代码,咱们则能够将它放到DispatcherServer的实现中去。

测试的代码也能够专一在测试业务逻辑上:

    @Test    fun testUpdateRecords() {        val originNamespace = "my_schema.my_table"        val mockData = listOf(getUpdate1Data())        val config = getMockConfig(extractorConfigJsonFile)        config.outputBeforeUpdateFlg = false        config.outputExtraValueFlg = false        config.payloadType = PayloadType.SIZE        config.maxPayloadSize = 10240        val dispatcherServer = DispatcherServerImpl(config)        val resultMap = mutableMapOf<Int, UmsMessageBuilder>()        dispatcherServer.dispatcherMessageEntry(originNamespace, mockData, "M26", hashMapOf(),                resultConsumer = { builder, innerKey ->                    resultMap.putAll(builder)                    Assert.assertEquals(innerKey, originNamespace)                },                executeDdlEventBlock = { throw  RuntimeException("这堆数据中不应该呈现DDL事件") },                ddlMessageConsumer = { _, _ -> throw  RuntimeException("这堆数据中不应该呈现DDL相干的后果") })        assertEquals(1, resultMap.keys.toSet().size, "以后数据中,应该被分为3组——依据主键散发准则,他们来自于不同的主键")        assertEquals(1, resultMap.size, "以后数据中,应该被分为3组——依据主键散发准则,他们来自于不同的主键")        val umsList = resultMap.values.map { it.message }        umsList.forEach {            Assert.assertEquals("m.M26.my_schema.my_table", it.schema.namespace)            Assert.assertEquals(1, it.payloads.size)            assertEquals(9, it.schema.fields.size, "5个扩大字段+4个schema字段应该为9")            Assert.assertEquals("inc", it.protocol.type)            Assert.assertEquals("2", it.protocol.version)            assertEquals(MediaType.DataSourceType.MYSQL, KafkaKeyUtils.getDataSourceType(it))        }    }

看完了成果,咱们再来谈谈下面所用到技巧。其实这很像面向对象中的Strategy模式——定义一个算法接口,并将每一种算法都在这个接口下实现其逻辑,令同一个类型的算法可能调换应用。这样做的益处是算法的变动不影响应用方,也不受应用方的影响。而如果函数是一等公民的话,则会让建设和操纵各种策略的工作变得非常简略。

那么怎么是不简略的呢?如果用java的话,咱们得先定义一个专门的接口,申明一个办法,在应用时用匿名外部实现将它传入,但这其实没什么必要,因为咱们仅仅想传一个函数进去,而不是对象。典型的代码能够见:

ZStack源码分析之设计模式鉴赏——策略模式:https://segmentfault.com/a/11...

设计模式要做的事不外乎缩小代码冗余度,进步代码复用性。而在函数式语言中,复用次要体现为通过参数来传递作为第一等语言成分的函数,各种函数式编程库都频繁地使用了这种手法。与面向对象语言相比(以类型为单位),函数式语言的重用产生于较粗的粒度级别上(以行为为单位),着眼于提取一些共通的运作机制,并参数化地调整其行为。