关于ruby-on-rails:玩转JUC工具Java并发编程不再危机四伏

44次阅读

共计 10263 个字符,预计需要花费 26 分钟才能阅读完成。

前言
  当今互联网利用广泛须要反对高并发拜访,而 Java 作为一种宽泛应用的编程语言,其并发编程能力对于实现高性能的利用十分重要。而 Java 的 JUC(java.util.concurrent)并发工具就提供了许多实用的工具类和接口,能够让 Java 利用轻松实现高效的并发编程。
ReetrantLock
  ReentrantLock 是 Java 提供的一个可重入锁,也是 Java 并发编程中最罕用的一种锁。与 synchronized 关键字相比,ReentrantLock 具备更大的灵活性和性能,能够更好地反对并发编程的实现。
特点

可重入性:与 synchronized 一样,ReentrantLock 也反对可重入锁,即同一个线程能够反复取得该锁,而不会呈现死锁。
偏心锁:ReentrantLock 能够创立偏心锁,即依照线程申请的程序调配锁,确保等待时间最长的线程最先取得锁,防止了线程饥饿问题。
中断响应:当线程期待获取锁的过程中,能够通过调用 interrupt()办法中断期待,防止线程有限期待。
条件变量:ReentrantLock 能够创立多个 Condition 对象,用于控制线程期待和唤醒,从而实现更灵便的线程合作。
性能优越:在高度竞争的多线程环境中,ReentrantLock 相比 synchronized 有更好的性能体现,特地是在多处理器零碎中。

简略应用
模仿秒杀商品场景
public class SecKillDemo {

private static int stock = 1;

// 秒杀锁
private static final ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
    // 模仿多个用户抢购
    for (int i = 0; i < 10; i++) {new Thread(new Runnable() {
            @Override
            public void run() {
                // 加锁
                lock.lock();
                try {
                    // 判断库存是否足够
                    if (stock > 0) {
                        // 模仿生成订单
                        System.out.println(Thread.currentThread().getName() + "抢购胜利,生成订单");
                        // 缩小库存
                        stock--;
                    } else {System.out.println(Thread.currentThread().getName() + "抢购失败,库存有余");
                    }
                } finally {
                    // 解锁
                    lock.unlock();}
            }
        }).start();}
}

}
复制代码
加锁后果: 最先进来的线程抢购胜利其余线程抢购失败

不加锁后果:4 个线程抢购胜利呈现超卖景象

可重入示例
  当一个线程取得了 ReentrantLock 锁之后,如果该线程持续申请取得这个锁,那么该线程能够持续取得这个锁,这种状况就是 ReentrantLock 锁的可重入性。上面是一个体现出 ReentrantLock 锁的可重入性的例子:
public class ReentrantLockExample {

private final ReentrantLock lock = new ReentrantLock();

public void outer() {lock.lock(); // 第一次获取锁
    try {inner();
    } finally {lock.unlock(); // 开释锁
    }
}

public void inner() {lock.lock(); // 第二次获取锁
    try {System.out.println("第二次获取锁");
    } finally {lock.unlock(); // 开释锁
    }
}

public static void main(String[] args) {ReentrantLockExample example = new ReentrantLockExample();
    example.outer();}

}
复制代码
  在这个例子中,咱们定义了一个 ReentrantLock 锁,并创立了一个 outer 办法和一个 inner 办法。在 outer 办法中,咱们首先通过调用 lock 办法获取锁,并在 try 块中调用 inner 办法,而后在 finally 块中开释锁。在 inner 办法中,咱们再次获取锁,并输入一条信息。
  能够看到,在 outer 办法中,咱们第一次获取锁,而后调用 inner 办法,inner 办法中又通过 lock 办法获取了锁,但这次获取锁是胜利的,并且可能失常输入信息,阐明 ReentrantLock 锁具备可重入性。
Condition
  当应用 ReentrantLock 时,能够应用 Condition 对象来进行线程的协调和期待。
public class ConditionDemo {

private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int count = 0;

public void increment() {lock.lock();
    try {while (count == 1) {  // 如果计数器曾经达到 1,就期待
            condition.await();}
        count++;  // 计数器加 1
        System.out.println(Thread.currentThread().getName() + ":" + count);
        condition.signalAll();  // 唤醒其余期待的线程} catch (InterruptedException e) {e.printStackTrace();
    } finally {lock.unlock();
    }
}

public void decrement() {lock.lock();
    try {while (count == 0) {  // 如果计数器曾经为 0,就期待
            condition.await();}
        count--;  // 计数器减 1
        System.out.println(Thread.currentThread().getName() + ":" + count);
        condition.signalAll();  // 唤醒其余期待的线程} catch (InterruptedException e) {e.printStackTrace();
    } finally {lock.unlock();
    }
}

public static void main(String[] args) {ConditionDemo demo = new ConditionDemo();

    // 创立两个线程别离进行减少和缩小操作
    new Thread(() -> {for (int i = 0; i < 10; i++) {demo.increment();
        }
    }, "Thread-A").start();

    new Thread(() -> {for (int i = 0; i < 10; i++) {demo.decrement();
        }
    }, "Thread-B").start();}

}
复制代码
  在这个示例中,有一个计数器 count,两个线程别离进行减少和缩小操作。在 increment() 和 decrement()办法中,通过调用 ReentrantLock 的 lock()办法获取锁,并在执行操作之前应用 while 循环判断条件是否满足,如果不满足就通过调用 Condition 的 await()办法期待。
  当条件满足时,线程执行相应的操作,并应用 Condition 的 signalAll()办法唤醒其余期待的线程。在这个示例中,当计数器为 1 时,减少线程就会期待,直到计数器减为 0;当计数器为 0 时,缩小线程就会期待,直到计数器减少为 1。
利用场景

解决多线程竞争资源的问题,例如多个线程同时对同一个数据库进行写操作,能够应用 ReentrantLock 保障每次只有一个线程可能写入。

实现多线程工作的程序执行,例如在一个线程执行完某个工作后,再让另一个线程执行工作。

实现多线程期待 / 告诉机制,例如在某个线程执行完某个工作后,告诉其余线程继续执行工作

Semaphore
  Semaphore(信号量)是 JUC 并发工具包中的一种同步工具,用于治理一个或多个共享资源的拜访。Semaphore 保护一个计数器,该计数器能够对共享资源的拜访进行管制,相似于停车场的车位治理,当所有的车位已满时,新来的车辆必须期待其余车辆来到能力进入停车场。
Semaphore 实现限流
public class RateLimiter {

private Semaphore semaphore;

public RateLimiter(int permits) {semaphore = new Semaphore(permits);
}

public boolean tryAcquire() {return semaphore.tryAcquire();
}

public void release() {semaphore.release();
}

}
复制代码
  在上述代码中,咱们定义了一个名为 RateLimiter 的限流器类。它有一个构造函数,用于初始化 Semaphore 计数器,其中参数 permits 示意容许同时拜访的线程数。
  在 tryAcquire() 办法中,咱们应用了 Semaphore 的 tryAcquire()办法来尝试获取一个许可,如果获取胜利则返回 true,否则返回 false。
  在 release()办法中,咱们调用 Semaphore 的 release()办法来开释一个许可。
public static void main(String[] args) {

int permits = 5;
RateLimiter rateLimiter = new RateLimiter(permits);

// 模仿 10 个线程并发申请
for (int i = 0; i < 10; i++) {new Thread(() -> {
        try {if (rateLimiter.tryAcquire()) {System.out.println(Thread.currentThread().getName() + "取得拜访权限");
                Thread.sleep(1000);
            } else {System.out.println(Thread.currentThread().getName() + "被限流");
            }
        } catch (InterruptedException e) {e.printStackTrace();
        } finally {rateLimiter.release();
            System.out.println(Thread.currentThread().getName() + "开释拜访权限");
        }
    }, "Thread-" + (i + 1)).start();}

}
复制代码
在上述代码中,咱们创立了一个限流器 RateLimiter,容许同时有 5 个线程拜访。而后,咱们模仿 10 个申请。当有可用许可时,申请被容许拜访;当所有许可都被占用时,申请被回绝拜访。
利用场景

限流:Semaphore 能够用于限度对共享资源的并发拜访数量,以控制系统的流量。

资源池:Semaphore 能够用于实现资源池,以保护一组无限的共享资源。

CountDownLatch
  CountDownLatch 能够帮忙控制线程之间的执行程序。在某些场景下,可能须要期待多个线程执行结束后,再继续执行某些操作,这时候就能够应用 CountDownLatch 来实现线程的期待。
  CountDownLatch 外部保护了一个计数器,该计数器的初始值能够通过构造函数进行指定。在主线程中调用 CountDownLatch 的 await()办法会使以后线程期待,直到计数器的值为 0 时才会继续执行。而在其余线程中调用 CountDownLatch 的 countDown()办法则会将计数器的值减 1。当计数器的值减到 0 时,之前在主线程中调用 await()办法的线程就会继续执行。
CountDownLatch 实现多任务合并
  比如说,有一个零碎须要进行批量数据导入,数据是从多个文件中读取的,每个文件须要一个线程进行解决,等所有文件处理完毕后再进行下一步操作,这时候就能够应用 CountDownLatch 来实现期待多个线程执行结束后再继续执行。。
public class BatchImportDemo {

private CountDownLatch latch;
private String[] filenames = {"file1.txt", "file2.txt", "file3.txt"};

public BatchImportDemo() {this.latch = new CountDownLatch(filenames.length);
}

public void start() {for (String filename : filenames) {new Thread(new ImportTask(filename, latch)).start();}
    try {latch.await(); // 期待所有线程执行结束
        System.out.println("所有文件处理完毕,开始进行下一步操作。");
    } catch (InterruptedException e) {e.printStackTrace();
    }
}

private static class ImportTask implements Runnable {
    private String filename;
    private CountDownLatch latch;

    public ImportTask(String filename, CountDownLatch latch) {
        this.filename = filename;
        this.latch = latch;
    }

    @Override
    public void run() {
        // 解决文件的逻辑
        System.out.println("正在解决文件" + filename);
        try {Thread.sleep(2000); // 模仿文件解决
        } catch (InterruptedException e) {e.printStackTrace();
        }
        System.out.println("文件" + filename + "处理完毕。");
        latch.countDown(); // 计数器减 1}
}

public static void main(String[] args) {BatchImportDemo demo = new BatchImportDemo();
    demo.start();}

}
复制代码
  在这个示例中,咱们定义了一个 BatchImportDemo 类,它蕴含了一个 CountDownLatch 实例和一个字符串数组 filenames,示意须要解决的文件名。在 start() 办法中,咱们启动了多个线程来解决每个文件,并调用了 latch.await()办法来期待所有线程执行结束。而在每个线程中,咱们执行了具体的文件解决逻辑,并在处理完毕后调用了 latch.countDown()办法来将计数器减 1。
  运行上述代码,输入后果为
正在解决文件 file1.txt
正在解决文件 file2.txt
正在解决文件 file3.txt
文件 file1.txt 处理完毕。
文件 file2.txt 处理完毕。
文件 file3.txt 处理完毕。
所有文件处理完毕,开始进行下一步操作。
复制代码
  能够看到,当所有文件处理完毕后,程序输入了“所有文件处理完毕,开始进行下一步操作。”的信息。这阐明咱们胜利地应用 CountDownLatch 来期待多个线程执行结束后再进行后续的操作。
利用场景

主线程期待多个线程执行结束后再继续执行。
多个线程期待某个操作实现后再继续执行。
实现并发工作的协调,例如多个线程同时执行不同的子工作,当所有子工作都执行结束后,再执行后续的操作。

CyclicBarrier
  CyclicBarrier(回环栅栏或循环屏障),是 Java 并发库中的一个同步工具,通过它能够实现让一组线程期待至某个状态(屏障点)
之后再全副同时执行。叫做回环是因为当所有期待线程都被开释当前,CyclicBarrier 能够被重用。
CyclicBarrier 多个线程协同工作
public class CyclicBarrierDemo {

public static void main(String[] args) throws InterruptedException {
    int threadCount = 5;
    CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
        @Override
        public void run() {System.out.println("所有线程执行实现,开始执行主线程...");
        }
    });
    for (int i = 0; i < threadCount; i++) {new Thread(() -> {
            try {System.out.println(Thread.currentThread().getName() + "执行工作...");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "执行工作实现...");
                cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();
            } catch (BrokenBarrierException e) {e.printStackTrace();
            }
        }).start();}
    System.out.println("主线程执行...");
}

}
复制代码
  上述示例中,定义了 5 个线程,这 5 个线程须要期待所有线程都执行实现后,能力继续执行主线程。在定义 CyclicBarrier 时,将屏障点的数量设置为 5,当所有线程都达到屏障点时,会执行 Runnable 中的工作,输入 “ 所有线程执行实现,开始执行主线程 …”。每个线程执行工作实现后,会调用 CyclicBarrier 的 await() 办法,期待其余线程执行实现。
  执行以上代码,输入如下:
Thread-0 执行工作 …
Thread-1 执行工作 …
Thread-2 执行工作 …
Thread-3 执行工作 …
Thread-4 执行工作 …
主线程执行 …
Thread-1 执行工作实现 …
Thread-0 执行工作实现 …
Thread-3 执行工作实现 …
Thread-2 执行工作实现 …
Thread-4 执行工作实现 …
所有线程执行实现,开始执行主线程 …
复制代码
  从输入后果能够看出,所有线程都先执行各自的工作,而后期待其余线程执行实现,当所有线程都执行实现后,执行 Runnable 中的工作,输入 “ 所有线程执行实现,开始执行主线程 …”,最初主线程继续执行。
利用场景

多线程工作:CyclicBarrier 能够用于将简单的任务分配给多个线程执行,并在所有线程实现工作后触发后续作。

数据处理:CyclicBarrier 能够用于协调多个线程间的数据处理,在所有线程解决完数据后触发后续操作。

Phaser
  Phaser 用于协调多个线程的执行。它提供了一些不便的办法来治理多个阶段的执行,能够让程序员灵便地控制线程的执行程序和阶段性的执行。Phaser 能够被视为 CyclicBarrier 和 CountDownLatch 的进化版,它可能自适应地调整并发线程数,能够动静地减少或缩小参加线程的数量。所以 Phaser 特地适宜应用在反复执行或者重用的状况。
Phaser 阶段工作应用
public class PhaserDemo {

public static void main(String[] args) {
    int numPhases = 3; // 设置阶段数为 3
    int numThreads = 5; // 设置线程数为 5
    Phaser phaser = new Phaser(numThreads); // 创立 Phaser 对象并注册线程数
    for (int i = 0; i < numThreads; i++) {new Thread(new Worker(phaser)).start(); // 创立并启动线程}
    for (int i = 0; i < numPhases; i++) {System.out.println("Phase" + i + "阶段开始");
        phaser.arriveAndAwaitAdvance(); // 期待所有线程达到同步点
        System.out.println("Phase" + i + "阶段实现");
    }
}

static class Worker implements Runnable {
    private Phaser phaser;

    Worker(Phaser phaser) {this.phaser = phaser;}

    @Override
    public void run() {System.out.println(Thread.currentThread().getName() + "开始执行");
        for (int i = 0; i < 3; i++) { // 模仿每个线程须要执行 3 个工作
            phaser.arriveAndAwaitAdvance(); // 达到同步点并期待其余线程
            System.out.println(Thread.currentThread().getName() + "实现第" + i + "个工作");
        }
    }
}

}
复制代码
  上述示例中,咱们创立了一个 Phaser 对象,将线程数和阶段数别离设置为 5 和 3,而后创立 5 个线程并启动它们。每个线程须要实现 3 个工作,在实现每个工作后调用 arriveAndAwaitAdvance() 办法达到同步点并期待其余线程,等到所有线程达到同步点后才会进入下一个阶段。最终,程序输入了每个线程实现工作的信息,以及每个阶段的开始和完结工夫。
利用场景

多线程执行多阶段工作,须要协调各个线程的执行程序。
多线程进行游戏或模仿操作,须要协调各个线程的执行机会。
多个线程须要协同工作来解决一个大型问题,例如搜索算法或者数据分析等。

Exchanger
  Exchanger 是 JUC(java.util.concurrent)并发工具之一,它提供了一个同步点,使得两个线程能够替换对象。Exchanger 中替换对象的过程是一个阻塞办法,只有在两个线程都达到同步点时,才会替换对象,并且在替换实现后,两个线程会继续执行本人的代码。
  Exchanger 通常用于实现数据的同步和线程间的通信,例如在生产者和消费者模式中,能够应用 Exchanger 来实现生产者和消费者之间的数据交换。另外,Exchanger 也能够用于遗传算法、数据加密等利用场景中。
Exchanger 应用
public class ExchangerDemo {

public static void main(String[] args) {Exchanger<String> exchanger = new Exchanger<>();
    Thread thread1 = new Thread(() -> {
        try {
            String data1 = "data1";
            String data2 = exchanger.exchange(data1);
            System.out.println("Thread1 received:" + data2);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    });

    Thread thread2 = new Thread(() -> {
        try {
            String data2 = "data2";
            String data1 = exchanger.exchange(data2);
            System.out.println("Thread2 received:" + data1);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    });

    thread1.start();
    thread2.start();}

}
复制代码
  在上述示例代码中,创立了一个 Exchanger 对象 exchanger,而后启动了两个线程 thread1 和 thread2。线程 thread1 将字符串 ”data1″ 替换给线程 thread2,并且接管到线程 thread2 替换过去的字符串 ”data2″;线程 thread2 将字符串 ”data2″ 替换给线程 thread1,并且接管到线程 thread1 替换过去的字符串 ”data1″。最初,两个线程都输入了接管到的数据。
利用场景

生产者消费者模式:在生产者和消费者模式中,能够应用 Exchanger 来实现生产者和消费者之间的数据交换,从而实现数据的同步和交换。
遗传算法:在遗传算法中,能够应用 Exchanger 来实现父代和子代之间的基因替换,从而实现基因的进化和优化。
数据加密:在数据加密中,能够应用 Exchanger 来实现加密和解密数据之间的替换,从而保证数据的安全性。
线程间合作:在须要多个线程合作实现某项工作的场景中,能够应用 Exchanger 来实现线程间的数据交换和同步,从而协同实现工作。

  须要留神的是,Exchanger 只实用于两个线程之间的数据交换,如果须要多个线程之间的数据交换和同步,能够应用其余的并发工具,例如 CyclicBarrier、CountDownLatch 等。在抉择并发工具时,应依据具体的场景和需要来进行抉择。

正文完
 0