关于java:Java并发编程ThreadLocal

2次阅读

共计 9262 个字符,预计需要花费 24 分钟才能阅读完成。

ThreadLocal

This class provides thread-local variables.  These variables differ from their normal counterparts in that each thread that accesses one (via its {@code get} or {@code set} method) has its own, independently initialized copy of the variable.  {@code ThreadLocal} instances are typically private static fields in classes that wish to associate state with a thread (e.g., a user ID or Transaction ID).

上文节选自 ThreadLocal 的 JavaDoc,从形容的内容上看咱们就能够看出 ThreadLocal 的作用是提供线程局部变量,这些局部变量是原变量的正本;ThreadLocal 是为每个线程提供一份变量的正本,由此不会影响到其余线程的变量正本。

源码浅析

咱们先来看看 ThreadLocal下反对的几个罕用操作。set(T value)get()remove();

set(T value)

public void set(T value) {Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {map.set(this, value);
    } else {createMap(t, value);
    }
}
ThreadLocalMap getMap(Thread t) {return t.threadLocals;}
 void createMap(Thread t, T firstValue) {t.threadLocals = new ThreadLocalMap(this, firstValue);
}

从源代码能够看出,set 操作先获取以后线程,再获取以后线程的 ThreadLocalMap ,如果 map 不为空则把以后 ThreadLocal 对象实例作为 key,传进来的 value 作为值,否则创立一个 map,再依照键值对放进去。从这里能够看出,本质上咱们最初的存储介质就是这个 ThreadLocalMap,那这个ThreadLocalMap 是什么呢?接着往下看。

public class Thread implements Runnable {
    ...
    ThreadLocal.ThreadLocalMap threadLocals = null;
    ...
 }
==================================================================
static class ThreadLocalMap {    
    static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {super(k);
        value = v;
        }
    }
...    
}

每个 Thread 对象都保护一个ThreadLocalMap 类型的变量 threadLocals;这个ThreadLocalMapThreadLocal 的一个外部类,ThreadLocalMap 外部保护了一个Entry(键值对),这里能够看到 Entry 继承了 WeakReference,其对应的构造方法里,key 值调用了父类的办法,那么意味着Entry 所对应的 key(ThreadLocal 对象实例)的援用是一个弱援用。

WeakReference 是 java 四种援用用中的弱援用,当有 gc 产生时就会被回收。
还有其余三种别离是强援用,虚援用,软援用。

那为什么这里设置成弱援用呢?次要是为了避免内存透露,上面咱们来剖析一下。

假如咱们在办法外部 new 了一个 ThreadLocal 对象,并往里面 set 值。此时堆内存中的 ThreadLocal 对象有两块援用指向它,第一个援用是栈内存的强援用;另外一个是当set 值之后Entry 的 key 作为的弱援用。办法完结时,当指向 ThreadLocal 对象的强援用隐没时,对应的弱援用也会主动被回收。
咱们假如 Entry 中的援用是强援用,当指向ThreadLocal 对象的强援用隐没时,ThreadLocal 对象应该被回收但却因为 Entry 中的强援用无奈回收,咱们晓得 ThreadlocalMap 是属于 Thread 的,如果是服务端线程的话,因为 Thread 长期存在,ThreadLocalMap 也必然长期存在,那么对应的这个 Entry 也会长期存在,那这个 ThreadLocal 对象就不会被回收,就造成了内存透露。所以这就是为什么要应用弱援用的起因。
除此之外,JDK 的设计者曾经帮咱们应用弱援用来防止了内存透露,认真想想这样就不会再产生内存透露了吗?当对应的 Entry 中的 key 被回收,那这个 value 是不是就无奈获取到了呢,但因为Thread 长期存在,ThreadLocalMap也长期存在,Entry也会长期存在,value 也会永远都无奈开释了,这样还是会造成内存透露。所以,在每次应用完之后,都须要调用remove 办法进行资源革除。

说到这里,回忆一下如果在拦截器里应用 ThreadLocal,就能了解为什么在 afterCompletion 须要 remove 办法了吧,如果不进行资源革除,就会导致线程在第二次申请中 get 到第一次申请的 set 进去的值。

remove()

public void remove() {ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null) {m.remove(this);
    }
}

remove 操作还是比较简单的,就是通过以后Thread 对象获取 ThreadLocalMap,若不为空则再依据 ThreadLocal 对象作为 key 删除 value。

get()

public T get() {Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {@SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    return setInitialValue();}
private T setInitialValue() {T value = initialValue();
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {map.set(this, value);
    } else {createMap(t, value);
    }
    if (this instanceof TerminatingThreadLocal) {TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
    }
    return value;
}

接着咱们来看下 get 操作,get 操作同样也是先获取以后ThreadThreadLocalMap,再依据ThreadLocal 对象获取对应的Entry,最初获取值。若 map 为空,则初始化值而后将初始化的值返回。

利用实际

Spring 事务管理

在 Web 我的项目编程中,咱们都会与数据库进行打交道,往往通常的做法是一个 Service 层里蕴含了多个 Dao 层的操作,要保障 Service 层操作的原子性,就要保障这些 Dao 操作是在同一个事务里,在同一个事务里就要确保多个 Dao 层的操作都是同一个 Connection,那如何保障呢?咱们能够确定的是该多个 Dao 层的操作都是由雷同的线程进行解决的,那只有把 Connection 与线程绑定就能够了,所以 Spring 这里就奇妙的应用 ThreadLocal 来解决了这个问题。
Spring 中有一个类 DataSourceUtils 其中有办法是获取数据源的 Connection 的,外面有一个 getConnection 办法,如下所示。

public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
    try {return doGetConnection(dataSource);
    }
    catch (SQLException ex) {throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
    }
    catch (IllegalStateException ex) {throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection:" + ex.getMessage());
    }
}

for example when using {@link DataSourceTransactionManager}. Will bind a Connection to the
thread if transaction synchronization is active
这里的正文有一个细节要关注到就是正文中提及到 如果应用数据源事务管理器,当开启事务时,那么就会绑定连贯到以后线程。

/**
* Actually obtain a JDBC Connection from the given DataSource.
* Same as {@link #getConnection}, but throwing the original SQLException.
* <p>Is aware of a corresponding Connection bound to the current thread, for example
* when using {@link DataSourceTransactionManager}. Will bind a Connection to the thread
* if transaction synchronization is active (e.g. if in a JTA transaction).
* <p>Directly accessed by {@link TransactionAwareDataSourceProxy}.
* @param dataSource the DataSource to obtain Connections from
* @return a JDBC Connection from the given DataSource
* @throws SQLException if thrown by JDBC methods
* @see #doReleaseConnection
*/
public static Connection doGetConnection(DataSource dataSource) throws SQLException {Assert.notNull(dataSource, "No DataSource specified");

    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {conHolder.requested();
        if (!conHolder.hasConnection()) {logger.debug("Fetching resumed JDBC Connection from DataSource");
            conHolder.setConnection(fetchConnection(dataSource));
        }
        return conHolder.getConnection();}
    // Else we either got no holder or an empty thread-bound holder here.

    logger.debug("Fetching JDBC Connection from DataSource");
    Connection con = fetchConnection(dataSource);

    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        try {
            // Use same Connection for further JDBC actions within the transaction.
            // Thread-bound object will get removed by synchronization at transaction completion.
            ConnectionHolder holderToUse = conHolder;
            if (holderToUse == null) {holderToUse = new ConnectionHolder(con);
            }
            else {holderToUse.setConnection(con);
            }
            holderToUse.requested();
            TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource));
            holderToUse.setSynchronizedWithTransaction(true);
            if (holderToUse != conHolder) {TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
            }
        }
        catch (RuntimeException ex) {
            // Unexpected exception from external delegation call -> close Connection and rethrow.
            releaseConnection(con, dataSource);
            throw ex;
        }
    }

    return con;
}

从源代码大抵能够看出首先从 TransactionSynchronizationManager 中获取 ConnectionHolder,若存在则间接返回Connection,若不存在则新生成一个Connection 并装到 ConnectionHolder 中而后注册到 TransactionSynchronizationManager 中,而后再返回Connection。由此,咱们能够看出TransactionSynchronizationManager 在这其中起到了治理Connection 的作用。

接着看下 TransactionSynchronizationManager 类。其中getResource 办法和 bindResource 办法都在下面的 doGetConnection 办法中有过调用,那咱们就重视看下这几个办法。

public abstract class TransactionSynchronizationManager {
private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<>("Transactional resources");
    
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
            new NamedThreadLocal<>("Transaction synchronizations");

    private static final ThreadLocal<String> currentTransactionName =
            new NamedThreadLocal<>("Current transaction name");

    private static final ThreadLocal<Boolean> currentTransactionReadOnly =
            new NamedThreadLocal<>("Current transaction read-only status");

    private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
            new NamedThreadLocal<>("Current transaction isolation level");

    private static final ThreadLocal<Boolean> actualTransactionActive =
            new NamedThreadLocal<>("Actual transaction active");
...
public static Object getResource(Object key) {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Object value = doGetResource(actualKey);
    if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                     Thread.currentThread().getName() + "]");
    }
    return value;
}
    
private static Object doGetResource(Object actualKey) {Map<Object, Object> map = resources.get();
    if (map == null) {return null;}
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {map.remove(actualKey);
        // Remove entire ThreadLocal if empty...
        if (map.isEmpty()) {resources.remove();
        }
        value = null;
    }
    return value;
}
...
public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Assert.notNull(value, "Value must not be null");
    Map<Object, Object> map = resources.get();
    // set ThreadLocal Map if none found
    if (map == null) {map = new HashMap<>();
        resources.set(map);
    }
    Object oldValue = map.put(actualKey, value);
    // Transparently suppress a ResourceHolder that was marked as void...
    if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {oldValue = null;}
    if (oldValue != null) {throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                                        actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
                     Thread.currentThread().getName() + "]");
    }
}

从以上代码能够得出 TransactionSynchronizationManager 类保护了ThreadLocal 对象来进行资源的存储,包含事务资源(Spring 中对 JDBC 的 Connection 或 Hibernate 的 Session 都称之为资源 ),事务隔离级别等。
名为 resources 变量的 ThreadLocal 对象存储的是 DataSource 生成的 actualKey 为 key 值和 ConnectionHolder 作为 value 值封装成的 Map。
再联合 DataSourceUtilsdoGetConnection办法和 TransactionSynchronizationManagerbindResourcegetResource 办法可知:在某个线程第一次调用时候,封装 Map 资源为:key 值为 DataSource 生成 actualKey 和 value 值为 DataSource 取得的 Connection 对象封装后的 ConnectionHolder。等这个线程下一次再次拜访中就能保障应用的是第一次创立的ConnectionHolder 中的 Connection 对象。

参考链接

【死磕 Java 并发】—–深入分析 ThreadLocal
Spring 事务之如何保障同一个 Connection 对象
Spring 是如何保障同一事务获取同一个 Connection 的?应用 Spring 的事务同步机制解决:数据库刚插入的记录却查问不到的问题【享学 Spring】

正文完
 0