共计 7998 个字符,预计需要花费 20 分钟才能阅读完成。
import io.netty.util.Recycler;
import org.junit.Assert;
public class Entry {
String data;
private Recycler.Handle<Entry> handle;
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle<Entry> handle) {return new Entry(handle);
}
};
public Entry(Recycler.Handle<Entry> handle) {this.handle = handle;}
public void recycle() {handle.recycle(this);
}
public String getData() {return data;}
public void setData(String data) {this.data = data;}
public static Entry newInstance(String data) {
// get 入口
Entry entry = RECYCLER.get();
entry.setData(data);
return entry;
}
public static void main(String[] args) {Entry entry =Entry.newInstance("one");
// 回收入口
entry.recycle();
Entry entry1 = Entry.newInstance("two");
Assert.assertSame(entry1, entry);
}
}
get 函数:
public final T get() {
// 如果禁止回收
if (maxCapacityPerThread == 0) {return newObject((Handle<T>) NOOP_HANDLE);
}
// 获取以后线程的 stack。第一次调用会触发初始化函数
Stack<T> stack = threadLocal.get();
// 从 stack 获取对象
DefaultHandle<T> handle = stack.pop();
if (handle == null) { // 如果没有获取到就构建一个 DefaultHandle 和对象绑定
handle = stack.newHandle();
// newObject 是用户重写的
handle.value = newObject(handle);
}
return (T) handle.value;
}
stack 的初始化:
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
};
stack.pop() 办法:
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) { // 如果 stack 中没有数据,本人回收的对象会间接放在 stack 中
// 从其它线程回收对象的 queue 获取
if (!scavenge()) {return null;}
size = this.size;
}
size --;
// stack 底层用 elements[] 这个数组来存储数据
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {throw new IllegalStateException("recycled multiple times");
}
// 此对象曾经被应用了
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
scavenge 办法:
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {return true;}
// 重置
prev = null;
cursor = head;
return false;
}
scavengeSome 办法:每一次转移其实只会针对一个有元素的 Link 进行操作,这样就不会太影响查找性能。
boolean scavengeSome() {
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {return false;}
}
boolean success = false;
WeakOrderQueue prev = this.prev;
do {if (cursor.transfer(this)) {
success = true;
break;
}
WeakOrderQueue next = cursor.next;
/**
* 如果以后的 WeakOrderQueue 的线程曾经不可达了,则
* 1、如果该 WeakOrderQueue 中有数据,则将其中的数据全副转移到以后 Stack 中
* 2、将以后的 WeakOrderQueue 的前一个节
* 点 prev 指向以后的 WeakOrderQueue 的下一个节点,* 行将以后的 WeakOrderQueue 从 Queue 链表中移除。不便后续 GC
*/
if (cursor.owner.get() == null) {if (cursor.hasFinalData()) {for (;;) {if (cursor.transfer(this)) {success = true;} else {break;}
}
}
if (prev != null) {prev.next = next;}
} else {prev = cursor;}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
transfer 办法:
public <T> boolean transfer(Stack<T> dst) {
// 寻找第一个 Link(Head 不是 Link)Link head = this.head.link;
// head == null,示意只有 Head 一个节点,没有存储数据的节点,间接返回
if (head == null) {return false;}
// 如果第一个 Link 节点的 readIndex 索引曾经达到该 Link 对象的 DefaultHandle[] 的尾部,// 则判断以后的 Link 节点的下一个节点是否为 null,如果为 null,阐明曾经达到了 Link 链表尾部,间接返回,// 否则,将以后的 Link 节点的下一个 Link 节点赋值给 head 和 this.head.link,进而对下一个 Link 节点进行操作
if (head.readIndex == LINK_CAPACITY) {if (head.next == null) {return false;}
this.head.link = head = head.next;
}
// 获取 Link 节点的 readIndex, 即以后的 Link 节点的第一个无效元素的地位
int srcStart = head.readIndex;
// 获取 Link 节点的 writeIndex,即以后的 Link 节点的最初一个无效元素的地位
int srcEnd = head.get();
// 计算 Link 节点中能够被转移的元素个数
int srcSize = srcEnd - srcStart;
if (srcSize == 0) {return false;}
// 获取转移元素的目的地 Stack 中以后的元素个数
final int dstSize = dst.size;
// 计算期盼的容量
final int expectedCapacity = dstSize + srcSize;
/**
* 如果 expectedCapacity 大于目的地 Stack 的长度
* 1、对目的地 Stack 进行扩容
* 2、计算 Link 中最终的可转移的最初一个元素的下标
*/
if (expectedCapacity > dst.elements.length) {int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = Math.min(srcEnd, actualCapacity - dstSize + srcStart);
}
if (srcStart == srcEnd) {
// The destination stack is full already.
return false;
} else {// 获取 Link 节点的 DefaultHandle[]
final DefaultHandle[] srcElems = head.elements;
// 获取目的地 Stack 的 DefaultHandle[]
final DefaultHandle[] dstElems = dst.elements;
// dst 数组的大小,会随着元素的迁入而减少,如果最初发现没有减少,那么示意没有迁徙胜利任何一个元素
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {final DefaultHandle element = srcElems[i];
/**
* 设置 element.recycleId 或者 进行防护性判断
*/
if (element.recycledId == 0) {element.recycledId = element.lastRecycledId;} else if (element.recycledId != element.lastRecycledId) {throw new IllegalStateException("recycled already");
}
// 置空 Link 节点的 DefaultHandle[i]
srcElems[i] = null;
// 扔掉放弃 7 / 8 的元素
if (dst.dropHandle(element)) {continue;}
// 将可转移胜利的 DefaultHandle 元素的 stack 属性设置为目的地 Stack
element.stack = dst;
// 将 DefaultHandle 元素转移到目的地 Stack 的 DefaultHandle[newDstSize ++] 中
dstElems[newDstSize++] = element;
}
if (srcEnd == LINK_CAPACITY && head.next != null) {this.head.reclaimSpace(LINK_CAPACITY);
// 将 Head 指向下一个 Link,也就是将以后的 Link 给回收掉了
// 假如之前为 Head -> Link1 -> Link2,回收之后为 Head -> Link2
this.head.link = head.next;
}
// 重置 readIndex
head.readIndex = srcEnd;
// 示意没有被回收任何一个对象,间接返回
if (dst.size == newDstSize) {return false;}
// 将新的 newDstSize 赋值给目的地 Stack 的 size
dst.size = newDstSize;
return true;
}
}
// 回收
recycle 办法:最终进入 stack.push
void push(DefaultHandle<?> item) {Thread currentThread = Thread.currentThread();
if (thread == currentThread) { // 本人线程回收本人的对象
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else { // 回收其余线程产生的对象
// The current Thread is not the one that belongs to the Stack, we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
pushNow 办法:
private void pushNow(DefaultHandle<T> item) {// (item.recycleId | item.lastRecycleId) != 0 等价于 item.recycleId!=0 && item.lastRecycleId!=0
// 当 item 开始创立时 item.recycleId==0 && item.lastRecycleId==0
// 当 item 被 recycle 时,item.recycleId==x,item.lastRecycleId==y 进行赋值
// 当 item 被 poll 之后,item.recycleId = item.lastRecycleId = 0
// 所以当 item.recycleId 和 item.lastRecycleId 任何一个不为 0,则示意回收过
if ((item.recycledId | item.lastRecycledId) != 0) {throw new IllegalStateException("recycled already");
}
item.recycledId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {return;}
// stack 中的 elements 扩容两倍,复制元素,将新数组赋值给 stack.elements
if (size == elements.length) {elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
}
// 搁置元素
elements[size] = item;
this.size = size + 1;
}
pushLater 办法:
private void pushLater(DefaultHandle<T> item, Thread currentThread) {Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
// 如果 DELAYED_RECYCLED 中的 key-value 对曾经达到了 maxDelayedQueues,则后续的无奈回收 - 内存保护
if (delayedRecycled.size() >= maxDelayedQueues) {delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// 如果这个 stack 的容量还没用完,就调配一个 queue,在调配 queue 的时候会将 queue 通过头插法插入 stack 保护的队列
if ((queue = WeakOrderQueue.allocate(this, currentThread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
queue.add(item);
}
private WeakOrderQueue(Stack<?> stack, Thread thread) {head = tail = new Link();
owner = new WeakReference<Thread>(thread);
synchronized (stack) {
next = stack.head;
stack.head = this;
}
// Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
// the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
// Stack itself GCed.
availableSharedCapacity = stack.availableSharedCapacity;
}
add:将回收元素退出队列
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
Link tail = this.tail;
int writeIndex;
// 判断一个 Link 对象是否曾经满了:// 如果没满,间接增加;// 如果曾经满了,创立一个新的 Link 对象,之后重组 Link 链表
if ((writeIndex = tail.get()) == LINK_CAPACITY) {if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
// Drop it.
return;
}
// We allocate a Link so reserve the space
this.tail = tail = tail.next = new Link();
writeIndex = tail.get();}
tail.elements[writeIndex] = handle;
/**
* 如果使用者在将 DefaultHandle 对象压入队列后,* 将 Stack 设置为 null,然而此处的 DefaultHandle 是持有 stack 的强援用的,则 Stack 对象无奈回收;* 而且因为此处 DefaultHandle 是持有 stack 的强援用,WeakHashMap 中对应 stack 的 WeakOrderQueue 也无奈被回收掉了,导致内存透露。*/
handle.stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
tail.lazySet(writeIndex + 1);
}
每个线程都会有一个 FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED。装载着此线程回收其余线程产生的对象。还含有一个 FastThreadLocal<Stack<T>> threadLocal,装着这个线程回收的对象和其余线程回收的对象(迁徙过去的)。stack 含有一个 weekQueue 指针,其余线程回收对象的时候会 new 一个 queue,
而后头插法退出队列。当 scavenge 的时候,会从 head 开始遍历,找到一个有数据的 queue 进行迁徙,将外面的对象挪动到 elements[] 中去。
参考博客:https://www.jianshu.com/p/854…
正文完