代码拉取完成,页面将自动刷新
package com.hao.chapter12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;
public class OrderTimeoutDetectExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.获取订单事件流,并提取时间戳、生成水位线
SingleOutputStreamOperator<OrderEvent> orderStream = env.fromElements(
new OrderEvent("user_1", "order_1", "create", 1000L),
new OrderEvent("user_2", "order_2", "create", 2000L),
new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
@Override
public long extractTimestamp(OrderEvent element, long recordTimestamp) {
return element.timestamp;
}
})
);
//2.定义模式
Pattern<OrderEvent, OrderEvent> pattern = Pattern.<OrderEvent>begin("create")
.where(new SimpleCondition<OrderEvent>() { //首先是下单事件
@Override
public boolean filter(OrderEvent value) throws Exception {
return value.eventType.equals("create");
}
})
.followedBy("pay")
.where(new SimpleCondition<OrderEvent>() { //其次是支付事件,中间可以修改订单,宽松近邻
@Override
public boolean filter(OrderEvent value) throws Exception {
return value.eventType.equals("pay");
}
}).within(Time.minutes(15));// 要求在十五分钟之内完成
//3.将应用模式应用到订单流上,检测匹配的复杂事件
PatternStream<OrderEvent> patternStream = CEP.pattern(orderStream.keyBy(event -> event.orderId), pattern);
//4.定义一个侧输出流标签
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};
//5.将完全匹配和超时部分匹配的复杂事件提取出来,进行处理
SingleOutputStreamOperator<String> result = patternStream.process(new OrderPayMatch());
// 将正常匹配和超时部分匹配的处理结果流打印输出
result.print("正常支付");
result.getSideOutput(timeoutTag).print("timeout");
env.execute();
}
//实现自定义的 PatternProcessFunction,需实现 TimedOutPartialMatchHandler 接口
public static class OrderPayMatch extends PatternProcessFunction<OrderEvent,String> implements TimedOutPartialMatchHandler<OrderEvent>{
// 处理正常匹配事件
@Override
public void processMatch(Map<String, List<OrderEvent>> match, Context context, Collector<String> out) throws Exception {
//获取当前的支付事件
OrderEvent payEvent = match.get("pay").get(0);
out.collect("用户" + payEvent.userId + "的订单:" + payEvent.orderId + " 已支付!");
}
//处理超时未支付事件
@Override
public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context context) throws Exception {
OrderEvent createEvent = match.get("create").get(0);
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};
context.output(timeoutTag,"用户" + createEvent.userId + "的订单 :" + createEvent.orderId + " 超时未支付!");
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。