乐趣区

关于pulsar:基于-Pulsar-Functions-的事件处理设计模式

原作者:David Kjerrumgaard
翻译:StreamNative——Sijia

本文将介绍一些常见的实时流式传输模式及其实现。

模式 1:动静路由

首先回顾一下如何应用 Apache Pulsar Functions 实现基于内容的路由。基于内容的路由是一种集成模式。该模式曾经存在多年,通常用于事件核心和音讯框架中。基本思路是查看每条音讯的内容,依据音讯内容将音讯路由到不同目的地。

上面的例子应用了 Apache Pulsar SDK,SDK 容许用户配置三个不同的值:

  • 用于在音讯中查找匹配的正则表达式
  • 音讯匹配表达式模式时被发送到的 topic
  • 音讯不匹配表达式模式时被发送到的 topic

这个例子证实了 Pulsar Functions 功能强大,能够基于性能逻辑动静决定将事件发送到哪里。

 import java.util.regex.*;
  import org.apache.pulsar.functions.api.Context;
  import org.apache.pulsar.functions.api.Function;

  public ContentBasedRoutingFunction implements Function<String, String> {String process(String input, Context context) throws Exception {
         String regex = context
             .getUserConfigValue(“regex”).toString();
         String matchedTopic = context
             .getUserConfigValue(“matched-topic”).toString();
         String unmatchedTopic = context
             .getUserConfigValue(“unmatched-topic”).toString();

         Pattern p = Pattern.compile(regex);
         Matcher m = p.matcher(input);
         if (m.matches()) {context.publish(matchedTopic, input);
         } else {context.publish(unmatchedTopic, input);
         }
     }
  }

模式 2:过滤

如果想通过仅保留满足给定条件的事件来排除 topic 上的大多数事件时,利用抉择过滤模式。过滤模式对仅查找感兴趣的事件特地无效,如信用卡付款超过了肯定金额;日志文件中的 ERROR 音讯;传感器读数超过特定阈值等等(见模式 4)。

如果用户在监督信用卡交易的事件流,并尝试检测欺诈或可疑行为。因为交易量很大,抉择“批准 / 不批准”的工夫无限,用户必须先过滤掉有“危险”特色的交易,如预付现金、大额付款等。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;

public class FraudFilter implements Function<Purchase, Purchase> {Purchase process(Purchase p, Context context) throws Exception {if (p.getTransactionType() ==‘CASH ADVANCE’) ||
             p.getAmount > 500.00) {return p;}
        return null;
    }
}

能够应用过滤器来过滤有“危险”特色的交易。过滤器能够辨认这些“危险”特色,并只将这些交易路由到一个独自的 topic 上以进行进一步评估。通过过滤器过滤后,所有信用卡领取都能够被路由到一个“潜在欺诈行为”的 topic 上进行进一步评估,而其余事件则会被过滤掉,过滤器也不会对过滤掉的事件执行任何操作。

图 2 是基于三个独立领取对象的 FraudFilter function。第一次领取合乎给定规范,被路由到“潜在欺诈行为”topic 上进行进一步评估;而第二次和第三次领取不合乎欺诈规范,间接被过滤掉(没有被路由到“潜在欺诈行为”过滤器上)。

模式 3:转换

转换模式用于将事件从一种类型转换为另一种类型,或用于增加、删除或批改输出事件的值。

投影

投影模式相似于关系代数中的投影算子,抉择输出事件的属性子集,并创立仅蕴含这些属性的输入事件。投影模式可用于删除事件中的敏感字段,或者只保留事件中的必要属性。图 3 为投影模式的一种利用,在将记录公布到上游 topic 前,“屏蔽”传入的社平安号码。

富集模式

富集模式用于将数据增加到输出属性中不存在的输入事件中。典型的富集模式蕴含基于输出事件中的某个键值对援用数据进行某种查找。以下示例展现了如何依据输出事件中蕴含的 IP 地址将地理位置增加到输入事件。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import com.company.creditcard.Purchase;
import com.company.services.GeoService;

public class IPLookup implements Function<Purchase, Purchase> {Purchase process(Purchase p) throws Exception {Geo g = GeoService.getByIp(p.getIPAddress());
      // By default, these fields are blank, so we just modify the object
      p.setLongitude(g.getLon());
      p.setLatitiude(g.getLat());
      return p;
    }
}

拆散模式

在拆散模式下,事件处理器接管单个输出事件,并将其分为多个输入事件。当输出事件是一个蕴含多个独自事件(如日志文件中的 entry)的批处理,并且想要独自解决每个事件时,拆散模式非常实用。下图展现了拆散模式的处理过程:先依据换行符分隔输出,再逐行公布到配置的输入 topic。

此 function 的实现过程如下:

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class Splitter implements Function<String, String> {String process(String s, Context context) throws Exception {Arrays.asLists(s.split(“\\R”).forEach(line ->
            context.publish(context.getOutputTopic(), line));
       return null;
    }
}

模式 4:警报和阈值

警报和阈值模式可进行检测,并依据检测条件生成警报(如低温警报)。能够基于简略的值,也能够基于较简单的条件(如增长率、数量的继续变动等)生成警报。

上面的示例为基于用户配置的阈值参数(如 100.00,38.7 等)和接管警报告诉的邮箱地址生成警报。当此 function 接管到超过配置阈值的传感器事件时,将发送电子邮件。

import javax.mail.*;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public SimpleAlertFunction implements Function<Sensor, Void> {Void process(Sensor sensor, Context context) throws Exception {
       Double threshold = context
           .getUserConfigValue(“threshold”).toString();
       String alertEmail = context
           .getUserConfigValue(“alert-email”).toString();

      if (sensor.getReading() >= threshold) {Session s = Session.getDefaultInstance();
         MimeMessage msg = new MineMessage(s);
         msg.setText(“Alert for Sensor:”+ sensor.getId());
         Transport.send(msg);
      }
      return null;
  }
}

上面是一个有状态 function 示例,该 function 依据特定传感器读数的增长率生成警报。在决定是否生成警报时,须要拜访以前的传感器读数。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public ComplexAlertFunction implements Function<Sensor, Void> {Void process(Sensor sensor, Context context) throws Exception {
      Double threshold = context
           .getUserConfigValue(“threshold”).toString();
      String alertTopic = context
           .getUserConfigValue(“alert-topic”).toString();

      // Get previous & current metric values
      Float previous = context.getState(sensor.getId() +“-metric”);
      Long previous_time = context.getState(sensor.getId() +“-metric-time”);
      Float current = sensor.getMetric();
      Long current_time = sensor.getMetricTime();

      // Calculate Rate of change & compare to threshold.
      Double rateOfChange = (current-previous) /
                           (current_time-previous_time);
      if (abs(rateOfChange) >= threshold) {
         // Publish the sensor ID to the alert topic for handling
         context.publish(alertTopic, sensor.getId());
      }

      // Update metric values
      context.putState(sensor.getId() +“-metric”, current);
      context.putState(sensor.getId() +“-metric-time”, current_time);
  }
}

通过 Apache Pulsar Functions 状态治理个性仅保留先前的度量读数和工夫,并将传感器 ID 增加到这些值中(因为将会解决来自多个传感器的度量,所以须要传感器 ID)。为了简略起见,假如事件以正确的程序达到,即始终是最新读数,没有乱序读数。

另外,这一次咱们将传感器 ID 转发到一个专门的警报 topic 以进行进一步解决,而不是仅发送电子邮件。通过这种形式,咱们能够对事件进行额定的富集解决(通过 Pulsar Functions)。例如,查找获取传感器的地理位置,而后告诉相干人员。

模式 5:简略计数和窗口计数

简略计数和窗口计数模式应用了聚合函数,聚合函数将事件的汇合作为输出,并通过对输出事件利用一个 function 生成一个所需的输入事件。聚合函数包含:求和、平均值、最大值、最小值、百分位数等。

以下为应用 Pulsar Functions 实现“字数统计”的示例,计算每个单词在给定 topic 中呈现次数的总和。

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public WordCountFunction implements Function<String, Void> {Void process(String s, Context context) throws Exception {Arrays.asLists(s.split(“\\.”).forEach(word -> context.incrCounter(word, 1));
      return null;
   }
}

思考到流数据 source 无休止的个性,无限期聚合用途不大,因为通常是在数据窗口上进行这些计算(如前一小时内的故障次数)。

数据窗口代表事件流的无限子集,如图 7 所示。然而,应该如何定义数据窗口的边界?有两个用于定义窗口的罕用属性:

  • 触发策略:管制执行或触发 function 代码的工夫。Apache Pulsar Function 框架通过这些规定来告诉代码解决窗口中收集的全副数据。
  • 革除策略:管制保留在窗口中的数据量。这些规定用于决定是否从窗口中革除数据元素。

这两个策略都是由工夫或窗口中的数据量驱动的。二者之间的区别是什么?又是怎么协同工作的?在多种窗口技术中,最罕用的是滚动窗口和滑动窗口。

滚动窗口

窗口已满是滚动窗口革除策略的 惟一 条件,因而,只须要指定想要应用触发策略(基于计数或基于工夫)即可。基于计数的滚动窗口是怎么工作的?

在图 8 的第一个示例中,触发策略设置为 2,也就是说,在窗口中有两个我的项目时,触发器将会触发,开始执行 Pulsar Function 代码。这一系列行为与工夫无关,窗口计数达到 2 用了 5 秒还是 5 个小时并不重要,重要的是窗口计数达到 2。

将上述基于计数的滚动窗口与基于工夫的滚动窗口(工夫设置为 10 秒)进行比照。通过 10 秒的距离后,无论窗口中有多少事件,function 代码都会被触发。在下图中,第一个窗口中有 7 个事件,而第二个窗口中只有 3 个事件。

滑动窗口

滑动窗口计数定义了窗口的长度,窗口长度设置了革除策略以限度保留待处理的数据量;滑动距离定义了触发策略。滚动窗口策略和滑动窗口策略都能够依据工夫(时间段)或长度(数据元素的数量)来定义。

在下图中,窗口长度为 2 秒,也就是说,2 秒以前的数据会被革除,并且不会用于计算。滑动距离为 1 秒,即每 1 秒钟执行一次 Pulsar function 代码。这样,能够在整个窗口长度内解决数据。

后面的示例都是基于工夫来定义革除策略和触发策略,也能够依据长度来定义革除策略或触发策略,或者同时定义这两种策略。

在 Pulsar Functions 中实现这两种类型的窗口 function 都很容易,只须要指定一个 java.util.Collection 作为输出类型,如下所示,并在创立 function 时在 -userConfig 标记中指定适当的窗口配置属性。

用于实现后面提到的工夫窗口四种情景的配置参数如下:

  • “–windowLengthCount”:每个窗口的音讯数量
  • “–windowLengthDurationMs”:窗口工夫(以毫秒为单位)
  • “–slidingIntervalCount”:窗口滑动后的音讯数量
  • “–slidingIntervalDurationMs”:窗口滑动后的工夫

正确的组合形式如下表:

工夫,滑动窗口 -windowLengthDurationMs = XXXX
-slidingIntervalDurationMs = XXXX
工夫,Batch Window(即滚动窗口) -windowLengthDurationMs = XXXX
长度,滑动窗口 -windowLengthCount = XXXX
-slidingIntervalCount = XXXX
长度,Batch Window(即滚动窗口) -windowLengthCount = XXXX

总结

本文介绍了几种应用 Apache Pulsar Functions 实现通用流解决模式的形式。这些解决模式包含基于内容的路由、过滤、转换、警报、简略计数应用程序等,还介绍了根本的窗口概念,以及 Apache Pulsar Functions 提供的窗口性能等。

退出移动版