共计 18247 个字符,预计需要花费 46 分钟才能阅读完成。
一、背景概述
交易反欺诈是 VoltDB 实用场景之一,是典型的事件驱动的业务,外围是摄取高频的交易数据,并逐条对交易进行一系列简单的反欺诈规定校验,最终生成评判交易可疑度的分值,发送给上游业务零碎,触发交易拦挡动作。
反欺诈规定中波及大量的通过剖析历史交易生成的指标项,在 VoltDB 中进行流式计算,可基于本地保留的丰盛的上下文数据对事件进行剖析决策,使实时计算凑近上下文数据,取得性能劣势。
二、实例回现
上面咱们通过一个刷卡的利用,展现 VoltDB 是如何实现一个简略的反欺诈用例的。为了让示例代码更加简洁,又能突出 VoltDB 的性能,这里应用一个地铁刷卡的场景代替金融交易(如信用卡刷卡),以防止引入过多业余的金融业务知识。同时一个忙碌地铁零碎产生的交易吞吐量不可小觑,定义的反欺诈规定也更容易了解。
能够通过这个链接来拜访具体的代码 https://github.com/ssomagani/…
在这个利用中,模仿如下几个场景:
- 多辆列车在地铁站点之间运行,生成列车进站事件。通过这个场景能够理解,如何将数据公布到 VoltDB Topic 中,以及如何生产 Topic 中的数据。
- 公交卡充值操作。通过这个场景,能够理解,如何应用一个蕴含自定义业务规定的 procedure 来解决 Topic 中的数据,同时应用 Stream 对象将数据导出到 Topic 中,并通过视图对 Stream 中的数据流进行统计,生成实时的统计报表。视图会逐条统计 Stream 中的流数据,将处理结果保留到视图中,是 VoltDB 实现流式计算的形式之一。
- 乘客刷卡乘车,生成高频交易数据。通过这个场景,能够理解,如何应用 VoltDB 数据库客户端 api 间接操作数据表(区别与将数据发送到 Topic 中),保留交易数据。如何通过 VoltDB 的 java procedure 定制反欺诈校验规定,并调用 java procedure 进行交易校验和反欺诈行为。
让咱们来具体理解一下,在 VoltDB 中运行这个用例的过程。
2.1 筹备工作
1. 启用 VoltDB Topic 性能
VoltDB 提供一个对立的配置文件,次要的个性都能够在其中进行定义,如:长久化、高可用、安全性等等,这里次要介绍与案例相干的 VoltDB Topic 性能。如下配置开启了 Topic 服务,并在服务器上开启端口 9999,用于承受客户端发来的音讯。
<Topics enabled="true">
<properties>
<property name="port">9999</property>
<property name="group.initial.rebalance.delay.ms">0</property>
<property name="retention.policy.threads">1</property>
</properties>
<profiles>
<profile name="retain_compact">
<retention policy="compact" limit="2048" />
</profile>
</profiles>
</Topics>
2. 依据特定配置文件启动 VoltDB
3. 创立 Topic,Topic 的用处前面的代码剖析中提到
CREATE Topic TRAINTOPIC execute procedure train_events.insert;
CREATE TOPIC RECHARGE execute procedure RechargeCard;
CREATE TOPIC using stream CARD_ALERT_EXPORT properties(topic.format=avro);
create topic using stream FRAUD properties(topic.format=avro,consumer.keys=TRANS_ID);
4. 创立数据表
在解决实时事件流时,能够充分利用底层的数据库引擎,充分利用本地关系型数据进行数据分析,失去反欺诈业务指标。在本例中将创立如下数据表和视图(省略具体 DDL)
5. 初始化数据
通过 VoltDB 的数据导入性能,从 csv 文件中初始化站点和列车
csvloader --file $PROJ_HOME/data/redline.csv --reportdir log stations
csvloader --file $PROJ_HOME/data/trains.csv --reportdir log trains
2.2 代码剖析 - 列车运行
在这个场景中,客户端模仿 8 辆列车在 17 个站点之间运行,产生进站事件并发送到 Topic。因为设定的列车进出站工夫比拟短(微秒为单位),所以会产生高频事件流。
在服务端,VoltDB 实现:
1. 音讯接管
2. 生产音讯
3. 将列车进站事件记录到数据库中
在客户端,通过 java 类 TrainProducer 生成多辆列车进站事件,并将事件发送到 VoltDB Topic 中。TrainProducer 的执行命令如下:
java metro.pub.TrainProducer localhost:9999 TRAINTOPIC 8
TrainProducer 类接管四个参数:
- . 指定接管列车进站和离站事件的 VoltDB 服务器端口。这里假如在同一台机器上运行 client 代码和 VoltDB,而后面在 VoltDB 配置文件中咱们曾经指定 Topic 的监听端口是 9999。
- 指定 VoltDB broker
- 指定数据发送的 Topic 名称。
- 指定要模仿的列车数量。
剖析一下 TrainProducer 的次要办法,main 办法生成 10 个线程, 每 50 毫秒执行一次 publish()办法,将列车进出站工夫发送到 Topic“TRAINTOPIC”中。
public static void main(String[] args) {ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10);
TrainProducer producer = new TrainProducer(args[0], args[1], Integer.parseInt(args[2]));
System.out.println("Scheduling trains");
EXECUTOR.scheduleAtFixedRate (() -> {producer.publish(producer.getNewEvents());
}, 1, 50, MILLISECONDS);
}
跟踪代码找到 producer 的定义,它其实就是原生的 KafkaProducer,所以能够看到 VoltDB Topic 齐全兼容 kafka api。而 brokers 即是 main 办法中的传参 localhost:9999,因而下面 producer.getNewEvents()办法生成的数据将被发送到 VoltDB Topic 中。
private Producer<String, TrainEvent> createProducer() {Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"metro.serde.TrainEventSer");
Producer<String, TrainEvent> producer = new KafkaProducer
<String, TrainEvent>(props);
return producer;
}
Publish 办法所发送的音讯由 producer.getNewEvents()办法生成。有必要提前看一下 Stations 类,其中定义了 17 个火车站点,包含每个站点的到下一个站点的运行工夫(Station.nextStnDuration)和本站点停车工夫(Station.stnWaitDuration),工夫以微秒为单位。所有列车将顺次在这些站点中运行。
static HashMap<Integer, Station> idToStationMap = new HashMap<>();
static {idToStationMap.put(1, new Station(1, 1200000, 450000));
idToStationMap.put(2, new Station(2, 1050000, 250000));
idToStationMap.put(3, new Station(3, 850000, 300000));
idToStationMap.put(4, new Station(4, 900000, 350000));
idToStationMap.put(5, new Station(5, 500000, 260000));
idToStationMap.put(6, new Station(6, 950000, 190000));
idToStationMap.put(7, new Station(7, 450000, 130000));
idToStationMap.put(8, new Station(8, 200000, 280000));
idToStationMap.put(9, new Station(9, 200000, 110000));
idToStationMap.put(10, new Station(10, 450000, 300000));
idToStationMap.put(11, new Station(11, 550000, 200000));
idToStationMap.put(12, new Station(12, 550000, 200000));
idToStationMap.put(13, new Station(13, 800000, 150000));
idToStationMap.put(14, new Station(14, 950000, 100000));
idToStationMap.put(15, new Station(15, 1000000, 130000));
idToStationMap.put(16, new Station(16, 1200000, 220000));
idToStationMap.put(17, new Station(17, 1500000, 500000));
}
public static class Station {
public final int stationId;
public final int nextStnDuration;
public final int stnWaitDuration;
public Station(int stationId, int nextStnDuration, int stnWaitDuration) {
this.stationId = stationId;
this.nextStnDuration = nextStnDuration;
this.stnWaitDuration = stnWaitDuration;
}
}
所以 getNewEvents 次要的逻辑是首先随机设定列车从任意站点登程,而后调用 next()依据零碎以后工夫和站点的 Station.nextStnDuration、Station.stnWaitDuration 来判断每辆列车目前运行到哪个站点,如果 next 返回的 LastKnownLocation 对象有变动,则判断列车已进入下一站,将列车进站事件 trainEvent 放到 records 中,用于发送给 Topic。(注:列车调度不是本样例的重点,因而 next 办法不会思考列车的抵触问题,它假如站点之间由足够多的轨道,能够供多个列车并行)。
public List<TrainEvent> getNewEvents() {ArrayList<TrainEvent> records = new ArrayList<>();
for(TrainEvent trainEvent : idToTrainMap.values()) {
LastKnownLocation prevLoc = trainEvent.location;
LastKnownLocation curLoc = next(prevLoc, LocalDateTime.now());
if(!prevLoc.equals(curLoc)) {trainEvent = new TrainEvent(trainEvent.trainId, curLoc);
idToTrainMap.put(trainEvent.trainId, trainEvent);
records.add(trainEvent);
}
}
return records;
}
Topic TRAINTOPIC 定义如下,train_events.insert 是 VoltDB 为表创立的默认存储过程,命名规定为[tablename].insert。Topic 与存储过程连用,示意存储过程 train_events.insert 生产该 Topic TRAINTOPIC 中的 trainEvent 数据,并写入 train_events 表中。
CREATE Topic TRAINTOPIC execute procedure train_events.insert;
2.23 代码剖析 - 公交卡充值
在这个场景中,客户端将实现充值音讯发送。
在服务端,VoltDB 实现:
- 音讯接管
- 生产音讯
- 应用自定义逻辑解决音讯 将充值数据更新到数据库中
- 生成充值音讯,并将数据写入 stream 对象中
- 基于 stream 对象创立视图,来生成实时的充值统计报表
- 将 stream 中的充值音讯公布到 Topic 中,供后续(VoltDB 之外的)数据处理逻辑进行生产。例如被 spark 生产,因为进行后续的批处理逻辑。
在客户端通过执行 java 类 CardsProducer,首先初始化公交卡记录,并将记录写入数据库表中。而后随机生成卡片充值事件,发送事件到 Topic RECHARGE 中。CardsProducer 的执行命令如下:
java metro.pub.CardsProducer --mode=recharge --servers=localhost:9999 --Topic=RECHARGE
CardsProducer 类接管三个参数:
- 执行模式,用于指定是初始化公交卡记录还是生成充值事件。
- 指定 VoltDB broker
- 指定数据发送的 Topic 名称
剖析一下 CardsProducer 的次要办法,main 办法生成 10 个线程, 每 5 毫秒执行一次 publish()办法,将列车进出站工夫发送到 Topic“RECHARGE”中。
public static void main(String[] args) throws IOException {CONFIG.parse("CardsProducer", args);
if(CONFIG.mode.equals("new")) {genCards(CONFIG);
return;
}
ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10);
CardsProducer producer = new CardsProducer(CONFIG.servers, CONFIG.Topic);
System.out.println("Recharging Cards");
EXECUTOR.scheduleAtFixedRate (() -> {producer.publish(producer.getRechargeActivityRecords(1));
}, 1, 5, MILLISECONDS);
}
和后面 TrainProducer 一样,CardsProducer 中的 producer 也是 KafkaProducer,不多介绍。getRechargeActivityRecords 办法用来生成一条随机的充值事件,包含卡号、充值金额和充值站点。每 5 毫秒执行一次。
public List<CardEvent> getRechargeActivityRecords(int count) {final ArrayList<CardEvent> records = new ArrayList<>();
int amt = (ThreadLocalRandom.current().nextInt(18)+2)*1000;
int stationId = ThreadLocalRandom.current().nextInt(1, 18);
ThreadLocalRandom.current().ints(count, 0, CONFIG.cardcount).forEach((cardId)
-> {records.add(new CardEvent(cardId, amt, stationId));
}
);
return records;
}
这个场景中,Client 端的代码非常简单,到此为止。更多的逻辑在服务端定义,请看以下。
Topic 用于接管充值事件,它的定义如下:
CREATE TOPIC RECHARGE execute procedure RechargeCard;
其中 RechargeCard 用于生产 Topic 中的数据,而 RechargeCard 是一个 java procedure,它通过 java+sql 的形式,自定义了业务逻辑。java procedure 是 VoltDB 在解决流数据时常常用到的对象,它是一个运行在 VoltDB 服务端的 java 类,而非 client 端代码。它须要提前编译成 jar 包(如下 procs.jar),并加载到 VoltDB java 运行时环境中。之后应用如下 DDL 定义。定义了 RechargeCard 后,在下面的 CREATE TOPIC 中能力被援用。
sqlcmd --query="load classes $PROJ_HOME/dist/procs.jar"
CREATE PROCEDURE PARTITION ON TABLE cards COLUMN card_id PARAMETER 0 FROM CLASS metro.cards.RechargeCard;
让咱们看一下 RechargeCard 中的逻辑,重点关注如何将 java 业务逻辑与 SQL 进行联合。其中定义 run()办法和四个 sql 语句。RechargeCard 从 Topic RECHARGE 中生产数据,进行反序列化之后,逐条将数据(即充值事件)作为传参交给 run()办法,run()是 procedure 的入口办法。
voltQueueSQL 是 VoltDB 的 server 端 api,用来执行 sql 并返回后果。Sql getCard 和 getStationName 首先依据从 Topic 中获取的数据进行充值事件合法性校验,如果数据库中没有对应的充值站点或公交卡记录,则执行 sql exportNotif 写入一条错误信息。否则,update VoltDB 数据库中对应公交卡,减少余额,并执行 sql exportNotif 写入一条胜利信息。
public class RechargeCard extends VoltProcedure {public final SQLStmt updateBalance = new SQLStmt("UPDATE cards SET balance = balance + ? WHERE card_id = ? AND card_type = 0");
public final SQLStmt getCard = new SQLStmt("SELECT * from cards WHERE card_id = ?");
public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");
public final SQLStmt getStationName = new SQLStmt("SELECT name FROM stations WHERE station_id = ?");
public long run(int cardId, int amt, int stationId) {voltQueueSQL(getStationName, stationId);
voltQueueSQL(getCard, cardId);
String station = "UNKNOWN";
final VoltTable[] results = voltExecuteSQL();
if(results.length == 0)
exportError(cardId, station);
VoltTable stationResult = results[0];
if(stationResult.advanceRow())
station = stationResult.getString(0);
VoltTable card = results[1];
if(card.advanceRow()) {voltQueueSQL(updateBalance, amt, cardId);
String name = card.getString(5);
String phone = card.getString(6);
String email = card.getString(7);
int notify = (int) card.getLong(8);
voltQueueSQL(updateBalance, amt, cardId);
voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, "Card recharged successfully");
voltExecuteSQL(true);
} else {exportError(cardId, station);
}
return 0;
}
private void exportError(int cardId, String station) {exportError(cardId, station, "","", "", 0,"Could not locate details of card for recharge");
}
private void exportError(int cardId, String station, String name, String phone, String email, int notify, String msg) {voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, msg);
voltExecuteSQL(true);
}
}
exportNotif 的定义如下,其中 CARD_ALERT_EXPORT 是 VoltDB 的 stream 数据库对象,一种数据管道,insert 进去的数据逐个流过。
public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");
能够在 CARD_ALERT_EXPORT 上增加数据处理逻辑,实现流计算成果。这个场景中,简略的在 Stream 上创立了一个视图,用于生成实时统计报表。视图的定义如下:
CREATE VIEW card_export_stats(card_id, station_name, rechargeCount) AS
SELECT card_id, station_name, count(*) from CARD_ALERT_EXPORT
GROUP BY card_id, station_name;
最初,咱们定义 Stream 中的数据最终流向另外的 Topic,该 Topic 能够让 VoltDB 之外的大数据产品进行生产,实现上游数据处理逻辑。
CREATE TOPIC using stream CARD_ALERT_EXPORT properties(Topic.format=avro);
2.4 代码剖析 - 乘客刷卡乘车
这个场景中,客户端随机生成大量乘客刷卡进站记录,并发送给数据库解决。
服务端实现如下操作:
1. 首先进行一系列校验,如验证卡信息,卡余额,是否盗刷等反欺诈操作。
2. 将所有刷卡行为都记录到数据表中。并将余额有余和复合欺诈逻辑的刷卡事件别离公布到不同的 Topic 中,供其余上游零碎订阅。
在客户端通过执行 java 类 RidersProducer,与后面两个场景不同,RidersProducer 类间接连贯 VoltDB 数据库将数据写入数据表中,而不是将数据发送到 VoltDB Topic 中。用来展现 VoltDB 的多种应用形式。
connectToOneServerWithRetry 应用 VoltDB client api 连贯指定 ip 的 VoltDB 数据库。
void connectToOneServerWithRetry(String server, Client client) {
int sleep = 1000;
while (true) {
try {client.createConnection(server);
break;
}
catch (Exception e) {System.err.printf("Connection failed - retrying in %d second(s).\n", sleep / 1000);
try {Thread.sleep(sleep); } catch (Exception interruted) {}
if (sleep < 8000) sleep += sleep;
}
}
System.out.printf("Connected to VoltDB node at: %s.\n", server);
}
RidersProducer 类创立 100 个线程,runBenchmark 办法中每 200 毫秒这些线程执行一次 getEntryActivityRecords。getEntryActivityRecords 随机生成一条乘客进站乘车记录,记录内容包含卡号、以后工夫、进站站点 id 等
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(100);
public void runBenchmark() throws Exception {
int microsPerTrans = 1000000/RidersProducer.config.rate;
EXECUTOR.scheduleAtFixedRate (() -> {List<Object[]> entryRecords = getEntryActivityRecords(config.cardcount);// 生成随机的进站记录
call(config.cardEntry, entryRecords);// 将数据发送到 VoltDB 数据库
}, 10000, microsPerTrans, MICROSECONDS);
}
public static List<Object[]> getEntryActivityRecords(int count) {final ArrayList<Object[]> records = new ArrayList<>();
long curTime = System.currentTimeMillis();
ThreadLocalRandom.current().ints(1, 0, count).forEach((cardId)
-> {records.add(new Object[] {cardId, curTime, Stations.getRandomStation().stationId, ENTER.value, 0});
}
);
return records;
}
接着调用 call 办法,将数据 records 发送到数据库进行解决。Call 办法定义如下,callProcedure 是 VoltDB 的 client 端 api,用于将数据发送给指定名称的 procedure 进行解决,能够通过同步和异步 IO 两种形式进行调用,异步调用时须要指定回调函数对数据库调用的返回后果进行解决,即本例中的自定义了 BenchmarkCallback。
protected static void call(String proc, Object[] args) {
try {client.callProcedure(new BenchmarkCallback(proc, args), procName, args);
} catch (IOException e) {e.printStackTrace();
}
}
Call 办法将数据发送给 procedure,procedure 名称由如下代码指定。一起看看 procedure 中的具体逻辑。
@Option(desc = "Proc for card entry swipes")
String cardEntry = "ValidateEntry";
Procedure ValidateEntry 的局部定义,首先定义了 6 个 SQL。
// 查问公交卡是否存在
public final SQLStmt checkCard = new SQLStmt("SELECT enabled, card_type, balance, expires, name, phone, email, notify FROM cards WHERE card_id = ?;");
// 卡充值
public final SQLStmt chargeCard = new SQLStmt("UPDATE cards SET balance = ? WHERE card_id = ?;");
// 查问指定站点的入站费用
public final SQLStmt checkStationFare = new SQLStmt("SELECT fare, name FROM stations WHERE station_id = ?;");
// 记录进站事件
public final SQLStmt insertActivity = new SQLStmt("INSERT INTO card_events (card_id, date_time, station_id, activity_code, amount, accept) VALUES (?,?,?,?,?,?);");
// 再次用到 card_alert_export 这个 stream 对象,用于发送公交卡欠费音讯
public final SQLStmt exportActivity = new SQLStmt("INSERT INTO card_alert_export (card_id, export_time, station_name, name, phone, email, notify, alert_message) VALUES (?,?,?,?,?,?,?,?);");
// 将刷卡欺诈行为写入 stream 对象 fraud 中
public final SQLStmt publishFraud = new SQLStmt("INSERT INTO fraud (trans_id, card_id, date_time, station, activity_type, amt) values (?, ?, ?, ?, ?, ?)"
);
值得阐明的,下面最初一个 sql 中用到的 fraud 是另外一个 stream 对象,用于插入刷卡欺诈事件,通过 DDL 定义其中的刷卡欺诈行为最终会公布到 VoltDB Topic 中,用于上游解决产品生产。
CREATE STREAM FRAUD partition on column CARD_ID (
TRANS_ID varchar not null,
CARD_ID integer not null,
DATE_TIME timestamp not null,
STATION integer not null,
ACTIVITY_TYPE TINYINT not null,
AMT integer not null
);
create Topic using stream FRAUD properties(Topic.format=avro,consumer.keys=TRANS_ID);
后面曾经提到 run 办法是 procedure 的入口办法,VoltDB 运行 procedure 时,主动调用该办法。后面客户端传进的 records 记录,被逐个传递到 run 办法到参数中进行解决。run 办法定义如下
public VoltTable run(int cardId, long tsl, int stationId, byte activity_code, int amt) throws VoltAbortException {
// 查问公交卡是否存在
voltQueueSQL(checkCard, EXPECT_ZERO_OR_ONE_ROW, cardId);
// 查问指定站点的交通费用
voltQueueSQL(checkStationFare, EXPECT_ONE_ROW, stationId);
VoltTable[] checks = voltExecuteSQL();
VoltTable cardInfo = checks[0];
VoltTable stationInfo = checks[1];
byte accepted = 0;
// 如果公交卡记录等于 0,阐明卡不存在
if (cardInfo.getRowCount() == 0) {
// 记录刷卡行为到数据库表中,将 accept 字段置为回绝“REJECTED”voltQueueSQL(insertActivity, cardId, tsl, stationId, ACTIVITY_ENTER, amt, ACTIVITY_REJECTED);
voltExecuteSQL(true);
// 返回“被回绝”音讯给客户端。return buildResult(accepted,"Card Invalid");
}
// 如果卡存在,则取出卡信息。cardInfo.advanceRow();
// 卡状态,0 不可用,1 可用
int enabled = (int)cardInfo.getLong(0);
int cardType = (int)cardInfo.getLong(1);
// 卡余额
int balance = (int)cardInfo.getLong(2);
TimestampType expires = cardInfo.getTimestampAsTimestamp(3);
String owner = cardInfo.getString(4);
String phone = cardInfo.getString(5);
String email = cardInfo.getString(6);
int notify = (int)cardInfo.getLong(7);
// 查问指定站点的进站费用
stationInfo.advanceRow();
// 指定站点的进站费用
int fare = (int)stationInfo.getLong(0);
String stationName = stationInfo.getString(1);
// 刷卡工夫
TimestampType ts = new TimestampType(tsl);
// 如果卡状态为不可用
if (enabled == 0) {
// 向客户端返回“此卡不可用”return buildResult(accepted,"Card Disabled");
}
// 如果卡类型为“非月卡”if (cardType == 0) { // 如果卡内余额短缺
if (balance > fare) {
//isFrand 为反欺诈策略,前面介绍
if (isFraud(cardId, ts, stationId)) {
// 如果认定为欺诈,记录刷卡记录,记录类型为“欺诈刷卡”voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_FRAUD);
// 并且把欺诈事件写入 stream,并最终被公布到 VoltDB Topic 中。见后面 STREAM FRAUD 到 ddl 定义
voltQueueSQL(publishFraud, generateId(cardId, tsl), cardId, ts, stationId, ACTIVITY_ENTER, amt);
voltExecuteSQL(true);
// 向客户端返回“欺诈交易”音讯
return buildResult(0, "Fraudulent transaction");
} else {
// 如果不是欺诈行为,则缩小卡内余额,实现失常生产
voltQueueSQL(chargeCard, balance - fare, cardId);
// 记录失常的刷卡事件
voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_ACCEPTED);
voltExecuteSQL(true);
// 向客户端返回卡内余额
return buildResult(1, "Remaining Balance:" + intToCurrency(balance - fare));
}
} else {
// 如果卡内余额有余,记录刷卡失败事件。voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, 0, ACTIVITY_REJECTED);
if (notify != 0) {
// 再次用到 card_alert_export 这个 stream 对象,用于发送公交卡欠费音讯
voltQueueSQL(exportActivity, cardId, getTransactionTime().getTime(), stationName, owner, phone, email, notify, "Insufficient Balance");
}
voltExecuteSQL(true);
// 向客户端返回“余额有余“音讯
return buildResult(0,"Card has insufficient balance:"+intToCurrency(balance));
}
}
}
以上代码中有一个 isFraud 办法,用于断定是否为欺诈性刷卡。这里定义了一些简略反欺诈规定
- 如果一秒钟内雷同的卡片有 1 次以上的刷卡记录,认定为欺诈。因为不可能存在工夫距离如此短的刷卡行为,可能是因为有多张伪造卡片在同时刷卡。
- 同一张卡在过来一小时内,在 5 个或 5 个以上站点刷卡进站。假如这同样被认为是因为有多张伪造卡片在同时刷卡。
-
同一张卡在过来一小时内,有过 10 次以上刷卡进站记录。进出站次数太多,暂停应用一段时间。
isFraud 办法依据以后刷卡记录中的数据,联合数据库中的历史记录实现以上反欺诈规定。历史刷卡记录被保留在 card_events 表中,另外基于这张表创立了视图,统计每张卡在一秒钟内是否有过刷卡记录。
CREATE VIEW CARD_HISTORY_SECOND as select card_id, TRUNCATE(SECOND, date_time) scnd from card_events group by card_id, scnd;
isFraud 办法的定义
public final SQLStmt cardHistoryAtStations = new SQLStmt("SELECT activity_code, COUNT(DISTINCT station_id) AS stations" +
"FROM card_events" +
"WHERE card_id = ? AND date_time >= DATEADD(HOUR, -1, ?)" +
"GROUP BY activity_code;"
);
public final SQLStmt cardEntries = new SQLStmt(
"SELECT activity_code" +
"FROM card_events" +
"WHERE card_id = ? AND station_id = ? AND date_time >= DATEADD(HOUR, -1, ?)" +
"ORDER BY date_time;"
);
public final SQLStmt instantaneousCardActivity = new SQLStmt("SELECT count(*) as activity_count"
+ "FROM CARD_HISTORY_SECOND"
+ "WHERE card_id = ?"
+ "AND scnd = TRUNCATE(SECOND, ?)"
+ "GROUP BY scnd;"
);
public boolean isFraud(int cardId, TimestampType ts, int stationId) {voltQueueSQL(instantaneousCardActivity, cardId, ts);
voltQueueSQL(cardHistoryAtStations, cardId, ts);
voltQueueSQL(cardEntries, cardId, stationId, ts);
final VoltTable[] results = voltExecuteSQL();
final VoltTable cardInstantaneousActivity = results[0];
final VoltTable cardHistoryAtStationisTable = results[1];
final VoltTable cardEntriesTable = results[2];
// 一秒钟之内曾经有一次刷卡记录的话,返回 true
while (cardInstantaneousActivity.advanceRow()) {if(cardInstantaneousActivity.getLong("activity_count") > 0) {return true;}
}
while (cardHistoryAtStationisTable.advanceRow()) {final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code");
final long stations = cardHistoryAtStationisTable.getLong("stations");
if (activity_code == ACTIVITY_ENTER) {
// 过来 1 小时之内在五个站点刷卡进站,返回 true
if (stations >= 5) {return true;}
}
}
byte prevActivity = ACTIVITY_INVALID;
int entranceCount = 0;
while (cardEntriesTable.advanceRow()) {final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code");
if (prevActivity == ACTIVITY_INVALID || prevActivity == activity_code) {if (activity_code == ACTIVITY_ENTER) {
prevActivity = activity_code;
entranceCount++;
} else {prevActivity = ACTIVITY_INVALID;}
}
}
// 如果在过来 1 小时内有 10 次连续的刷卡记录,返回 true。if (entranceCount >= 10) {return true;}
return false;
}
您看好 VoltDB 吗?马上口头吧!
欢送私信,与更多小伙伴一起探讨。
对于 VoltDB
VoltDB 反对强 ACID 和实时智能决策的应用程序,以实现互联世界。没有其它数据库产品能够像 VoltDB 这样,能够同时须要低延时、大规模、高并发数和准确性相结合的应用程序加油。
VoltDB 由 2014 年图灵奖获得者 Mike Stonebraker 博士创立,他对关系数据库进行了从新设计,以应答当今一直增长的实时操作和机器学习挑战。Stonebraker 博士对数据库技术钻研已有 40 多年,在疾速数据,流数据和内存数据库方面带来了泛滥翻新理念。
在 VoltDB 的研发过程中,他意识到了利用内存事务数据库技术开掘流数据的全副后劲,岂但能够满足解决数据的提早和并发需要,还能提供实时剖析和决策。VoltDB 是业界可信赖的名称,在诺基亚、金融时报、三菱电机、HPE、巴克莱、华为等当先组织单干有理论场景落地案例。