import io.netty.util.Recycler;
import org.junit.Assert;
public class Entry {
String data;private Recycler.Handle<Entry> handle;private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { @Override protected Entry newObject(Handle<Entry> handle) { return new Entry(handle); }};public Entry(Recycler.Handle<Entry> handle) { this.handle = handle;}public void recycle() { handle.recycle(this);}public String getData() { return data;}public void setData(String data) { this.data = data;}public static Entry newInstance(String data) {// get入口 Entry entry = RECYCLER.get(); entry.setData(data); return entry;}public static void main(String[] args) { Entry entry =Entry.newInstance("one"); // 回收入口 entry.recycle(); Entry entry1 = Entry.newInstance("two"); Assert.assertSame(entry1, entry);}
}
get函数:
public final T get() {// 如果禁止回收 if (maxCapacityPerThread == 0) { return newObject((Handle<T>) NOOP_HANDLE); } // 获取以后线程的stack。第一次调用会触发初始化函数 Stack<T> stack = threadLocal.get(); // 从stack获取对象 DefaultHandle<T> handle = stack.pop(); if (handle == null) { //如果没有获取到就构建一个 DefaultHandle和对象绑定 handle = stack.newHandle(); // newObject是用户重写的 handle.value = newObject(handle); } return (T) handle.value;}
stack的初始化:
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue() { return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); }};
stack.pop()办法:
DefaultHandle<T> pop() { int size = this.size; if (size == 0) { // 如果stack中没有数据,本人回收的对象会间接放在stack中 // 从其它线程回收对象的queue获取 if (!scavenge()) { return null; } size = this.size; } size --; // stack底层用elements[]这个数组来存储数据 DefaultHandle ret = elements[size]; elements[size] = null; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times"); } // 此对象曾经被应用了 ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; return ret; }
scavenge办法:
boolean scavenge() { // continue an existing scavenge, if any if (scavengeSome()) { return true; } //重置 prev = null; cursor = head; return false; }
scavengeSome办法:每一次转移其实只会针对一个有元素的Link进行操作,这样就不会太影响查找性能。
boolean scavengeSome() { WeakOrderQueue cursor = this.cursor; if (cursor == null) { cursor = head; if (cursor == null) { return false; } } boolean success = false; WeakOrderQueue prev = this.prev; do { if (cursor.transfer(this)) { success = true; break; } WeakOrderQueue next = cursor.next; /** * 如果以后的WeakOrderQueue的线程曾经不可达了,则 * 1、如果该WeakOrderQueue中有数据,则将其中的数据全副转移到以后Stack中 * 2、将以后的WeakOrderQueue的前一个节 * 点prev指向以后的WeakOrderQueue的下一个节点, * 行将以后的WeakOrderQueue从Queue链表中移除。不便后续GC */ if (cursor.owner.get() == null) { if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this)) { success = true; } else { break; } } } if (prev != null) { prev.next = next; } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this.prev = prev; this.cursor = cursor; return success; }
transfer办法:
public <T> boolean transfer(Stack<T> dst) { // 寻找第一个Link(Head不是Link) Link head = this.head.link; // head == null,示意只有Head一个节点,没有存储数据的节点,间接返回 if (head == null) { return false; } // 如果第一个Link节点的readIndex索引曾经达到该Link对象的DefaultHandle[]的尾部, // 则判断以后的Link节点的下一个节点是否为null,如果为null,阐明曾经达到了Link链表尾部,间接返回, // 否则,将以后的Link节点的下一个Link节点赋值给head和this.head.link,进而对下一个Link节点进行操作 if (head.readIndex == LINK_CAPACITY) { if (head.next == null) { return false; } this.head.link = head = head.next; } // 获取Link节点的readIndex,即以后的Link节点的第一个无效元素的地位 int srcStart = head.readIndex; // 获取Link节点的writeIndex,即以后的Link节点的最初一个无效元素的地位 int srcEnd = head.get(); // 计算Link节点中能够被转移的元素个数 int srcSize = srcEnd - srcStart; if (srcSize == 0) { return false; } // 获取转移元素的目的地Stack中以后的元素个数 final int dstSize = dst.size; // 计算期盼的容量 final int expectedCapacity = dstSize + srcSize; /** * 如果expectedCapacity大于目的地Stack的长度 * 1、对目的地Stack进行扩容 * 2、计算Link中最终的可转移的最初一个元素的下标 */ if (expectedCapacity > dst.elements.length) { int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = Math.min(srcEnd, actualCapacity - dstSize + srcStart); } if (srcStart == srcEnd) { // The destination stack is full already. return false; } else { // 获取Link节点的DefaultHandle[] final DefaultHandle[] srcElems = head.elements; // 获取目的地Stack的DefaultHandle[] final DefaultHandle[] dstElems = dst.elements; // dst数组的大小,会随着元素的迁入而减少,如果最初发现没有减少,那么示意没有迁徙胜利任何一个元素 int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { final DefaultHandle element = srcElems[i]; /** * 设置element.recycleId 或者 进行防护性判断 */ if (element.recycledId == 0) { element.recycledId = element.lastRecycledId; } else if (element.recycledId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } // 置空Link节点的DefaultHandle[i] srcElems[i] = null; // 扔掉放弃7/8的元素 if (dst.dropHandle(element)) { continue; } // 将可转移胜利的DefaultHandle元素的stack属性设置为目的地Stack element.stack = dst; // 将DefaultHandle元素转移到目的地Stack的DefaultHandle[newDstSize ++]中 dstElems[newDstSize++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null) { this.head.reclaimSpace(LINK_CAPACITY); // 将Head指向下一个Link,也就是将以后的Link给回收掉了 // 假如之前为Head -> Link1 -> Link2,回收之后为Head -> Link2 this.head.link = head.next; } // 重置readIndex head.readIndex = srcEnd; // 示意没有被回收任何一个对象,间接返回 if (dst.size == newDstSize) { return false; } // 将新的newDstSize赋值给目的地Stack的size dst.size = newDstSize; return true; } }
//回收
recycle办法:最终进入stack.push
void push(DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); if (thread == currentThread) { //本人线程回收本人的对象 // The current Thread is the thread that belongs to the Stack, we can try to push the object now. pushNow(item); } else { //回收其余线程产生的对象 // The current Thread is not the one that belongs to the Stack, we need to signal that the push // happens later. pushLater(item, currentThread); } }
pushNow办法:
private void pushNow(DefaultHandle<T> item) { // (item.recycleId | item.lastRecycleId) != 0 等价于 item.recycleId!=0 && item.lastRecycleId!=0 // 当item开始创立时item.recycleId==0 && item.lastRecycleId==0 // 当item被recycle时,item.recycleId==x,item.lastRecycleId==y 进行赋值 // 当item被poll之后, item.recycleId = item.lastRecycleId = 0 // 所以当item.recycleId 和 item.lastRecycleId 任何一个不为0,则示意回收过 if ((item.recycledId | item.lastRecycledId) != 0) { throw new IllegalStateException("recycled already"); } item.recycledId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; if (size >= maxCapacity || dropHandle(item)) { return; } // stack中的elements扩容两倍,复制元素,将新数组赋值给stack.elements if (size == elements.length) { elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity)); } // 搁置元素 elements[size] = item; this.size = size + 1; }
pushLater办法:
private void pushLater(DefaultHandle<T> item, Thread currentThread) { Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this); if (queue == null) { // 如果DELAYED_RECYCLED中的key-value对曾经达到了maxDelayedQueues,则后续的无奈回收 - 内存保护 if (delayedRecycled.size() >= maxDelayedQueues) { delayedRecycled.put(this, WeakOrderQueue.DUMMY); return; } // 如果这个stack的容量还没用完,就调配一个queue,在调配queue的时候会将queue通过头插法插入stack保护的队列 if ((queue = WeakOrderQueue.allocate(this, currentThread)) == null) { // drop object return; } delayedRecycled.put(this, queue); } else if (queue == WeakOrderQueue.DUMMY) { // drop object return; } queue.add(item); } private WeakOrderQueue(Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<Thread>(thread); synchronized (stack) { next = stack.head; stack.head = this; } // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the // Stack itself GCed. availableSharedCapacity = stack.availableSharedCapacity; }
add:将回收元素退出队列
void add(DefaultHandle<?> handle) { handle.lastRecycledId = id; Link tail = this.tail; int writeIndex; // 判断一个Link对象是否曾经满了: // 如果没满,间接增加; // 如果曾经满了,创立一个新的Link对象,之后重组Link链表 if ((writeIndex = tail.get()) == LINK_CAPACITY) { if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) { // Drop it. return; } // We allocate a Link so reserve the space this.tail = tail = tail.next = new Link(); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; /** * 如果使用者在将DefaultHandle对象压入队列后, * 将Stack设置为null,然而此处的DefaultHandle是持有stack的强援用的,则Stack对象无奈回收; * 而且因为此处DefaultHandle是持有stack的强援用,WeakHashMap中对应stack的WeakOrderQueue也无奈被回收掉了,导致内存透露。 */ handle.stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; // this also means we guarantee visibility of an element in the queue if we see the index updated tail.lazySet(writeIndex + 1); }
每个线程都会有一个FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED。装载着此线程回收其余线程产生的对象。还含有一个FastThreadLocal<Stack<T>> threadLocal,装着这个线程回收的对象和其余线程回收的对象(迁徙过去的)。stack 含有一个weekQueue指针,其余线程回收对象的时候会new一个queue,
而后头插法退出队列。当scavenge的时候,会从head开始遍历,找到一个有数据的queue进行迁徙,将外面的对象挪动到elements[]中去。
参考博客:https://www.jianshu.com/p/854...