关于arch:聊聊软件开发的REPCCPCRP原则

序本文次要钻研一下软件开发的REP、CCP、CRP准则 The Reuse/Release Equivalence Principle (REP)复用/公布等同准则The granular of reuse is the granular of release.软件复用的最小粒度应等同于其公布的最小粒度,须要有本人的公布版本号。The Common Closure Principle (CCP)独特闭包准则,即组件层面的繁多职责准则The classes in a component should be closed together against the same kinds of changes. A change that affects a component affects all the classes in that component and no other components.咱们应该将那些会同时批改,并且为雷同目标而批改的类放到同一个组件中,而将不会同时批改,并且不会为了雷同目标而批改的那些类放到不同的组件中。 对大部分应用程序来说,可维护性的重要性要远远高于可复用性。如果某程序中的代码必须要进行某些变更,那么这些变更最好都体现在同一个组件中,而不是散布于很多个组件中。因为如果这些变更都集中在同一个组件中,咱们就只须要重新部署该组件,其余组件则不须要被从新验证、重新部署了 The Common Reuse Principle (CRP)独特复用准则The classes in a component are reused together. If you reuse one of the classes in a component, you reuse them all.不要强制一个组件的用户依赖他们不须要的货色。倡议咱们将常常独特复用的类和模块放在同一个组件中。小结REP和CCP准则是黏合性准则,它们会让组件变得更大,而CRP准则是排除性准则,它会尽量让组件变小。 ...

April 6, 2022 · 1 min · jiezi

关于arch:聊聊软件开发的SLAP原则

序本文次要钻研一下软件开发的SLAP(Single Level of Abstraction Principle)准则 SLAPSALP即Single Level of Abstraction Principle的缩写,即繁多抽象层次准则。在Robert C. Martin的<<Clean Code>>一书中的函数章节有提到: 要确保函数只做一件事,函数中的语句都要在同一形象层级上。函数中混淆不同形象层级,往往让人蛊惑。读者可能无奈判断某个表达式是根底概念还是细节。更顽劣的是,就像破损的窗户,一旦细节与根底概念混淆,更多的细节就会在函数中纠结起来。这与 Don't Make Me Think 有殊途同归之妙,遵循SLAP的代码通常浏览起来不会太吃力。 另外没有循序这个准则的通常是Leaky Abstraction 要遵循这个准则通常有两个好用的伎俩便是抽取办法与抽取类。 实例1public List<ResultDto> buildResult(Set<ResultEntity> resultSet) { List<ResultDto> result = new ArrayList<>(); for (ResultEntity entity : resultSet) { ResultDto dto = new ResultDto(); dto.setShoeSize(entity.getShoeSize()); dto.setNumberOfEarthWorms(entity.getNumberOfEarthWorms()); dto.setAge(computeAge(entity.getBirthday())); result.add(dto); } return result;}这段代码蕴含两个抽象层次,一个是循环将resultSet转为List<ResultDto>,一个是转换ResultEntity到ResultDto能够进一步抽取转换ResultDto的逻辑到新的办法中 public List<ResultDto> buildResult(Set<ResultEntity> resultSet) { List<ResultDto> result = new ArrayList<>(); for (ResultEntity entity : resultSet) { result.add(toDto(entity)); } return result;} private ResultDto toDto(ResultEntity entity) { ResultDto dto = new ResultDto(); dto.setShoeSize(entity.getShoeSize()); dto.setNumberOfEarthWorms(entity.getNumberOfEarthWorms()); dto.setAge(computeAge(entity.getBirthday())); return dto;}这样重构之后,buildResult就很清晰实例2public MarkdownPost(Resource resource) { try { this.parsedResource = parse(resource); this.metadata = extractMetadata(parsedResource); this.url = "/" + resource.getFilename().replace(EXTENSION, ""); } catch (IOException e) { throw new RuntimeException(e); } }这里的url的拼装逻辑与其余几个办法不在一个档次,重构如下public MarkdownPost(Resource resource) { try { this.parsedResource = parse(resource); this.metadata = extractMetadata(parsedResource); this.url = urlFor(resource); } catch (IOException e) { throw new RuntimeException(e); }}private String urlFor(Resource resource) { return "/" + resource.getFilename().replace(EXTENSION, "");}实例3public class UglyMoneyTransferService { public void transferFunds(Account source, Account target, BigDecimal amount, boolean allowDuplicateTxn) throws IllegalArgumentException, RuntimeException { Connection conn = null; try { conn = DBUtils.getConnection(); PreparedStatement pstmt = conn.prepareStatement("Select * from accounts where acno = ?"); pstmt.setString(1, source.getAcno()); ResultSet rs = pstmt.executeQuery(); Account sourceAccount = null; if(rs.next()) { sourceAccount = new Account(); //populate account properties from ResultSet } if(sourceAccount == null){ throw new IllegalArgumentException("Invalid Source ACNO"); } Account targetAccount = null; pstmt.setString(1, target.getAcno()); rs = pstmt.executeQuery(); if(rs.next()) { targetAccount = new Account(); //populate account properties from ResultSet } if(targetAccount == null){ throw new IllegalArgumentException("Invalid Target ACNO"); } if(!sourceAccount.isOverdraftAllowed()) { if((sourceAccount.getBalance() - amount) < 0) { throw new RuntimeException("Insufficient Balance"); } } else { if(((sourceAccount.getBalance()+sourceAccount.getOverdraftLimit()) - amount) < 0) { throw new RuntimeException("Insufficient Balance, Exceeding Overdraft Limit"); } } AccountTransaction lastTxn = .. ; //JDBC code to obtain last transaction of sourceAccount if(lastTxn != null) { if(lastTxn.getTargetAcno().equals(targetAccount.getAcno()) && lastTxn.getAmount() == amount && !allowDuplicateTxn) { throw new RuntimeException("Duplicate transaction exception");//ask for confirmation and proceed } } sourceAccount.debit(amount); targetAccount.credit(amount); TransactionService.saveTransaction(source, target, amount); } catch(Exception e){ logger.error("",e); } finally { try { conn.close(); } catch(Exception e){ //Not everything is in your control..sometimes we have to believe in GOD/JamesGosling and proceed } }} }这段代码把dao的逻辑泄露到了service中,另外校验的逻辑也与外围业务逻辑耦合在一起,看起来有点吃力,按SLAP准则重构如下class FundTransferTxn{ private Account sourceAccount; private Account targetAccount; private BigDecimal amount; private boolean allowDuplicateTxn; //setters & getters}public class CleanMoneyTransferService { public void transferFunds(FundTransferTxn txn) { Account sourceAccount = validateAndGetAccount(txn.getSourceAccount().getAcno()); Account targetAccount = validateAndGetAccount(txn.getTargetAccount().getAcno()); checkForOverdraft(sourceAccount, txn.getAmount()); checkForDuplicateTransaction(txn); makeTransfer(sourceAccount, targetAccount, txn.getAmount()); } private Account validateAndGetAccount(String acno){ Account account = AccountDAO.getAccount(acno); if(account == null){ throw new InvalidAccountException("Invalid ACNO :"+acno); } return account; } private void checkForOverdraft(Account account, BigDecimal amount){ if(!account.isOverdraftAllowed()){ if((account.getBalance() - amount) < 0) { throw new InsufficientBalanceException("Insufficient Balance"); } } else{ if(((account.getBalance()+account.getOverdraftLimit()) - amount) < 0){ throw new ExceedingOverdraftLimitException("Insufficient Balance, Exceeding Overdraft Limit"); } } } private void checkForDuplicateTransaction(FundTransferTxn txn){ AccountTransaction lastTxn = TransactionDAO.getLastTransaction(txn.getSourceAccount().getAcno()); if(lastTxn != null) { if(lastTxn.getTargetAcno().equals(txn.getTargetAccount().getAcno()) && lastTxn.getAmount() == txn.getAmount() && !txn.isAllowDuplicateTxn()) { throw new DuplicateTransactionException("Duplicate transaction exception"); } } } private void makeTransfer(Account source, Account target, BigDecimal amount){ sourceAccount.debit(amount); targetAccount.credit(amount); TransactionService.saveTransaction(source, target, amount); } }重构之后transferFunds的逻辑就很清晰,先是校验账户,再校验是否超额,再校验是否反复转账,最初执行外围的makeTransfer逻辑小结SLAP与 Don't Make Me Think 有殊途同归之妙,遵循SLAP的代码通常浏览起来不会太吃力。另外没有循序这个准则的通常是Leaky Abstraction。 ...

April 5, 2022 · 3 min · jiezi

questioning-the-lambda-architecture

原文:https://www.oreilly.com/ideas... Storm的作者Nathan Marz提出了 lambda 架构,该架构是在 MapReduce 上和 Storm 上构建流式处理的应用。lambda 架构是捕获不可变的数据序列并将其并行的发送给批处理系统和流式处理系统。但是你需要分别在批处理系统和流式处理系统中实现一次数据处理逻辑。而在查询的时候需要将两个系统计算的结果合并在一起,以完成查询返回给请求端。 对于两种处理系统,你可以灵活替换实现系统,比如使用 kafka+storm 实现流出处理,使用 hadoop 实现批式处理,输出结果通常在分开的两个数据库中,一个是为了流式而优化,另一个是为了批式更新而优化。 但是对于Jay Kreps【原文作者】因为一直在从事实时数据管道的建设,虽然其中有一些风格就是 lambda 架构,但是他更喜欢一个新的替换的方案。 Lambda 架构图 为什么会提出lambda 架构呢因为那些试图构建流式处理系统的人并没有过多的考虑数据重计算的问题,最终造成系统没有便利的方法来处理数据重计算。 lambda 架构强调保持输入的原始数据不可变并且显示的将数据重新计算的问题给展现了出来。通过 lambda 架构可以比较好的解决了流式数据和历史数据的计算问题。 为什么数据会可能重新计算呢?因为随着时间的推移,代码可能会改变。改变的原因可能是你想在输出结果中新加一个字段,或者是因为代码有 bug 需要修复。不管是什么原因,要使得历史数据得到新的预期结果,就需要将数据重新计算。 Jay Kreps对 lambda 架构的反驳因为 lambda 架构提出其中一个观点认为流式系统是近似的,不准确的,准确度不如批式处理。Jay Kreps对此观点不敢苟同,他认为现存的流式处理的框架不如 MapReduce成熟,但不代表流式系统不能如批式系统那样提供强大的语义保证。而且 lambda 架构提出的标题是"beats the CAP theorem",即干掉 CAP 理论,但是实际上尽管在流处理上对延时和可用性间存在权衡,但因为这是一种异步处理架构。所以异步计算的结果也不能立即保持与输入的数据一致,因此 CAP 理论仍然没有被打破。 lambda 架构存在的问题是什么?lambda 架构需要维护在两个复杂的分布式系统中输出相同结果的代码,就像看起来那么痛苦,而且Jay Kreps也不认为该问题是可以解决的。 因为 storm 和 Hadoop 分布式框架非常的复杂,因此不可避免的代码会针对其运行的框架进行设计。 为什么 Lambda 会这样令人兴奋呢?Jay Kreps建议如果对延时性不敏感就仅使用如 MapReduce 这样的批处理系统。如果延迟敏感则使用流式处理框架,除非特别必须才同时使用这两种系统。 但是需求总是千奇百怪的,人们需要构建复杂的,低延时的处理系统,(而且在天朝 PM 都想要大而全的功能下,这样需求更盛)。 他们拥有的两件事情并不能解决他们的问题:一个可以处理历史数据的可扩展高延迟批处理系统和一个无法重新处理结果的低延迟流处理系统。但通过将两个东西连接在一起,实际上构成了一个可行的方案,也就是 lambda 架构。但尽管lambda 架构让人很痛苦,但确实也解决了重新计算这样通常让人忽略的问题。但是Jay Kreps只认为 lambda 架构只是临时解决方案,它不是新的编程范例也不是大数据的未来方向。 ...

June 25, 2019 · 1 min · jiezi

聊聊Guava的RateLimiter

序本文主要研究一下Guava的RateLimiterRateLimiterguava-26.0-jre-sources.jar!/com/google/common/util/concurrent/RateLimiter.java@Beta@GwtIncompatiblepublic abstract class RateLimiter { //…… /** * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request * can be granted. Tells the amount of time slept, if any. * * @param permits the number of permits to acquire * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited * @throws IllegalArgumentException if the requested number of permits is negative or zero * @since 16.0 (present in 13.0 with {@code void} return type}) / @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } /* * Reserves the given number of permits from this {@code RateLimiter} for future use, returning * the number of microseconds until the reservation can be consumed. * * @return time in microseconds to wait until the resource can be acquired, never negative / final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } private static void checkPermits(int permits) { checkArgument(permits > 0, “Requested permits (%s) must be positive”, permits); } /* * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative / final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } /* * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative / final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } /* * Returns the earliest time that permits are available (with one caveat). * * @return the time that permits are available, or, if permits are available immediately, an * arbitrary past or present time / abstract long queryEarliestAvailable(long nowMicros); /* * Reserves the requested number of permits and returns the time that those permits can be used * (with one caveat). * * @return the time that the permits may be used, or, if the permits may be used immediately, an * arbitrary past or present time / abstract long reserveEarliestAvailable(int permits, long nowMicros); //……}这里主要看acquire以及tryAcquire方法acquire主要依赖reserve方法,先调用reserveAndGetWaitLength,最后是调用reserveEarliestAvailable方法tryAcquire也会调用reserveAndGetWaitLength,最后也是调用reserveEarliestAvailable方法reserveEarliestAvailable是抽象方法,由子类去实现SmoothRateLimiterguava-26.0-jre-sources.jar!/com/google/common/util/concurrent/SmoothRateLimiter.java@GwtIncompatibleabstract class SmoothRateLimiter extends RateLimiter { //…… @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; return returnValue; } /* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. / void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } /* * Translates a specified portion of our currently stored permits which we want to spend/acquire, * into a throttling time. Conceptually, this evaluates the integral of the underlying function we * use, for the range of [(storedPermits - permitsToTake), storedPermits]. * * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits} / abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake); /* * Returns the number of microseconds during cool down that we have to wait to get a new permit. / abstract double coolDownIntervalMicros(); //……}SmoothRateLimiter是RateLimiter的抽象子类,是平滑限流实现类的抽象父类这里首先调用resync方法(用于处理根据速率添加token的逻辑),然后再去计算permits扣减以及等待时间的计算这里调用了两个抽象方法,分别是coolDownIntervalMicros以及storedPermitsToWaitTimeSmoothRateLimiter的两个子类SmoothRateLimiter有两个内部静态子类,分别是SmoothBursty以及SmoothWarmingUpSmoothBursty /* * This implements a “bursty” RateLimiter, where storedPermits are translated to zero throttling. * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10 * seconds, we can save up to 2 * 10 = 20 permits. / static final class SmoothBursty extends SmoothRateLimiter { /* The work (permits) of how many seconds can be saved up if this RateLimiter is unused? / final double maxBurstSeconds; SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don’t special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; } @Override double coolDownIntervalMicros() { return stableIntervalMicros; } }SmoothBursty是一个zero throttling的"bursty" RateLimitercoolDownIntervalMicros返回的是stableIntervalMicros,而storedPermitsToWaitTime返回的为0SmoothWarmingUp static final class SmoothWarmingUp extends SmoothRateLimiter { private final long warmupPeriodMicros; /* * The slope of the line from the stable interval (when permits == 0), to the cold interval * (when permits == maxPermits) */ private double slope; private double thresholdPermits; private double coldFactor; SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; double coldIntervalMicros = stableIntervalMicros * coldFactor; thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don’t special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // TODO(cpovirk): Figure out a good name for this variable. double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) micros += (long) (stableIntervalMicros * permitsToTake); return micros; } private double permitsToTime(double permits) { return stableIntervalMicros + permits * slope; } @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } }coolDownIntervalMicros返回的是warmupPeriodMicros / maxPermits,而storedPermitsToWaitTime的计算相对复杂一些SmoothBursty是基于token bucket算法,允许一定量的bursty流量,但是有些场景需要bursty流量更平滑些,这就需要使用SmoothWarmingUpSmoothWarmingUp有一个warmup period,为thresholdPermits到maxPermits的这段范围 * <pre> * ^ throttling * | * cold + / * interval | /. * | / . * | / . ← “warmup period” is the area of the trapezoid between * | / . thresholdPermits and maxPermits * | / . * | / . * | / . * stable +———-/ WARM . * interval | . UP . * | . PERIOD. * | . . * 0 +———-+——-+————–→ storedPermits * 0 thresholdPermits maxPermits * </pre>主要涉及如下几个公式coldInterval = coldFactor * stableInterval.thresholdPermits = 0.5 * warmupPeriod / stableIntervalmaxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)coldFactor默认是3stableInterval代码以毫秒计算,即stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond小结Guava的RateLimiter(SmoothRateLimiter)基于token bucket算法实现,具体有两个实现类,分别是SmoothBursty以及SmoothWarmingUpSmoothBursty初始化的storedPermits为0,可以支持burst到maxPermitsSmoothWarmingUp初始化的storedPermits为maxPermits(thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros)),也支持burst,但是总体相对平滑docRateLimiter.javaRateLimiter.htmlRate-Limiter ...

September 1, 2018 · 6 min · jiezi

聊聊leaky bucket算法的实现

序本文主要研究一下leaky bucket算法的实现leaky bucket算法bucket以一定速率滴水,相当于增加桶容量bucket有其容量限制,请求过来时bucket满,则直接被抛弃请求到来时,如果bucket不满,则放入bucket,相当于放行简单实现public class LeakyBucket { private final long capacity; private final long leaksIntervalInMillis; private double used; private long lastLeakTimestamp; public LeakyBucket(long capacity, long leaksIntervalInMillis) { this.capacity = capacity; this.leaksIntervalInMillis = leaksIntervalInMillis; this.used = 0; this.lastLeakTimestamp = System.currentTimeMillis(); } synchronized public boolean tryConsume(int drop) { leak(); if (used + drop > capacity) { return false; } used = used + drop; return true; } private void leak() { long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis > lastLeakTimestamp) { long millisSinceLastLeak = currentTimeMillis - lastLeakTimestamp; long leaks = millisSinceLastLeak / leaksIntervalInMillis; if(leaks > 0){ if(used <= leaks){ used = 0; }else{ used -= (int)leaks; } this.lastLeakTimestamp = currentTimeMillis; } } }}这个实现设计了lastLeakTimestamp字段,用于计算时间差,以及在这个时间段内需要漏水的数量每次tryConsume的时候,方法内部首先调用leak,根据设定的速度以及时间差计算这个时间段需要漏水的数量,更新桶的当前使用量以及lastLeakTimestamp之后限流判断,就是判断used与请求的drop是否会超过桶容量,超出则限流,否则放入桶中,更新桶容量小结leaky bucket与token bucket算法相反,前者是漏水,后者是添加tokenleaky bucket由于是漏水算法,所以不能像token bucket添加token那种可以累积,因此leaky bucket不能支持burst突发流量docLeaky Bucket AlgorithmLeaky bucket algorithm for flow controlComputer Network | Leaky bucket algorithm ...

September 1, 2018 · 1 min · jiezi