关于java:Java-并发异步编程原来十个接口的活现在只需要一个接口就搞定

51次阅读

共计 15957 个字符,预计需要花费 40 分钟才能阅读完成。

作者:锦成同学 \
起源:juejin.im/post/5d3c46d2f265da1b9163dbce

什么? 对你没有听错, 也没有看错 .. 多线程并发执行工作,取后果归集~~ 不再发愁….

引言

先来看一些 APP 的获取数据, 诸如此类, 一个页面获取 N 多个, 多达 10 个左右的一个用户行为数据, 比方: 点赞数, 公布文章数, 点赞数, 音讯数, 关注数, 珍藏数, 粉丝数, 卡券数, 红包数……….. 真的是多~ 咱们看些图:

平时要 10+ 接口的去获取数据(因为当你 10+ 个查问写一起, 那预计到半分钟能力响应了), 一个页面上 N 多接口, 真是累死前端的宝宝了, 前端开启多线程也累啊, 咱们做后端的要体量一下前端的宝宝们, 毕竟有句话叫 ” 程序员何苦尴尬程序员~”

明天咱们也能够一个接口将这些数据返回~ 还贼 TM 快, 解决串行编程, 阻塞编程带来的苦恼~

多线程并发执行工作,取后果归集

明天猪脚就是:Future、FutureTask、ExecutorService…

  • 用上 FutureTask 工作获取后果老少皆宜,就是 CPU 有耗费。FutureTask 也能够做闭锁(实现了 Future 的语义,示意一种形象的可计算的后果)。通过把 Callable(相当于一个可生成后果的 Runnable)作为一个属性,进而把它本人作为一个执行器去继承 Runnable,FutureTask 实际上就是一个反对勾销行为的异步工作执行器。
  • Callable 就是一个回调接口, 能够泛型申明返回类型, 而 Runnable 是线程去执行的办法. 这个很简略~ 大家想深刻理解就进去看源码好了~ 因为真的很简略~
  • FutureTask 实现了 Future,提供了 start, cancel, query 等性能,并且实现了 Runnable 接口,能够提交给线程执行。
  • Java 并发工具类的三板斧 状态,队列,CAS

状态

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:        // 可能产生的状态适度过程
 * NEW -> COMPLETING -> NORMAL        // 创立 --> 实现 --> 失常
 * NEW -> COMPLETING -> EXCEPTIONAL   // 创立 --> 实现 --> 异样
 * NEW -> CANCELLED                   // 创立 --> 勾销
 * NEW -> INTERRUPTING -> INTERRUPTED // 创立 --> 中断中 --> 中断完结
 */

private volatile int state;                  // 执行器状态

private static final int NEW = 0;            // 初始值        由构造函数保障
private static final int COMPLETING = 1;     // 实现进行时    正在设置工作后果
private static final int NORMAL = 2;         // 失常完结      工作失常执行结束
private static final int EXCEPTIONAL = 3;    // 产生异样      工作执行过程中产生异样
private static final int CANCELLED = 4;      // 曾经勾销      工作曾经勾销
private static final int INTERRUPTING = 5;   // 中断进行时    正在中断运行工作的线程
private static final int INTERRUPTED = 6;    // 中断完结      工作被中断

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

还不明确就看图:

public interface Future<T> {
    /**
    * 勾销工作
    *@param mayInterruptIfRunning
    * 是否容许勾销正在执行却没有执行结束的工作,如果设置 true,则示意能够勾销正在执行过程中的工作
    * 如果工作正在执行,则返回 true
    * 如果工作还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,返回 true
    * 如果工作曾经实现,则无论 mayInterruptIfRunning 为 true 还是 false,返回 false
    */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
    * 工作是否被勾销胜利,如果在工作失常实现前被勾销胜利,则返回 true
    */
    boolean isCancelled();
    /**
    * 工作是否实现
    */
    boolean isDone();
    /**
    * 通过阻塞获取执行后果
    */
    T get() throws InterruptedException, ExecutionException;
    /**
    * 通过阻塞获取执行后果。如果在指定的工夫内没有返回,则返回 null
    */
    T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future

  • cancle 能够进行工作的执行 但不肯定胜利 看返回值 true or false
  • get 阻塞获取 callable 的工作后果, 即 get 阻塞住调用线程,直至计算实现返回后果
  • isCancelled 是否勾销胜利
  • isDone 是否实现

重点阐明:

Furture.get()获取执行后果的值,取决于执行的状态,如果工作实现,会立刻返回后果,否则始终阻塞直到工作进入实现状态,而后返回后果或者抛出异样。

“运行实现”示意计算的所有可能完结的状态,蕴含失常完结,因为勾销而完结和因为异样而完结。当进入实现状态,他会进行在这个状态上,只有 state 不处于 NEW 状态,就阐明工作曾经执行结束。

FutureTask 负责将计算结果从执行工作的线程传递到调用这个线程的线程,而且确保了, 传递过程中后果的平安公布

UNSAFE 无锁编程技术, 确保了线程的安全性~ 为了放弃无锁编程 CPU 的耗费, 所以用状态标记, 缩小空转的时候 CPU 的压力

  • 工作本尊:callable
  • 工作的执行者:runner
  • 工作的后果:outcome
  • 获取工作的后果:state + outcome + waiters
  • 中断或者勾销工作:state + runner + waiters

run 办法

1、查看 state,非 NEW,阐明曾经启动,间接返回;否则,设置 runner 为以后线程,胜利则持续,否则,返回。

2、调用 Callable.call()办法执行工作,胜利则调用 set(result)办法,失败则调用 setException(ex)办法,最终都会设置 state,并调用 finishCompletion()办法,唤醒阻塞在 get()办法上的线程们。

3、如正文所示,如果省略 ran 变量,并把 ”set(result);” 语句挪动到 try 代码块 ”ran = true;” 语句处,会怎么呢?首先,从代码逻辑上看,是没有问题的,然而,思考到 ”set(result);” 办法万一抛出异样甚至是谬误了呢?set()办法最终会调用到用户自定义的 done()办法,所以,不可省略。

4、如果 state 为 INTERRUPTING, 则被动让出 CPU,自旋期待别的线程执行完中断流程。见 handlePossibleCancellationInterrupt(int s) 办法。

public void run() {
        // UNSAFE.compareAndSwapObject,CAS 保障 Callable 工作只被执行一次 无锁编程
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable; // 拿到执行工作
            if (c != null && state == NEW) { // 工作不为空,并且执行器状态是初始值,才会执行;如果勾销就不执行了
                V result;
                boolean ran; // 记录是否执行胜利
                try {result = c.call(); // 执行工作
                    ran = true; // 胜利
                } catch (Throwable ex) {
                    result = null; // 异样,清空后果
                    ran = false; // 失败
                    setException(ex); // 记录异样
                }
                if (ran) // 问题:ran 变量能够省略吗,把 set(result); 移到 try 块外面?set(result); // 设置后果
            }
        } finally {runner = null; // 直到 set 状态前,runner 始终都是非空的,为了避免并发调用 run()办法。int s = state;
            if (s >= INTERRUPTING) // 有别的线程要中断以后线程,把 CPU 让进来,自旋等一下
                handlePossibleCancellationInterrupt(s);
        }
    }
private void handlePossibleCancellationInterrupt(int s) {if (s == INTERRUPTING) // 当 state 为 INTERRUPTING 时
         while (state == INTERRUPTING) // 示意有线程正在中断以后线程
             Thread.yield(); // 让出 CPU,自旋期待中断}

再啰嗦下: run 办法重点做了以下几件事:

  • 将 runner 属性设置成以后正在执行 run 办法的线程
  • 调用 callable 成员变量的 call 办法来执行工作
  • 设置执行后果 outcome, 如果执行胜利, 则 outcome 保留的就是执行后果;如果执行过程中产生了异样, 则 outcome 中保留的就是异样,设置后果之前,先将 state 状态设为两头态
  • 对 outcome 的赋值实现后,设置 state 状态为终止态(NORMAL 或者 EXCEPTIONAL)
  • 唤醒 Treiber 栈中所有期待的线程
  • 善后清理(waiters, callable,runner 设为 null)
  • 查看是否有脱漏的中断,如果有,期待中断状态实现。

怎么能少了 get 办法呢, 始终阻塞获取参见:awaitDone

public V get() throws InterruptedException, ExecutionException {
    int s = state; // 执行器状态
     if (s <= COMPLETING) // 如果状态小于等于 COMPLETING,阐明工作正在执行,须要期待
         s = awaitDone(false, 0L); // 期待
     return report(s); // 报告后果
}

顺便偷偷看下 get(long, TimeUnit), 就是 get 的办法扩大, 减少了超时工夫, 超时后我还没拿到就怄气抛异样….

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {if (unit == null) // 参数校验
        throw new NullPointerException();
    int s = state; // 执行器状态
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) // 如果状态小于等于 COMPLETING,阐明工作正在执行,须要期待;期待指定工夫,state 仍然小于等于 COMPLETING
        throw new TimeoutException(); // 抛出超时异样
    return report(s); // 报告后果
}

那么再看 awaitDone, 要晓得会写死循环 while(true)|for (;;)的都是高手~

private int awaitDone(boolean timed, long nanos) throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L; // 计算 deadline
    WaitNode q = null; // 期待结点
    boolean queued = false; // 是否曾经入队
    for (;;) {if (Thread.interrupted()) { // 如果以后线程曾经标记中断,则间接移除此结点,并抛出中断异样
            removeWaiter(q);
            throw new InterruptedException();}

        int s = state; // 执行器状态
        if (s > COMPLETING) { // 如果状态大于 COMPLETING,阐明工作曾经实现,或者曾经勾销,间接返回
            if (q != null)
                q.thread = null; // 复位线程属性
            return s; // 返回
        } else if (s == COMPLETING) // 如果状态等于 COMPLETING,阐明正在整顿后果,自旋期待一会儿
            Thread.yield();
        else if (q == null) // 初始,构建结点
            q = new WaitNode();
        else if (!queued) // 还没入队,则 CAS 入队
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        else if (timed) { // 是否容许超时
            nanos = deadline - System.nanoTime(); // 计算等待时间
            if (nanos <= 0L) { // 超时
                removeWaiter(q); // 移除结点
                return state; // 返回后果
            }
            LockSupport.parkNanos(this, nanos); // 线程阻塞指定工夫
        } else
            LockSupport.park(this); // 阻塞线程
    }
}

至此, 线程安顿工作和获取我就不啰嗦了~~~~ 还要很多摸索的, 毕竟带薪聊天比拟缓和, 我就不多赘述了~

队列

接着咱们来看队列,在 FutureTask 中,队列的实现是一个单向链表,它示意所有期待工作执行结束的线程的汇合。咱们晓得,FutureTask 实现了 Future 接口,能够获取“Task”的执行后果,那么如果获取后果时,工作还没有执行结束怎么办呢?那么获取后果的线程就会在一个期待队列中挂起,直到工作执行结束被唤醒。这一点有点相似于 AQS 中的 sync queue,在下文的剖析中,大家能够本人对照它们的异同点。

咱们后面说过,在并发编程中应用队列通常是将以后线程包装成某种类型的数据结构扔到期待队列中,咱们先来看看队列中的每一个节点是怎么个构造:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

可见,相比于 AQS 的 sync queue 所应用的双向链表中的 Node,这个 WaitNode 要简略多了,它只蕴含了一个记录线程的 thread 属性和指向下一个节点的 next 属性。

值得一提的是,FutureTask 中的这个单向链表是当做栈来应用的,确切来说是当做 Treiber 栈来应用的,不理解 Treiber 栈是个啥的能够简略的把它当做是一个线程平安的栈,它应用 CAS 来实现入栈出栈操作(想进一步理解的话能够看这篇文章)。

为啥要应用一个线程平安的栈呢,因为同一时刻可能有多个线程都在获取工作的执行后果,如果工作还在执行过程中,则这些线程就要被包装成 WaitNode 扔到 Treiber 栈的栈顶,即实现入栈操作,这样就有可能呈现多个线程同时入栈的状况,因而须要应用 CAS 操作保障入栈的线程平安,对于出栈的状况也是同理。

因为 FutureTask 中的队列实质上是一个 Treiber(驱动)栈,那么应用这个队列就只须要一个指向栈顶节点的指针就行了,在 FutureTask 中,就是 waiters 属性:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事实上,它就是整个单向链表的头节点。

综上,FutureTask 中所应用的队列的构造如下:

CAS 操作

CAS 操作大多数是用来扭转状态的,在 FutureTask 中也不例外。咱们个别在动态代码块中初始化须要 CAS 操作的属性的偏移量:

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
    } catch (Exception e) {throw new Error(e);
    }
}

从这个动态代码块中咱们也能够看出,CAS 操作次要针对 3 个属性,包含 state、runner 和 waiters,阐明这 3 个属性根本是会被多个线程同时拜访的。其中 state 属性代表了工作的状态,waiters 属性代表了指向栈顶节点的指针,这两个咱们下面曾经剖析过了。

runner 属性代表了执行 FutureTask 中的“Task”的线程。为什么须要一个属性来记录执行工作的线程呢?这是为了中断或者勾销工作做筹备的,只有晓得了执行工作的线程是谁,咱们能力去中断它。

定义完属性的偏移量之后,接下来就是 CAS 操作自身了。在 FutureTask,CAS 操作最终调用的还是 Unsafe 类的 compareAndSwapXXX 办法,对于 Unsafe,因为带薪码文这里不再赘述。

实战演练

所有没有例子的解说都是耍流氓 >>> 葱姜切沫~~ 退出生命的源泉….

实战我的项目以 springboot 为我的项目脚手架,github 地址:

https://github.com/javastacks…

1.MyFutureTask 实现类

外部定义一个线程池进行工作的调度和线程的治理以及线程的复用, 大家能够依据本人的理论我的项目状况进行配置

其中线程调度示例: 外围线程 8 最大线程 20 保活工夫 30s 存储队列 10 有守护线程 回绝策略: 将超负荷工作回退到调用者

阐明 :

默认应用外围线程 (8) 数执行工作, 工作数量超过外围线程数就丢到队列, 队列 (10) 满了就再开启新的线程, 新的线程数最大为 20, 当工作执行完, 新开启的线程将存活 30s, 若没有工作就沦亡, 线程池回到外围线程数量.

import com.boot.lea.mybot.dto.UserBehaviorDataDTO;
import com.boot.lea.mybot.service.UserService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.*;

/**
 * @author Lijing
 * @date 2019 年 7 月 29 日
 */
@Slf4j
@Component
public class MyFutureTask {

    @Resource
    UserService userService;

    /**
     * 外围线程 8 最大线程 20 保活工夫 30s 存储队列 10 有守护线程 回绝策略: 将超负荷工作回退到调用者
     */
    private static ExecutorService executor = new ThreadPoolExecutor(8, 20,
            30L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10),
            new ThreadFactoryBuilder().setNameFormat("User_Async_FutureTask-%d").setDaemon(true).build(),
            new ThreadPoolExecutor.CallerRunsPolicy());

    @SuppressWarnings("all")
    public UserBehaviorDataDTO getUserAggregatedResult(final Long userId) {System.out.println("MyFutureTask 的线程:" + Thread.currentThread());

        long fansCount = 0, msgCount = 0, collectCount = 0,
                followCount = 0, redBagCount = 0, couponCount = 0;

//        fansCount = userService.countFansCountByUserId(userId);
//        msgCount = userService.countMsgCountByUserId(userId);
//        collectCount = userService.countCollectCountByUserId(userId);
//        followCount = userService.countFollowCountByUserId(userId);
//        redBagCount = userService.countRedBagCountByUserId(userId);
//        couponCount = userService.countCouponCountByUserId(userId);

        try {Future<Long> fansCountFT = executor.submit(() -> userService.countFansCountByUserId(userId));
            Future<Long> msgCountFT = executor.submit(() -> userService.countMsgCountByUserId(userId));
            Future<Long> collectCountFT = executor.submit(() -> userService.countCollectCountByUserId(userId));
            Future<Long> followCountFT = executor.submit(() -> userService.countFollowCountByUserId(userId));
            Future<Long> redBagCountFT = executor.submit(() -> userService.countRedBagCountByUserId(userId));
            Future<Long> couponCountFT = executor.submit(() -> userService.countCouponCountByUserId(userId));

            //get 阻塞
            fansCount = fansCountFT.get();
            msgCount = msgCountFT.get();
            collectCount = collectCountFT.get();
            followCount = followCountFT.get();
            redBagCount = redBagCountFT.get();
            couponCount = couponCountFT.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();
            log.error(">>>>>> 聚合查问用户聚合信息异样:" + e + "<<<<<<<<<");
        }
        UserBehaviorDataDTO userBehaviorData =
                UserBehaviorDataDTO.builder().fansCount(fansCount).msgCount(msgCount)
                        .collectCount(collectCount).followCount(followCount)
                        .redBagCount(redBagCount).couponCount(couponCount).build();
        return userBehaviorData;
    }

}

2.service 业务办法

惯例业务查询方法, 为了特效, 以及看出理论的成果, 咱们每个办法做了延时

import com.boot.lea.mybot.mapper.UserMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    UserMapper userMapper;

    @Override
    public long countFansCountByUserId(Long userId) {
        try {Thread.sleep(10000);
            System.out.println("获取 FansCount=== 睡眠:" + 10 + "s");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("UserService 获取 FansCount 的线程" + Thread.currentThread().getName());
        return 520;
    }

    @Override
    public long countMsgCountByUserId(Long userId) {System.out.println("UserService 获取 MsgCount 的线程" + Thread.currentThread().getName());
        try {Thread.sleep(10000);
            System.out.println("获取 MsgCount=== 睡眠:" + 10 + "s");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 618;
    }

    @Override
    public long countCollectCountByUserId(Long userId) {System.out.println("UserService 获取 CollectCount 的线程" + Thread.currentThread().getName());
        try {Thread.sleep(10000);
            System.out.println("获取 CollectCount== 睡眠:" + 10 + "s");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 6664;
    }

    @Override
    public long countFollowCountByUserId(Long userId) {System.out.println("UserService 获取 FollowCount 的线程" + Thread.currentThread().getName());
        try {Thread.sleep(10000);
            System.out.println("获取 FollowCount=== 睡眠:" + 10+ "s");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return userMapper.countFollowCountByUserId(userId);
    }

    @Override
    public long countRedBagCountByUserId(Long userId) {System.out.println("UserService 获取 RedBagCount 的线程" + Thread.currentThread().getName());
        try {TimeUnit.SECONDS.sleep(4);
            System.out.println("获取 RedBagCount=== 睡眠:" + 4 + "s");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 99;
    }

    @Override
    public long countCouponCountByUserId(Long userId) {System.out.println("UserService 获取 CouponCount 的线程" + Thread.currentThread().getName());
        try {TimeUnit.SECONDS.sleep(8);
            System.out.println("获取 CouponCount=== 睡眠:" + 8+ "s");
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 66;
    }
}

3.controller 调用

/**
 * @author LiJing
 * @ClassName: UserController
 * @Description: 用户控制器
 * @date 2019/7/29 15:16
 */
@RestController
@RequestMapping("user/")
public class UserController {

    @Autowired
    private UserService userService;

    @Autowired
    private MyFutureTask myFutureTask;

    @GetMapping("/index")
    @ResponseBody
    public String index() {return "启动用户模块胜利~~~~~~~~";}

    //http://localhost:8080/api/user/get/data?userId=4

    @GetMapping("/get/data")
    @ResponseBody
    public UserBehaviorDataDTO getUserData(Long userId) {System.out.println("UserController 的线程:" + Thread.currentThread());
        long begin = System.currentTimeMillis();
        UserBehaviorDataDTO userAggregatedResult = myFutureTask.getUserAggregatedResult(userId);
        long end = System.currentTimeMillis();
        System.out.println("=============== 总耗时:" + (end - begin) /1000.0000+ "秒");
        return userAggregatedResult;
    }

}

咱们启动我的项目: 开启调用 http://localhost:8080/api/use…

当咱们线程池配置为: 外围线程 8 最大线程 20 保活工夫 30s 存储队列 10 的时候, 咱们测试的后果如下:

后果: 咱们看到每个 server method 的执行线程都是从线程池中发动的线程名:User_Async_FutureTask-%d, 总耗时从累计的 52 秒缩短到 10 秒, 即取决于最耗时的办法查问工夫.

那咱们再将正文代码放开, 进行串行查问进行测试:

后果: 咱们应用串行的形式进行查问, 后果汇总将达到 52 秒, 那太可怕了~~

总结

应用 FutureTask 的时候, 就是将工作 runner 以 caller 的形式进行回调, 阻塞获取, 最初咱们将后果汇总, 即实现了开启多线程异步调用咱们的业务办法.

Future<Long> fansCountFT = executor.submit(new Callable<Long>() {
    @Override
    public Long call() throws Exception {return userService.countFansCountByUserId(userId);
    }
});

这里应用的只是一个简略的例子, 具体我的项目能够定义具体的业务办法进行归并解决, 其实在 JDK1.8 当前, 又有了 ExecutorCompletionService,ForkJoinTask,CompletableFuture 这些都能够实现上述的办法, 咱们后续会做一些这些办法应用的案例, 冀望大家的关注, 文章中有不足之处, 欢送斧正~

小甜点

所以: 咱们要用到敬爱的 Spring 的异步编程, 异步编程有很多种形式: 比方常见的 Future 的 sync,CompletableFuture.supplyAsync,@Async, 哈哈 其实都离不开 Thread.start()…, 等等我说个笑话:

老爸有俩孩子:小红和小明。老爸想喝酒了,他让小红去买酒,小红进来了。而后老爸忽然想吸烟了,于是老爸让小明去买烟。在面对对象的思维中,个别会把买货色,而后买回来这件事作为一个办法,如果依照程序构造或者应用多线程同步的话,小明想去买烟就必须等小红这个买货色的操作进行完。这样无疑减少了工夫的开销(万一老爸尿憋呢?)。异步就是为了解决这样的问题。你能够别离给小红小明下达指令,让他们去买货色,而后你就能够本人做本人的事,等他们买回来的时候接管后果就能够了。

package com.boot.lea.mybot.futrue;

/**
 * @ClassName: TestFuture
 * @Description: 演示异步编程
 * @author LiJing
 * @date 2019/8/5 15:16
 */
@SuppressWarnings("all")
public class TestFuture {static ExecutorService executor = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws InterruptedException {
        // 两个线程的线程池
        // 小红买酒工作,这里的 future2 代表的是小红将来产生的操作,返回小红买货色这个操作的后果
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("爸:小红你去买瓶酒!");
            try {System.out.println("小红进来买酒了,女孩子跑的比较慢,预计 5s 后才会回来...");
                Thread.sleep(5000);
                return "我买回来了!";
            } catch (InterruptedException e) {System.err.println("小红路上遭逢了意外");
                return "来世再见!";
            }
        }, executor);

        // 小明买烟工作,这里的 future1 代表的是小明将来买货色会产生的事,返回值是小明买货色的后果
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {System.out.println("爸:小明你去买包烟!");
            try {System.out.println("小明出去买烟了,可能要 3s 后回来...");
                Thread.sleep(3000);

                throw new InterruptedException();
//                return "我买回来了!";
            } catch (InterruptedException e) {System.out.println("小明路上遭逢了意外!");
                return "这是我托人带来的口信,我曾经不在了。";
            }
        }, executor);

        // 获取小红买酒后果,从小红的操作中获取后果,把后果打印
        future2.thenAccept((e) -> {System.out.println("小红说:" + e);
        });
        // 获取小明买烟的后果
        future1.thenAccept((e) -> {System.out.println("小明说:" + e);
        });

        System.out.println("爸:等啊等 西湖美景三月天嘞......");
        System.out.println("爸: 我感觉无聊甚至去了趟厕所。");
        Thread.currentThread().join(9 * 1000);
        System.out.println("爸:终于给老子买来了......huo 酒");
        // 敞开线程池
        executor.shutdown();}
}

运行后果:

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿(2021 最新版)

2. 别在再满屏的 if/ else 了,试试策略模式,真香!!

3. 卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.6 正式公布,一大波新个性。。

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0