聊聊hibernate的sessionlevel-repeatable-reads

55次阅读

共计 16105 个字符,预计需要花费 41 分钟才能阅读完成。

本文主要研究一下 hibernate 的 session-level repeatable reads

实例

doInTransaction(session -> {Product product = new Product();
    product.setId(1L);
    product.setQuantity(7L);
    session.persist(product);
});
doInTransaction(session -> {final Product product = (Product) session.get(Product.class, 1L);
    try {executeSync(() -> doInTransaction(_session -> {Product otherThreadProduct = (Product) _session.get(Product.class, 1L);
            assertNotSame(product, otherThreadProduct);
            otherThreadProduct.setQuantity(6L);
        }));
        Product reloadedProduct = (Product) session.createQuery("from Product").uniqueResult();
        assertEquals(7L, reloadedProduct.getQuantity());
        assertEquals(6L, 
            ((Number) session
            .createSQLQuery("select quantity from product where id = :id")
            .setParameter("id", product.getId())
            .uniqueResult())
            .longValue());
    } catch (Exception e) {fail(e.getMessage());
    }
});
  • 这段代码展示了 hibernate 的 session-level repeatable reads 功能,这里 reloadedProduct 查询返回的是 session 中 id 为 1 的 entity 的缓存 ( 但是也向 db 发出了 sql 语句,只是没有使用其返回的 resultSet 的值),而 project 操作查询则直接根据 jdbc 查询返回的结果返回

实例代码来自 How does Hibernate guarantee application-level repeatable reads

write-behind

write-behind cache

  • write-behind cache 是 cache 策略的一种,其主要思路就是更新数据是首先更新 cache,之后 cache 在批量持久化到存储中,比如批量更新到数据,这样做的好处是可以合并数据的多次操作减少 IO

transactional write-behind cache

  • hibernate 为了减少数据库连接加锁的时间,设计了 transactional write-behind 的策略,其 persistence context 充当 transactional write-behind cache 的角色,对 entity 的改动都先作用到内存,等到一定时机在 flush 到数据库;具体体现在 Session 类中

To reduce lock contention in the database, the physical database transaction needs to be as short as possible.

Long-running database transactions prevent your application from scaling to a highly-concurrent load. Do not hold a database transaction open during end-user-level work, but open it after the end-user-level work is finished.
This concept is referred to as transactional write-behind.

The persistence context acts as a transactional write-behind cache, queuing any entity state change. Like any write-behind cache, changes are first applied in-memory and synchronized with the database during the flush time.

Session

hibernate 的 Session 对 jdbc 的 connection 进行了包装,它主要是维护了 level one cache,即 ”repeatable read” persistence context;具体体现在 Loader 的 getRow 方法中

Behind the scenes, the Hibernate Session wraps a JDBC java.sql.Connection and acts as a factory for org.hibernate.Transaction instances. It maintains a generally “repeatable read” persistence context (first level cache) of the application domain model.

The Hibernate Session acts as a transaction-scoped cache providing repeatable reads for lookup by identifier and queries that result in loading entities.

Loader.getRow

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

    /**
     * Resolve any IDs for currently loaded objects, duplications within the
     * <tt>ResultSet</tt>, etc. Instantiate empty objects to be initialized from the
     * <tt>ResultSet</tt>. Return an array of objects (a row of results) and an
     * array of booleans (by side-effect) that determine whether the corresponding
     * object should be initialized.
     */
    private Object[] getRow(
            final ResultSet rs,
            final Loadable[] persisters,
            final EntityKey[] keys,
            final Object optionalObject,
            final EntityKey optionalObjectKey,
            final LockMode[] lockModes,
            final List hydratedObjects,
            final SharedSessionContractImplementor session) throws HibernateException, SQLException {
        final int cols = persisters.length;
        final EntityAliases[] descriptors = getEntityAliases();

        if (LOG.isDebugEnabled() ) {LOG.debugf( "Result row: %s", StringHelper.toString( keys) );
        }

        final Object[] rowResults = new Object[cols];

        for (int i = 0; i < cols; i++) {

            Object object = null;
            EntityKey key = keys[i];

            if (keys[i] == null ) {//do nothing}
            else {
                //If the object is already loaded, return the loaded one
                object = session.getEntityUsingInterceptor(key);
                if (object != null) {
                    //its already loaded so don't need to hydrate it
                    instanceAlreadyLoaded(
                            rs,
                            i,
                            persisters[i],
                            key,
                            object,
                            lockModes[i],
                            session
                    );
                }
                else {
                    object = instanceNotYetLoaded(
                            rs,
                            i,
                            persisters[i],
                            descriptors[i].getRowIdAlias(),
                            key,
                            lockModes[i],
                            optionalObjectKey,
                            optionalObject,
                            hydratedObjects,
                            session
                    );
                }
            }

            rowResults[i] = object;

        }

        return rowResults;
    }
  • 在 key 不为 null 的情况下,该方法会设置 object 的值;这里首先通过 session.getEntityUsingInterceptor 方法根据 key 从 session 中寻找该 entity,如果不为 null,则执行 instanceAlreadyLoaded,否则执行 instanceNotYetLoaded 去设置 object

SessionImpl.getEntityUsingInterceptor

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/internal/SessionImpl.java

public final class SessionImpl
        extends AbstractSessionImpl
        implements EventSource, SessionImplementor, HibernateEntityManagerImplementor {

        //......

    @Override
    public Object getEntityUsingInterceptor(EntityKey key) throws HibernateException {checkOpenOrWaitingForAutoClose();
        // todo : should this get moved to PersistentContext?
        // logically, is PersistentContext the "thing" to which an interceptor gets attached?
        final Object result = persistenceContext.getEntity(key);
        if (result == null) {final Object newObject = getInterceptor().getEntity(key.getEntityName(), key.getIdentifier());
            if (newObject != null) {lock( newObject, LockMode.NONE);
            }
            return newObject;
        }
        else {return result;}
    }

        //......
}
  • getEntityUsingInterceptor 方法首先从 persistenceContext 获取 entity,如果获取不到再调用 getInterceptor().getEntity 获取;如果没有额外设置默认是 EmptyInterceptor,其 getEntity 方法返回 null

StatefulPersistenceContext

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/engine/internal/StatefulPersistenceContext.java

public class StatefulPersistenceContext implements PersistenceContext {
    //......

    // Loaded entity instances, by EntityKey
    private Map<EntityKey, Object> entitiesByKey;

    @Override
    public Object getEntity(EntityKey key) {return entitiesByKey.get( key);
    }

    @Override
    public void addEntity(EntityKey key, Object entity) {entitiesByKey.put( key, entity);
        if(batchFetchQueue != null) {getBatchFetchQueue().removeBatchLoadableEntityKey(key);
        }
    }

    @Override
    public Object removeEntity(EntityKey key) {final Object entity = entitiesByKey.remove( key);
        final Iterator itr = entitiesByUniqueKey.values().iterator();
        while (itr.hasNext() ) {if ( itr.next() == entity ) {itr.remove();
            }
        }
        // Clear all parent cache
        parentsByChild.clear();
        entitySnapshotsByKey.remove(key);
        nullifiableEntityKeys.remove(key);
        if(batchFetchQueue != null) {getBatchFetchQueue().removeBatchLoadableEntityKey(key);
            getBatchFetchQueue().removeSubselect(key);
        }
        return entity;
    }

    @Override
    public void replaceDelayedEntityIdentityInsertKeys(EntityKey oldKey, Serializable generatedId) {final Object entity = entitiesByKey.remove( oldKey);
        final EntityEntry oldEntry = entityEntryContext.removeEntityEntry(entity);
        parentsByChild.clear();

        final EntityKey newKey = session.generateEntityKey(generatedId, oldEntry.getPersister() );
        addEntity(newKey, entity);
        addEntry(
                entity,
                oldEntry.getStatus(),
                oldEntry.getLoadedState(),
                oldEntry.getRowId(),
                generatedId,
                oldEntry.getVersion(),
                oldEntry.getLockMode(),
                oldEntry.isExistsInDatabase(),
                oldEntry.getPersister(),
                oldEntry.isBeingReplicated());
    }

    //......
}
  • StatefulPersistenceContext 维护了一个 entitiesByKey 的 map,getEntity 方法直接根据 EntityKey 从该 map 取数据;它同时也提供了 addEntity、removeEntity、replaceDelayedEntityIdentityInsertKeys 等方法来修改 map

instanceAlreadyLoaded

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

public abstract class Loader {

    //......

    /**
     * The entity instance is already in the session cache
     */
    private void instanceAlreadyLoaded(
            final ResultSet rs,
            final int i,
            final Loadable persister,
            final EntityKey key,
            final Object object,
            final LockMode requestedLockMode,
            final SharedSessionContractImplementor session)
            throws HibernateException, SQLException {if ( !persister.isInstance( object) ) {
            throw new WrongClassException("loaded object was of wrong class" + object.getClass(),
                    key.getIdentifier(),
                    persister.getEntityName());
        }

        if (LockMode.NONE != requestedLockMode && upgradeLocks() ) { //no point doing this if NONE was requested
            final EntityEntry entry = session.getPersistenceContext().getEntry( object);
            if (entry.getLockMode().lessThan(requestedLockMode) ) {
                //we only check the version when _upgrading_ lock modes
                if (persister.isVersioned() ) {checkVersion( i, persister, key.getIdentifier(), object, rs, session );
                }
                //we need to upgrade the lock mode to the mode requested
                entry.setLockMode(requestedLockMode);
            }
        }
    }

    //......
}
  • instanceAlreadyLoaded 方法主要是校验类型是否正确,同时根据 lockMode 信息判断是否要升级 lock mode 等

instanceNotYetLoaded

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

public abstract class Loader {
    //......

    /**
     * The entity instance is not in the session cache
     */
    private Object instanceNotYetLoaded(
            final ResultSet rs,
            final int i,
            final Loadable persister,
            final String rowIdAlias,
            final EntityKey key,
            final LockMode lockMode,
            final EntityKey optionalObjectKey,
            final Object optionalObject,
            final List hydratedObjects,
            final SharedSessionContractImplementor session)
            throws HibernateException, SQLException {
        final String instanceClass = getInstanceClass(
                rs,
                i,
                persister,
                key.getIdentifier(),
                session
        );

        // see if the entity defines reference caching, and if so use the cached reference (if one).
        if (session.getCacheMode().isGetEnabled() && persister.canUseReferenceCacheEntries() ) {final EntityDataAccess cache = persister.getCacheAccessStrategy();
            final Object ck = cache.generateCacheKey(key.getIdentifier(),
                    persister,
                    session.getFactory(),
                    session.getTenantIdentifier());
            final Object cachedEntry = CacheHelper.fromSharedCache(session, ck, cache);
            if (cachedEntry != null) {CacheEntry entry = (CacheEntry) persister.getCacheEntryStructure().destructure( cachedEntry, factory);
                return ((ReferenceCacheEntryImpl) entry ).getReference();}
        }

        final Object object;
        if (optionalObjectKey != null && key.equals( optionalObjectKey) ) {
            //its the given optional object
            object = optionalObject;
        }
        else {
            // instantiate a new instance
            object = session.instantiate(instanceClass, key.getIdentifier() );
        }

        //need to hydrate it.

        // grab its state from the ResultSet and keep it in the Session
        // (but don't yet initialize the object itself)
        // note that we acquire LockMode.READ even if it was not requested
        LockMode acquiredLockMode = lockMode == LockMode.NONE ? LockMode.READ : lockMode;
        loadFromResultSet(
                rs,
                i,
                object,
                instanceClass,
                key,
                rowIdAlias,
                acquiredLockMode,
                persister,
                session
        );

        //materialize associations (and initialize the object) later
        hydratedObjects.add(object);

        return object;
    }

    /**
     * Hydrate the state an object from the SQL <tt>ResultSet</tt>, into
     * an array or "hydrated" values (do not resolve associations yet),
     * and pass the hydrates state to the session.
     */
    private void loadFromResultSet(
            final ResultSet rs,
            final int i,
            final Object object,
            final String instanceEntityName,
            final EntityKey key,
            final String rowIdAlias,
            final LockMode lockMode,
            final Loadable rootPersister,
            final SharedSessionContractImplementor session) throws SQLException, HibernateException {final Serializable id = key.getIdentifier();

        // Get the persister for the _subclass_
        final Loadable persister = (Loadable) getFactory().getEntityPersister( instanceEntityName);

        if (LOG.isTraceEnabled() ) {
            LOG.tracef(
                    "Initializing object from ResultSet: %s",
                    MessageHelper.infoString(
                            persister,
                            id,
                            getFactory())
            );
        }

        boolean fetchAllPropertiesRequested = isEagerPropertyFetchEnabled(i);

        // add temp entry so that the next step is circular-reference
        // safe - only needed because some types don't take proper
        // advantage of two-phase-load (esp. components)
        TwoPhaseLoad.addUninitializedEntity(
                key,
                object,
                persister,
                lockMode,
                session
        );

        //This is not very nice (and quite slow):
        final String[][] cols = persister == rootPersister ?
                getEntityAliases()[i].getSuffixedPropertyAliases() :
                getEntityAliases()[i].getSuffixedPropertyAliases(persister);

        final Object[] values = persister.hydrate(
                rs,
                id,
                object,
                rootPersister,
                cols,
                fetchAllPropertiesRequested,
                session
        );

        final Object rowId = persister.hasRowId() ? rs.getObject( rowIdAlias) : null;

        final AssociationType[] ownerAssociationTypes = getOwnerAssociationTypes();
        if (ownerAssociationTypes != null && ownerAssociationTypes[i] != null ) {String ukName = ownerAssociationTypes[i].getRHSUniqueKeyPropertyName();
            if (ukName != null) {final int index = ( (UniqueKeyLoadable) persister ).getPropertyIndex(ukName);
                final Type type = persister.getPropertyTypes()[index];

                // polymorphism not really handled completely correctly,
                // perhaps...well, actually its ok, assuming that the
                // entity name used in the lookup is the same as the
                // the one used here, which it will be

                EntityUniqueKey euk = new EntityUniqueKey(rootPersister.getEntityName(), //polymorphism comment above
                        ukName,
                        type.semiResolve(values[index], session, object ),
                        type,
                        persister.getEntityMode(),
                        session.getFactory());
                session.getPersistenceContext().addEntity( euk, object);
            }
        }

        TwoPhaseLoad.postHydrate(
                persister,
                id,
                values,
                rowId,
                object,
                lockMode,
                session
        );

    }

    //......
}
  • instanceNotYetLoaded 方法主要 hydrate object,它会调用 loadFromResultSet 方法从 resultSet 提取出对象,然后添加到 hydratedObjects 中再返回;loadFromResultSet 方法主要是通过 TwoPhaseLoad.addUninitializedEntity 方法,调用了 session.getPersistenceContext().addEntity,将该 object 添加到 StatefulPersistenceContext 中;之后使用 persister.hydrate 从 resultSet 提取 values,最后通过 TwoPhaseLoad.postHydrate 方法来创建 managedEntity 并与 object 关联起来

initializeEntitiesAndCollections

hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java

public abstract class Loader {
    //......

    private void initializeEntitiesAndCollections(
            final List hydratedObjects,
            final Object resultSetId,
            final SharedSessionContractImplementor session,
            final boolean readOnly,
            List<AfterLoadAction> afterLoadActions) throws HibernateException {final CollectionPersister[] collectionPersisters = getCollectionPersisters();
        if (collectionPersisters != null) {for ( CollectionPersister collectionPersister : collectionPersisters) {if ( collectionPersister.isArray() ) {
                    //for arrays, we should end the collection load before resolving
                    //the entities, since the actual array instances are not instantiated
                    //during loading
                    //TODO: or we could do this polymorphically, and have two
                    //      different operations implemented differently for arrays
                    endCollectionLoad(resultSetId, session, collectionPersister);
                }
            }
        }

        //important: reuse the same event instances for performance!
        final PreLoadEvent pre;
        final PostLoadEvent post;
        if (session.isEventSource() ) {pre = new PreLoadEvent( (EventSource) session );
            post = new PostLoadEvent((EventSource) session );
        }
        else {
            pre = null;
            post = null;
        }

        if (hydratedObjects != null) {int hydratedObjectsSize = hydratedObjects.size();
            LOG.tracev("Total objects hydrated: {0}", hydratedObjectsSize );
            for (Object hydratedObject : hydratedObjects) {TwoPhaseLoad.initializeEntity( hydratedObject, readOnly, session, pre);
            }
        }

        if (collectionPersisters != null) {for ( CollectionPersister collectionPersister : collectionPersisters) {if ( !collectionPersister.isArray() ) {
                    //for sets, we should end the collection load after resolving
                    //the entities, since we might call hashCode() on the elements
                    //TODO: or we could do this polymorphically, and have two
                    //      different operations implemented differently for arrays
                    endCollectionLoad(resultSetId, session, collectionPersister);
                }
            }
        }

        // Until this entire method is refactored w/ polymorphism, postLoad was
        // split off from initializeEntity.  It *must* occur after
        // endCollectionLoad to ensure the collection is in the
        // persistence context.
        if (hydratedObjects != null) {for ( Object hydratedObject : hydratedObjects) {TwoPhaseLoad.postLoad( hydratedObject, session, post);
                if (afterLoadActions != null) {for ( AfterLoadAction afterLoadAction : afterLoadActions) {final EntityEntry entityEntry = session.getPersistenceContext().getEntry(hydratedObject);
                        if (entityEntry == null) {
                            // big problem
                            throw new HibernateException("Could not locate EntityEntry immediately after two-phase load");
                        }
                        afterLoadAction.afterLoad(session, hydratedObject, (Loadable) entityEntry.getPersister());
                    }
                }
            }
        }
    }

    //......
}
  • initializeEntitiesAndCollections 方法会调用 TwoPhaseLoad.initializeEntity(该方法会调用 persister.setPropertyValues(entity, hydratedState)来将 hydratedState 值填充到 entity 中 ) 来初始化 hydratedObject

小结

  • write-behind cache 是 cache 策略的一种,其主要思路就是更新数据是首先更新 cache,之后 cache 在批量持久化到存储中,比如批量更新到数据,这样做的好处是可以合并数据的多次操作减少 IO
  • hibernate 为了减少数据库连接加锁的时间,设计了 transactional write-behind 的策略,其 persistence context 充当 transactional write-behind cache 的角色,对 entity 的改动都先作用到内存,等到一定时机在 flush 到数据库;具体体现在 Session 类中
  • hibernate 的 Session 对 jdbc 的 connection 进行了包装,它主要是维护了 level one cache,即 ”repeatable read” persistence context;具体体现在 Loader 的 getRow 方法中
  • 在 key 不为 null 的情况下,该方法会设置 object 的值;这里首先通过 session.getEntityUsingInterceptor 方法根据 key 从 session 中寻找该 entity,如果不为 null,则执行 instanceAlreadyLoaded,否则执行 instanceNotYetLoaded 去设置 object
  • getEntityUsingInterceptor 方法首先从 persistenceContext 获取 entity,如果获取不到再调用 getInterceptor().getEntity 获取;如果没有额外设置默认是 EmptyInterceptor,其 getEntity 方法返回 null
  • StatefulPersistenceContext 维护了一个 entitiesByKey 的 map,getEntity 方法直接根据 EntityKey 从该 map 取数据;它同时也提供了 addEntity、removeEntity、replaceDelayedEntityIdentityInsertKeys 等方法来修改 map
  • instanceAlreadyLoaded 方法主要是校验类型是否正确,同时根据 lockMode 信息判断是否要升级 lock mode 等;instanceNotYetLoaded 方法主要 hydrate object,它会调用 loadFromResultSet 方法从 resultSet 提取出对象,然后添加到 hydratedObjects 中再返回;loadFromResultSet 方法主要是通过 TwoPhaseLoad.addUninitializedEntity 方法,调用了 session.getPersistenceContext().addEntity,将该 object 添加到 StatefulPersistenceContext 中;之后使用 persister.hydrate 从 resultSet 提取 values,最后通过 TwoPhaseLoad.postHydrate 方法来创建 managedEntity 并与 object 关联起来
  • initializeEntitiesAndCollections 方法会调用 TwoPhaseLoad.initializeEntity(该方法会调用 persister.setPropertyValues(entity, hydratedState)来将 hydratedState 值填充到 entity 中 ) 来初始化 hydratedObject

doc

  • 缓存更新的套路
  • 极端事务处理模式:Write-behind 缓存
  • Write-behind caching
  • 6. Flushing
  • 8.1. Physical Transactions
  • A beginner’s guide to flush strategies in JPA and Hibernate
  • How does Hibernate guarantee application-level repeatable reads

正文完
 0