关于多线程:AtomicStampedReference解决CAS的ABA问题

42次阅读

共计 3565 个字符,预计需要花费 9 分钟才能阅读完成。

AtomicStampReference

解决 CASABA问题

什么是 ABA

ABA 问题:指 CAS 操作的时候,线程将某个变量值由 A 批改为 B,然而又改回了 A,其余线程发现 A 并未扭转,于是 CAS 将进行值替换操作,实际上该值曾经被扭转过,这与 CAS 的核心思想是不合乎的

ABA 解决方案

每次变量更新的时候,把变量的版本号进行更新,如果某变量被某个线程批改过,那么版本号肯定会递增更新,从而解决 ABA 问题

AtomicReference 演示 ABA 问题

package com.keytech.task;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicIntegerTest {private static AtomicReference<Integer> count=new AtomicReference<>(10);
    public static void main(String[] args) {ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(()->{boolean b = count.compareAndSet(10, 12);
            if(b){System.out.println(Thread.currentThread().getName()+"批改胜利 count="+count.get());
            }
            boolean c =count.compareAndSet(12, 10);
            if(c){System.out.println(Thread.currentThread().getName()+"批改胜利 count="+count.get());
            }
        });

        executorService.execute(()->{boolean b = count.compareAndSet(10, 100);
            if(b){System.out.println(Thread.currentThread().getName()+"批改胜利 count="+count.get());
            }

        });

        executorService.shutdown();}
}
//pool-1-thread- 1 批改胜利 count=12
//pool-1-thread- 1 批改胜利 count=10
//pool-1-thread- 2 批改胜利 count=100

pool-1-thread-1count 由 10 批改成 12, 又将 count 从 12 改成 10。pool-1-thread-2count 从 10 胜利改成 100。呈现了 ABA 的问题。

AtomicStampedReference解决 ABA 的问题

以计数器的实现为例,计数器通常用来统计在线人数,在线 +1,离线 -1,是 ABA 的典型场景。

package com.keytech.task;

import java.util.concurrent.atomic.AtomicStampedReference;


public class CounterTest {private AtomicStampedReference<Integer> count=new AtomicStampedReference<Integer>(0,0);
    public int getCount(){return count.getReference();
    }
    public int increment(){int[] stamp=new int[1];
        while (true){Integer value = count.get(stamp);
            int newValue=value+1;
            boolean b = count.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
            if(b){return newValue;}

        }
    }

    public int decrement(){int[] stamp=new int[1];
        while(true){Integer value=count.get(stamp);
            int newValue=value-1;
            boolean b = count.compareAndSet(value, newValue, stamp[0], stamp[0] + 1);
            if(b){return newValue;}
        }
    }
}

调用计数器

package com.keytech.task;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicIntegerTest {public static void main(String[] args) {ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(200);
        CounterTest counterTest=new CounterTest();

        for (int i = 0; i < 5000; i++) {executorService.execute(()->{
                try{semaphore.acquire();
                    counterTest.increment();
                    semaphore.release();}catch (Exception e){e.printStackTrace();
                }

            });
            executorService.execute(()->{
                try{semaphore.acquire();
                    counterTest.decrement();
                    semaphore.release();}catch (Exception e){e.printStackTrace();
                }

            });
        }

        executorService.shutdown();
        System.out.println(counterTest.getCount());

    }
}

// 输入 0 

AtomicBoolean保障高并发下只执行一次

package com.keytech.task;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;


public class AtomicBooleanTest {private static AtomicBoolean isHappen=new AtomicBoolean(false);
    public static int clientTotal=5000;
    public static int threadTotal=200;

    public static void main(String[] args) {ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore=new Semaphore(threadTotal);
        for (int i = 0; i < clientTotal; i++) {executorService.execute(()->{
                try {semaphore.acquire();
                    update();
                    semaphore.release();}catch (Exception e){e.printStackTrace();
                }
            });
        }

        executorService.shutdown();}
    private static void update(){if(isHappen.compareAndSet(false, true)){System.out.println("只执行一次");
        }
    }
}
// 只执行一次

正文完
 0