代码拉取完成,页面将自动刷新
package com.hao.chapter09;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
public class BehaviorPatternDetectExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//用户行为数据流
DataStreamSource<Action> actionStream = env.fromElements(
new Action("Alice", "login"),
new Action("Alice", "pay"),
new Action("Bob", "login"),
new Action("Bob", "order")
);
//行为模式流,基于它构建广播流
DataStreamSource<Pattern> patternStream = env.fromElements(
new Pattern("login", "pay"),
new Pattern("login", "order")
);
//定义广播状态描述器
MapStateDescriptor<Void,Pattern> descriptor = new MapStateDescriptor<Void,Pattern>("pattern", Types.VOID,Types.POJO(Pattern.class));
BroadcastStream<Pattern> broadcastStream = patternStream.broadcast(descriptor);
//连接两条流进行处理
SingleOutputStreamOperator<Tuple2<String,Pattern>> matches = actionStream.keyBy(data -> data.userId)
.connect(broadcastStream)
.process(new PatterDetector());
matches.print();
env.execute();
}
//实现自定义的KeyedBroadcastProcessFunction
public static class PatterDetector extends KeyedBroadcastProcessFunction<String,Action,Pattern,Tuple2<String,Pattern>>{
//定义一个KeyedState 保存上一次用户行为
public ValueState<String> prevActionState;
@Override
public void open(Configuration parameters) throws Exception {
prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-action",String.class));
}
@Override
public void processElement(Action value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
// 从广播状态中匹配模式 (只读状态)
ReadOnlyBroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<Void, Pattern>("pattern", Types.VOID, Types.POJO(Pattern.class)));
Pattern pattern = patternState.get(null);
// 获取用户上一次的行为
String prevAction = prevActionState.value();
//判断是否匹配
if (pattern != null && prevAction != null){
if (pattern.action1.equals(prevAction) && pattern.action2.equals(value.action)){
//匹配后输出key和它对应的pattern里的模型
out.collect(new Tuple2<>(ctx.getCurrentKey(),pattern));
}
}
//更新状态
prevActionState.update(value.action);
}
@Override
public void processBroadcastElement(Pattern value, KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>>.Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
//从上下文中获取广播状态,并用当前数据更新状态
BroadcastState<Void, Pattern> patternState = ctx.getBroadcastState(new MapStateDescriptor<Void, Pattern>("pattern", Types.VOID, Types.POJO(Pattern.class)));
//更新当前的广播状态
patternState.put(null,value);
}
}
//定义用户行为事件和模式的POJO类
public static class Action{
public String userId; //id
public String action; //行为
public Action() {}
public Action(String userId, String action) {
this.userId = userId;
this.action = action;
}
@Override
public String toString() {
return "Action[" +
"userId='" + userId + '\'' +
", action='" + action + '\'' +
']';
}
}
public static class Pattern{
public String action1; //行为 1
public String action2; //行为 2
public Pattern() {}
public Pattern(String action1, String action2) {
this.action1 = action1;
this.action2 = action2;
}
@Override
public String toString() {
return "Pattern[" +
"action1='" + action1 + '\'' +
", action2='" + action2 + '\'' +
']';
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。