序本文主要研究一下flink的BlobServiceBlobServiceflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java/** * A simple store and retrieve binary large objects (BLOBs). /public interface BlobService extends Closeable { /* * Returns a BLOB service for accessing permanent BLOBs. * * @return BLOB service / PermanentBlobService getPermanentBlobService(); /* * Returns a BLOB service for accessing transient BLOBs. * * @return BLOB service / TransientBlobService getTransientBlobService(); /* * Returns the port of the BLOB server that this BLOB service is working with. * * @return the port the blob server. / int getPort();}BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobServicePermanentBlobServiceflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java/* * A service to retrieve permanent binary large objects (BLOBs). * * <p>These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job’s * JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor} * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}. /public interface PermanentBlobService extends Closeable { /* * Returns the path to a local copy of the file associated with the provided job ID and blob * key. * * @param jobId * ID of the job this blob belongs to * @param key * BLOB key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * if the BLOB does not exist; * @throws IOException * if any other error occurs when retrieving the file / File getFile(JobID jobId, PermanentBlobKey key) throws IOException;}PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取FileTransientBlobServiceflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java/* * A service to retrieve transient binary large objects (BLOBs) which are deleted on the * {@link BlobServer} when they are retrieved. * * <p>These may include per-job BLOBs like files in the {@link * org.apache.flink.api.common.cache.DistributedCache}, for example. * * <p>Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the * {@link PermanentBlobService}. * * <p>TODO: change API to not rely on local files but return {@link InputStream} objects /public interface TransientBlobService extends Closeable { // ——————————————————————————————– // GET // ——————————————————————————————– /* * Returns the path to a local copy of the (job-unrelated) file associated with the provided * blob key. * * @param key * blob key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * when the path does not exist; * @throws IOException * if any other error occurs when retrieving the file / File getFile(TransientBlobKey key) throws IOException; /* * Returns the path to a local copy of the file associated with the provided job ID and blob * key. * * @param jobId * ID of the job this blob belongs to * @param key * blob key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * when the path does not exist; * @throws IOException * if any other error occurs when retrieving the file / File getFile(JobID jobId, TransientBlobKey key) throws IOException; // ——————————————————————————————– // PUT // ——————————————————————————————– /* * Uploads the (job-unrelated) data of the given byte array to the BLOB server. * * @param value * the buffer to upload * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server / TransientBlobKey putTransient(byte[] value) throws IOException; /* * Uploads the data of the given byte array for the given job to the BLOB server. * * @param jobId * the ID of the job the BLOB belongs to * @param value * the buffer to upload * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server / TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException; /* * Uploads the (job-unrelated) data from the given input stream to the BLOB server. * * @param inputStream * the input stream to read the data from * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while reading the data from the input stream or uploading the * data to the BLOB server / TransientBlobKey putTransient(InputStream inputStream) throws IOException; /* * Uploads the data from the given input stream for the given job to the BLOB server. * * @param jobId * ID of the job this blob belongs to * @param inputStream * the input stream to read the data from * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while reading the data from the input stream or uploading the * data to the BLOB server / TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException; // ——————————————————————————————– // DELETE // ——————————————————————————————– /* * Deletes the (job-unrelated) file associated with the provided blob key from the local cache. * * @param key * associated with the file to be deleted * * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; * <tt>false</tt> otherwise / boolean deleteFromCache(TransientBlobKey key); /* * Deletes the file associated with the provided job ID and blob key from the local cache. * * @param jobId * ID of the job this blob belongs to * @param key * associated with the file to be deleted * * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; * <tt>false</tt> otherwise / boolean deleteFromCache(JobID jobId, TransientBlobKey key);}TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法BlobKeyflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java/* * A BLOB key uniquely identifies a BLOB. /public abstract class BlobKey implements Serializable, Comparable<BlobKey> { private static final long serialVersionUID = 3847117712521785209L; /* Size of the internal BLOB key in bytes. / public static final int SIZE = 20; /* The byte buffer storing the actual key data. / private final byte[] key; /* * (Internal) BLOB type - to be reflected by the inheriting sub-class. / private final BlobType type; /* * BLOB type, i.e. permanent or transient. / enum BlobType { /* * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly * available. / PERMANENT_BLOB, /* * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made * highly available. / TRANSIENT_BLOB } /* * Random component of the key. / private final AbstractID random; /* * Constructs a new BLOB key. * * @param type * whether the referenced BLOB is permanent or transient / protected BlobKey(BlobType type) { this.type = checkNotNull(type); this.key = new byte[SIZE]; this.random = new AbstractID(); } /* * Constructs a new BLOB key from the given byte array. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data / protected BlobKey(BlobType type, byte[] key) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException(“BLOB key must have a size of " + SIZE + " bytes”); } this.type = checkNotNull(type); this.key = key; this.random = new AbstractID(); } /* * Constructs a new BLOB key from the given byte array. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * @param random * the random component of the key / protected BlobKey(BlobType type, byte[] key, byte[] random) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException(“BLOB key must have a size of " + SIZE + " bytes”); } this.type = checkNotNull(type); this.key = key; this.random = new AbstractID(random); } /* * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * * @return BlobKey subclass / @VisibleForTesting static BlobKey createKey(BlobType type) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(); } else { return new TransientBlobKey(); } } /* * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * * @return BlobKey subclass / static BlobKey createKey(BlobType type, byte[] key) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(key); } else { return new TransientBlobKey(key); } } /* * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * @param random * the random component of the key * * @return BlobKey subclass / static BlobKey createKey(BlobType type, byte[] key, byte[] random) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(key, random); } else { return new TransientBlobKey(key, random); } } /* * Returns the hash component of this key. * * @return a 20 bit hash of the contents the key refers to / @VisibleForTesting public byte[] getHash() { return key; } /* * Returns the (internal) BLOB type which is reflected by the inheriting sub-class. * * @return BLOB type, i.e. permanent or transient / BlobType getType() { return type; } /* * Adds the BLOB key to the given {@link MessageDigest}. * * @param md * the message digest to add the BLOB key to / public void addToMessageDigest(MessageDigest md) { md.update(this.key); } @Override public boolean equals(final Object obj) { if (!(obj instanceof BlobKey)) { return false; } final BlobKey bk = (BlobKey) obj; return Arrays.equals(this.key, bk.key) && this.type == bk.type && this.random.equals(bk.random); } @Override public int hashCode() { int result = Arrays.hashCode(this.key); result = 37 * result + this.type.hashCode(); result = 37 * result + this.random.hashCode(); return result; } @Override public String toString() { final String typeString; switch (this.type) { case TRANSIENT_BLOB: typeString = “t-”; break; case PERMANENT_BLOB: typeString = “p-”; break; default: // this actually never happens! throw new IllegalStateException(“Invalid BLOB type”); } return typeString + StringUtils.byteToHexString(this.key) + “-” + random.toString(); } @Override public int compareTo(BlobKey o) { // compare the hashes first final byte[] aarr = this.key; final byte[] barr = o.key; final int len = Math.min(aarr.length, barr.length); for (int i = 0; i < len; ++i) { final int a = (aarr[i] & 0xff); final int b = (barr[i] & 0xff); if (a != b) { return a - b; } } if (aarr.length == barr.length) { // same hash contents - compare the BLOB types int typeCompare = this.type.compareTo(o.type); if (typeCompare == 0) { // same type - compare random components return this.random.compareTo(o.random); } else { return typeCompare; } } else { return aarr.length - barr.length; } } // ——————————————————————————————– /* * Auxiliary method to read a BLOB key from an input stream. * * @param inputStream * the input stream to read the BLOB key from * @return the read BLOB key * @throws IOException * throw if an I/O error occurs while reading from the input stream / static BlobKey readFromInputStream(InputStream inputStream) throws IOException { final byte[] key = new byte[BlobKey.SIZE]; final byte[] random = new byte[AbstractID.SIZE]; int bytesRead = 0; // read key while (bytesRead < key.length) { final int read = inputStream.read(key, bytesRead, key.length - bytesRead); if (read < 0) { throw new EOFException(“Read an incomplete BLOB key”); } bytesRead += read; } // read BLOB type final BlobType blobType; { final int read = inputStream.read(); if (read < 0) { throw new EOFException(“Read an incomplete BLOB type”); } else if (read == TRANSIENT_BLOB.ordinal()) { blobType = TRANSIENT_BLOB; } else if (read == PERMANENT_BLOB.ordinal()) { blobType = PERMANENT_BLOB; } else { throw new IOException(“Invalid data received for the BLOB type: " + read); } } // read random component bytesRead = 0; while (bytesRead < AbstractID.SIZE) { final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead); if (read < 0) { throw new EOFException(“Read an incomplete BLOB key”); } bytesRead += read; } return createKey(blobType, key, random); } /* * Auxiliary method to write this BLOB key to an output stream. * * @param outputStream * the output stream to write the BLOB key to * @throws IOException * thrown if an I/O error occurs while writing the BLOB key / void writeToOutputStream(final OutputStream outputStream) throws IOException { outputStream.write(this.key); outputStream.write(this.type.ordinal()); outputStream.write(this.random.getBytes()); }}BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKeyPermanentBlobKeyflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java/* * BLOB key referencing permanent BLOB files. /public final class PermanentBlobKey extends BlobKey { /* * Constructs a new BLOB key. / @VisibleForTesting public PermanentBlobKey() { super(BlobType.PERMANENT_BLOB); } /* * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data / PermanentBlobKey(byte[] key) { super(BlobType.PERMANENT_BLOB, key); } /* * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data * @param random * the random component of the key / PermanentBlobKey(byte[] key, byte[] random) { super(BlobType.PERMANENT_BLOB, key, random); }}PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOBTransientBlobKeyflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java/* * BLOB key referencing transient BLOB files. /public final class TransientBlobKey extends BlobKey { /* * Constructs a new BLOB key. / @VisibleForTesting public TransientBlobKey() { super(BlobType.TRANSIENT_BLOB); } /* * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data / TransientBlobKey(byte[] key) { super(BlobType.TRANSIENT_BLOB, key); } /* * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data * @param random * the random component of the key */ TransientBlobKey(byte[] key, byte[] random) { super(BlobType.TRANSIENT_BLOB, key, random); }}TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB小结BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobServicePermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File;TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey;PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB;TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOBdocBlob Server