聊聊flink的MemorySegment

32次阅读

共计 25601 个字符,预计需要花费 65 分钟才能阅读完成。


本文主要研究一下 flink 的 MemorySegment
MemorySegment
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@Internal
public abstract class MemorySegment {

@SuppressWarnings(“restriction”)
protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

@SuppressWarnings(“restriction”)
protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final boolean LITTLE_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);

// ————————————————————————

protected final byte[] heapMemory;

protected long address;

protected final long addressLimit;

protected final int size;

private final Object owner;

MemorySegment(byte[] buffer, Object owner) {
if (buffer == null) {
throw new NullPointerException(“buffer”);
}

this.heapMemory = buffer;
this.address = BYTE_ARRAY_BASE_OFFSET;
this.size = buffer.length;
this.addressLimit = this.address + this.size;
this.owner = owner;
}

MemorySegment(long offHeapAddress, int size, Object owner) {
if (offHeapAddress <= 0) {
throw new IllegalArgumentException(“negative pointer or size”);
}
if (offHeapAddress >= Long.MAX_VALUE – Integer.MAX_VALUE) {
// this is necessary to make sure the collapsed checks are safe against numeric overflows
throw new IllegalArgumentException(“Segment initialized with too large address: ” + offHeapAddress
+ ” ; Max allowed address is ” + (Long.MAX_VALUE – Integer.MAX_VALUE – 1));
}

this.heapMemory = null;
this.address = offHeapAddress;
this.addressLimit = this.address + size;
this.size = size;
this.owner = owner;
}

// ————————————————————————
// Memory Segment Operations
// ————————————————————————

public int size() {
return size;
}

public boolean isFreed() {
return address > addressLimit;
}

public void free() {
// this ensures we can place no more data and trigger
// the checks for the freed segment
address = addressLimit + 1;
}

public boolean isOffHeap() {
return heapMemory == null;
}

public byte[] getArray() {
if (heapMemory != null) {
return heapMemory;
} else {
throw new IllegalStateException(“Memory segment does not represent heap memory”);
}
}

public long getAddress() {
if (heapMemory == null) {
return address;
} else {
throw new IllegalStateException(“Memory segment does not represent off heap memory”);
}
}

public abstract ByteBuffer wrap(int offset, int length);

public Object getOwner() {
return owner;
}

// ————————————————————————
// Random Access get() and put() methods
// ————————————————————————

//————————————————————————
// Notes on the implementation: We try to collapse as many checks as
// possible. We need to obey the following rules to make this safe
// against segfaults:
//
// – Grab mutable fields onto the stack before checking and using. This
// guards us against concurrent modifications which invalidate the
// pointers
// – Use subtractions for range checks, as they are tolerant
//————————————————————————

public abstract byte get(int index);

public abstract void put(int index, byte b);

public abstract void get(int index, byte[] dst);

public abstract void put(int index, byte[] src);

public abstract void get(int index, byte[] dst, int offset, int length);

public abstract void put(int index, byte[] src, int offset, int length);

public abstract boolean getBoolean(int index);

public abstract void putBoolean(int index, boolean value);

@SuppressWarnings(“restriction”)
public final char getChar(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 2) {
return UNSAFE.getChar(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException(“This segment has been freed.”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final char getCharLittleEndian(int index) {
if (LITTLE_ENDIAN) {
return getChar(index);
} else {
return Character.reverseBytes(getChar(index));
}
}

public final char getCharBigEndian(int index) {
if (LITTLE_ENDIAN) {
return Character.reverseBytes(getChar(index));
} else {
return getChar(index);
}
}

@SuppressWarnings(“restriction”)
public final void putChar(int index, char value) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 2) {
UNSAFE.putChar(heapMemory, pos, value);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final void putCharLittleEndian(int index, char value) {
if (LITTLE_ENDIAN) {
putChar(index, value);
} else {
putChar(index, Character.reverseBytes(value));
}
}

public final void putCharBigEndian(int index, char value) {
if (LITTLE_ENDIAN) {
putChar(index, Character.reverseBytes(value));
} else {
putChar(index, value);
}
}

public final short getShort(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 2) {
return UNSAFE.getShort(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final short getShortLittleEndian(int index) {
if (LITTLE_ENDIAN) {
return getShort(index);
} else {
return Short.reverseBytes(getShort(index));
}
}

public final short getShortBigEndian(int index) {
if (LITTLE_ENDIAN) {
return Short.reverseBytes(getShort(index));
} else {
return getShort(index);
}
}

public final void putShort(int index, short value) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 2) {
UNSAFE.putShort(heapMemory, pos, value);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final void putShortLittleEndian(int index, short value) {
if (LITTLE_ENDIAN) {
putShort(index, value);
} else {
putShort(index, Short.reverseBytes(value));
}
}

public final void putShortBigEndian(int index, short value) {
if (LITTLE_ENDIAN) {
putShort(index, Short.reverseBytes(value));
} else {
putShort(index, value);
}
}

public final int getInt(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 4) {
return UNSAFE.getInt(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final int getIntLittleEndian(int index) {
if (LITTLE_ENDIAN) {
return getInt(index);
} else {
return Integer.reverseBytes(getInt(index));
}
}

public final int getIntBigEndian(int index) {
if (LITTLE_ENDIAN) {
return Integer.reverseBytes(getInt(index));
} else {
return getInt(index);
}
}

public final void putInt(int index, int value) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 4) {
UNSAFE.putInt(heapMemory, pos, value);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final void putIntLittleEndian(int index, int value) {
if (LITTLE_ENDIAN) {
putInt(index, value);
} else {
putInt(index, Integer.reverseBytes(value));
}
}

public final void putIntBigEndian(int index, int value) {
if (LITTLE_ENDIAN) {
putInt(index, Integer.reverseBytes(value));
} else {
putInt(index, value);
}
}

public final long getLong(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 8) {
return UNSAFE.getLong(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final long getLongLittleEndian(int index) {
if (LITTLE_ENDIAN) {
return getLong(index);
} else {
return Long.reverseBytes(getLong(index));
}
}

public final long getLongBigEndian(int index) {
if (LITTLE_ENDIAN) {
return Long.reverseBytes(getLong(index));
} else {
return getLong(index);
}
}

public final void putLong(int index, long value) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit – 8) {
UNSAFE.putLong(heapMemory, pos, value);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

public final void putLongLittleEndian(int index, long value) {
if (LITTLE_ENDIAN) {
putLong(index, value);
} else {
putLong(index, Long.reverseBytes(value));
}
}

public final void putLongBigEndian(int index, long value) {
if (LITTLE_ENDIAN) {
putLong(index, Long.reverseBytes(value));
} else {
putLong(index, value);
}
}

public final float getFloat(int index) {
return Float.intBitsToFloat(getInt(index));
}

public final float getFloatLittleEndian(int index) {
return Float.intBitsToFloat(getIntLittleEndian(index));
}

public final float getFloatBigEndian(int index) {
return Float.intBitsToFloat(getIntBigEndian(index));
}

public final void putFloat(int index, float value) {
putInt(index, Float.floatToRawIntBits(value));
}

public final void putFloatLittleEndian(int index, float value) {
putIntLittleEndian(index, Float.floatToRawIntBits(value));
}

public final void putFloatBigEndian(int index, float value) {
putIntBigEndian(index, Float.floatToRawIntBits(value));
}

public final double getDouble(int index) {
return Double.longBitsToDouble(getLong(index));
}

public final double getDoubleLittleEndian(int index) {
return Double.longBitsToDouble(getLongLittleEndian(index));
}

public final double getDoubleBigEndian(int index) {
return Double.longBitsToDouble(getLongBigEndian(index));
}

public final void putDouble(int index, double value) {
putLong(index, Double.doubleToRawLongBits(value));
}

public final void putDoubleLittleEndian(int index, double value) {
putLongLittleEndian(index, Double.doubleToRawLongBits(value));
}

public final void putDoubleBigEndian(int index, double value) {
putLongBigEndian(index, Double.doubleToRawLongBits(value));
}

// ————————————————————————-
// Bulk Read and Write Methods
// ————————————————————————-

public abstract void get(DataOutput out, int offset, int length) throws IOException;

public abstract void put(DataInput in, int offset, int length) throws IOException;

public abstract void get(int offset, ByteBuffer target, int numBytes);

public abstract void put(int offset, ByteBuffer source, int numBytes);

public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes) {
final byte[] thisHeapRef = this.heapMemory;
final byte[] otherHeapRef = target.heapMemory;
final long thisPointer = this.address + offset;
final long otherPointer = target.address + targetOffset;

if ((numBytes | offset | targetOffset) >= 0 &&
thisPointer <= this.addressLimit – numBytes && otherPointer <= target.addressLimit – numBytes) {
UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes);
}
else if (this.address > this.addressLimit) {
throw new IllegalStateException(“this memory segment has been freed.”);
}
else if (target.address > target.addressLimit) {
throw new IllegalStateException(“target memory segment has been freed.”);
}
else {
throw new IndexOutOfBoundsException(
String.format(“offset=%d, targetOffset=%d, numBytes=%d, address=%d, targetAddress=%d”,
offset, targetOffset, numBytes, this.address, target.address));
}
}

// ————————————————————————-
// Comparisons & Swapping
// ————————————————————————-

public final int compare(MemorySegment seg2, int offset1, int offset2, int len) {
while (len >= 8) {
long l1 = this.getLongBigEndian(offset1);
long l2 = seg2.getLongBigEndian(offset2);

if (l1 != l2) {
return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
}

offset1 += 8;
offset2 += 8;
len -= 8;
}
while (len > 0) {
int b1 = this.get(offset1) & 0xff;
int b2 = seg2.get(offset2) & 0xff;
int cmp = b1 – b2;
if (cmp != 0) {
return cmp;
}
offset1++;
offset2++;
len–;
}
return 0;
}

public final void swapBytes(byte[] tempBuffer, MemorySegment seg2, int offset1, int offset2, int len) {
if ((offset1 | offset2 | len | (tempBuffer.length – len)) >= 0) {
final long thisPos = this.address + offset1;
final long otherPos = seg2.address + offset2;

if (thisPos <= this.addressLimit – len && otherPos <= seg2.addressLimit – len) {
// this -> temp buffer
UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, BYTE_ARRAY_BASE_OFFSET, len);

// other -> this
UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len);

// temp buffer -> other
UNSAFE.copyMemory(tempBuffer, BYTE_ARRAY_BASE_OFFSET, seg2.heapMemory, otherPos, len);
return;
}
else if (this.address > this.addressLimit) {
throw new IllegalStateException(“this memory segment has been freed.”);
}
else if (seg2.address > seg2.addressLimit) {
throw new IllegalStateException(“other memory segment has been freed.”);
}
}

// index is in fact invalid
throw new IndexOutOfBoundsException(
String.format(“offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d”,
offset1, offset2, len, tempBuffer.length, this.address, seg2.address));
}
}

MemorySegment 有点类似 java.nio.ByteBuffer;它有一个 byte[]类型的 heapMemory 属性;它有两个构造器,带有 byte[]类型参数的构造器会将 byte[]赋给 heapMemory,不带 byte[]类型参数的构造器则 heapMemory 为 null;isOffHeap 方法则用于判断当前的 memory segment 是 heap 还是 off-heap,它根据 heapMemory 是否为 null 来判断,如果为 null 则是 off-heap;另外提供了 compare、swapBytes、copyTo 方法;还显示提供了 BigEndian 及 LittleEndian 的 get、put 方法
BigEndian 的相关方法有:get/putCharBigEndian、get/putShortBigEndian、get/putIntBigEndian、get/putLongBigEndian、get/putFloatBigEndian、get/putDoubleBigEndian;LittleEndian 的相关方法有:get/putCharLittleEndian、get/putShortLittleEndian、get/putIntLittleEndian、get/putLongLittleEndian、get/putFloatLittleEndian、get/putDoubleLittleEndian
MemorySegment 定义了 free、wrap、get、put、getBoolean、putBoolean 抽象方法,要求子类去实现;MemorySegment 有两个子类,分别是 HeapMemorySegment、HybridMemorySegment

HeapMemorySegment
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@SuppressWarnings(“unused”)
@Internal
public final class HeapMemorySegment extends MemorySegment {

private byte[] memory;

HeapMemorySegment(byte[] memory) {
this(memory, null);
}

HeapMemorySegment(byte[] memory, Object owner) {
super(Objects.requireNonNull(memory), owner);
this.memory = memory;
}

// ————————————————————————-
// MemorySegment operations
// ————————————————————————-

@Override
public void free() {
super.free();
this.memory = null;
}

@Override
public ByteBuffer wrap(int offset, int length) {
try {
return ByteBuffer.wrap(this.memory, offset, length);
}
catch (NullPointerException e) {
throw new IllegalStateException(“segment has been freed”);
}
}

public byte[] getArray() {
return this.heapMemory;
}

// ————————————————————————
// Random Access get() and put() methods
// ————————————————————————

@Override
public final byte get(int index) {
return this.memory[index];
}

@Override
public final void put(int index, byte b) {
this.memory[index] = b;
}

@Override
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}

@Override
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}

@Override
public final void get(int index, byte[] dst, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(this.memory, index, dst, offset, length);
}

@Override
public final void put(int index, byte[] src, int offset, int length) {
// system arraycopy does the boundary checks anyways, no need to check extra
System.arraycopy(src, offset, this.memory, index, length);
}

@Override
public final boolean getBoolean(int index) {
return this.memory[index] != 0;
}

@Override
public final void putBoolean(int index, boolean value) {
this.memory[index] = (byte) (value ? 1 : 0);
}

// ————————————————————————-
// Bulk Read and Write Methods
// ————————————————————————-

@Override
public final void get(DataOutput out, int offset, int length) throws IOException {
out.write(this.memory, offset, length);
}

@Override
public final void put(DataInput in, int offset, int length) throws IOException {
in.readFully(this.memory, offset, length);
}

@Override
public final void get(int offset, ByteBuffer target, int numBytes) {
// ByteBuffer performs the boundary checks
target.put(this.memory, offset, numBytes);
}

@Override
public final void put(int offset, ByteBuffer source, int numBytes) {
// ByteBuffer performs the boundary checks
source.get(this.memory, offset, numBytes);
}

// ————————————————————————-
// Factoring
// ————————————————————————-

/**
* A memory segment factory that produces heap memory segments. Note that this factory does not
* support to allocate off-heap memory.
*/
public static final class HeapMemorySegmentFactory {

public HeapMemorySegment wrap(byte[] memory) {
return new HeapMemorySegment(memory);
}

public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
return new HeapMemorySegment(new byte[size], owner);
}

public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
return new HeapMemorySegment(memory, owner);
}

/**
* Prevent external instantiation.
*/
HeapMemorySegmentFactory() {}
}

public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
}
HeapMemorySegment 继承了 MemorySegment,它有一个 byte[]的 memory 属性,free 操作会将 memory 设置为 null,wrap 方法使用的是 memory 属性;它的构造器要求传入的 memory 不能为 null,然后赋给父类的 heapMemory 属性及自己定义的 memory 属性(引用);它还定义了 HeapMemorySegmentFactory,提供了 wrap、allocateUnpooledSegment、wrapPooledHeapMemory 方法
HybridMemorySegment
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@Internal
public final class HybridMemorySegment extends MemorySegment {

/**
* The direct byte buffer that allocated the off-heap memory. This memory segment holds a
* reference to that buffer, so as long as this memory segment lives, the memory will not be
* released.
*/
private final ByteBuffer offHeapBuffer;

/**
* Creates a new memory segment that represents the memory backing the given direct byte buffer.
* Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
* otherwise this method with throw an IllegalArgumentException.
*
* <p>The owner referenced by this memory segment is null.
*
* @param buffer The byte buffer whose memory is represented by this memory segment.
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(ByteBuffer buffer) {
this(buffer, null);
}

/**
* Creates a new memory segment that represents the memory backing the given direct byte buffer.
* Note that the given ByteBuffer must be direct {@link java.nio.ByteBuffer#allocateDirect(int)},
* otherwise this method with throw an IllegalArgumentException.
*
* <p>The memory segment references the given owner.
*
* @param buffer The byte buffer whose memory is represented by this memory segment.
* @param owner The owner references by this memory segment.
* @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
*/
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}

/**
* Creates a new memory segment that represents the memory of the byte array.
*
* <p>The owner referenced by this memory segment is null.
*
* @param buffer The byte array whose memory is represented by this memory segment.
*/
HybridMemorySegment(byte[] buffer) {
this(buffer, null);
}

/**
* Creates a new memory segment that represents the memory of the byte array.
*
* <p>The memory segment references the given owner.
*
* @param buffer The byte array whose memory is represented by this memory segment.
* @param owner The owner references by this memory segment.
*/
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}

// ————————————————————————-
// MemorySegment operations
// ————————————————————————-

/**
* Gets the buffer that owns the memory of this memory segment.
*
* @return The byte buffer that owns the memory of this memory segment.
*/
public ByteBuffer getOffHeapBuffer() {
if (offHeapBuffer != null) {
return offHeapBuffer;
} else {
throw new IllegalStateException(“Memory segment does not represent off heap memory”);
}
}

@Override
public ByteBuffer wrap(int offset, int length) {
if (address <= addressLimit) {
if (heapMemory != null) {
return ByteBuffer.wrap(heapMemory, offset, length);
}
else {
try {
ByteBuffer wrapper = offHeapBuffer.duplicate();
wrapper.limit(offset + length);
wrapper.position(offset);
return wrapper;
}
catch (IllegalArgumentException e) {
throw new IndexOutOfBoundsException();
}
}
}
else {
throw new IllegalStateException(“segment has been freed”);
}
}

// ————————————————————————
// Random Access get() and put() methods
// ————————————————————————

@Override
public final byte get(int index) {
final long pos = address + index;
if (index >= 0 && pos < addressLimit) {
return UNSAFE.getByte(heapMemory, pos);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

@Override
public final void put(int index, byte b) {
final long pos = address + index;
if (index >= 0 && pos < addressLimit) {
UNSAFE.putByte(heapMemory, pos, b);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

@Override
public final void get(int index, byte[] dst) {
get(index, dst, 0, dst.length);
}

@Override
public final void put(int index, byte[] src) {
put(index, src, 0, src.length);
}

@Override
public final void get(int index, byte[] dst, int offset, int length) {
// check the byte array offset and length and the status
if ((offset | length | (offset + length) | (dst.length – (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
}

final long pos = address + index;
if (index >= 0 && pos <= addressLimit – length) {
final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
UNSAFE.copyMemory(heapMemory, pos, dst, arrayAddress, length);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

@Override
public final void put(int index, byte[] src, int offset, int length) {
// check the byte array offset and length
if ((offset | length | (offset + length) | (src.length – (offset + length))) < 0) {
throw new IndexOutOfBoundsException();
}

final long pos = address + index;

if (index >= 0 && pos <= addressLimit – length) {
final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset;
UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}

@Override
public final boolean getBoolean(int index) {
return get(index) != 0;
}

@Override
public final void putBoolean(int index, boolean value) {
put(index, (byte) (value ? 1 : 0));
}

// ————————————————————————-
// Bulk Read and Write Methods
// ————————————————————————-

@Override
public final void get(DataOutput out, int offset, int length) throws IOException {
if (address <= addressLimit) {
if (heapMemory != null) {
out.write(heapMemory, offset, length);
}
else {
while (length >= 8) {
out.writeLong(getLongBigEndian(offset));
offset += 8;
length -= 8;
}

while (length > 0) {
out.writeByte(get(offset));
offset++;
length–;
}
}
}
else {
throw new IllegalStateException(“segment has been freed”);
}
}

@Override
public final void put(DataInput in, int offset, int length) throws IOException {
if (address <= addressLimit) {
if (heapMemory != null) {
in.readFully(heapMemory, offset, length);
}
else {
while (length >= 8) {
putLongBigEndian(offset, in.readLong());
offset += 8;
length -= 8;
}
while (length > 0) {
put(offset, in.readByte());
offset++;
length–;
}
}
}
else {
throw new IllegalStateException(“segment has been freed”);
}
}

@Override
public final void get(int offset, ByteBuffer target, int numBytes) {
// check the byte array offset and length
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}

final int targetOffset = target.position();
final int remaining = target.remaining();

if (remaining < numBytes) {
throw new BufferOverflowException();
}

if (target.isDirect()) {
if (target.isReadOnly()) {
throw new ReadOnlyBufferException();
}

// copy to the target memory directly
final long targetPointer = getAddress(target) + targetOffset;
final long sourcePointer = address + offset;

if (sourcePointer <= addressLimit – numBytes) {
UNSAFE.copyMemory(heapMemory, sourcePointer, null, targetPointer, numBytes);
target.position(targetOffset + numBytes);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
throw new IndexOutOfBoundsException();
}
}
else if (target.hasArray()) {
// move directly into the byte array
get(offset, target.array(), targetOffset + target.arrayOffset(), numBytes);

// this must be after the get() call to ensue that the byte buffer is not
// modified in case the call fails
target.position(targetOffset + numBytes);
}
else {
// neither heap buffer nor direct buffer
while (target.hasRemaining()) {
target.put(get(offset++));
}
}
}

@Override
public final void put(int offset, ByteBuffer source, int numBytes) {
// check the byte array offset and length
if ((offset | numBytes | (offset + numBytes)) < 0) {
throw new IndexOutOfBoundsException();
}

final int sourceOffset = source.position();
final int remaining = source.remaining();

if (remaining < numBytes) {
throw new BufferUnderflowException();
}

if (source.isDirect()) {
// copy to the target memory directly
final long sourcePointer = getAddress(source) + sourceOffset;
final long targetPointer = address + offset;

if (targetPointer <= addressLimit – numBytes) {
UNSAFE.copyMemory(null, sourcePointer, heapMemory, targetPointer, numBytes);
source.position(sourceOffset + numBytes);
}
else if (address > addressLimit) {
throw new IllegalStateException(“segment has been freed”);
}
else {
throw new IndexOutOfBoundsException();
}
}
else if (source.hasArray()) {
// move directly into the byte array
put(offset, source.array(), sourceOffset + source.arrayOffset(), numBytes);

// this must be after the get() call to ensue that the byte buffer is not
// modified in case the call fails
source.position(sourceOffset + numBytes);
}
else {
// neither heap buffer nor direct buffer
while (source.hasRemaining()) {
put(offset++, source.get());
}
}
}

//……
}
HybridMemorySegment 继承了 MemorySegment,它有一个 ByteBuffer 类型的 offHeapBuffer 属性,由于父类本身已经有一个 byte[]类型的 heapMemory 属性了,因而 HybridMemorySegment 管理的 memory 可以是 on-heap 的 (使用带有 byte[] 类型参数的构造器)也可以是 off-heap 的(使用带有 ByteBuffer 类型参数的构造器);wrap 方法会判断,如果 heapMemory 不为 null,则使用 heapMemory,否则使用 offHeapBuffer
小结

MemorySegment 有点类似 java.nio.ByteBuffer;它有一个 byte[]类型的 heapMemory 属性;它有两个构造器,带有 byte[]类型参数的构造器会将 byte[]赋给 heapMemory,不带 byte[]类型参数的构造器则 heapMemory 为 null;isOffHeap 方法则用于判断当前的 memory segment 是 heap 还是 off-heap,它根据 heapMemory 是否为 null 来判断,如果为 null 则是 off-heap;另外提供了 compare、swapBytes、copyTo 方法;还显示提供了 BigEndian 及 LittleEndian 的 get、put 方法;MemorySegment 定义了 free、wrap、get、put、getBoolean、putBoolean 抽象方法,要求子类去实现;MemorySegment 有两个子类,分别是 HeapMemorySegment、HybridMemorySegment
HeapMemorySegment 继承了 MemorySegment,它有一个 byte[]的 memory 属性,free 操作会将 memory 设置为 null,wrap 方法使用的是 memory 属性;它的构造器要求传入的 memory 不能为 null,然后赋给父类的 heapMemory 属性及自己定义的 memory 属性(引用);它还定义了 HeapMemorySegmentFactory,提供了 wrap、allocateUnpooledSegment、wrapPooledHeapMemory 方法
HybridMemorySegment 继承了 MemorySegment,它有一个 ByteBuffer 类型的 offHeapBuffer 属性,由于父类本身已经有一个 byte[]类型的 heapMemory 属性了,因而 HybridMemorySegment 管理的 memory 可以是 on-heap 的 (使用带有 byte[] 类型参数的构造器)也可以是 off-heap 的(使用带有 ByteBuffer 类型参数的构造器);wrap 方法会判断,如果 heapMemory 不为 null,则使用 heapMemory,否则使用 offHeapBuffer

doc

MemorySegment
HeapMemorySegment
HybridMemorySegment

正文完
 0