聊聊flink的ParameterTool

38次阅读

共计 11652 个字符,预计需要花费 30 分钟才能阅读完成。


本文主要研究一下 flink 的 ParameterTool
实例
fromPropertiesFile
String propertiesFilePath = “/home/sam/flink/myjob.properties”;
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);

File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);

InputStream propertiesFileInputStream = new FileInputStream(file);
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
使用 ParameterTool.fromPropertiesFile 从.properties 文件创建 ParameterTool
fromArgs
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
}
使用 ParameterTool.fromArgs 从命令行创建 ParameterTool(比如 –input hdfs:///mydata –elements 42)
fromSystemProperties
ParameterTool parameter = ParameterTool.fromSystemProperties();
使用 ParameterTool.fromSystemProperties 从 system properties 创建 ParameterTool(比如 -Dinput=hdfs:///mydata)
获取参数值
ParameterTool parameters = // …
parameter.getRequired(“input”);
parameter.get(“output”, “myDefaultValue”);
parameter.getLong(“expectedCount”, -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
可以使用 ParameterTool 的 get、getRequired、getLong 等方法获取参数值
设置为 global
env.getConfig().setGlobalJobParameters(parameters);

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired(“input”);
// … do more …
}
使用 env.getConfig().setGlobalJobParameters 将 ParameterTool 的访问范围设置为 global
GlobalJobParameters
flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/ExecutionConfig.java
public static class GlobalJobParameters implements Serializable {
private static final long serialVersionUID = 1L;

/**
* Convert UserConfig into a {@code Map<String, String>} representation.
* This can be used by the runtime, for example for presenting the user config in the web frontend.
*
* @return Key/Value representation of the UserConfig
*/
public Map<String, String> toMap() {
return Collections.emptyMap();
}
}
GlobalJobParameters 里头有一个 toMap 方法,返回 Collections.emptyMap()
ParameterTool
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/utils/ParameterTool.java
@Public
public class ParameterTool extends ExecutionConfig.GlobalJobParameters implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

protected static final String NO_VALUE_KEY = “__NO_VALUE_KEY”;
protected static final String DEFAULT_UNDEFINED = “<undefined>”;

//……

// —————— ParameterUtil ————————
protected final Map<String, String> data;

// data which is only used on the client and does not need to be transmitted
protected transient Map<String, String> defaultData;
protected transient Set<String> unrequestedParameters;

private ParameterTool(Map<String, String> data) {
this.data = Collections.unmodifiableMap(new HashMap<>(data));

this.defaultData = new ConcurrentHashMap<>(data.size());

this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));

unrequestedParameters.addAll(data.keySet());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ParameterTool that = (ParameterTool) o;
return Objects.equals(data, that.data) &&
Objects.equals(defaultData, that.defaultData) &&
Objects.equals(unrequestedParameters, that.unrequestedParameters);
}

@Override
public int hashCode() {
return Objects.hash(data, defaultData, unrequestedParameters);
}

@Override
public Map<String, String> toMap() {
return data;
}

//……

/**
* Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by values.
* Keys have to start with ‘-‘ or ‘–‘
*
* <p><strong>Example arguments:</strong>
* –key1 value1 –key2 value2 -key3 value3
*
* @param args Input array arguments
* @return A {@link ParameterTool}
*/
public static ParameterTool fromArgs(String[] args) {
final Map<String, String> map = new HashMap<>(args.length / 2);

int i = 0;
while (i < args.length) {
final String key;

if (args[i].startsWith(“–“)) {
key = args[i].substring(2);
} else if (args[i].startsWith(“-“)) {
key = args[i].substring(1);
} else {
throw new IllegalArgumentException(
String.format(“Error parsing arguments ‘%s’ on ‘%s’. Please prefix keys with — or -.”,
Arrays.toString(args), args[i]));
}

if (key.isEmpty()) {
throw new IllegalArgumentException(
“The input ” + Arrays.toString(args) + ” contains an empty argument”);
}

i += 1; // try to find the value

if (i >= args.length) {
map.put(key, NO_VALUE_KEY);
} else if (NumberUtils.isNumber(args[i])) {
map.put(key, args[i]);
i += 1;
} else if (args[i].startsWith(“–“) || args[i].startsWith(“-“)) {
// the argument cannot be a negative number because we checked earlier
// -> the next argument is a parameter name
map.put(key, NO_VALUE_KEY);
} else {
map.put(key, args[i]);
i += 1;
}
}

return fromMap(map);
}

/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param path Path to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(String path) throws IOException {
File propertiesFile = new File(path);
return fromPropertiesFile(propertiesFile);
}

/**
* Returns {@link ParameterTool} for the given {@link Properties} file.
*
* @param file File object to the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(File file) throws IOException {
if (!file.exists()) {
throw new FileNotFoundException(“Properties file ” + file.getAbsolutePath() + ” does not exist”);
}
try (FileInputStream fis = new FileInputStream(file)) {
return fromPropertiesFile(fis);
}
}

/**
* Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
*
* @param inputStream InputStream from the properties file
* @return A {@link ParameterTool}
* @throws IOException If the file does not exist
* @see Properties
*/
public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
Properties props = new Properties();
props.load(inputStream);
return fromMap((Map) props);
}

/**
* Returns {@link ParameterTool} for the given map.
*
* @param map A map of arguments. Both Key and Value have to be Strings
* @return A {@link ParameterTool}
*/
public static ParameterTool fromMap(Map<String, String> map) {
Preconditions.checkNotNull(map, “Unable to initialize from empty map”);
return new ParameterTool(map);
}

/**
* Returns {@link ParameterTool} from the system properties.
* Example on how to pass system properties:
* -Dkey1=value1 -Dkey2=value2
*
* @return A {@link ParameterTool}
*/
public static ParameterTool fromSystemProperties() {
return fromMap((Map) System.getProperties());
}

//……

/**
* Returns the String value for the given key.
* If the key does not exist it will return null.
*/
public String get(String key) {
addToDefaults(key, null);
unrequestedParameters.remove(key);
return data.get(key);
}

/**
* Returns the String value for the given key.
* If the key does not exist it will throw a {@link RuntimeException}.
*/
public String getRequired(String key) {
addToDefaults(key, null);
String value = get(key);
if (value == null) {
throw new RuntimeException(“No data for required key ‘” + key + “‘”);
}
return value;
}

/**
* Returns the String value for the given key.
* If the key does not exist it will return the given default value.
*/
public String get(String key, String defaultValue) {
addToDefaults(key, defaultValue);
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return value;
}
}

/**
* Check if value is set.
*/
public boolean has(String value) {
addToDefaults(value, null);
unrequestedParameters.remove(value);
return data.containsKey(value);
}

// ————– Integer

/**
* Returns the Integer value for the given key.
* The method fails if the key does not exist or the value is not an Integer.
*/
public int getInt(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Integer.parseInt(value);
}

/**
* Returns the Integer value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not an Integer.
*/
public int getInt(String key, int defaultValue) {
addToDefaults(key, Integer.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Integer.parseInt(value);
}

// ————– LONG

/**
* Returns the Long value for the given key.
* The method fails if the key does not exist.
*/
public long getLong(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Long.parseLong(value);
}

/**
* Returns the Long value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Long.
*/
public long getLong(String key, long defaultValue) {
addToDefaults(key, Long.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
}
return Long.parseLong(value);
}

// ————– FLOAT

/**
* Returns the Float value for the given key.
* The method fails if the key does not exist.
*/
public float getFloat(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Float.valueOf(value);
}

/**
* Returns the Float value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Float.
*/
public float getFloat(String key, float defaultValue) {
addToDefaults(key, Float.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Float.valueOf(value);
}
}

// ————– DOUBLE

/**
* Returns the Double value for the given key.
* The method fails if the key does not exist.
*/
public double getDouble(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Double.valueOf(value);
}

/**
* Returns the Double value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Double.
*/
public double getDouble(String key, double defaultValue) {
addToDefaults(key, Double.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Double.valueOf(value);
}
}

// ————– BOOLEAN

/**
* Returns the Boolean value for the given key.
* The method fails if the key does not exist.
*/
public boolean getBoolean(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Boolean.valueOf(value);
}

/**
* Returns the Boolean value for the given key. If the key does not exists it will return the default value given.
* The method returns whether the string of the value is “true” ignoring cases.
*/
public boolean getBoolean(String key, boolean defaultValue) {
addToDefaults(key, Boolean.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Boolean.valueOf(value);
}
}

// ————– SHORT

/**
* Returns the Short value for the given key.
* The method fails if the key does not exist.
*/
public short getShort(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Short.valueOf(value);
}

/**
* Returns the Short value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Short.
*/
public short getShort(String key, short defaultValue) {
addToDefaults(key, Short.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Short.valueOf(value);
}
}

// ————– BYTE

/**
* Returns the Byte value for the given key.
* The method fails if the key does not exist.
*/
public byte getByte(String key) {
addToDefaults(key, null);
String value = getRequired(key);
return Byte.valueOf(value);
}

/**
* Returns the Byte value for the given key. If the key does not exists it will return the default value given.
* The method fails if the value is not a Byte.
*/
public byte getByte(String key, byte defaultValue) {
addToDefaults(key, Byte.toString(defaultValue));
String value = get(key);
if (value == null) {
return defaultValue;
} else {
return Byte.valueOf(value);
}
}

//……
}

ParameterTool 里头有 data、defaultData、unrequestedParameters 等属性,toMap 方法返回的是 data 属性
ParameterTool 提供了 fromPropertiesFile、fromArgs、fromSystemProperties、fromMap 静态方法用于创建 ParameterTool
ParameterTool 提供了 get、getRequired、getInt、getLong、getFloat、getDouble、getBoolean、getShort、getByte 等方法,每种类型的 get 均提供了一个支持 defaultValue 的方法

小结

ParameterTool 提供了 fromPropertiesFile、fromArgs、fromSystemProperties、fromMap 静态方法用于创建 ParameterTool
ParameterTool 提供了 get、getRequired、getInt、getLong、getFloat、getDouble、getBoolean、getShort、getByte 等方法,每种类型的 get 均提供了一个支持 defaultValue 的方法
ParameterTool 继承了 ExecutionConfig.GlobalJobParameters,其 toMap 方法返回的是 data 属性;使用 env.getConfig().setGlobalJobParameters 可以将 ParameterTool 的访问范围设置为 global

doc
Parsing command line arguments and passing them around in your Flink application

正文完
 0