关于金融:代码回现-如何实现交易反欺诈

5次阅读

共计 18247 个字符,预计需要花费 46 分钟才能阅读完成。

一、背景概述

交易反欺诈是 VoltDB 实用场景之一,是典型的事件驱动的业务,外围是摄取高频的交易数据,并逐条对交易进行一系列简单的反欺诈规定校验,最终生成评判交易可疑度的分值,发送给上游业务零碎,触发交易拦挡动作。
反欺诈规定中波及大量的通过剖析历史交易生成的指标项,在 VoltDB 中进行流式计算,可基于本地保留的丰盛的上下文数据对事件进行剖析决策,使实时计算凑近上下文数据,取得性能劣势。

二、实例回现

上面咱们通过一个刷卡的利用,展现 VoltDB 是如何实现一个简略的反欺诈用例的。为了让示例代码更加简洁,又能突出 VoltDB 的性能,这里应用一个地铁刷卡的场景代替金融交易(如信用卡刷卡),以防止引入过多业余的金融业务知识。同时一个忙碌地铁零碎产生的交易吞吐量不可小觑,定义的反欺诈规定也更容易了解。
能够通过这个链接来拜访具体的代码 https://github.com/ssomagani/…
在这个利用中,模仿如下几个场景:

  1. 多辆列车在地铁站点之间运行,生成列车进站事件。通过这个场景能够理解,如何将数据公布到 VoltDB Topic 中,以及如何生产 Topic 中的数据。
  2. 公交卡充值操作。通过这个场景,能够理解,如何应用一个蕴含自定义业务规定的 procedure 来解决 Topic 中的数据,同时应用 Stream 对象将数据导出到 Topic 中,并通过视图对 Stream 中的数据流进行统计,生成实时的统计报表。视图会逐条统计 Stream 中的流数据,将处理结果保留到视图中,是 VoltDB 实现流式计算的形式之一。
  3. 乘客刷卡乘车,生成高频交易数据。通过这个场景,能够理解,如何应用 VoltDB 数据库客户端 api 间接操作数据表(区别与将数据发送到 Topic 中),保留交易数据。如何通过 VoltDB 的 java procedure 定制反欺诈校验规定,并调用 java procedure 进行交易校验和反欺诈行为。
    让咱们来具体理解一下,在 VoltDB 中运行这个用例的过程。

2.1 筹备工作

1. 启用 VoltDB Topic 性能
VoltDB 提供一个对立的配置文件,次要的个性都能够在其中进行定义,如:长久化、高可用、安全性等等,这里次要介绍与案例相干的 VoltDB Topic 性能。如下配置开启了 Topic 服务,并在服务器上开启端口 9999,用于承受客户端发来的音讯。

  <Topics enabled="true">
        <properties>
            <property name="port">9999</property>
            <property name="group.initial.rebalance.delay.ms">0</property>
            <property name="retention.policy.threads">1</property>
        </properties>
        <profiles>
            <profile name="retain_compact">
                <retention policy="compact" limit="2048" />
        </profile>
        </profiles>
    </Topics>

2. 依据特定配置文件启动 VoltDB
3. 创立 Topic,Topic 的用处前面的代码剖析中提到

CREATE Topic TRAINTOPIC execute procedure train_events.insert;
CREATE TOPIC RECHARGE execute procedure RechargeCard;
CREATE TOPIC using stream CARD_ALERT_EXPORT properties(topic.format=avro);
create topic using stream FRAUD properties(topic.format=avro,consumer.keys=TRANS_ID);

4. 创立数据表
在解决实时事件流时,能够充分利用底层的数据库引擎,充分利用本地关系型数据进行数据分析,失去反欺诈业务指标。在本例中将创立如下数据表和视图(省略具体 DDL)

5. 初始化数据
通过 VoltDB 的数据导入性能,从 csv 文件中初始化站点和列车

csvloader --file $PROJ_HOME/data/redline.csv --reportdir log stations
csvloader --file $PROJ_HOME/data/trains.csv --reportdir log trains

2.2 代码剖析 - 列车运行

在这个场景中,客户端模仿 8 辆列车在 17 个站点之间运行,产生进站事件并发送到 Topic。因为设定的列车进出站工夫比拟短(微秒为单位),所以会产生高频事件流。
在服务端,VoltDB 实现:
1. 音讯接管
2. 生产音讯
3. 将列车进站事件记录到数据库中
在客户端,通过 java 类 TrainProducer 生成多辆列车进站事件,并将事件发送到 VoltDB Topic 中。TrainProducer 的执行命令如下:

java metro.pub.TrainProducer localhost:9999 TRAINTOPIC 8

TrainProducer 类接管四个参数:

  1. . 指定接管列车进站和离站事件的 VoltDB 服务器端口。这里假如在同一台机器上运行 client 代码和 VoltDB,而后面在 VoltDB 配置文件中咱们曾经指定 Topic 的监听端口是 9999。
  2. 指定 VoltDB broker
  3. 指定数据发送的 Topic 名称。
  4. 指定要模仿的列车数量。

剖析一下 TrainProducer 的次要办法,main 办法生成 10 个线程, 每 50 毫秒执行一次 publish()办法,将列车进出站工夫发送到 Topic“TRAINTOPIC”中。

public static void main(String[] args) {ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10);
        TrainProducer producer = new TrainProducer(args[0], args[1], Integer.parseInt(args[2]));
        System.out.println("Scheduling trains");
        EXECUTOR.scheduleAtFixedRate (() -> {producer.publish(producer.getNewEvents());
                }, 1, 50, MILLISECONDS);
    }

跟踪代码找到 producer 的定义,它其实就是原生的 KafkaProducer,所以能够看到 VoltDB Topic 齐全兼容 kafka api。而 brokers 即是 main 办法中的传参 localhost:9999,因而下面 producer.getNewEvents()办法生成的数据将被发送到 VoltDB Topic 中。

private Producer<String, TrainEvent> createProducer() {Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
           "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer",
           "metro.serde.TrainEventSer");
        Producer<String, TrainEvent> producer = new KafkaProducer
           <String, TrainEvent>(props);
        return producer;
    }

Publish 办法所发送的音讯由 producer.getNewEvents()办法生成。有必要提前看一下 Stations 类,其中定义了 17 个火车站点,包含每个站点的到下一个站点的运行工夫(Station.nextStnDuration)和本站点停车工夫(Station.stnWaitDuration),工夫以微秒为单位。所有列车将顺次在这些站点中运行。

 static HashMap<Integer, Station> idToStationMap = new HashMap<>();
    static {idToStationMap.put(1, new Station(1, 1200000, 450000));
        idToStationMap.put(2, new Station(2, 1050000, 250000));
        idToStationMap.put(3, new Station(3, 850000, 300000));
        idToStationMap.put(4, new Station(4, 900000, 350000));
        idToStationMap.put(5, new Station(5, 500000, 260000));
        idToStationMap.put(6, new Station(6, 950000, 190000));
        idToStationMap.put(7, new Station(7, 450000, 130000));
        idToStationMap.put(8, new Station(8, 200000, 280000));
        idToStationMap.put(9, new Station(9, 200000, 110000));
        idToStationMap.put(10, new Station(10, 450000, 300000));
        idToStationMap.put(11, new Station(11, 550000, 200000));
        idToStationMap.put(12, new Station(12, 550000, 200000));
        idToStationMap.put(13, new Station(13, 800000, 150000));
        idToStationMap.put(14, new Station(14, 950000, 100000));
        idToStationMap.put(15, new Station(15, 1000000, 130000));
        idToStationMap.put(16, new Station(16, 1200000, 220000));
        idToStationMap.put(17, new Station(17, 1500000, 500000));
}
   public static class Station {
        public final int stationId;
        public final int nextStnDuration;
        public final int stnWaitDuration;
        public Station(int stationId, int nextStnDuration, int stnWaitDuration) {
            this.stationId = stationId;
            this.nextStnDuration = nextStnDuration;
            this.stnWaitDuration = stnWaitDuration;
        }
    }

所以 getNewEvents 次要的逻辑是首先随机设定列车从任意站点登程,而后调用 next()依据零碎以后工夫和站点的 Station.nextStnDuration、Station.stnWaitDuration 来判断每辆列车目前运行到哪个站点,如果 next 返回的 LastKnownLocation 对象有变动,则判断列车已进入下一站,将列车进站事件 trainEvent 放到 records 中,用于发送给 Topic。(注:列车调度不是本样例的重点,因而 next 办法不会思考列车的抵触问题,它假如站点之间由足够多的轨道,能够供多个列车并行)。

public List<TrainEvent> getNewEvents() {ArrayList<TrainEvent> records = new ArrayList<>();
        for(TrainEvent trainEvent : idToTrainMap.values()) {
            LastKnownLocation prevLoc = trainEvent.location;
            LastKnownLocation curLoc = next(prevLoc, LocalDateTime.now());
            if(!prevLoc.equals(curLoc)) {trainEvent = new TrainEvent(trainEvent.trainId, curLoc);
                idToTrainMap.put(trainEvent.trainId, trainEvent);
                records.add(trainEvent);
            }
        }
        return records;
    }

Topic TRAINTOPIC 定义如下,train_events.insert 是 VoltDB 为表创立的默认存储过程,命名规定为[tablename].insert。Topic 与存储过程连用,示意存储过程 train_events.insert 生产该 Topic TRAINTOPIC 中的 trainEvent 数据,并写入 train_events 表中。

CREATE Topic TRAINTOPIC execute procedure train_events.insert;

2.23 代码剖析 - 公交卡充值

在这个场景中,客户端将实现充值音讯发送。
在服务端,VoltDB 实现:

  1. 音讯接管
  2. 生产音讯
  3. 应用自定义逻辑解决音讯 将充值数据更新到数据库中
  4. 生成充值音讯,并将数据写入 stream 对象中
  5. 基于 stream 对象创立视图,来生成实时的充值统计报表
  6. 将 stream 中的充值音讯公布到 Topic 中,供后续(VoltDB 之外的)数据处理逻辑进行生产。例如被 spark 生产,因为进行后续的批处理逻辑。

在客户端通过执行 java 类 CardsProducer,首先初始化公交卡记录,并将记录写入数据库表中。而后随机生成卡片充值事件,发送事件到 Topic RECHARGE 中。CardsProducer 的执行命令如下:

java metro.pub.CardsProducer --mode=recharge --servers=localhost:9999 --Topic=RECHARGE

CardsProducer 类接管三个参数:

  1. 执行模式,用于指定是初始化公交卡记录还是生成充值事件。
  2. 指定 VoltDB broker
  3. 指定数据发送的 Topic 名称

剖析一下 CardsProducer 的次要办法,main 办法生成 10 个线程, 每 5 毫秒执行一次 publish()办法,将列车进出站工夫发送到 Topic“RECHARGE”中。

    public static void main(String[] args) throws IOException {CONFIG.parse("CardsProducer", args);
        if(CONFIG.mode.equals("new")) {genCards(CONFIG);
            return;
        }
        ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(10);
        CardsProducer producer = new CardsProducer(CONFIG.servers, CONFIG.Topic);
        System.out.println("Recharging Cards");
        EXECUTOR.scheduleAtFixedRate (() -> {producer.publish(producer.getRechargeActivityRecords(1));
                }, 1, 5, MILLISECONDS);
    }

和后面 TrainProducer 一样,CardsProducer 中的 producer 也是 KafkaProducer,不多介绍。getRechargeActivityRecords 办法用来生成一条随机的充值事件,包含卡号、充值金额和充值站点。每 5 毫秒执行一次。

  public List<CardEvent> getRechargeActivityRecords(int count) {final ArrayList<CardEvent> records = new ArrayList<>();
        int amt = (ThreadLocalRandom.current().nextInt(18)+2)*1000;
        int stationId = ThreadLocalRandom.current().nextInt(1, 18);
        ThreadLocalRandom.current().ints(count, 0, CONFIG.cardcount).forEach((cardId)
                -> {records.add(new CardEvent(cardId, amt, stationId));
                    }
        );
        return records;
    }

这个场景中,Client 端的代码非常简单,到此为止。更多的逻辑在服务端定义,请看以下。
Topic 用于接管充值事件,它的定义如下:

CREATE TOPIC RECHARGE execute procedure RechargeCard;

其中 RechargeCard 用于生产 Topic 中的数据,而 RechargeCard 是一个 java procedure,它通过 java+sql 的形式,自定义了业务逻辑。java procedure 是 VoltDB 在解决流数据时常常用到的对象,它是一个运行在 VoltDB 服务端的 java 类,而非 client 端代码。它须要提前编译成 jar 包(如下 procs.jar),并加载到 VoltDB java 运行时环境中。之后应用如下 DDL 定义。定义了 RechargeCard 后,在下面的 CREATE TOPIC 中能力被援用。

sqlcmd --query="load classes $PROJ_HOME/dist/procs.jar"
CREATE PROCEDURE PARTITION ON TABLE cards COLUMN card_id PARAMETER 0 FROM CLASS metro.cards.RechargeCard;

让咱们看一下 RechargeCard 中的逻辑,重点关注如何将 java 业务逻辑与 SQL 进行联合。其中定义 run()办法和四个 sql 语句。RechargeCard 从 Topic RECHARGE 中生产数据,进行反序列化之后,逐条将数据(即充值事件)作为传参交给 run()办法,run()是 procedure 的入口办法。
voltQueueSQL 是 VoltDB 的 server 端 api,用来执行 sql 并返回后果。Sql getCard 和 getStationName 首先依据从 Topic 中获取的数据进行充值事件合法性校验,如果数据库中没有对应的充值站点或公交卡记录,则执行 sql exportNotif 写入一条错误信息。否则,update VoltDB 数据库中对应公交卡,减少余额,并执行 sql exportNotif 写入一条胜利信息。

public class RechargeCard extends VoltProcedure {public final SQLStmt updateBalance = new SQLStmt("UPDATE cards SET balance = balance + ? WHERE card_id = ? AND card_type = 0");
    public final SQLStmt getCard = new SQLStmt("SELECT * from cards WHERE card_id = ?");
    public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");
    public final SQLStmt getStationName = new SQLStmt("SELECT name FROM stations WHERE station_id = ?"); 
    public long run(int cardId, int amt, int stationId) {voltQueueSQL(getStationName, stationId);
        voltQueueSQL(getCard, cardId);
        String station = "UNKNOWN";
        
        final VoltTable[] results = voltExecuteSQL();
        if(results.length == 0) 
            exportError(cardId, station);
        
        VoltTable stationResult = results[0];
        if(stationResult.advanceRow()) 
            station = stationResult.getString(0);
        
        VoltTable card = results[1];
        if(card.advanceRow()) {voltQueueSQL(updateBalance, amt, cardId);
            
            String name = card.getString(5);
            String phone = card.getString(6);
            String email = card.getString(7);
            int notify = (int) card.getLong(8);
            
            voltQueueSQL(updateBalance, amt, cardId);
            voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, "Card recharged successfully");
            
            voltExecuteSQL(true);
        } else {exportError(cardId, station);
        }
        return 0;
}
    private void exportError(int cardId, String station) {exportError(cardId, station, "","", "", 0,"Could not locate details of card for recharge");
    }
    
    private void exportError(int cardId, String station, String name, String phone, String email, int notify, String msg) {voltQueueSQL(exportNotif, cardId, station, name, phone, email, notify, msg);
        voltExecuteSQL(true);
    }
}

exportNotif 的定义如下,其中 CARD_ALERT_EXPORT 是 VoltDB 的 stream 数据库对象,一种数据管道,insert 进去的数据逐个流过。

public final SQLStmt exportNotif = new SQLStmt("INSERT INTO CARD_ALERT_EXPORT values (?, NOW, ?, ?, ?, ?, ?, ?)");

能够在 CARD_ALERT_EXPORT 上增加数据处理逻辑,实现流计算成果。这个场景中,简略的在 Stream 上创立了一个视图,用于生成实时统计报表。视图的定义如下:

CREATE VIEW card_export_stats(card_id, station_name, rechargeCount) AS 
    SELECT card_id, station_name, count(*) from CARD_ALERT_EXPORT 
    GROUP BY card_id, station_name;

最初,咱们定义 Stream 中的数据最终流向另外的 Topic,该 Topic 能够让 VoltDB 之外的大数据产品进行生产,实现上游数据处理逻辑。

CREATE TOPIC using stream CARD_ALERT_EXPORT properties(Topic.format=avro);

2.4 代码剖析 - 乘客刷卡乘车

这个场景中,客户端随机生成大量乘客刷卡进站记录,并发送给数据库解决。
服务端实现如下操作:
1. 首先进行一系列校验,如验证卡信息,卡余额,是否盗刷等反欺诈操作。
2. 将所有刷卡行为都记录到数据表中。并将余额有余和复合欺诈逻辑的刷卡事件别离公布到不同的 Topic 中,供其余上游零碎订阅。
在客户端通过执行 java 类 RidersProducer,与后面两个场景不同,RidersProducer 类间接连贯 VoltDB 数据库将数据写入数据表中,而不是将数据发送到 VoltDB Topic 中。用来展现 VoltDB 的多种应用形式。
connectToOneServerWithRetry 应用 VoltDB client api 连贯指定 ip 的 VoltDB 数据库。

  void connectToOneServerWithRetry(String server, Client client) {
        int sleep = 1000;
        while (true) {
            try {client.createConnection(server);
                break;
            }
            catch (Exception e) {System.err.printf("Connection failed - retrying in %d second(s).\n", sleep / 1000);
                try {Thread.sleep(sleep); } catch (Exception interruted) {}
                if (sleep < 8000) sleep += sleep;
            }
        }
        System.out.printf("Connected to VoltDB node at: %s.\n", server);
    }

RidersProducer 类创立 100 个线程,runBenchmark 办法中每 200 毫秒这些线程执行一次 getEntryActivityRecords。getEntryActivityRecords 随机生成一条乘客进站乘车记录,记录内容包含卡号、以后工夫、进站站点 id 等

private static final ScheduledExecutorService EXECUTOR =     Executors.newScheduledThreadPool(100);
public void runBenchmark() throws Exception {
        int microsPerTrans = 1000000/RidersProducer.config.rate;
        EXECUTOR.scheduleAtFixedRate (() -> {List<Object[]> entryRecords = getEntryActivityRecords(config.cardcount);// 生成随机的进站记录
                    call(config.cardEntry, entryRecords);// 将数据发送到 VoltDB 数据库
                }, 10000, microsPerTrans, MICROSECONDS);
    }
    public static List<Object[]> getEntryActivityRecords(int count) {final ArrayList<Object[]> records = new ArrayList<>();
        long curTime = System.currentTimeMillis();
        ThreadLocalRandom.current().ints(1, 0, count).forEach((cardId)
                -> {records.add(new Object[] {cardId, curTime, Stations.getRandomStation().stationId, ENTER.value, 0});
                    }
        );
        return records;
    }

接着调用 call 办法,将数据 records 发送到数据库进行解决。Call 办法定义如下,callProcedure 是 VoltDB 的 client 端 api,用于将数据发送给指定名称的 procedure 进行解决,能够通过同步和异步 IO 两种形式进行调用,异步调用时须要指定回调函数对数据库调用的返回后果进行解决,即本例中的自定义了 BenchmarkCallback。

   protected static void call(String proc, Object[] args) {
        try {client.callProcedure(new BenchmarkCallback(proc, args), procName, args);
        } catch (IOException e) {e.printStackTrace();
        }
    }

Call 办法将数据发送给 procedure,procedure 名称由如下代码指定。一起看看 procedure 中的具体逻辑。

 @Option(desc = "Proc for card entry swipes")
        String cardEntry = "ValidateEntry";

Procedure ValidateEntry 的局部定义,首先定义了 6 个 SQL。

 // 查问公交卡是否存在
    public final SQLStmt checkCard = new SQLStmt("SELECT enabled, card_type, balance, expires, name, phone, email, notify FROM cards WHERE card_id = ?;");
    // 卡充值
    public final SQLStmt chargeCard = new SQLStmt("UPDATE cards SET balance = ? WHERE card_id = ?;");
    // 查问指定站点的入站费用
    public final SQLStmt checkStationFare = new SQLStmt("SELECT fare, name FROM stations WHERE station_id = ?;");
    // 记录进站事件
    public final SQLStmt insertActivity = new SQLStmt("INSERT INTO card_events (card_id, date_time, station_id, activity_code, amount, accept) VALUES (?,?,?,?,?,?);");
    // 再次用到 card_alert_export 这个 stream 对象,用于发送公交卡欠费音讯
    public final SQLStmt exportActivity = new SQLStmt("INSERT INTO card_alert_export (card_id, export_time, station_name, name, phone, email, notify, alert_message) VALUES (?,?,?,?,?,?,?,?);");
    // 将刷卡欺诈行为写入 stream 对象 fraud 中
    public final SQLStmt publishFraud = new SQLStmt("INSERT INTO fraud (trans_id, card_id, date_time, station, activity_type, amt) values (?, ?, ?, ?, ?, ?)"
            );

值得阐明的,下面最初一个 sql 中用到的 fraud 是另外一个 stream 对象,用于插入刷卡欺诈事件,通过 DDL 定义其中的刷卡欺诈行为最终会公布到 VoltDB Topic 中,用于上游解决产品生产。

CREATE STREAM FRAUD partition on column CARD_ID (
  TRANS_ID varchar not null,
  CARD_ID integer not null,
  DATE_TIME timestamp not null,
  STATION integer not null,
  ACTIVITY_TYPE TINYINT not null,
  AMT integer not null
);
create Topic using stream FRAUD properties(Topic.format=avro,consumer.keys=TRANS_ID);

后面曾经提到 run 办法是 procedure 的入口办法,VoltDB 运行 procedure 时,主动调用该办法。后面客户端传进的 records 记录,被逐个传递到 run 办法到参数中进行解决。run 办法定义如下

public VoltTable run(int cardId, long tsl, int stationId, byte activity_code, int amt) throws VoltAbortException {
        // 查问公交卡是否存在
        voltQueueSQL(checkCard, EXPECT_ZERO_OR_ONE_ROW, cardId);
        // 查问指定站点的交通费用
        voltQueueSQL(checkStationFare, EXPECT_ONE_ROW, stationId);
        VoltTable[] checks = voltExecuteSQL();
        VoltTable cardInfo = checks[0];
        VoltTable stationInfo = checks[1];
        byte accepted = 0;

        // 如果公交卡记录等于 0,阐明卡不存在
        if (cardInfo.getRowCount() == 0) {

            // 记录刷卡行为到数据库表中,将 accept 字段置为回绝“REJECTED”voltQueueSQL(insertActivity, cardId, tsl, stationId, ACTIVITY_ENTER, amt, ACTIVITY_REJECTED);
voltExecuteSQL(true);
// 返回“被回绝”音讯给客户端。return buildResult(accepted,"Card Invalid");
        }

        // 如果卡存在,则取出卡信息。cardInfo.advanceRow();
        // 卡状态,0 不可用,1 可用
        int enabled = (int)cardInfo.getLong(0);
        int cardType = (int)cardInfo.getLong(1);
        // 卡余额
        int balance = (int)cardInfo.getLong(2);
        TimestampType expires = cardInfo.getTimestampAsTimestamp(3);
        String owner = cardInfo.getString(4);
        String phone = cardInfo.getString(5);
        String email = cardInfo.getString(6);
        int notify = (int)cardInfo.getLong(7);

        // 查问指定站点的进站费用
        stationInfo.advanceRow();
        // 指定站点的进站费用
        int fare = (int)stationInfo.getLong(0);
        String stationName = stationInfo.getString(1);
        // 刷卡工夫
        TimestampType ts = new TimestampType(tsl);

        // 如果卡状态为不可用
        if (enabled == 0) {
                // 向客户端返回“此卡不可用”return buildResult(accepted,"Card Disabled");
        }

        // 如果卡类型为“非月卡”if (cardType == 0) { // 如果卡内余额短缺
                if (balance > fare) {
                    //isFrand 为反欺诈策略,前面介绍
                    if (isFraud(cardId, ts, stationId)) {
                        // 如果认定为欺诈,记录刷卡记录,记录类型为“欺诈刷卡”voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_FRAUD);
                        // 并且把欺诈事件写入 stream,并最终被公布到 VoltDB Topic 中。见后面 STREAM FRAUD 到 ddl 定义
                        voltQueueSQL(publishFraud, generateId(cardId, tsl), cardId, ts, stationId, ACTIVITY_ENTER, amt);
                        voltExecuteSQL(true);
                        // 向客户端返回“欺诈交易”音讯
                        return buildResult(0, "Fraudulent transaction");
                    } else {
                        // 如果不是欺诈行为,则缩小卡内余额,实现失常生产
                        voltQueueSQL(chargeCard, balance - fare, cardId);
                        // 记录失常的刷卡事件
                        voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, fare, ACTIVITY_ACCEPTED);
                        voltExecuteSQL(true);
                        // 向客户端返回卡内余额
                        return buildResult(1, "Remaining Balance:" + intToCurrency(balance - fare));
                    }
                } else {
                        // 如果卡内余额有余,记录刷卡失败事件。voltQueueSQL(insertActivity, cardId, ts, stationId, ACTIVITY_ENTER, 0, ACTIVITY_REJECTED);
                        if (notify != 0) {  
                            // 再次用到 card_alert_export 这个 stream 对象,用于发送公交卡欠费音讯
                            voltQueueSQL(exportActivity, cardId, getTransactionTime().getTime(), stationName, owner, phone, email, notify, "Insufficient Balance");
                        }
                        voltExecuteSQL(true);
                        // 向客户端返回“余额有余“音讯
                        return buildResult(0,"Card has insufficient balance:"+intToCurrency(balance));
                }
        }
    }

以上代码中有一个 isFraud 办法,用于断定是否为欺诈性刷卡。这里定义了一些简略反欺诈规定

  1. 如果一秒钟内雷同的卡片有 1 次以上的刷卡记录,认定为欺诈。因为不可能存在工夫距离如此短的刷卡行为,可能是因为有多张伪造卡片在同时刷卡。
  2. 同一张卡在过来一小时内,在 5 个或 5 个以上站点刷卡进站。假如这同样被认为是因为有多张伪造卡片在同时刷卡。
  3. 同一张卡在过来一小时内,有过 10 次以上刷卡进站记录。进出站次数太多,暂停应用一段时间。

    isFraud 办法依据以后刷卡记录中的数据,联合数据库中的历史记录实现以上反欺诈规定。历史刷卡记录被保留在 card_events 表中,另外基于这张表创立了视图,统计每张卡在一秒钟内是否有过刷卡记录。

CREATE VIEW CARD_HISTORY_SECOND as select card_id, TRUNCATE(SECOND, date_time) scnd from card_events group by card_id, scnd;
isFraud 办法的定义
    public final SQLStmt cardHistoryAtStations = new SQLStmt("SELECT activity_code, COUNT(DISTINCT station_id) AS stations" +
        "FROM card_events" +
        "WHERE card_id = ? AND date_time >= DATEADD(HOUR, -1, ?)" +
        "GROUP BY activity_code;"
    );

    public final SQLStmt cardEntries = new SQLStmt(
    "SELECT activity_code" +
    "FROM card_events" +
    "WHERE card_id = ? AND station_id = ? AND date_time >= DATEADD(HOUR, -1, ?)" +
    "ORDER BY date_time;"
    );

    public final SQLStmt instantaneousCardActivity = new SQLStmt("SELECT count(*) as activity_count"
            + "FROM CARD_HISTORY_SECOND"
            + "WHERE card_id = ?"
            + "AND scnd = TRUNCATE(SECOND, ?)"
            + "GROUP BY scnd;"
            );
  
 public boolean isFraud(int cardId, TimestampType ts, int stationId) {voltQueueSQL(instantaneousCardActivity, cardId, ts);
        voltQueueSQL(cardHistoryAtStations, cardId, ts);
        voltQueueSQL(cardEntries, cardId, stationId, ts);
        final VoltTable[] results = voltExecuteSQL();
        final VoltTable cardInstantaneousActivity = results[0];
        final VoltTable cardHistoryAtStationisTable = results[1];
        final VoltTable cardEntriesTable = results[2];
        // 一秒钟之内曾经有一次刷卡记录的话,返回 true
        while (cardInstantaneousActivity.advanceRow()) {if(cardInstantaneousActivity.getLong("activity_count") > 0) {return true;}
        }
        
        while (cardHistoryAtStationisTable.advanceRow()) {final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code");
            final long stations = cardHistoryAtStationisTable.getLong("stations");

            if (activity_code == ACTIVITY_ENTER) {
                // 过来 1 小时之内在五个站点刷卡进站,返回 true
                if (stations >= 5) {return true;}
            }
        }

        byte prevActivity = ACTIVITY_INVALID;
        int entranceCount = 0;
        while (cardEntriesTable.advanceRow()) {final byte activity_code = (byte) cardHistoryAtStationisTable.getLong("activity_code");

            if (prevActivity == ACTIVITY_INVALID || prevActivity == activity_code) {if (activity_code == ACTIVITY_ENTER) {
                    prevActivity = activity_code;
                    entranceCount++;
                } else {prevActivity = ACTIVITY_INVALID;}
            }
        }

        // 如果在过来 1 小时内有 10 次连续的刷卡记录,返回 true。if (entranceCount >= 10) {return true;}

        return false;
    }

您看好 VoltDB 吗?马上口头吧!
欢送私信,与更多小伙伴一起探讨。

对于 VoltDB
VoltDB 反对强 ACID 和实时智能决策的应用程序,以实现互联世界。没有其它数据库产品能够像 VoltDB 这样,能够同时须要低延时、大规模、高并发数和准确性相结合的应用程序加油。
VoltDB 由 2014 年图灵奖获得者 Mike Stonebraker 博士创立,他对关系数据库进行了从新设计,以应答当今一直增长的实时操作和机器学习挑战。Stonebraker 博士对数据库技术钻研已有 40 多年,在疾速数据,流数据和内存数据库方面带来了泛滥翻新理念。
在 VoltDB 的研发过程中,他意识到了利用内存事务数据库技术开掘流数据的全副后劲,岂但能够满足解决数据的提早和并发需要,还能提供实时剖析和决策。VoltDB 是业界可信赖的名称,在诺基亚、金融时报、三菱电机、HPE、巴克莱、华为等当先组织单干有理论场景落地案例。

正文完
 0