主頁 >  其他 > ??電商用戶行為分析-Flink【Java重寫版本】,內附具體代碼??,可以直接學習使用??【建議收藏】!

??電商用戶行為分析-Flink【Java重寫版本】,內附具體代碼??,可以直接學習使用??【建議收藏】!

2021-08-17 07:20:11 其他

文章目錄

  • 前言
  • 專案整體介紹
    • 專案主要模塊
    • 資料源決議
    • 熱門時事商品統計
      • 基本需求
      • 解決思路
      • 按照商品id進行磁區
      • 設定時間視窗
      • 視窗聚合
    • 實時流量統計–熱門頁面
      • 基本需求
      • 解決思路
    • 市場營銷分析–APP市場推廣統計
      • 基本需求
      • 解決思路
    • 市場營銷分析–頁面廣告統計
      • 基本需求
      • 解決思路
  • 專案撰寫
    • 熱門商品統計
      • 資料分析
      • 定義資料輸入輸出的結構
      • WATERMARKS
      • AGGREGATEFUNCTION自定義聚合規則
      • WINDOWFUNCTION自定義視窗處理元素的規則
      • 處理函式(PROCESSFUNCTIONS)
      • 完整代碼如下
      • 資料源改為KAFKA
      • 自定義KAFKA生產者
    • 實時流量統計
      • 頁面瀏覽量統計
      • 網站瀏覽總量(PV)
      • 獨立訪客數統計(UV)
        • 方法一
        • 方法二
        • 方法三 布隆過濾
    • 市場營銷商業指標
      • APP市場推廣統計
      • 頁面廣告分析
      • 過濾黑名單
      • 惡意登錄監控
        • 方法一
    • FLINKCEP - COMPLEX EVENT PROCESSING FOR FLINK
        • 方法二 CEP
      • 訂單支付實時監控
        • 方法一 CEP
        • 方法二 直接使用狀態編程
      • 來自兩條流的訂單交易匹配
        • 方法一:CONNECT + COPROCESS
        • 方法二:JOIN

前言

近些年,隨著對實時資料需求越來越高,掀起了一波學習Flink的熱潮,本文借鑒于尚硅谷大資料實戰_電商用戶行為分析(專案開發實戰)學習,原始專案使用Scala,本文嘗試用Java對專案進行重寫,也會結合官方檔案,介紹一些api的用處,話不多說,直接開始我們今天的正題:

專案整體介紹

專案主要模塊

基于對電商用戶行為資料的基本分類,我們可以發現主要有以下三個分析方向:

1.熱門統計
利用用戶的點擊瀏覽行為,進行流量統計、近期熱門商品統計等,

2.偏好統計
利用用戶的偏好行為,比如收藏、喜歡、評分等,進行用戶畫像分析,給出個性化的商品推薦串列,

3.風險控制
利用用戶的常規業務行為,比如登錄、下單、支付等,分析資料,對例外情況進行報警提示,

本專案限于資料,我們只實作熱門統計和風險控制中的部分內容,將包括以下五大模塊:實時熱門商品統計、實時流量統計、市場營銷商業指標統計、惡意登錄監控和訂單支付失效監控,其中細分為以下9個具體指標:
在這里插入圖片描述由于對實時性要求較高,我們會用flink作為資料處理的框架,在專案中,我們將綜合運用flink的各種API,基于EventTime去處理基本的業務需求,并且靈活地使用底層的processFunction,基于狀態編程和CEP去處理更加復雜的情形,

資料源決議

行為資料UserBehavior

欄位名資料型別說明
userIdLong加密后的用戶ID
itemIdLong加密后的商品ID
categoryIdInt加密后的商品所屬類別ID
behaviorString用戶行為型別,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’)
timestampLong行為發生的時間戳,單位秒

web日志資料

欄位名資料型別說明
ipString訪問的 IP
userIdLong訪問的 user ID
eventTimeLong訪問時間
methodString訪問方法 GET/POST/PUT/DELETE
urlString訪問的 url

熱門時事商品統計

基本需求

統計近一小時熱門商品,每五秒鐘更新一次
熱門數用瀏覽度pv來衡量

解決思路

過濾出用戶行為中的pv
構建滑動視窗

按照商品id進行磁區

.keyBy(“itemid”)

設定時間視窗

.timeWindow(Time.minutes(60),Time.minutes(5)) 滑動視窗
時間視窗左閉右開,同一份資料可以發送給滿足條件的多份視窗

視窗聚合

.aggregate(new CountAgg(),new WindowResultFunction())
new CountAgg():定義聚合規則
new WindowResultFunction():定義輸出的資料結構

實時流量統計–熱門頁面

基本需求

從web服務器日志中,統計實時熱門訪問頁面
統計每分鐘ip訪問量,取出訪問量最大的五個地址,每五秒更新一次

解決思路

將日志中的時間轉換為時間戳
構建滑動視窗

市場營銷分析–APP市場推廣統計

基本需求

統計APP市場推廣的資料指標
按照不同的推廣渠道,分別統計資料

解決思路

通過濾過,按照不同渠道進行統計
自定義processFunction

市場營銷分析–頁面廣告統計

基本需求

按照不同省份,統計每小時頁面訪問量,五秒鐘統計一次
對于頻繁的點擊行為進行過濾,放入黑名單

解決思路

滑動視窗
利用processFunction進行黑名單過濾
其實需求的具體細節還有很多,代碼實作中再展開

專案撰寫

熱門商品統計

資料分析

// userID,itemId,categoryId,mode,timeStamp
543462,1715,1464116,pv,1511658000

定義資料輸入輸出的結構

// input structure
class UserBehavior {
    public long userId;
    public long itemId;
    public int categoryId;
    public String behavior;
    public long timeStamp;

    public UserBehavior() {
    }

    public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timeStamp = timeStamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\'' +
                ", timeStamp=" + timeStamp +
                '}';
    }
}

// output structure
class ItemViewCount{
    public long itemID;
    public long windowEnd;
    public long count;

    public ItemViewCount() {
    }

    public ItemViewCount(long itemID, long windowEnd, long count) {
        this.itemID = itemID;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemID=" + itemID +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }

WATERMARKS

為了使用事件時間語意,Flink 應用程式需要知道事件時間戳對應的欄位,意味著資料流中的每個元素都需要擁有可分配的事件時間戳,其通常通過使用 TimestampAssigner API 從元素中的某個欄位去訪問/提取時間戳,

時間戳的分配與 watermark 的生成是齊頭并進的,其可以告訴 Flink 應用程式事件時間的進度,其可以通過指定 WatermarkGenerator 來配置 watermark 的生成方式,

使用 Flink API 時需要設定一個同時包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy,WatermarkStrategy 工具類中也提供了許多常用的 watermark 策略,并且用戶也可以在某些必要場景下構建自己的 watermark 策略,

// 設定水印,處理亂序資料
// 水印策略,有界無序,定義一個固定延遲事件
// 同時時間的語意,由我們物件中的timeStamp指定
SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));

AGGREGATEFUNCTION自定義聚合規則

AggregateFunction比ReduceFunction更加通用,它有三個引數:輸入型別(IN),累加器型別(ACC)和輸出型別(OUT)

class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {

    // 定義初始值
    @Override
    public Long createAccumulator() {
        return null;
    }

    // 組內規則
    @Override
    public Long add(UserBehavior userBehavior, Long aLong) {
        return null;
    }

    // 回傳值
    @Override
    public Long getResult(Long aLong) {
        return null;
    }

    // 組間規則
    @Override
    public Long merge(Long aLong, Long acc1) {
        return null;
    }
}

WINDOWFUNCTION自定義視窗處理元素的規則

@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
    void apply(KEY var1, W var2, Iterable<IN> var3, Collector<OUT> var4) throws Exception;
}

處理函式(PROCESSFUNCTIONS)

ProcessFunction 將事件處理與 Timer,State 結合在一起,使其成為流處理應用的強大構建模塊, 這是使用 Flink 創建事件驅動應用程式的基礎,它和 RichFlatMapFunction 十分相似, 但是增加了 Timer,

這里展示了其中一種ProcessFunction,

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;

    public KeyedProcessFunction() {
    }

    public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;

    public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
    }
  ......
class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> {

    public int n;
    // 定義一個狀態變數 list state,用來保存所有的 ItemViewCont
    public ListState<ItemViewCount> itemState;

    public TopNHotItems(int n) {
        this.n = n;
    }

    // // 在執行processElement方法之前,會最先執行并且只執行一次 open 方法
    @Override
    public void open(Configuration parameters) throws Exception {
        itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemState",ItemViewCount.class));
    }

    @Override
    public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
        itemState.add(itemViewCount);
        // 注冊 windowEnd+1 的 EventTime Timer, 延遲觸發,當觸發時,說明收齊了屬于windowEnd視窗的所有商品資料,統一排序處理
        context.timerService().registerEventTimeTimer(itemViewCount.windowEnd+1);
    }

    // 定時器觸發時,會執行這個方法
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 已經收集到所有的資料,首先把所有的資料放到一個 List 中
        List<ItemViewCount> allItems = new ArrayList<>();
        Iterable<ItemViewCount> itemViewCounts = itemState.get();
        Iterator<ItemViewCount> iterator = itemViewCounts.iterator();
        int cnt=0;
        while (iterator.hasNext()) {
            if(cnt>=3) break;
            allItems.add(iterator.next());
            cnt++;
        }
        // 清除狀態
        itemState.clear();
        // 按照 count 大小  倒序排序
        Collections.sort(allItems, new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                if(o1.count>o2.count) return -1;
                else if(o1.count==o2.count) return 0;
                else return 1;
            }
        });
        StringBuilder result = new StringBuilder();
        result.append("======================================================\n");
        // 觸發定時器時,我們多設定了1秒的延遲,這里我們將時間減去0.1獲取到最精確的時間
        result.append("時間:").append(new Timestamp(timestamp - 1)).append("\n");
        for(ItemViewCount elem:allItems) result.append(elem.toString());
        result.append("\n");
        result.append("======================================================\n");
        out.collect(result.toString());
    }
}

完整代碼如下

注釋還是寫得很詳細的,層層遞進

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;

/**
 * @Description: 熱門時事商品統計
 */

// input structure
class UserBehavior {
    public long userId;
    public long itemId;
    public int categoryId;
    public String behavior;
    public long timeStamp;

    public UserBehavior() {
    }

    public UserBehavior(long userId, long itemId, int categoryId, String behavior, long timeStamp) {
        this.userId = userId;
        this.itemId = itemId;
        this.categoryId = categoryId;
        this.behavior = behavior;
        this.timeStamp = timeStamp;
    }

    @Override
    public String toString() {
        return "UserBehavior{" +
                "userId=" + userId +
                ", itemId=" + itemId +
                ", categoryId=" + categoryId +
                ", behavior='" + behavior + '\'' +
                ", timeStamp=" + timeStamp +
                '}';
    }
}

// output structure
class ItemViewCount{
    public long itemID;
    public long windowEnd;
    public long count;

    public ItemViewCount() {
    }

    public ItemViewCount(long itemID, long windowEnd, long count) {
        this.itemID = itemID;
        this.windowEnd = windowEnd;
        this.count = count;
    }

    @Override
    public String toString() {
        return "ItemViewCount{" +
                "itemID=" + itemID +
                ", windowEnd=" + windowEnd +
                ", count=" + count +
                '}';
    }
}


public class HotItems {

    public static void main(String[] args) throws Exception {

        // 定義流處理環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 設定并行度
        env.setParallelism(1);
        // 設定時間特征為事件事件
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // source
        // input data
        DataStreamSource<String> stringDataStreamSource = env.readTextFile("/UserBehavior.csv");

        // transform
        // 將原始資料變成UserBehavior型別
        SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
            }
        });
        // 設定水印,處理亂序資料
        // 水印策略,有界無序,定義一個固定延遲事件
        // 同時時間的語意,由我們物件中的timeStamp指定
        SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));
        // 過濾出pv資料
        userBehaviorWatermark.filter(new FilterFunction<UserBehavior>() {
            @Override
            public boolean filter(UserBehavior userBehavior) throws Exception {
                return userBehavior.behavior.equals("pv");
            }
        })
                // 按照itemId聚合
        .keyBy(value -> value.itemId)
        // Windows can be defined on already partitioned KeyedStreams
        // 定義滑動視窗
        .timeWindow(Time.hours(1), Time.minutes(5))
        // 統計出每種商品的個數,自定義聚合規則,和輸出結構
        .aggregate(new CountAgg(), new WindowResult())

        // 按照每次視窗結束時間聚合
        .keyBy(value->value.windowEnd)
        // 輸出每個視窗中點擊量前N名的商品
        .process(new TopNHotItems(3))
        .print("HotItems");

        env.execute();
    }
}

class CountAgg implements AggregateFunction<UserBehavior,Long,Long> {

    // 定義初始值
    @Override
    public Long createAccumulator() {
        return 0L;
    }

    // 組內規則
    @Override
    public Long add(UserBehavior userBehavior, Long aLong) {
        return aLong+1;
    }

    // 回傳值
    @Override
    public Long getResult(Long aLong) {
        return aLong;
    }

    // 組間規則
    @Override
    public Long merge(Long aLong, Long acc1) {
        return aLong+acc1;
    }
}

class WindowResult implements WindowFunction<Long,ItemViewCount,Long, TimeWindow> {

    @Override
    public void apply(Long aLong, TimeWindow timeWindow, java.lang.Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
        collector.collect(new ItemViewCount(aLong,timeWindow.getEnd(),iterable.iterator().next()));
    }
}

class TopNHotItems extends KeyedProcessFunction<Long,ItemViewCount,String> {

    public int n;
    // 定義一個狀態變數 list state,用來保存所有的 ItemViewCont
    public ListState<ItemViewCount> itemState;

    public TopNHotItems(int n) {
        this.n = n;
    }

    // // 在執行processElement方法之前,會最先執行并且只執行一次 open 方法
    @Override
    public void open(Configuration parameters) throws Exception {
        itemState = getRuntimeContext().getListState(new ListStateDescriptor<ItemViewCount>("itemState",ItemViewCount.class));
    }

    @Override
    public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
        itemState.add(itemViewCount);
        // 注冊 windowEnd+1 的 EventTime Timer, 延遲觸發,當觸發時,說明收齊了屬于windowEnd視窗的所有商品資料,統一排序處理
        context.timerService().registerEventTimeTimer(itemViewCount.windowEnd+1);
    }

    // 定時器觸發時,會執行這個方法
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        // 已經收集到所有的資料,首先把所有的資料放到一個 List 中
        List<ItemViewCount> allItems = new ArrayList<>();
        Iterable<ItemViewCount> itemViewCounts = itemState.get();
        Iterator<ItemViewCount> iterator = itemViewCounts.iterator();
        int cnt=0;
        while (iterator.hasNext()) {
            if(cnt>=3) break;
            allItems.add(iterator.next());
            cnt++;
        }
        // 清除狀態
        itemState.clear();
        // 按照 count 大小  倒序排序
        Collections.sort(allItems, new Comparator<ItemViewCount>() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                if(o1.count>o2.count) return -1;
                else if(o1.count==o2.count) return 0;
                else return 1;
            }
        });
        StringBuilder result = new StringBuilder();
        result.append("======================================================\n");
        // 觸發定時器時,我們多設定了1秒的延遲,這里我們將時間減去0.1獲取到最精確的時間
        result.append("時間:").append(new Timestamp(timestamp - 1)).append("\n");
        for(ItemViewCount elem:allItems) result.append(elem.toString());
        result.append("\n");
        result.append("======================================================\n");
        out.collect(result.toString());
    }
}

第一個模塊的代碼,寫得比較詳細,之后的模塊在文中就只寫核心部分了,

資料源改為KAFKA

Properties properties = new Properties();
properties.setProperty(“bootstrap.servers”,“localhost:9092”);
properties.setProperty(“group.id”, “consumer-group”);
properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
properties.setProperty(“auto.offset.reset”, “latest”);
DataStreamSource stringDataStreamSource = env.addSource(new FlinkKafkaConsumer<>(“hotItem”, new SimpleStringSchema(), properties));

自定義KAFKA生產者

可以從檔案讀取資訊并不斷發送,便于測驗

public class myKafkaProducer {

    public static void main(String[] args) throws IOException {
        write2kafka("hotItem");
    }
    public static void write2kafka(String topic) throws IOException {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.166.200:9092");
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String,String>(properties);
        InputStreamReader read = new InputStreamReader(new FileInputStream("data/UserBehavior.csv"));
        BufferedReader bufferedReader = new BufferedReader(read);
        while (true) {
            String s = bufferedReader.readLine();
            if(s!=null) {
                producer.send(new ProducerRecord<String, String>(topic,s));
            }
        }
    }
}

實時流量統計

頁面瀏覽量統計

每隔5秒,輸出最近10分鐘內訪問量最多的前N個URL,
套路其實是一樣的,同上

網站瀏覽總量(PV)

統計每小時pv
其實就是一個word count

獨立訪客數統計(UV)

這里涉及到一個去重的操作,flink本身沒有distinct算子,這比較出乎意料,當前場景,有如下幾種去重的方式,
在展示去重方法之前,需要先指出一個api的要點,就是WindowedStream/AllWindowedStream,這兩者最后輸出的都是DataStream,可以apply在有磁區和沒磁區的視窗中,效果雖然沒有process這么強力,但還是不錯的,

方法一

利用set進行去重

public class UniqueVisitor {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
            }
        });

        SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));

        userBehaviorWatermark.filter(value -> value.behavior.equals("pv"))
                .timeWindowAll(Time.minutes(60))
                .apply(new UvCountByWindow())
                .print();

        env.execute();
    }
}

class UvCountByWindow implements AllWindowFunction<UserBehavior,UVCount, TimeWindow> {

    @Override
    public void apply(TimeWindow window, Iterable<UserBehavior> input, Collector<UVCount> out) {
        Set<Long> longs = new HashSet<>();
        for (UserBehavior next : input) {
            longs.add(next.userId);
        }
        out.collect(new UVCount(window.getEnd(),longs.size()));
    }
}

方法二

利用mapState,思路和set差不多
可以看到這個processFunction并沒有ontimer方法,因為keyedProcessFunction是ProcessFunction的擴展,可以在onTimer獲取timer的key (通過context.getCurrentKey方法),而這個方法并不是

public class UniqueVisitor {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
            }
        });

        SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));

        userBehaviorWatermark.filter(value -> value.behavior.equals("pv"))
                .timeWindowAll(Time.minutes(60))
//                .apply(new UvCountByWindow())
                .process(new UvCountByProcess())
                .print();

        env.execute();
    }
}

class UvCountByProcess extends ProcessAllWindowFunction<UserBehavior, UVCount, TimeWindow> {

    public MapState<Long, Long> mapState;
    public int cnt = 0;

    @Override
    public void open(Configuration parameters) throws Exception {
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("mapState",Long.class , Long.class));
    }

    @Override
    public void process(Context context, Iterable<UserBehavior> iterable, Collector<UVCount> collector) throws Exception {
        for(UserBehavior elem:iterable) {
            if(!mapState.contains(elem.userId)) {
                mapState.put(elem.userId,1L);
                cnt++;
            }
        }
        collector.collect(new UVCount(context.window().getEnd(),cnt));
        mapState.clear();
    }


}

方法三 布隆過濾

上兩種方法都需要用到記憶體在存盤元素,要是資料量很大,會遇到資源不夠的情況,這里采用布隆過濾器,

public class UvWithBloomFilter {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/UserBehavior.csv");

        SingleOutputStreamOperator<UserBehavior> userBehaviorStream = stringDataStreamSource.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] split = s.split(",");
                return new UserBehavior(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2]), split[3], Long.parseLong(split[4]));
            }
        });

        SingleOutputStreamOperator<UserBehavior> userBehaviorWatermark = userBehaviorStream.assignTimestampsAndWatermarks(WatermarkStrategy
                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((event, timeStamp) -> event.timeStamp * 1000));

        userBehaviorWatermark.filter(value -> value.behavior.equals("pv"))
                .map(new MapFunction<UserBehavior, Tuple2<String,Long>>() {
                    @Override
                    public Tuple2<String, Long> map(UserBehavior userBehavior) throws Exception {
                        return new Tuple2<>("dummyKey",userBehavior.userId);
                    }
                })
                .keyBy(value->value.f0)
                .timeWindow(Time.minutes(60))
                // 我們不應該等待視窗關閉才去做 Redis 的連接 -》 資料量可能很大,視窗的記憶體放不下
                // 所以這里使用了 觸發視窗操作的API -- 觸發器 trigger
                .trigger(new MyTrigger())
                .process(new UvCountWithBloom())
                .print();

        env.execute();
    }
}

class MyTrigger extends Trigger<Tuple2<String, Long>, TimeWindow> {

    @Override
    public TriggerResult onElement(Tuple2<String, Long> stringLongTuple2, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
    }
}

// 定義一個布隆過濾器
class Bloom {
    public Long cap;

    public Bloom() {
    }

    public Bloom(Long cap) {
        this.cap = cap;
    }

    public Long hash(String value,int seed) {
        long result = 0;
        for(int i=0;i<value.length();i++) {
            result = result * seed + value.charAt(i);
        }
        return result & (cap-1);
    }

    @Override
    public String toString() {
        return "Bloom{" +
                "cap=" + cap +
                '}';
    }
}

class UvCountWithBloom extends ProcessWindowFunction<Tuple2<String, Long>, UVCount, String, TimeWindow> {

    // 可以在這里提前定義和redis的鏈接,這里我就不定義了
    // 這里我們就定義一個map來表示,其實應該是利用redis的位圖,本部分建議還是看原始倉庫scala的代碼
    public Map<Long,Integer> map = new HashMap<>();

    // 定義bloom過濾器
    public Bloom bloom;

    @Override
    public void open(Configuration parameters) throws Exception {
        bloom = new Bloom(100L);
    }

    @Override
    public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<UVCount> out) {

        // 因為是每來一條資料就判斷一次,所以我們就可以直接用last獲取到這條資料
        String userId = elements.iterator().next().f1.toString();
        // 計算哈希
        long hash = bloom.hash(userId, 61);
        // 定義一個標志位,判斷 redis 位圖中有沒有這一位
        if (!map.containsKey(hash)) {
            map.put(hash, map.getOrDefault(hash,1));
        }
        map.put(hash, map.get(hash)+1);
        out.collect(new UVCount(Long.parseLong(userId),map.get(hash)));
    }
}

市場營銷商業指標

APP市場推廣統計

主要有兩個知識點,一個是自定義資料源,這對測驗來說,是一個很好的方式

class SimulateEventSource extends RichParallelSourceFunction<MarketingUserBehavior> {

    // 定義是否運行的識別符號
    Boolean running = true;
    // 定義渠道的集合
    String[] channelSet = {"AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba"};
    // 定義用戶行為的集合
    String[] behaviorTypes = {"BROWSE", "CLICK", "PURCHASE", "UNINSTALL"};
    // 定義亂數發生器
    Random rand = new Random();

    @Override
    public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {

        long count = 0;
        long max = Long.MAX_VALUE;

        while (count<max && running) {
            String userId = String.valueOf(rand.nextLong());
            String behaviorType = behaviorTypes[rand.nextInt(behaviorTypes.length)];
            String channel = channelSet[rand.nextInt(channelSet.length)];
            long timestamp = System.currentTimeMillis();
            ctx.collect(new MarketingUserBehavior(userId,behaviorType,channel,timestamp));
            count++;
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

另一個,在java中,keyby設計到的回傳值要是過于復雜,如果不想定義pojo的話,還是要使用keyselector,否則可能會遇到錯誤,
完整代碼如下

// 定義一個輸入資料的樣例類  保存電商用戶行為的樣例類
// case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)
class MarketingUserBehavior {
    public String userId;
    public String behavior;
    public String channel;
    public Long timestamp;

    public MarketingUserBehavior(String userId, String behavior, String channel, Long timestamp) {
        this.userId = userId;
        this.behavior = behavior;
        this.channel = channel;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "MarketingUserBehavior{" +
                "userId='" + userId + '\'' +
                ", behavior='" + behavior + '\'' +
                ", channel='" + channel + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}
// 定義一個輸出結果的樣例類   保存 市場用戶點擊次數
// case class MarketingViewCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)
class MarketingViewCount {
    public String windowStart;
    public String windowEnd;
    public String channel;
    public String behavior;
    public Long count;

    public MarketingViewCount(String windowStart, String windowEnd, String channel, String behavior, Long count) {
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
        this.channel = channel;
        this.behavior = behavior;
        this.count = count;
    }

    @Override
    public String toString() {
        return "MarketingViewCount{" +
                "windowStart='" + windowStart + '\'' +
                ", windowEnd='" + windowEnd + '\'' +
                ", channel='" + channel + '\'' +
                ", behavior='" + behavior + '\'' +
                ", count=" + count +
                '}';
    }
}

public class AppMarketingByChannel {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 自定義資料源
        DataStreamSource<MarketingUserBehavior> marketingUserBehaviorDataStreamSource = env.addSource(new SimulateEventSource());

        marketingUserBehaviorDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<MarketingUserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event,timeStamp)->event.timestamp))
        .filter(value-> !value.behavior.equals("UNINSTALL"))
        .map(new MapFunction<MarketingUserBehavior, Tuple2<Tuple2<String,String>,Long>>() {
            @Override
            public Tuple2<Tuple2<String,String>, Long> map(MarketingUserBehavior marketingUserBehavior) throws Exception {
                return new Tuple2<>(new Tuple2<>(marketingUserBehavior.channel, marketingUserBehavior.behavior), 1L);
            }
        })
        .keyBy(new KeySelector<Tuple2<Tuple2<String, String>, Long>, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(Tuple2<Tuple2<String, String>, Long> tuple2LongTuple2) throws Exception {
                return tuple2LongTuple2.f0;
            }
        })
        .timeWindow(Time.minutes(60),Time.seconds(10))
        .process(new MarketingCountByChannel())
        .print();

        env.execute();
    }
}

class SimulateEventSource extends RichParallelSourceFunction<MarketingUserBehavior> {

    // 定義是否運行的識別符號
    Boolean running = true;
    // 定義渠道的集合
    String[] channelSet = {"AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba"};
    // 定義用戶行為的集合
    String[] behaviorTypes = {"BROWSE", "CLICK", "PURCHASE", "UNINSTALL"};
    // 定義亂數發生器
    Random rand = new Random();

    @Override
    public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {

        long count = 0;
        long max = Long.MAX_VALUE;

        while (count<max && running) {
            String userId = String.valueOf(rand.nextLong());
            String behaviorType = behaviorTypes[rand.nextInt(behaviorTypes.length)];
            String channel = channelSet[rand.nextInt(channelSet.length)];
            long timestamp = System.currentTimeMillis();
            ctx.collect(new MarketingUserBehavior(userId,behaviorType,channel,timestamp));
            count++;
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

class MarketingCountByChannel extends ProcessWindowFunction<Tuple2<Tuple2<String,String>,Long>,MarketingViewCount,Tuple2<String,String>, TimeWindow> {

    @Override
    public void process(Tuple2<String, String> stringStringTuple2, Context context, Iterable<Tuple2<Tuple2<String, String>, Long>> elements, Collector<MarketingViewCount> out) throws Exception {

        // 根據 context 物件分別獲取到 Long 型別的 視窗的開始和結束時間
        //context.window.getStart是長整形   所以new 一個 變成String型別
        String startTs = String.valueOf(context.window().getStart());
        String endTs = String.valueOf(context.window().getEnd());

        // 獲取到 渠道
        String channel = stringStringTuple2.f0;
        // 獲取到 行為
        String behaviorType = stringStringTuple2.f1;
        // 獲取到 次數
        long count = 0;
        for (Tuple2<Tuple2<String, String>, Long> element : elements) {
            count++;
        }

        // 輸出結果
        out.collect(new MarketingViewCount(startTs, endTs, channel, behaviorType, count));
    }
}

頁面廣告分析

按照省份劃分點擊量

過濾黑名單

相比上一個功能多了一個過濾動作,具體過濾規則由需求決定
這里使用了旁路輸出getSideOutput,這次接觸到一個實際案例,識訓還是很大的

//定義側輸出流報警資訊樣例類
// case class BlackListWarning(userId:Long,adId:Long,msg:String)
class BlackListWarning {
    public long userId;
    public long adId;
    public String msg;

    public BlackListWarning(long userId, long adId, String msg) {
        this.userId = userId;
        this.adId = adId;
        this.msg = msg;
    }

    @Override
    public String toString() {
        return "BlackListWarning{" +
                "userId=" + userId +
                ", adId=" + adId +
                ", msg='" + msg + '\'' +
                '}';
    }
}

public class AdAnalysisByProvinceBlack {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/AdClickLog.csv");

        SingleOutputStreamOperator<AdClickEvent> adLogStream = stringDataStreamSource.map(new MapFunction<String, AdClickEvent>() {
            @Override
            public AdClickEvent map(String s) throws Exception {
                String[] s1 = s.split(",");
                return new AdClickEvent(Long.parseLong(s1[0]), Long.parseLong(s1[1]), s1[2], s1[3], Long.parseLong(s1[4]));
            }
        })
        .assignTimestampsAndWatermarks(WatermarkStrategy.<AdClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.timestamp * 1000));

        SingleOutputStreamOperator<AdClickEvent> filterBlackListStream = adLogStream.keyBy(new KeySelector<AdClickEvent, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> getKey(AdClickEvent adClickEvent) throws Exception {
                return new Tuple2<>(adClickEvent.userId, adClickEvent.adId);
            }
        })
        .process(new FilterBlackList(100L));

        SingleOutputStreamOperator<String> process = filterBlackListStream
                .keyBy(value -> value.province)
                .timeWindow(Time.minutes(60), Time.seconds(5))
                .process(new AdCount());

//        process.print();

        filterBlackListStream.getSideOutput(new OutputTag<BlackListWarning>("BlackListOutputTag"){}).print();

        env.execute();
    }
}

class FilterBlackList extends KeyedProcessFunction<Tuple2<Long, Long>, AdClickEvent, AdClickEvent> {

    public long cnt;

    public FilterBlackList(long cnt) {
        this.cnt = cnt;
    }

    ValueState<Long> count;
    ValueState<Boolean> state;

    // 定義一個狀態,需要保存當前用戶對當前廣告的點擊量 count
    // lazy val countState:ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count",classOf[Long]))
    // 定義一個標識位,用來表示用戶是否已經在黑名單中
    // lazy val isSendState:ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-sent",classOf[Boolean]))

    @Override
    public void open(Configuration parameters) throws Exception {
        count = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
        state = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-sent", Boolean.class));
    }

    @Override
    public void processElement(AdClickEvent value, Context ctx, Collector<AdClickEvent> out) throws Exception {
        // 取出狀態資料
        Long value1 = count.value();
        // 如果是第一個資料,那么注冊第二天0點的定時器,用于清空狀態
        if(value1==null || value1==0L) {
            count.update(0L);
            state.update(false);
            long ts = (ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24);
            ctx.timerService().registerProcessingTimeTimer(ts);
        }
        // 判斷 count 值是否達到上限,如果達到,并且之前沒有輸出過報警資訊,那么則報警
        if(count.value()>cnt) {
            if(!state.value()) {
                // 旁路輸出資料
                ctx.output(new OutputTag<BlackListWarning>("BlackListOutputTag"){},new BlackListWarning(value.userId,value.adId,"click over "+cnt+" times today"));
                // 更新黑名單
                state.update(true);
            }
            return;
        }
        count.update(count.value()+1);
        out.collect(value);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<AdClickEvent> out) throws Exception {
        count.clear();
        state.clear();
    }
}

惡意登錄監控

方法一

最樸素的方法

// 輸入的登錄事件樣例類
// case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
class LoginEvent {
    public long userId;
    public String ip;
    public String eventType;
    public long eventTime;

    public LoginEvent(long userId, String ip, String eventType, long eventTime) {
        this.userId = userId;
        this.ip = ip;
        this.eventType = eventType;
        this.eventTime = eventTime;
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId=" + userId +
                ", ip='" + ip + '\'' +
                ", eventType='" + eventType + '\'' +
                ", eventTime=" + eventTime +
                '}';
    }
}

// 輸出的報警資訊樣例類
// case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)
class Warning {
    public long userId;
    public long firstFailTime;
    public long lastFailTime;
    public String warningMsg;

    public Warning(long userId, long firstFailTime, long lastFailTime, String warningMsg) {
        this.userId = userId;
        this.firstFailTime = firstFailTime;
        this.lastFailTime = lastFailTime;
        this.warningMsg = warningMsg;
    }

    @Override
    public String toString() {
        return "Warning{" +
                "userId=" + userId +
                ", firstFailTime=" + firstFailTime +
                ", lastFailTime=" + lastFailTime +
                ", warningMsg='" + warningMsg + '\'' +
                '}';
    }
}

public class LoginFailTwo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/LoginLog.csv");
        stringDataStreamSource.map(new MapFunction<String, LoginEvent>() {
            @Override
            public LoginEvent map(String s) throws Exception {
                String[] split = s.split(",");
                return new LoginEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.eventTime * 1000))
                .keyBy(value -> value.userId)
                .process(new LoginWarning())
                .print();

        env.execute();
    }
}

class LoginWarning extends KeyedProcessFunction<Long, LoginEvent, Warning> {

    ListState<LoginEvent> log;

    @Override
    public void open(Configuration parameters) throws Exception {
        log = getRuntimeContext().getListState(new ListStateDescriptor<LoginEvent>("log", LoginEvent.class));
    }

    @Override
    public void processElement(LoginEvent value, Context ctx, Collector<Warning> out) throws Exception {
        if(value.eventType.equals("fail")) {
            // 先獲取之前失敗的事件
            Iterator<LoginEvent> iterator = log.get().iterator();
            if(iterator.hasNext()) {
                // 如果之前已經有失敗的事件,就做判斷,如果沒有就把當前失敗事件保存進state
                LoginEvent next = iterator.next();
                if (value.eventTime < next.eventTime + 2){
                    out.collect(new Warning( value.userId,next.eventTime,value.eventTime,"在2秒內連續兩次登錄失敗,"));
                }

                // 更新最近一次的登錄失敗事件,保存在狀態里
                log.clear();
            }
            // 如果是第一次登錄失敗,之前把當前記錄 保存至 state
            log.add(value);
        } else {
            // 當前登錄狀態 不為 fail,則直接清除狀態
            log.clear();
        }
    }
}

這有兩個很大的問題,計算的是最近兩秒內的情況,是寫死的,不能改,同時沒有考慮到亂序的資訊流,這個時候我們就需要使用到flink的cep了

FLINKCEP - COMPLEX EVENT PROCESSING FOR FLINK

FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink.

It allows you to detect event patterns in an endless stream of events, giving you the opportunity to get hold of what’s important in your data.

一個或多個由簡單事件構成的事件流通過簡單的規則匹配,然后輸出用戶想得到的資料–滿足規則的復雜事件,

方法二 CEP

public class LoginFailWithCep {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/LoginLog.csv");
        KeyedStream<LoginEvent, Long> loginEventStream = stringDataStreamSource.map(new MapFunction<String, LoginEvent>() {
            @Override
            public LoginEvent map(String s) throws Exception {
                String[] split = s.split(",");
                return new LoginEvent(Long.parseLong(split[0]), split[1], split[2], Long.parseLong(split[3]));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.eventTime * 1000))
                .keyBy(value -> value.userId);

        // 定義匹配的模式
        Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("begin").where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })
                .next("next")
                .where(new SimpleCondition<LoginEvent>() {
                    @Override
                    public boolean filter(LoginEvent loginEvent) throws Exception {
                        return loginEvent.eventType.equals("fail");
                    }
                })
                .within(Time.seconds(2));

        // 將 pattern 應用到 輸入流 上,得到一個 pattern stream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream, pattern);

        // 用 select 方法檢出 符合模式的事件序列
        SingleOutputStreamOperator<Warning> select = patternStream.select(new LoginFailMatch());
        select.print();

        env.execute();
    }
}

class LoginFailMatch implements PatternSelectFunction<LoginEvent,Warning> {

    @Override
    public Warning select(Map<String, List<LoginEvent>> map) throws Exception {
        LoginEvent begin = map.get("begin").iterator().next();
        LoginEvent next = map.get("next").iterator().next();
        return new Warning(begin.userId,begin.eventTime,next.eventTime,"在2秒內連續2次登錄失敗,");
    }
}

這算一個很簡單的cep,做了個簡單的入門,其實需要注意的點還很多

訂單支付實時監控

在電商網站中,訂單的支付作為直接與營銷收入掛鉤的一環,在業務流程中非常重要,對于訂單而言,為了正確控制業務流程,也為了增加用戶的支付意愿,網站一般會設定一個支付失效時間,超過一段時間不支付的訂單就會被取消,另外,對于訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平臺的交易資料來做一個實時對賬,在接下來的內容中,我們將實作這兩個需求,

在電商平臺中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候,用戶下單的行為可以表明用戶對商品的需求,但在現實中,并不是每次下單都會被用戶立刻支付,當拖延一段時間后,用戶支付的意愿會降低,所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防范訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設定一個失效時間(比如15分鐘),如果下單后一段時間仍未支付,訂單就會被取消,

方法一 CEP

這里的重點我們是要學一下cep中select函式的方法,原始碼如下

/**
	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
	 * exactly one resulting element.
	 *
	 * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
	 * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
	 * timeout function can produce exactly one resulting element.
	 *
	 * <p>You can get the stream of timed-out data resulting from the
	 * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
	 * {@link SingleOutputStreamOperator} resulting from the select operation
	 * with the same {@link OutputTag}.
	 *
	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
	 *                               pattern sequence which has timed out.
	 * @param patternSelectFunction The pattern select function which is called for each detected
	 *                              pattern sequence.
	 * @param <L> Type of the resulting timeout elements
	 * @param <R> Type of the resulting elements
	 * @return {@link DataStream} which contains the resulting elements with the resulting timeout
	 * elements in a side output.
	 */
	public <L, R> SingleOutputStreamOperator<R> select(
    // 重點是以下幾個引數
			final OutputTag<L> timedOutPartialMatchesTag,
			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
			final PatternSelectFunction<T, R> patternSelectFunction) {

		final TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
			patternSelectFunction,
			PatternSelectFunction.class,
			0,
			1,
			TypeExtractor.NO_INDEX,
			builder.getInputType(),
			null,
			false);

		return select(
			timedOutPartialMatchesTag,
			patternTimeoutFunction,
			rightTypeInfo,
			patternSelectFunction);
	}

可以發現select函式還可以回傳超時的時間流,比我們想象的強大得多

// 定義輸入的訂單事件樣例類
// case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)

// 定義輸出的訂單檢測結果樣例類
// case class OrderResult(orderId: Long, resultMsg: String)
class OrderResult {
    public Long orderId;
    public String resultMsg;

    public OrderResult(Long orderId, String resultMsg) {
        this.orderId = orderId;
        this.resultMsg = resultMsg;
    }

    public OrderResult() {
    }

    @Override
    public String toString() {
        return "OrderResult{" +
                "orderId=" + orderId +
                ", resultMsg='" + resultMsg + '\'' +
                '}';
    }
}

class OrderEvent {
    public Long orderId;
    public String eventType;
    public Long eventTime;

    public OrderEvent() {
    }

    public OrderEvent(Long orderId, String eventType, Long eventTime) {
        this.orderId = orderId;
        this.eventType = eventType;
        this.eventTime = eventTime;
    }

    @Override
    public String toString() {
        return "OrderEvent{" +
                "orderId=" + orderId +
                ", eventType='" + eventType + '\'' +
                ", eventTime=" + eventTime +
                '}';
    }
}

public class OrderTimeout {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/OrderLog.csv");
        SingleOutputStreamOperator<OrderEvent> orderEventStream = stringDataStreamSource.map(new MapFunction<String, OrderEvent>() {
            @Override
            public OrderEvent map(String s) throws Exception {
                String[] split = s.split(",");
                return new OrderEvent(Long.parseLong(split[0].trim()), split[1].trim(), Long.parseLong(split[3].trim()));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.eventTime * 1000));

        KeyedStream<OrderEvent, Long> keyedStream = orderEventStream.keyBy(value -> value.orderId);

        // 定義匹配模式
        Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("begin").where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent orderEvent) throws Exception {
                return orderEvent.eventType.equals("create");
            }
        }).followedBy("follow").where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent orderEvent) throws Exception {
                return orderEvent.eventType.equals("pay");
            }
        }).within(Time.minutes(15));

        // 應用pattern到stream上
        PatternStream<OrderEvent> pattern = CEP.pattern(keyedStream, orderPayPattern);

        // select,提取事件序列,超時的事件要做報警提示
        OutputTag<OrderResult> orderTimeout = new OutputTag<OrderResult>("orderTimeout"){};
        SingleOutputStreamOperator<OrderResult> select = pattern.select(orderTimeout, new OrderTimeoutSelect(), new OrderPaySelect());

        select.print();
        select.getSideOutput(orderTimeout).print();

        env.execute();
    }
}

class OrderTimeoutSelect implements PatternTimeoutFunction<OrderEvent,OrderResult> {

    @Override
    public OrderResult timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
        Long begin = pattern.get("begin").iterator().next().orderId;
        return new OrderResult(begin,"timeout");
    }
}

class OrderPaySelect implements PatternSelectFunction<OrderEvent, OrderResult> {

    @Override
    public OrderResult select(Map<String, List<OrderEvent>> pattern) throws Exception {
        Long follow = pattern.get("follow").iterator().next().orderId;
        return new OrderResult(follow,"payed successfully");
    }
}

方法二 直接使用狀態編程

遇到這種問題,直接理清思路就好,就先不寫了

來自兩條流的訂單交易匹配

對于訂單支付事件,用戶支付完成其實并不算完,我們還得確認平臺賬戶上是否到賬了,而往往這會來自不同的日志資訊,所以我們要同時讀入兩條流的資料來做合并處理,這里我們利用connect將兩條流進行連接,然后用自定義的CoProcessFunction進行處理

方法一:CONNECT + COPROCESS

這里使用了流之間的connect


// 輸入輸出的樣例類
//  case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
//  case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)

class ReceiptEvent {
    public String txId;
    public String payChannel;
    public Long timestamp;

    public ReceiptEvent(String txId, String payChannel, Long timestamp) {
        this.txId = txId;
        this.payChannel = payChannel;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "ReceiptEvent{" +
                "txId='" + txId + '\'' +
                ", payChannel='" + payChannel + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

class OrderEvent2 {
    public Long orderId;
    public String eventType;
    public String txId;
    public Long eventTime;

    public OrderEvent2(Long orderId, String eventType, String txId, Long eventTime) {
        this.orderId = orderId;
        this.eventType = eventType;
        this.txId = txId;
        this.eventTime = eventTime;
    }

    @Override
    public String toString() {
        return "OrderEvent2{" +
                "orderId=" + orderId +
                ", eventType='" + eventType + '\'' +
                ", txId='" + txId + '\'' +
                ", eventTime=" + eventTime +
                '}';
    }
}

public class OrderPayTxMatch {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/OrderLog.csv");
        SingleOutputStreamOperator<OrderEvent2> orderEventStream = stringDataStreamSource.map(new MapFunction<String, OrderEvent2>() {
            @Override
            public OrderEvent2 map(String s) throws Exception {
                String[] split = s.split(",");
                return new OrderEvent2(Long.parseLong(split[0].trim()), split[1].trim(), split[2].trim(), Long.parseLong(split[3].trim()));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent2>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.eventTime * 1000));

        KeyedStream<OrderEvent2, String> keyedStream = orderEventStream.keyBy(value -> value.txId);

        DataStreamSource<String> stringDataStreamSource2 = env.readTextFile("data/ReceiptLog.csv");
        SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = stringDataStreamSource2.map(new MapFunction<String, ReceiptEvent>() {
            @Override
            public ReceiptEvent map(String s) throws Exception {
                String[] split = s.split(",");
                return new ReceiptEvent(split[0].trim(), split[1].trim(), Long.parseLong(split[2].trim()));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ReceiptEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.timestamp * 1000));

        KeyedStream<ReceiptEvent, String> keyedStream2 = receiptEventStream.keyBy(value -> value.txId);

        // 定義測輸出流
        // OutputTag<OrderEvent2> outputTag = new OutputTag<OrderEvent2>("unmatchedPay") {};
        // OutputTag<ReceiptEvent> outputTag2 = new OutputTag<ReceiptEvent>("unmatchedRec") {};

        // connect 連接兩條流,匹配事件進行處理
        SingleOutputStreamOperator<Tuple2<OrderEvent2, ReceiptEvent>> process = keyedStream.connect(keyedStream2).process(new OrderPayTxDetect());

        // 列印輸出
        process.print();
        process.getSideOutput(new OutputTag<OrderEvent2>("unmatchedPay") {}).print();
        process.getSideOutput(new OutputTag<ReceiptEvent>("unmatchedRec") {}).print();

        env.execute();
    }
}

class OrderPayTxDetect extends CoProcessFunction<OrderEvent2, ReceiptEvent, Tuple2<OrderEvent2, ReceiptEvent>> {

    public ValueState<OrderEvent2> pay;
    public ValueState<ReceiptEvent> receipt;

    @Override
    public void open(Configuration parameters) throws Exception {
        pay = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent2>("pay", OrderEvent2.class));
        receipt = getRuntimeContext().getState(new ValueStateDescriptor<ReceiptEvent>("receipt", ReceiptEvent.class));

    }

    @Override
    public void processElement1(OrderEvent2 value, Context ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
        // pay 來了,考察有沒有對應的 receipt 來過
        ReceiptEvent value1 = receipt.value();
        if(value1!=null) {
            // 如果已經有 receipt,正常輸出到主流
            out.collect(new Tuple2<>(value,value1));
            receipt.clear();
        } else {
            // 如果 receipt 還沒來,那么把 pay 存入狀態,注冊一個定時器等待 5 秒
            pay.update(value);
            ctx.timerService().registerProcessingTimeTimer(value.eventTime*1000L + 5000L);
        }

    }

    @Override
    public void processElement2(ReceiptEvent value, Context ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
        //receipt來了,判斷有沒有對應的pay來過
        OrderEvent2 value1 = pay.value();
        if(value1!=null) {
            // 如果已經有 pay,正常輸出到主流
            out.collect(new Tuple2<>(value1,value));
            pay.clear();
        } else {
            // 如果 pay 還沒來,那么把 receipt 存入狀態,注冊一個定時器等待 3 秒
            receipt.update(value);
            ctx.timerService().registerProcessingTimeTimer(value.timestamp*1000L + 3000L);
        }
    }

    // 定時觸發, 有兩種情況,所以要判斷當前有沒有pay和receipt
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
        //如果 pay 不為空,說明receipt沒來,輸出unmatchedPays
        if (pay.value() != null){
            ctx.output(new OutputTag<OrderEvent2>("unmatchedPay") {},pay.value());
        }

        if (receipt.value() != null){
            ctx.output(new OutputTag<ReceiptEvent>("unmatchedRec") {},receipt.value());
        }

        // 清除狀態
        pay.clear();
        receipt.clear();
    }
}

方法二:JOIN

public class OrderPayTxMatchWithJoin {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> stringDataStreamSource = env.readTextFile("data/OrderLog.csv");
        SingleOutputStreamOperator<OrderEvent2> orderEventStream = stringDataStreamSource.map(new MapFunction<String, OrderEvent2>() {
            @Override
            public OrderEvent2 map(String s) throws Exception {
                String[] split = s.split(",");
                return new OrderEvent2(Long.parseLong(split[0].trim()), split[1].trim(), split[2].trim(), Long.parseLong(split[3].trim()));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent2>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.eventTime * 1000));

        KeyedStream<OrderEvent2, String> keyedStream = orderEventStream.keyBy(value -> value.txId);

        DataStreamSource<String> stringDataStreamSource2 = env.readTextFile("data/ReceiptLog.csv");
        SingleOutputStreamOperator<ReceiptEvent> receiptEventStream = stringDataStreamSource2.map(new MapFunction<String, ReceiptEvent>() {
            @Override
            public ReceiptEvent map(String s) throws Exception {
                String[] split = s.split(",");
                return new ReceiptEvent(split[0].trim(), split[1].trim(), Long.parseLong(split[2].trim()));
            }
        })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<ReceiptEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((event, timestamr) -> event.timestamp * 1000));

        KeyedStream<ReceiptEvent, String> keyedStream2 = receiptEventStream.keyBy(value -> value.txId);

        keyedStream.intervalJoin(keyedStream2)
                .between(Time.seconds(-5),Time.seconds(3))
                .process(new OrderPayTxDetectWithJoin()).print();

        env.execute();
    }
}

class OrderPayTxDetectWithJoin extends ProcessJoinFunction<OrderEvent2, ReceiptEvent, Tuple2<OrderEvent2, ReceiptEvent>> {

    @Override
    public void processElement(OrderEvent2 left, ReceiptEvent right, Context ctx, Collector<Tuple2<OrderEvent2, ReceiptEvent>> out) throws Exception {
        out.collect(new Tuple2<>(left,right));
    }
}

電商用戶行為分析-Flink【Java重寫版本】到這里就介紹完了,小伙伴們點贊、收藏、評論,一鍵三連走起呀,下期見~~
最后再次感謝尚硅谷和Xiaoyuyu的指導和分享,本文僅用于交流學習,不用于商業性使用

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294166.html

標籤:其他

上一篇:Flink 內核原理與實作-應用

下一篇:97年雙非本科畢業2年取得20k offer

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more