共计 16105 个字符,预计需要花费 41 分钟才能阅读完成。
序
本文主要研究一下 hibernate 的 session-level repeatable reads
实例
doInTransaction(session -> {Product product = new Product();
product.setId(1L);
product.setQuantity(7L);
session.persist(product);
});
doInTransaction(session -> {final Product product = (Product) session.get(Product.class, 1L);
try {executeSync(() -> doInTransaction(_session -> {Product otherThreadProduct = (Product) _session.get(Product.class, 1L);
assertNotSame(product, otherThreadProduct);
otherThreadProduct.setQuantity(6L);
}));
Product reloadedProduct = (Product) session.createQuery("from Product").uniqueResult();
assertEquals(7L, reloadedProduct.getQuantity());
assertEquals(6L,
((Number) session
.createSQLQuery("select quantity from product where id = :id")
.setParameter("id", product.getId())
.uniqueResult())
.longValue());
} catch (Exception e) {fail(e.getMessage());
}
});
- 这段代码展示了 hibernate 的 session-level repeatable reads 功能,这里 reloadedProduct 查询返回的是 session 中 id 为 1 的 entity 的缓存 (
但是也向 db 发出了 sql 语句,只是没有使用其返回的 resultSet 的值
),而 project 操作查询则直接根据 jdbc 查询返回的结果返回
实例代码来自 How does Hibernate guarantee application-level repeatable reads
write-behind
write-behind cache
- write-behind cache 是 cache 策略的一种,其主要思路就是更新数据是首先更新 cache,之后 cache 在批量持久化到存储中,比如批量更新到数据,这样做的好处是可以合并数据的多次操作减少 IO
transactional write-behind cache
- hibernate 为了减少数据库连接加锁的时间,设计了 transactional write-behind 的策略,其 persistence context 充当 transactional write-behind cache 的角色,对 entity 的改动都先作用到内存,等到一定时机在 flush 到数据库;具体体现在 Session 类中
To reduce lock contention in the database, the physical database transaction needs to be as short as possible.
Long-running database transactions prevent your application from scaling to a highly-concurrent load. Do not hold a database transaction open during end-user-level work, but open it after the end-user-level work is finished.
This concept is referred to as transactional write-behind.The persistence context acts as a transactional write-behind cache, queuing any entity state change. Like any write-behind cache, changes are first applied in-memory and synchronized with the database during the flush time.
Session
hibernate 的 Session 对 jdbc 的 connection 进行了包装,它主要是维护了 level one cache,即 ”repeatable read” persistence context;具体体现在 Loader 的 getRow 方法中
Behind the scenes, the Hibernate Session wraps a JDBC java.sql.Connection and acts as a factory for org.hibernate.Transaction instances. It maintains a generally “repeatable read” persistence context (first level cache) of the application domain model.
The Hibernate Session acts as a transaction-scoped cache providing repeatable reads for lookup by identifier and queries that result in loading entities.
Loader.getRow
hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java
/**
* Resolve any IDs for currently loaded objects, duplications within the
* <tt>ResultSet</tt>, etc. Instantiate empty objects to be initialized from the
* <tt>ResultSet</tt>. Return an array of objects (a row of results) and an
* array of booleans (by side-effect) that determine whether the corresponding
* object should be initialized.
*/
private Object[] getRow(
final ResultSet rs,
final Loadable[] persisters,
final EntityKey[] keys,
final Object optionalObject,
final EntityKey optionalObjectKey,
final LockMode[] lockModes,
final List hydratedObjects,
final SharedSessionContractImplementor session) throws HibernateException, SQLException {
final int cols = persisters.length;
final EntityAliases[] descriptors = getEntityAliases();
if (LOG.isDebugEnabled() ) {LOG.debugf( "Result row: %s", StringHelper.toString( keys) );
}
final Object[] rowResults = new Object[cols];
for (int i = 0; i < cols; i++) {
Object object = null;
EntityKey key = keys[i];
if (keys[i] == null ) {//do nothing}
else {
//If the object is already loaded, return the loaded one
object = session.getEntityUsingInterceptor(key);
if (object != null) {
//its already loaded so don't need to hydrate it
instanceAlreadyLoaded(
rs,
i,
persisters[i],
key,
object,
lockModes[i],
session
);
}
else {
object = instanceNotYetLoaded(
rs,
i,
persisters[i],
descriptors[i].getRowIdAlias(),
key,
lockModes[i],
optionalObjectKey,
optionalObject,
hydratedObjects,
session
);
}
}
rowResults[i] = object;
}
return rowResults;
}
- 在 key 不为 null 的情况下,该方法会设置 object 的值;这里首先通过 session.getEntityUsingInterceptor 方法根据 key 从 session 中寻找该 entity,如果不为 null,则执行 instanceAlreadyLoaded,否则执行 instanceNotYetLoaded 去设置 object
SessionImpl.getEntityUsingInterceptor
hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/internal/SessionImpl.java
public final class SessionImpl
extends AbstractSessionImpl
implements EventSource, SessionImplementor, HibernateEntityManagerImplementor {
//......
@Override
public Object getEntityUsingInterceptor(EntityKey key) throws HibernateException {checkOpenOrWaitingForAutoClose();
// todo : should this get moved to PersistentContext?
// logically, is PersistentContext the "thing" to which an interceptor gets attached?
final Object result = persistenceContext.getEntity(key);
if (result == null) {final Object newObject = getInterceptor().getEntity(key.getEntityName(), key.getIdentifier());
if (newObject != null) {lock( newObject, LockMode.NONE);
}
return newObject;
}
else {return result;}
}
//......
}
- getEntityUsingInterceptor 方法首先从 persistenceContext 获取 entity,如果获取不到再调用 getInterceptor().getEntity 获取;如果没有额外设置默认是 EmptyInterceptor,其 getEntity 方法返回 null
StatefulPersistenceContext
hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/engine/internal/StatefulPersistenceContext.java
public class StatefulPersistenceContext implements PersistenceContext {
//......
// Loaded entity instances, by EntityKey
private Map<EntityKey, Object> entitiesByKey;
@Override
public Object getEntity(EntityKey key) {return entitiesByKey.get( key);
}
@Override
public void addEntity(EntityKey key, Object entity) {entitiesByKey.put( key, entity);
if(batchFetchQueue != null) {getBatchFetchQueue().removeBatchLoadableEntityKey(key);
}
}
@Override
public Object removeEntity(EntityKey key) {final Object entity = entitiesByKey.remove( key);
final Iterator itr = entitiesByUniqueKey.values().iterator();
while (itr.hasNext() ) {if ( itr.next() == entity ) {itr.remove();
}
}
// Clear all parent cache
parentsByChild.clear();
entitySnapshotsByKey.remove(key);
nullifiableEntityKeys.remove(key);
if(batchFetchQueue != null) {getBatchFetchQueue().removeBatchLoadableEntityKey(key);
getBatchFetchQueue().removeSubselect(key);
}
return entity;
}
@Override
public void replaceDelayedEntityIdentityInsertKeys(EntityKey oldKey, Serializable generatedId) {final Object entity = entitiesByKey.remove( oldKey);
final EntityEntry oldEntry = entityEntryContext.removeEntityEntry(entity);
parentsByChild.clear();
final EntityKey newKey = session.generateEntityKey(generatedId, oldEntry.getPersister() );
addEntity(newKey, entity);
addEntry(
entity,
oldEntry.getStatus(),
oldEntry.getLoadedState(),
oldEntry.getRowId(),
generatedId,
oldEntry.getVersion(),
oldEntry.getLockMode(),
oldEntry.isExistsInDatabase(),
oldEntry.getPersister(),
oldEntry.isBeingReplicated());
}
//......
}
- StatefulPersistenceContext 维护了一个 entitiesByKey 的 map,getEntity 方法直接根据 EntityKey 从该 map 取数据;它同时也提供了 addEntity、removeEntity、replaceDelayedEntityIdentityInsertKeys 等方法来修改 map
instanceAlreadyLoaded
hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java
public abstract class Loader {
//......
/**
* The entity instance is already in the session cache
*/
private void instanceAlreadyLoaded(
final ResultSet rs,
final int i,
final Loadable persister,
final EntityKey key,
final Object object,
final LockMode requestedLockMode,
final SharedSessionContractImplementor session)
throws HibernateException, SQLException {if ( !persister.isInstance( object) ) {
throw new WrongClassException("loaded object was of wrong class" + object.getClass(),
key.getIdentifier(),
persister.getEntityName());
}
if (LockMode.NONE != requestedLockMode && upgradeLocks() ) { //no point doing this if NONE was requested
final EntityEntry entry = session.getPersistenceContext().getEntry( object);
if (entry.getLockMode().lessThan(requestedLockMode) ) {
//we only check the version when _upgrading_ lock modes
if (persister.isVersioned() ) {checkVersion( i, persister, key.getIdentifier(), object, rs, session );
}
//we need to upgrade the lock mode to the mode requested
entry.setLockMode(requestedLockMode);
}
}
}
//......
}
- instanceAlreadyLoaded 方法主要是校验类型是否正确,同时根据 lockMode 信息判断是否要升级 lock mode 等
instanceNotYetLoaded
hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java
public abstract class Loader {
//......
/**
* The entity instance is not in the session cache
*/
private Object instanceNotYetLoaded(
final ResultSet rs,
final int i,
final Loadable persister,
final String rowIdAlias,
final EntityKey key,
final LockMode lockMode,
final EntityKey optionalObjectKey,
final Object optionalObject,
final List hydratedObjects,
final SharedSessionContractImplementor session)
throws HibernateException, SQLException {
final String instanceClass = getInstanceClass(
rs,
i,
persister,
key.getIdentifier(),
session
);
// see if the entity defines reference caching, and if so use the cached reference (if one).
if (session.getCacheMode().isGetEnabled() && persister.canUseReferenceCacheEntries() ) {final EntityDataAccess cache = persister.getCacheAccessStrategy();
final Object ck = cache.generateCacheKey(key.getIdentifier(),
persister,
session.getFactory(),
session.getTenantIdentifier());
final Object cachedEntry = CacheHelper.fromSharedCache(session, ck, cache);
if (cachedEntry != null) {CacheEntry entry = (CacheEntry) persister.getCacheEntryStructure().destructure( cachedEntry, factory);
return ((ReferenceCacheEntryImpl) entry ).getReference();}
}
final Object object;
if (optionalObjectKey != null && key.equals( optionalObjectKey) ) {
//its the given optional object
object = optionalObject;
}
else {
// instantiate a new instance
object = session.instantiate(instanceClass, key.getIdentifier() );
}
//need to hydrate it.
// grab its state from the ResultSet and keep it in the Session
// (but don't yet initialize the object itself)
// note that we acquire LockMode.READ even if it was not requested
LockMode acquiredLockMode = lockMode == LockMode.NONE ? LockMode.READ : lockMode;
loadFromResultSet(
rs,
i,
object,
instanceClass,
key,
rowIdAlias,
acquiredLockMode,
persister,
session
);
//materialize associations (and initialize the object) later
hydratedObjects.add(object);
return object;
}
/**
* Hydrate the state an object from the SQL <tt>ResultSet</tt>, into
* an array or "hydrated" values (do not resolve associations yet),
* and pass the hydrates state to the session.
*/
private void loadFromResultSet(
final ResultSet rs,
final int i,
final Object object,
final String instanceEntityName,
final EntityKey key,
final String rowIdAlias,
final LockMode lockMode,
final Loadable rootPersister,
final SharedSessionContractImplementor session) throws SQLException, HibernateException {final Serializable id = key.getIdentifier();
// Get the persister for the _subclass_
final Loadable persister = (Loadable) getFactory().getEntityPersister( instanceEntityName);
if (LOG.isTraceEnabled() ) {
LOG.tracef(
"Initializing object from ResultSet: %s",
MessageHelper.infoString(
persister,
id,
getFactory())
);
}
boolean fetchAllPropertiesRequested = isEagerPropertyFetchEnabled(i);
// add temp entry so that the next step is circular-reference
// safe - only needed because some types don't take proper
// advantage of two-phase-load (esp. components)
TwoPhaseLoad.addUninitializedEntity(
key,
object,
persister,
lockMode,
session
);
//This is not very nice (and quite slow):
final String[][] cols = persister == rootPersister ?
getEntityAliases()[i].getSuffixedPropertyAliases() :
getEntityAliases()[i].getSuffixedPropertyAliases(persister);
final Object[] values = persister.hydrate(
rs,
id,
object,
rootPersister,
cols,
fetchAllPropertiesRequested,
session
);
final Object rowId = persister.hasRowId() ? rs.getObject( rowIdAlias) : null;
final AssociationType[] ownerAssociationTypes = getOwnerAssociationTypes();
if (ownerAssociationTypes != null && ownerAssociationTypes[i] != null ) {String ukName = ownerAssociationTypes[i].getRHSUniqueKeyPropertyName();
if (ukName != null) {final int index = ( (UniqueKeyLoadable) persister ).getPropertyIndex(ukName);
final Type type = persister.getPropertyTypes()[index];
// polymorphism not really handled completely correctly,
// perhaps...well, actually its ok, assuming that the
// entity name used in the lookup is the same as the
// the one used here, which it will be
EntityUniqueKey euk = new EntityUniqueKey(rootPersister.getEntityName(), //polymorphism comment above
ukName,
type.semiResolve(values[index], session, object ),
type,
persister.getEntityMode(),
session.getFactory());
session.getPersistenceContext().addEntity( euk, object);
}
}
TwoPhaseLoad.postHydrate(
persister,
id,
values,
rowId,
object,
lockMode,
session
);
}
//......
}
- instanceNotYetLoaded 方法主要 hydrate object,它会调用 loadFromResultSet 方法从 resultSet 提取出对象,然后添加到 hydratedObjects 中再返回;loadFromResultSet 方法主要是通过 TwoPhaseLoad.addUninitializedEntity 方法,调用了 session.getPersistenceContext().addEntity,将该 object 添加到 StatefulPersistenceContext 中;之后使用 persister.hydrate 从 resultSet 提取 values,最后通过 TwoPhaseLoad.postHydrate 方法来创建 managedEntity 并与 object 关联起来
initializeEntitiesAndCollections
hibernate-core-5.3.9.Final-sources.jar!/org/hibernate/loader/Loader.java
public abstract class Loader {
//......
private void initializeEntitiesAndCollections(
final List hydratedObjects,
final Object resultSetId,
final SharedSessionContractImplementor session,
final boolean readOnly,
List<AfterLoadAction> afterLoadActions) throws HibernateException {final CollectionPersister[] collectionPersisters = getCollectionPersisters();
if (collectionPersisters != null) {for ( CollectionPersister collectionPersister : collectionPersisters) {if ( collectionPersister.isArray() ) {
//for arrays, we should end the collection load before resolving
//the entities, since the actual array instances are not instantiated
//during loading
//TODO: or we could do this polymorphically, and have two
// different operations implemented differently for arrays
endCollectionLoad(resultSetId, session, collectionPersister);
}
}
}
//important: reuse the same event instances for performance!
final PreLoadEvent pre;
final PostLoadEvent post;
if (session.isEventSource() ) {pre = new PreLoadEvent( (EventSource) session );
post = new PostLoadEvent((EventSource) session );
}
else {
pre = null;
post = null;
}
if (hydratedObjects != null) {int hydratedObjectsSize = hydratedObjects.size();
LOG.tracev("Total objects hydrated: {0}", hydratedObjectsSize );
for (Object hydratedObject : hydratedObjects) {TwoPhaseLoad.initializeEntity( hydratedObject, readOnly, session, pre);
}
}
if (collectionPersisters != null) {for ( CollectionPersister collectionPersister : collectionPersisters) {if ( !collectionPersister.isArray() ) {
//for sets, we should end the collection load after resolving
//the entities, since we might call hashCode() on the elements
//TODO: or we could do this polymorphically, and have two
// different operations implemented differently for arrays
endCollectionLoad(resultSetId, session, collectionPersister);
}
}
}
// Until this entire method is refactored w/ polymorphism, postLoad was
// split off from initializeEntity. It *must* occur after
// endCollectionLoad to ensure the collection is in the
// persistence context.
if (hydratedObjects != null) {for ( Object hydratedObject : hydratedObjects) {TwoPhaseLoad.postLoad( hydratedObject, session, post);
if (afterLoadActions != null) {for ( AfterLoadAction afterLoadAction : afterLoadActions) {final EntityEntry entityEntry = session.getPersistenceContext().getEntry(hydratedObject);
if (entityEntry == null) {
// big problem
throw new HibernateException("Could not locate EntityEntry immediately after two-phase load");
}
afterLoadAction.afterLoad(session, hydratedObject, (Loadable) entityEntry.getPersister());
}
}
}
}
}
//......
}
- initializeEntitiesAndCollections 方法会调用 TwoPhaseLoad.initializeEntity(
该方法会调用 persister.setPropertyValues(entity, hydratedState)来将 hydratedState 值填充到 entity 中
) 来初始化 hydratedObject
小结
- write-behind cache 是 cache 策略的一种,其主要思路就是更新数据是首先更新 cache,之后 cache 在批量持久化到存储中,比如批量更新到数据,这样做的好处是可以合并数据的多次操作减少 IO
- hibernate 为了减少数据库连接加锁的时间,设计了 transactional write-behind 的策略,其 persistence context 充当 transactional write-behind cache 的角色,对 entity 的改动都先作用到内存,等到一定时机在 flush 到数据库;具体体现在 Session 类中
- hibernate 的 Session 对 jdbc 的 connection 进行了包装,它主要是维护了 level one cache,即 ”repeatable read” persistence context;具体体现在 Loader 的 getRow 方法中
- 在 key 不为 null 的情况下,该方法会设置 object 的值;这里首先通过 session.getEntityUsingInterceptor 方法根据 key 从 session 中寻找该 entity,如果不为 null,则执行 instanceAlreadyLoaded,否则执行 instanceNotYetLoaded 去设置 object
- getEntityUsingInterceptor 方法首先从 persistenceContext 获取 entity,如果获取不到再调用 getInterceptor().getEntity 获取;如果没有额外设置默认是 EmptyInterceptor,其 getEntity 方法返回 null
- StatefulPersistenceContext 维护了一个 entitiesByKey 的 map,getEntity 方法直接根据 EntityKey 从该 map 取数据;它同时也提供了 addEntity、removeEntity、replaceDelayedEntityIdentityInsertKeys 等方法来修改 map
- instanceAlreadyLoaded 方法主要是校验类型是否正确,同时根据 lockMode 信息判断是否要升级 lock mode 等;instanceNotYetLoaded 方法主要 hydrate object,它会调用 loadFromResultSet 方法从 resultSet 提取出对象,然后添加到 hydratedObjects 中再返回;loadFromResultSet 方法主要是通过 TwoPhaseLoad.addUninitializedEntity 方法,调用了 session.getPersistenceContext().addEntity,将该 object 添加到 StatefulPersistenceContext 中;之后使用 persister.hydrate 从 resultSet 提取 values,最后通过 TwoPhaseLoad.postHydrate 方法来创建 managedEntity 并与 object 关联起来
- initializeEntitiesAndCollections 方法会调用 TwoPhaseLoad.initializeEntity(
该方法会调用 persister.setPropertyValues(entity, hydratedState)来将 hydratedState 值填充到 entity 中
) 来初始化 hydratedObject
doc
- 缓存更新的套路
- 极端事务处理模式:Write-behind 缓存
- Write-behind caching
- 6. Flushing
- 8.1. Physical Transactions
- A beginner’s guide to flush strategies in JPA and Hibernate
- How does Hibernate guarantee application-level repeatable reads