关于java:Java-并发编程生产应用场景及实战

5次阅读

共计 7839 个字符,预计需要花费 20 分钟才能阅读完成。

背景介绍

为什么须要学习 Java 并发?

从晋升性能角度来说
  • 晋升了对 CPU 的应用效率:目前生产的服务器大多数都是多核,标配的机器都是 8C/16G。操作系统会将不同的线程调配给不同的外围解决,实践上,有多少外围就有多少个线程并行执行。如果没有并发编程,CPU 的利用率将极大的节约,假如以后正在解决耗时的 I/O 操作,那么整个 CPU 就会处于阻塞闲暇状态,前面的指令必须期待后面的执行完能力继续执行。
  • 升高服务 RT:大型互联网访问量轻松每秒轻松过万,如果没有并发解决,所有的用户申请都会排队期待,那种体验成果你能设想么,这样的服务能力如何能留住客户?有了并发编程,充沛开释 CPU 算力,操作系统让每个客户轮流应用 CPU 计算,每个客户都能失去疾速的响应。
  • 容错率高:线程与线程之间的执行不会互相烦扰,某个线程执行出现异常退出,不会对其它线程造成影响。
从开发者角度来说
  • Java 根底面试必考查技能:Java 并发面试问题根本必呈现,有大型项目研发教训的同学,解决并发问题多的同学,往往会被青眼。因为越是简单的零碎,并发申请就越多,简略的业务 + 并发 = 这个业务不简略。
  • 工作中离不开并发:多线程能充分发挥 CPU 的计算力,这使得咱们不得不理解并发的原理,免得造成线程平安问题,给生产带来损失。罕用的中间件中大量使用了并发常识,如 MQ、RPC 等,如果不相熟原理,如何可能调优中间件的应用。

并发编程业务中的实际

实际一:风控规定引擎——策略执行

互联网企业风控安全部门每时每刻都须要和黑灰产反抗,爱护企业蒙受不必要的经济损失。风控策略团队在反抗的过程中,积淀出一系列危险辨认策略,用以检测以后业务申请中否存在高危操作。

风控平安团队须要评估业务在运行流程中,是否存在黑产可能获取利益点的中央,即“危险卡点”。评估后,须要业务在每次流程通过危险卡点处,需透传业务信息给风控服务,风控服务在很多工夫内进行大量决策计算,并返回业务方决策后果(ACCEPT- 通过 /REVIEW- 人工,需进一步信息确认 /REJECT- 回绝,高危操作)。如图展现的是营销流动——裂变类流动危险卡点。

营销裂变流程危险卡点图

一条业务申请耗时个别在 300 ~ 500 ms 之间,如果超过这个区间,可能就须要定位调优哪个节点耗时高了。大型互联网公司零碎架构比较复杂,残缺的业务可能有几十甚至上百个服务零碎,你触发的一次申请,可能中途会通过多少服务超过你设想。

如上述,业务对风控服务的性能要求很高,个别管制在 100 ms 以内。但风控外部排查业务申请波及大量策略和规定,如何短时间内执行完,且又不阉割策略呢?答案是并发变成。风控外部大量应用并发,以满足海量申请和计算需要,我将以策略规定的执行来举例如何编写编发变成代码。如下是风控服务一次申请的大抵执行流程:

风控流程执行图

能够看到,一次风控申请的断定,波及大量的规定断定,此时如果没有并发,会呈现什么成果?

串行 & 并行执行规定图

全副串行执行策略和规定的话,可能几秒都不定能计算出来,此时咱们须要应用 Java 并发来满足性能需求。
外围代码如下:

public class RuleSessionExecutor {

    // 线程池
    private final static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 8,
            Runtime.getRuntime().availableProcessors() * 8,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new CustomerThreadFactory("rule-executor"),
            new ThreadPoolExecutor.AbortPolicy());

    /**
     * 规定执行
     * @param rules
     */
    public void execute(List<CustomerSession> rules) {final CountDownLatch lock = new CountDownLatch(rules.size());
        for (CustomerSession session : rules) {
            try {session.exec();
            } catch (Throwable e) { } finally {lock.countDown();
            }
        }

        try {lock.await(50, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {log.error("CountDownLatch error", e);
        }
    }
}

此处用到了 CountDownLatch 并发工具类,下文会应用介绍。

实际二:风控特色平台特色加载

如上述实际一,大量的规定执行前须要大量的特色,如果在每条规定执行内获取特色,可行,然而会造成特色的反复获取问题,节约了性能。举例:如果规定人员做 A/B 测试,两个策略包有交加的特色特地多,此时如果在每个规定内获取,就等于有交加的特色反复拜访两次,这种节约是没必要的。此时咱们在规定执行前先获取以后策略包下所有的去重特色,而后获取所有的特色后,再去执行规定。

那么此时的问题是,如何批量的去获取特色呢?
特色的品种很多:

  • 输出型:不耗时 - 申请上下文携带,如订单金额
  • 衍生型:根本不耗时 - 基于输出型特色衍生,如根据经纬度计算间隔
  • 实时统计特色:根本不耗时 - 感兴趣的能够关注我文章 Flink 在风控场景实时特色落地实战,具体介绍
  • 查问类:耗时- 如根据订单号调业务 RPC 接口获取订单明细信息,通信 + 业务自身耗时
  • 外部类:耗时- 如第三方风控公司产品,同盾、IPIP 等

特色同步获取 & 异步获取比照图

显然,咱们须要并发来撑持性能,外围代码如下:

public class DataSourceExecutor {

    private final static ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 8,
            Runtime.getRuntime().availableProcessors() * 8,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new HamThreadFactory("ds-executor"),
            new ThreadPoolExecutor.AbortPolicy());

    /**
     * 特色获取
     *
     * @param dataSources
     */
    public void execute(List<DataSource> dataSources) {List<CompletableFuture> futures = Lists.newArrayList();
        long timeout = ApolloConfig.getLongProperty(ApolloConfigKey.DS_TIMEOUT_OUTER_KEY, 150L);

        for (DataSource ds : dataSources) {timeout = ds.getExecutionTimeout() > timeout ? ds.getExecutionTimeout() : timeout;

            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {ds.execute();
            }, executor);
            futures.add(future);
        }

        CompletableFuture<Void> summaryFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[]{}));
        try {summaryFuture.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {log.error("DataSource exec error", e);
        }
    }
}

和规定的批量执行大同小异,单此处用到了 Java 8 CompletableFuture 并发工具类,性能上有所加强,下文会应用介绍。

实际三:分布式工作跑批

定时工作应该是工作中很常见的需要了,如订单状态流转检测、对账等。工作个别都是跑批的,即蕴含多个子工作,该场景很适宜线程池工作队列并发执行。

工作队列线程池图

外围代码如下:

public void execute(List<Task> tasks) {
    tasks.forEach(t -> {executor.execute(() -> {
            try {log.info("task id: {} begin exec", t.getId());
                t.execute();} catch (Throwable e) {log.error(String.format("task execute error, uid: %s", t.getId()), e);
            } finally {log.info("task id: {} end exec", t.getId());
            }
        });

    });
}

并发编程常用工具类

线程池

线程池(英语:thread pool):一种线程应用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池保护着多个线程,期待着监督管理者调配可并发执行的工作。这防止了在解决短时间工作时创立与销毁线程的代价。线程池不仅可能保障内核的充分利用,还能避免过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量【1】。

J.U.C 提供的线程池:ThreadPoolExecutor 类,帮忙开发人员治理线程并不便地执行并行任务。理解并正当应用线程池,是一个开发人员必修的基本功。

任务调度
当用户提交了工作,工作的生命周期将有线程池管控。线程池外部实际上构建了一个生产者 / 消费者模式,线程与工作是解耦的,没有强关联性,这有利于工作的缓冲 & 复用。理解线程池的第一步必须晓得工作的运行机制。

工作执行图

工作队列
线程池的实质是对工作和线程的治理,而做到这一点的要害是解耦工作和线程,不让两者间接关联,能力做到后续的正当调配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存工作,工作线程从阻塞队列中获取工作。

阻塞队列(BlockingQueue)在队列的根底上新增两个个性。

  • 队列为空时,获取元素的线程会期待队列变为非空
  • 队列满时,存储元素的线程会期待队列可用

阻塞队列罕用于生产者和消费者的场景,生产者是往队列里增加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者寄存元素的容器,而消费者也只从容器里拿元素。

阻塞队列示意图

阻塞队列如下表可抉择:

ArrayBlockingQueue 有界;数组实现;FIFO;
LinkedBlockingQueue 有界(默认长度 Integer.MAX_VALUE,不小心就会内存溢出);链表实现;FIFO;
PriorityBlockingQueue 无界;均衡二叉树实现;排序
DelayQueue 同 PriorityBlockingQueue;对象只能在其到期时能力从队列中取走
SynchronousQueue 不存储元素;没一个 put 操作需期待 take 操作
LinkedTransferQueue 有界;劣势在于绝对 LinkedBlockingQueue 升高了锁的粒度,性能更高
LinkedBlockingDeque 绝对 LinkedBlockingQueue 实现双端阻塞;锁粒度升高,性能较好

工作回绝
线程池自我爱护熔断局部,当工作有界缓存队列已满,证实线程池曾经超负荷运行,解决不过去了。此时须要回绝新进工作,采纳设置的拒接策略,以爱护线程池。

用户能够抉择 JDK 提供的四种回绝策略,或者自定义实现 RejectedExecutionHandler 接口即可

ThreadPoolExecutor.AbortPolicy() 抛弃工作并抛出 RejectedExecutionException 异样;线程池默认回绝策略;要害业务应应用此异样策略,以理解线程池的健康状况
ThreadPoolExecutor.CallerRunsPolicy() 由主线程去执行当前任务
ThreadPoolExecutor.DiscardOldestPolicy() 抛弃老工作,从新提交工作;生产不倡议应用,有危险
ThreadPoolExecutor.DiscardPolicy() 抛弃工作 & 不抛出异样;生产不倡议应用,不易发现问题

CountDownLatch——同步计数器

CountDownLatch 外部应用计数器实现,初始化时计数器数量等于须要解决的期待线程数量,当每个线程执行结束后须要将计数器减一,当计数器到 0 后,代表须要期待执行的线程已全副执行结束,此时会唤醒主线程继续执行主线工作。

CountDownLatch 流程图

罕用场景:

  • 1 等 N:联合线程池开释 CPU 算力,如首页简单信息流成精,能够散布加载各模块信息,计数完结后在交由主线程结构化数据返回
  • 最坏匹配:哪一个线程执行结束,能够立刻开释 cnt 数量至 0,告诉主线程执行

外围代码:

public void demo() {
    List<Task> tasks = ...
    final CountDownLatch countDownLatch = new CountDownLatch(10);
    tasks.forEach(task -> {

        try {// 本人的子线程逻辑} catch (Throwable e) {// 有时 Exception 接不到的异样,倡议用 Throwable} finally {countDownLatch.countDown();
        }

    });

    try {countDownLatch.await(100, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {log.error("countDownLatch InterruptedException", e);
    }

}

CompletableFuture

Java 在 1.8 版本提供了 CompletableFuture 来反对异步编程,CompletableFuture 的性能着实让人感到震撼。他的复杂度应该也是我见过的最简单之一了。

咱们看个例子来直观感触下 CompletableFurure 的威力

CompletableFuture 之西红柿炒蛋

// 步骤一:筹备西红柿
CompletableFuture<String> f1 =
  CompletableFuture.supplyAsync(() -> {System.out.println("洗西红柿");

    System.out.println("切它");

    return "切好的西红柿";
  });

// 步骤二:筹备鸡蛋
CompletableFuture<String> f2 =
  CompletableFuture.supplyAsync(() -> {System.out.println("洗鸡蛋");

    System.out.println("煎鸡蛋");

    return "煎好的鸡蛋";
  });
// 步骤三:炒鸡蛋
CompletableFuture<String> f3 =
  f1.thenCombine(f2, (__, tf) -> {System.out.println("爆炒");
    return "西红柿炒鸡蛋";
  });

// 期待工作 3 执行后果
System.out.println(f3.join());

CompletableFuture 旨在解决多线程之间的简单实现逻辑,如上所示,其实都是只蕴含了业务实现的逻辑,并发编程的逻辑曾经被 Lamda 编程奇妙的躲避了。即用起码的代码干最硬的事,很完满。

此处不对 CompletableFuture 作具体的形容,如果感兴趣能够关注我,因为 CompletableFuture 实现即应用一篇文章都不肯定能说完。

常见并发问题及解决

死锁 & 定位

死锁(deadlock),当两个以上的运算单元,单方都在期待对方进行执行,以获取系统资源,然而没有一方提前退出时,就称为死锁。【1】

死锁的四个条件是:

  • 禁止抢占(no preemption):系统资源不能被强制从一个过程中退出。
  • 持有和期待(hold and wait):一个过程能够在期待时持有系统资源。
  • 互斥(mutual exclusion):资源只能同时调配给一个行程,无奈多个行程共享。
  • 循环期待(circular waiting):一系列过程相互持有其余过程所须要的资源。

    定位

    jps
    jstack pid
    
    
    // 下面的信息截取
    
    Found one Java-level deadlock:
    =============================
    "Thread-1":
    waiting to lock monitor 0x00007fcc68023f58 (object 0x0000000795ea0c00, a java.lang.Object),
    which is held by "Thread-0"
    "Thread-0":
    waiting to lock monitor 0x00007fcc68022ab8 (object 0x0000000795ea0c10, a java.lang.Object),
    which is held by "Thread-1"
    

    jps 定位运行的 java 程序,而后利用 jstack pid 打印线程信息,拉倒最上面很显著发现有提醒 deadlock,再根据线程号 0x00007fcc68023f58 寻找到对应的线程即可剖析是哪一段代码引发的问题。

性能调优

Java 并发多线程编程,咱们首选的工具肯定是线程池。线程池应用面临的外围的问题在于:线程池的参数并不好配置!

你是否会也遇到过依照教训去预估线上某个场景线程池最小沉闷线程和最大沉闷线程数不准或失误。事实上并无线程池通用的计算公式,因为一台机器上并不是只有你的一个服务,且一个服务内并不是只有一个线程池,如果依照 I/ O 密集 或者 CPU 密集 预估,还是免不了重复调试的苦。

那么咱们是否能够将批改线程池参数的老本降下来,这样至多能够产生故障的时候能够疾速调整从而缩短故障复原的工夫呢?

本篇不会多讲动静线程池的架构设计,感兴趣的能够关注我,后续会发文,此处只给出一个大略的思路:

  • 动静调参:反对线程池外围参数动静调整,最小 / 最大外围线程数
  • 工作监控:次要监控阻塞队列沉积 和 单次线程执行耗时 95 99 线
  • 告警:有潜在压力及时预警修改
  • 操作告诉 & 鉴权:生产操作很危险,需谨慎

欠缺监控

任何零碎的运行都离不开监控,只是颗粒度粗细的问题,并发场景,咱们尤其须要关注服务线程的监控情况,尤其是沉闷线程数、队列沉积长度、均匀耗时、吞吐量等重要指标。产生预警时能及时告诉相应的开发人员降级解决,亦可主动熔断,爱护主服务。

往期精彩

集体技术博客:https://jifuwei.github.io/
公众号:是咕咕鸡

  • 性能调优——小小的 log 大大的坑
  • 性能优化必备——火焰图
  • Flink 在风控场景实时特色落地实战

本文参加了思否技术征文,欢送正在浏览的你也退出。

正文完
 0