Pulsar Functions编程模型(Programming model)
开启Functionsconf/bookkeeper.confextraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponentconf/broker.conffunctionsWorkerEnabled=trueconf/functions_worker.ymlpulsarFunctionsCluster: pulsar-clusternumFunctionPackageReplicas: 2窗口(window)windowLengthCount 每个窗口的音讯数量slidingIntervalCount 窗口滑动后的音讯数量windowLengthDurationMs 窗口工夫slidingIntervalDurationMs 窗口滑动后的工夫开窗函数public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<String, Void> { @Override public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception { for (Record<String> input : inputs) { } return null; }}运行函数工夫,滑动窗口--user-config '{"windowLengthDurationMs":"60000", "slidingIntervalDurationMs":"1000"}'
工夫,滚动窗口--user-config '{"windowLengthDurationMs":"60000"}'
数量,滑动窗口--user-config '{"windowLengthCount":"100", "slidingIntervalCount":"10"}'
数量,滚动窗口--user-config '{"windowLengthCount":"100"}'
Java编程pom.xml
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version></dependency><dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-api</artifactId> <version>${pulsar.version}</version></dependency><dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-local-runner</artifactId> <version>${pulsar.version}</version></dependency>WordCountpublic class WordCountFunction implements org.apache.pulsar.functions.api.Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { Arrays.asList(input.split(" ")).forEach(word -> { String counterKey = word.toLowerCase(); if (context.getCounter(counterKey) == 0) { context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100))); } context.incrCounter(counterKey, 1); }); return null; }}$ $PULSAR_HOME/bin/pulsar-admin functions create \--broker-service-url pulsar://server-101:6650 \--jar target/cloudwise-pulsar-functions-with-dependencies.jar \--classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \--tenant public \--namespace default \--name word-count-function \--inputs persistent://public/default/sentences \--output persistent://public/default/wordcount动静路由/** * 基本思路是查看每条音讯的内容,依据音讯内容将音讯路由到不同目的地。 */public class RoutingFunction implements org.apache.pulsar.functions.api.Function<String, String> { @Override public String process(String input, Context context) throws Exception { String regex = context.getUserConfigValue("regex").toString(); String matchedTopic = context.getUserConfigValue("matched-topic").toString(); String unmatchedTopic = context.getUserConfigValue("unmatched-topic").toString(); Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(input); if (matcher.matches()) { context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send(); } else { context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send(); } return null; }}log-topicpublic class LoggingFunction implements org.apache.pulsar.functions.api.Function<String, Void> { @Override public Void process(String s, Context context) throws Exception { Logger LOG = context.getLogger(); String messageId = context.getFunctionId(); if (s.contains("danger")) { LOG.warn("A warning was received in message {}", messageId); } else { LOG.info("Message {} received\nContent: {}", messageId, s); } return null; }}$ $PULSAR_HOME/bin/pulsar-admin functions create \--jar cloudwise-pulsar-functions-1.0.0.jar \--classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \--log-topic persistent://public/default/logging-function-logsuser-configpublic class UserConfigFunction implements org.apache.pulsar.functions.api.Function<String, Void> { @Override public Void process(String s, Context context) throws Exception { Logger log = context.getLogger(); Optional<Object> value = context.getUserConfigValue("word-of-the-day"); if (value.isPresent()) { log.info("The word of the day is {}", value); } else { log.warn("No word of the day provided"); } return null; }}$ $PULSAR_HOME/bin/pulsar-admin functions create \--broker-service-url pulsar://server-101:6650 \--jar target/cloudwise-pulsar-functions-with-dependencies.jar \--classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \--tenant public \--namespace default \--name word-count-function \--inputs persistent://public/default/userconfig \--user-config '{"word-of-the-day":"verdure"}'更多福利云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维治理平台OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、复原 等性能,可为用户提供便捷的运维能力和业务管理,在进步运维人员等工作效率的同时,极大晋升了业务的连续性和安全性。点击下方地址链接,欢送大家给OMP点赞送star,理解更多相干内容~
...