乐趣区

关于并发编程:并行获取机票信息高并发场景微服务实战七

你好,我是程序员 Alan。

在《需要剖析—高并发场景微服务实战(二)》一文的最初,我提了一个问题“你会用什么形式获取和聚合机票信息?”,明天我会具体地解说解决这类问题的几种罕用办法。

问题回顾

在开始解说问题的解决办法之前,咱们再来看一下问题的具体形容。搭建一个订票零碎常常会有这样的需要,那就是同时获取多家航空公司的航班信息。比方,从深圳到三亚的机票钱是多少?有很多家航空公司都有这样的航班信息,所以应该把所有航空公司的航班、票价等信息都获取到,而后再聚合。因为每个航空公司都有本人的服务器,所以须要别离去申请它们的服务器,如下图所示:

解决办法

1. 串行

咱们想获取所有航空公司某个航班信息,要先去拜访东航,而后再去拜访南航,以此类推。每一个申请收回去之后,等它响应回来当前,咱们能力去申请下一个航空公司,这就是串行的形式。

这样做的效率十分低下,如果航空公司比拟多,假如每个航空公司都须要 1 秒钟的话,那么用户必定等不及,所以这种形式是不可取的。

2. 并行

既然串行的办法很慢,那么咱们能够并行地去获取这些机票信息,而后再把机票信息给聚合起来,这样的话,效率会成倍的进步。

这种并行尽管进步了效率,但也有一个毛病,那就是会“始终等到所有申请都返回”。如果有一个航空公司的响应特地慢,那么咱们的整个服务就会被连累。所以咱们须要再改良一下,减少超时获取的性能。

3. 有超时的并行获取

上图的这种状况,就属于有超时的并行获取,同样也在并行的去申请各个公司的机票信息。然而咱们规定了一个超时工夫,如果没能在指定工夫内响应信息,咱们就把这些申请给疏忽掉,这样用户体验就比拟好了,它最多只须要等固定的工夫就能取得机票信息,尽管拿到的信息可能是不全的,然而总比始终等更好。

实现这个指标有多种实现计划,咱们一个个的来看看。

3.1 线程池的实现

第一个实现计划是用线程池,咱们来看一下代码。

/**
 * @author alan
 * @create 2022 - 10 - 05  15:17
 */
public class ThreadPoolDemo {ExecutorService threadPool = Executors.newFixedThreadPool(3);
    public static void main(String[] args) throws InterruptedException {ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();
        System.out.println(threadPoolDemo.getPrices());
    }
    private Set<Integer> getPrices() throws InterruptedException {Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        threadPool.submit(new Task(1, prices));
        threadPool.submit(new Task(2, prices));
        threadPool.submit(new Task(3, prices));
        Thread.sleep(3000);
        return prices;
    }
    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;
        public Task(Integer productId, Set<Integer> prices) {
            this.productId = productId;
            this.prices = prices;
        }
        @Override
        public void run() {
            int price=0;
            try {Thread.sleep((long) (Math.random() * 6000));
                price= productId;
            }catch (Exception e){e.printStackTrace();
            }
            prices.add(price);

        }
    }
}

在代码中,新建了一个线程平安的 Set,命名为 Prices 用它来存储价格信息,而后往线程池中去放工作。线程池是在类的最开始时创立的,是一个固定 3 线程的线程池。

在 Task 的 run 办法中,用一个随机的工夫取模仿各个航空公司的响应工夫,而后再返回咱们传入的值作为票价,最初把这个票价放到 Set 中。

getPrices 函数中,咱们新建了三个工作,productId 别离是 1、2、3,为了实现期待固定工夫的性能,在这里调用了 Thread 的 sleep 办法来休眠 3 秒钟,它就会在这里期待 3 秒,之后间接返回 prices。

此时,如果 Math.random() * 6000) 的值很小,工作的响应速度快的话,返回的 prices 外面最多会有三个值,然而如果每一个响应工夫都很慢,那么可能 prices 外面一个值都没有。

这就是用线程池去实现的最根底的计划。

3.2 CountDownLatch

下面的办法有一个优化的空间,比如说网络特地好时,每个航空公司响应速度都特地快,你基本不须要等三秒,有的航空公司可能几百毫秒就返回了,那么咱们也不应该让用户等 3 秒。所以须要进行一下这样的改良,看上面这段代码:

/**
 * @author alan
 * @create 2022 - 10 - 05  15:32
 */
public class CountDownLatchDemo {ExecutorService threadPool = Executors.newFixedThreadPool(3);
    public static void main(String[] args) throws InterruptedException {CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
        System.out.println(countDownLatchDemo.getPrices());
    }
    private Set<Integer> getPrices() throws InterruptedException {Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        CountDownLatch countDownLatch = new CountDownLatch(3);
        threadPool.submit(new Task(1, prices, countDownLatch));
        threadPool.submit(new Task(2, prices, countDownLatch));
        threadPool.submit(new Task(3, prices, countDownLatch));
        countDownLatch.await(3, TimeUnit.SECONDS);
        return prices;
    }
    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;
        CountDownLatch countDownLatch;
        public Task(Integer productId, Set<Integer> prices,CountDownLatch countDownLatch) {
            this.productId = productId;
            this.prices = prices;
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            int price = 0;
            try {Thread.sleep((long) (Math.random() * 6000));
                price = productId;
            } catch (InterruptedException e) {e.printStackTrace();
            }
            prices.add(price);
            countDownLatch.countDown();}
    }
}

这段代码应用 CountDownLatch 实现了这个性能,整体思路和之前是统一的,不同点在于咱们新增了一个 CountDownLatch,并且把它传入到了 Task 中。在 Task 中,获取完机票信息并且把它增加到 Set 之后,会调用 countDown 办法,相当于把计数减 1。

这样一来,在执行 countDownLatch.await(3, TimeUnit.SECONDS) 这个函数进行期待时,如果三个工作都十分疾速地执行结束了,那么三个线程都曾经执行了 countDown 办法,那么这个 await 办法就会立即返回,不须要傻等到 3 秒钟。

如果有一个申请特地慢,相当于有一个线程没有执行 countDown 办法,来不及在 3 秒钟之内执行结束,那么这个带超时参数的 await 办法也会在 3 秒钟到了当前,及时地放弃这一次期待,于是就把 prices 给返回了。所以这样一来,咱们就利用 CountDownLatch 实现了这个需要,也就是说咱们最多等 3 秒钟,但如果在 3 秒之内全都返回了,咱们也能够疾速地去返回,不会傻等,进步了效率。

3.3 CompletableFuture

咱们再来看一下用 CompletableFuture 来实现这个性能的用法,代码如下所示:

**
 * @author alan
 * @create 2022 - 10 - 05  15:59
 */
public class CompletableFutureDemo {public static void main(String[] args) throws Exception {CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo();
        System.out.println(completableFutureDemo.getPrices());
    }
    private Set<Integer> getPrices() {Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(1, prices));
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(2, prices));
        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(3, prices));
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
        try {allTasks.get(3, TimeUnit.SECONDS);
        } catch (Exception e) {e.printStackTrace();
        }
        return prices;
    }
    private class Task implements Runnable {
        Integer productId;
        Set<Integer> prices;
        public Task(Integer productId, Set<Integer> prices) {
            this.productId = productId;
            this.prices = prices;
        }
        @Override
        public void run() {
            int price = 0;
            try {Thread.sleep((long) (Math.random() * 6000));
                price = productId;
            } catch (InterruptedException e) {e.printStackTrace();
            }
            prices.add(price);
        }
    }
}

getPrices 办法中,咱们用了 CompletableFuture 的 runAsync 办法,这个办法会异步的去执行工作。

咱们有三个工作,并且在执行这个代码之后会别离返回一个 CompletableFuture 对象,咱们把它们命名为 task 1、task 2、task 3,而后执行 CompletableFuture 的 allOf 办法,并且把 task 1、task 2、task 3 传入。这个办法的作用是把多个 task 汇总,而后能够依据须要去获取到传入参数的这些 task 的返回后果,或者期待它们都执行结束等。咱们就把这个返回值叫作 allTasks,并且在上面调用它的带超时工夫的 get 办法,同时传入 3 秒钟的超时参数。

它的成果是,如果在 3 秒钟之内这 3 个工作都能够顺利返回,那么会立刻响应后果

然而如果有某一个工作没能来得及在 3 秒钟之内返回,那么这个带超时参数的 get 办法便会抛出 TimeoutException 异样,会被咱们给 catch 住。

这样一来它就实现了这样的成果:会尝试期待所有的工作实现,然而最多只会等 3 秒钟,在此之间,如及时实现则及时返回,如果超时则抛出异样抛弃。

站在伟人的肩膀上

  • 徐隆曦——《Java 并发编程外围 78 讲》
退出移动版