一、AtomicInteger
/**
* @author Java 和算法学习:周一
*/
public class AtomicInteger {
// 不必加 volatile
public AtomicInteger count = new AtomicInteger(0);
// 不必加 synchronized
public void m() {for (int i = 0; i < 1000; i++) {
//count++
count.incrementAndGet();}
}
public static void main(String[] args) {T01_AtomicInteger t = new T01_AtomicInteger();
List<Thread> threadList = new ArrayList<>();
for (int i = 0; i < 10; i++) {threadList.add(new Thread(t::m, "t" + i));
}
threadList.forEach((o)->{o.start();
});
threadList.forEach((o)->{
try {o.join();
} catch (InterruptedException e) {e.printStackTrace();
}
});
System.out.println(t.count);
}
}
1、底层实现
CAS(Compare And Swap/Set)无锁优化、乐观锁
cas(V, Expected, NewValue)
if V == E
V = New
otherwise try again or fail
V:要改的值
Expected:冀望以后这个值是多少
NewValue:要设置的新值
比方要改的值是 3(即最开始拿到的这个值是 3),当执行 CAS 操作时,我冀望(Expected)这个值是 3,是 3 我才批改这个值;如果当执行 CAS 时,不等于我冀望的值 3,阐明这个值被其余线程改了(如果改为了 4),那我就再试一次(try again)。此时我冀望这个值是 4,如果执行 CAS 操作时,没有其余的线程再次批改这个值,即和我冀望的值 4 相等,那我再执行 CAS 操作,把值批改为新值。
(1)如果在执行 CAS 操作的时候,在判断值是否为我冀望的值后,马上有其余线程把这个值改为了其余的值,那这种状况不是仍然有问题吗?
CAS 操作是 CPU 原语反对的,也就是说 CAS 的操作是 CPU 指令级别的操作,不容许被扭转。
(2)ABA 问题
(就好比 你的女朋友跟你复合时,两头又交了别的男朋友,她可能就曾经变了,不再是你原来的女朋友)
就是在我执行 CAS 操作的时候,这个值被其余的线程批改为了 2,而后又改为了 3,也就是两头通过了更改。如果是根底类型,能够不必管;如果要解决,须要加版本号,也就是这个值作任何一次的批改,版本号都加 1,前面查看的时候和版本号一起查看。
(3)CAS 底层是怎么做到的?
外面是通过 sun.misc.Unsafe 类来做的(大多数办法都是 native 的)。该类次要作用:间接操作 JVM 内存(native allocateMemory)、间接生成类实例(native allocateInstance)、间接操作变量(native getInt、native getObject)、以及 CAS 相干操作(native compareAndSwapObject)。该类只能通过反射或者 getUnsafe 失去该类的对象来应用(单例),不能间接应用。
2、SyncLong VS AtomicLong VS LongAdder
/**
* @author Java 和算法学习:周一
*/
public class AtomicSynclongLongAdder {private static AtomicLong count1 = new AtomicLong(0L);
private static long count2 = 0L;
private static LongAdder count3 = new LongAdder();
public static void main(String[] args) throws Exception {Thread[] threads = new Thread[1000];
//AtomicLong
for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 100000; j++) {count1.incrementAndGet();
}
});
}
long start = System.currentTimeMillis();
for (Thread thread : threads) {thread.start();
}
for (Thread thread : threads) {thread.join();
}
long end = System.currentTimeMillis();
System.out.println("AtomicLong:" + count1.get() + "time:" + (end - start));
//long
Object o = new Object();
for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 100000; j++) {synchronized (o) {count2++;}
}
});
}
start = System.currentTimeMillis();
for (Thread thread : threads) {thread.start();
}
for (Thread thread : threads) {thread.join();
}
end = System.currentTimeMillis();
System.out.println("Long:" + count2 + "time:" + (end - start));
//LongAdder
for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 100000; j++) {count3.increment();
}
});
}
start = System.currentTimeMillis();
for (Thread thread : threads) {thread.start();
}
for (Thread thread : threads) {thread.join();
}
end = System.currentTimeMillis();
System.out.println("LongAdder:" + count3.longValue() + "time:" + (end - start));
}
}
LongAdder 外部用的是 分段锁(外部也是 CAS 实现)。它会把值放到一个数组里,比方数组大小为 4,则每一个数组里锁 250 个线程,每一个数组都做运算,最初再把所有的数组后果求和。所以 LongAdder 在超高并发时,劣势特地显著。
二、ReentrantLock
1、reentrantlock 能够代替 synchronized,应用 reentrantlock 能够实现和 synchronized 同样的性能,然而 必须得手动开释锁。应用 synchronized 锁定如果遇到异样,jvm 会主动开释锁,然而 ReentrantLock 必须手动开释锁,因而常常在 finally 中开释锁。
2、应用 reentrantlock 能够进行尝试锁定(tryLock),这样无奈锁定或者在指定工夫内无奈锁定,线程能够决定是否持续期待。
3、reentrantlock 能够指定为偏心锁。ReentrantLock lock=new ReentrantLock(true); 示意 new 一个偏心锁。默认是非偏心锁。
1、偏心锁与非偏心锁
偏心锁:如果一个新来的线程,首先去判断锁的期待队列有无正在期待的线程,有则进入期待队列,等后面的先运行,无则间接去抢锁,这样的锁是偏心锁。先来后到。
非偏心锁:如果一个新来的线程,间接就去抢锁,而不去判断期待队列是否有线程在期待,这就是非偏心锁。synchronized 都是非偏心锁。
新来的线程检不查看队列是偏心锁与非偏心锁的要害。
2、reentrantlock VS synchronized
1、reentrantlock 能够代替 synchronized
2、reentrantlock 必须手动敞开锁,synchronized 执行完结或异样时 JVM 会主动开释锁
3、reentrantlock 是通过 CAS 实现的,synchronized 实质是锁降级
4、reentrantlock 能够通过 tryLock 来进行尝试锁定
5、reentrantlock 能够在偏心锁与非偏心锁之间切换,synchronized 都是非偏心锁
三、CountDownLatch(倒数 门闩)
/**
* @author Java 和算法学习:周一
*/
public class TestCountDownLatch {public static void usingCountDownLatch() {Thread[] threads = new Thread[10];
CountDownLatch countDownLatch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(() -> {
int count = 0;
for (int j = 0; j < 10000; j++) {count++;}
System.out.println(count);
countDownLatch.countDown();});
}
for (Thread thread : threads) {thread.start();
}
try {countDownLatch.await();
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("CountDownLatch end...");
}
public static void main(String[] args) {usingCountDownLatch();
}
}
countDownLatch 初始大小定义为 10,每减少一个线程门闩值减 1(countDownLatch.countDown()),始终期待着(countDownLatch.await()),直到门闩值为 0 才执行。
四、CyclicBarrier(循环 栅栏)
/**
* @author Java 和算法学习:周一
*/
public class TestCyclicBarrier {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(20, ()->{System.out.println("满人,发车");
});
for (int i = 0; i < 100; i++) {new Thread(()->{
try {barrier.await();
} catch (InterruptedException e) {e.printStackTrace();
} catch (BrokenBarrierException e) {e.printStackTrace();
}
}).start();}
}
}
CyclicBarrier 大小定义为 20,达到 20 个线程时才执行一次,否则始终期待着(barrier.await())。即以上程序会输入 5 次 满人,发车。
五、Phaser
/**
* @author Java 和算法学习:周一
*/
public class TestPhaser {private static MarriagePhaser marriagePhaser = new MarriagePhaser();
public static void sleep() {
try {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {e.printStackTrace();
}
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {switch (phase) {
case 0:
System.out.println("所有人到齐" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("所有人吃完" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("所有人来到" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("新郎新娘抱抱" + registeredParties);
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
public Person(String name) {this.name = name;}
private void arrive() {sleep();
System.out.println(name + "达到现场");
marriagePhaser.arriveAndAwaitAdvance();}
private void eat() {sleep();
System.out.println(name + "吃");
marriagePhaser.arriveAndAwaitAdvance();}
private void leave() {sleep();
System.out.println(name + "来到");
marriagePhaser.arriveAndAwaitAdvance();}
private void hug() {if ("新郎".equals(name) || "新娘".equals(name)) {sleep();
System.out.println(name + "拥抱");
marriagePhaser.arriveAndAwaitAdvance();} else {marriagePhaser.arriveAndDeregister();
}
}
@Override
public void run() {arrive();
eat();
leave();
hug();}
}
public static void main(String[] args) {marriagePhaser.bulkRegister(7);
for (int i = 0; i < 5; i++) {new Thread(new Person("person" + i)).start();}
new Thread(new Person("新郎")).start();
new Thread(new Person("新娘")).start();}
}
相似于栅栏组,分阶段的栅栏。所有的线程都到了某个栅栏的时候,才会执行下一步的操作。
六、ReadWriteLock
Read 的时候是共享锁
Write 的时候是排它锁(互斥锁)
最开始如果是读线程拿到了锁,当第二个来的线程是读线程时,能够一起读;当第二个来的是写线程时则阻塞,不容许你写,等我读完再写。
最开始如果是写线程拿到了锁,不论第二个线程是读还是写,均阻塞,必须等我改完了其余线程能力读、其余线程能力写。
/**
* @author Java 和算法学习:周一
*/
public class TestReadWriteLock {
private static int value;
private static ReentrantLock lock = new ReentrantLock();
private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {lock.lock();
Thread.sleep(1000);
System.out.println("read...");
} catch (Exception e) {e.printStackTrace();
} finally {lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {lock.lock();
value = v;
Thread.sleep(1000);
System.out.println("write..." + value);
} catch (Exception e) {e.printStackTrace();
} finally {lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
// 读线程 8 个
Thread[] tRead = new Thread[8];
for (int i = 0; i < 8; i++) {tRead[i] = new Thread(() -> {read(readLock);
});
}
long start = System.currentTimeMillis();
for (Thread t : tRead) {t.start();
}
for (Thread t : tRead) {t.join();
}
// 写线程 2 个
Thread[] tWrite = new Thread[2];
for (int i = 0; i < 2; i++) {tWrite[i] = new Thread(() -> {write(writeLock, new Random().nextInt(10));
});
}
for (Thread t : tWrite) {t.start();
}
for (Thread t : tWrite) {t.join();
}
long end = System.currentTimeMillis();
System.out.println("total time:" + (end - start));
}
}
如果应用 Reentrantlock 加锁,程序运行 10s,也就是说 Reentrantlock 读写均是排它锁;
如果应用 ReadWriteLock 加锁,程序运行 3s,也就是说 ReadWriteLock 的读是共享锁,写是排它锁。
七、Semaphore(信号灯)
初始时,给 Semaphore 赋默认值,该值示意最多能够容许多少个线程同时运行。即 限流 的意思。
Semaphore 默认是非偏心锁,new Semaphore(1, true) new 对象时设置 true 即示意是偏心锁。
/**
* @author Java 和算法学习:周一
*/
public class TestSemaphore {public static void main(String[] args) {Semaphore s = new Semaphore(1);
new Thread(()->{
try {s.acquire();
System.out.println("t1...");
TimeUnit.SECONDS.sleep(1);
System.out.println("t1...");
} catch (InterruptedException e) {e.printStackTrace();
} finally {s.release();
}
}, "t1").start();
new Thread(()->{
try {s.acquire();
System.out.println("t2...");
TimeUnit.SECONDS.sleep(1);
System.out.println("t2...");
} catch (InterruptedException e) {e.printStackTrace();
} finally {s.release();
}
}, "t2").start();}
}
八、Exchanger
/**
* @author Java 和算法学习:周一
*/
public class TestExchanger {public static void main(String[] args) {Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
String s = "T1";
try {s = exchanger.exchange(s);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(() -> {
String s = "T2";
try {s = exchanger.exchange(s);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();}
}
exchange()办法是阻塞的,有一个线程调用了 exchange()办法,它会始终阻塞直到第二个线程调用了 exchange()办法;而后第一个线程才会往下执行。