关于java:你真的了解ThreadLocal吗

30次阅读

共计 7870 个字符,预计需要花费 20 分钟才能阅读完成。

一, 是什么? 怎么用?

是什么?

是每个线程的本地变量, 能够存储每个线程独有的变量.

怎么用?

能够为每个线程创立一个独有的变量对象

能够实现线程间的数据隔离

Spring 申明式事务中应用 ThreadLocal 实现数据库隔离


二, 类架构

ThreadLocal 属性

/**
 * 该值用于给 ThreadLocalHashMap 中存入值时线性探测插入的 bucket 地位
 */
private final int threadLocalHashCode = nextHashCode();

/**
 * 下一个要给出的 hashCode, 每次原子性更新, 从 0 开始
 */
private static AtomicInteger nextHashCode = new AtomicInteger();

/**
 * hashCode 增值, 应用这个数字能够使 key 平均的散布在 2 的幂次方的数组上
 * 具体能够参考 https://www.javaspecialists.eu/archive/Issue164-Why-0x61c88647.html
 * 因为比较复杂, 在这里不展开讨论
 */
private static final int HASH_INCREMENT = 0x61c88647;

ThreadLocalMap 属性

/**
 * Map 的初始化容量
 */
private static final int INITIAL_CAPACITY = 16;

/**
 * 哈希表, 长度始终是 2 的幂, 起因在于 2 的幂 - 1 的二进制全副为 1
 * 便于按位与操作
 * 如 16: 10000 - 1 = 1111
 * 按位与后都会在数组中
 */
private Entry[] table;

/**
 * 哈希表的长度
 */
private int size = 0;

/**
 * 扩容阈值, 默认为 0, 扩容大小为哈希表长度的 2 /3
 */
private int threshold;

三, 实现原理

1, 为什么 ThreadLocal 能够实现线程隔离?

/**
 * 创立一个 ThreadLocal, 能够看出在结构器外部并没有解决任何事件
 */
public ThreadLocal() {}
/**
 * 给 ThreadLocal 中设置值会调用该办法
 */
public void set(T value) {
    // 获取以后线程
    Thread t = Thread.currentThread();
    // 获取以后线程中的变量 threadLocals
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        // 以后线程的 threadLocals 变量为空, 创立 map 设置值
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    // 获取 t 的线程变量
    return t.threadLocals;
}

由上能够看出给 ThreadLocal 设置值实质上是给 Thread 的本地变量 threadLocals 变量设置值, 这就是为什么 ThreadLocal 能够实现线程之间数据隔离


2, 增删查操作

/**
 * 给 ThreadLocal 中设置值会调用该办法
 */
public void set(T value) {
    // 获取以后线程
    Thread t = Thread.currentThread();
    // 获取以后线程中的变量 threadLocals
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        // 以后线程的 threadLocals 变量为空, 创立 map 设置值
        createMap(t, value);
}
void createMap(Thread t, T firstValue) {
    // 给线程设置变量值
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}
/**
 * 设置与 key 相干的值,key 为 ThreadLocal
 */
private void set(ThreadLocal<?> key, Object value) {Entry[] tab = table;
    int len = tab.length;
    // 按位与获取插入的地位
    int i = key.threadLocalHashCode & (len - 1);
    /**
     * 应用线性探测法插入值
     * 从获取到的下标向后遍历, 如果以后 key 等于数组中以后下标的 key 则间接批改值
     * 如果数组以后下标地位为空则替换掉以后下标的 entry
     */
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {ThreadLocal<?> k = e.get();
        if (k == key) {
            e.value = value;
            return;
        }
        if (k == null) {
            // 以后地位为空须要替换掉
            replaceStaleEntry(key, value, i);
            return;
        }
    }
    // 如果 i 向后没有与其雷同的 key 或者为空的地位则间接替换掉以后地位的 entry
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 从 i 向后没有空位, 并且数量大于阈值须要 rehash, 删除哈希表中某些为空的 entry 如果 size 大于阈值的 3 / 4 则须要扩容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();}
/**
 * 替换掉有效的 entry
 */
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {Entry[] tab = table;
    int len = tab.length;
    Entry e;

    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
        // 从后向前寻找空位, 寻找到间隔 staleSlot 最远的一个地位
        if (e.get() == null)
            slotToExpunge = i;

    // 向后遍历
    for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {ThreadLocal<?> k = e.get();
        // 找到了与 ThreadLocal 雷同的 key, 替换值
        if (k == key) {
            e.value = value;
            // 替换掉以后地位的 entry
            tab[i] = tab[staleSlot];
            // 替换 entry
            tab[staleSlot] = e;

            // 后面没有空位, 设置删除的地位
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            // 删除前面为空的地位
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
    // 如果没有发现 key, 则间接替换
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);
    // staleSlot 前有空位须要删除为空的 entry
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
/**
 * rehash 哈希表
 */
private void rehash() {
    // 删除有效的 entry 并且 rehash 哈希表
    expungeStaleEntries();
    // 如果数量大于等于阈值的 3 /4, 须要扩容
    if (size >= threshold - threshold / 4)
        resize();}
/**
 * 删除有效的 entry 并且 rehash 哈希表
 */
private void expungeStaleEntries() {Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {Entry e = tab[j];
        if (e != null && e.get() == null)
            expungeStaleEntry(j);
    }
}
/**
 * 双倍扩容之前的哈希表
 */
private void resize() {Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;
    for (int j = 0; j < oldLen; ++j) {Entry e = oldTab[j];
        if (e != null) {
            // 以后地位 key 为空, 则间接 GC
            ThreadLocal<?> k = e.get();
            if (k == null) {e.value = null; // Help the GC} else {
                // 获取新地位
                int h = k.threadLocalHashCode & (newLen - 1);
                // 从以后地位向后寻找一个为空的地位
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                // 从新插入 entry
                newTab[h] = e;
                count++;
            }
        }
    }
    // 从新设置阈值和哈希表属性
    setThreshold(newLen);
    size = count;
    table = newTab;
}
/**
 * 删除某些 entry 以减半的形式删除
 */
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {i = nextIndex(i, len);
        Entry e = tab[i];
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
        // 无条件右移 1 位           /2
    } while ((n >>>= 1) != 0);
    return removed;
}
/**
 * 删除具体位置的 entry, 并且 rehash 之后的 entry
 */
private int expungeStaleEntry(int staleSlot) {Entry[] tab = table;
    int len = tab.length;
    // 删除以后下标的 entry
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;
    Entry e;
    int i;
    // 从以后地位向后遍历, 寻找为空的或者 rehash 之后的 entry
    for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {ThreadLocal<?> k = e.get();
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {int h = k.threadLocalHashCode & (len - 1);
            //rehash 之后的地位不是以后的地位须要删除以后的 entry
            if (h != i) {
                //help GC
                tab[i] = null;
                // 从 h 向后遍历, 寻找一个为空的地位
                while (tab[h] != null)
                    h = nextIndex(h, len);
                // 插入 entry
                tab[h] = e;
            }
        }
    }
    return i;
}
/**
 * 获取 ThreadLocal 中寄存的值
 */
public T get() {
    //1, 获取以后线程
    Thread t = Thread.currentThread();
    //2, 获取以后线程中的 threadLocals 变量, 也就是 ThreadLocalMap 对象
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        //3, 获取之前设置的值并返回
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {@SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    //3, 如果以后线程的 threadLocals 变量为空, 则还没有初始化, 须要进行初始化
    return setInitialValue();}
/**
 * 设置初始化值
 */
private T setInitialValue() {T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}
/**
 * 初始化值为 null
 */
protected T initialValue() {return null;}
/**
 * 删除变量
 */
public void remove() {ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);
}
/**
 * 删除某个 entry
 */
private void remove(ThreadLocal<?> key) {Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len - 1);
    // 从 i 向后遍历
    for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {if (e.get() == key) {
            //help GC
            e.clear();
            // 删除以后 entry 并且 rehash 前面的 entry
            expungeStaleEntry(i);
            return;
        }
    }
}

三, 存在问题

1, 内存透露问题

因为应用 ThreadLocal 实质上是应用 ThreadLocalMap, 在应用完 ThreadLocal 后, 无奈手动删除 ThreadLocalMap 中的 key(ThreadLocal 的援用), 所以可能会引起内存透露问题, 然而代码在设计的时候就思考到了这一点, 所以将 ThreadLocalMap 中的 key(ThreadLocal) 设置为了弱援用 (WeakReference), 即很容易被 GC 掉, 但即便如此, 咱们还是要在应用完后调用 ThreadLocal 的 remove 办法, 手动删除 ThreadLocal 援用, 防止内存透露.

2, 子线程不能拜访父线程变量

能够应用 InheritableThreadLocal

原理

ThreadLocal<String> local = new InheritableThreadLocal<>();
local.set("main local variable");
public void set(T value) {
    // 获取以后线程
    Thread t = Thread.currentThread();
    // 因为 InheritableThreadLocal 重写了 ThreadLocal 的 getMap 办法, 所以上面调用的为 InheritableThreadLocal 中的 getMap, 获取的为 Thread 类的 inheritableThreadLocals 变量
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        // 以后线程的 threadLocals 变量为空, 创立 map 设置值
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {return t.inheritableThreadLocals;}
// 创立子线程
Thread thread = new Thread(() -> {local.set("child thread variable");
    System.out.println("child thread get local variable :" + local.get());
});
public Thread(Runnable target) {init(null, target, "Thread-" + nextThreadNum(), 0);
}
private void init(ThreadGroup g, Runnable target, String name, long stackSize) {init(g, target, name, stackSize, null, true);
}
private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
    // 获取以后线程, 这里为父线程, 子线程还没有被创立进去
    Thread parent = currentThread();
    ...
        // 这里的 parent.inheritableThreadLocals 在父线程 set 的时候曾经初始化了
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            // 所以子类的 ThreadLocal 也是应用的 inheritableThreadLocals, 不是之前的 threadLocals
            this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {Entry[] parentTable = parentMap.table;
    int len = parentTable.length;
    setThreshold(len);
    table = new Entry[len];
    
    for (int j = 0; j < len; j++) {Entry e = parentTable[j];
        if (e != null) {
            // 向上转型
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                // 调用 InheritableThreadLocal 对象的 childValue 办法, 返回 e.value
                Object value = key.childValue(e.value);
                Entry c = new Entry(key, value);
                // 插入到子线程的 inheritableThreadLocals, 也就是将父线程的 inheritableThreadLocals 数据复制到子线程中
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);
                table[h] = c;
                size++;
            }
        }
    }
}

正文完
 0