1 Star 10 Fork 16

栾昊/Flink_1.13

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
BillCheckExample.java 5.93 KB
一键复制 编辑 原始数据 按行查看 历史
dahuanhuan 提交于 2022-04-19 17:28 . CoProcessFunction合流操作案例
package com.hao.chapter08;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.checkerframework.checker.units.qual.A;
import java.time.Duration;
public class BillCheckExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//来自app的日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L),
Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
}));
//来自第三方支付平台的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L),
Tuple4.of("order-4", "third-party", "success", 5000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
@Override
public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
return element.f3;
}
}));
//检测同一支付订单在两条流中的编号是否匹配,不匹配则报警
appStream.connect(thirdpartStream)
.keyBy(data -> data.f0, data -> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>,Tuple4<String, String, String, Long>,String>{
//定义状态变量,用来保存已经到达的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String,String, Long>> thirdEventState;
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)));
thirdEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String,String, Long>>("third-event", Types.TUPLE(Types.STRING,Types.STRING,Types.STRING,Types.LONG)));
}
@Override
public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
//来的是app event , 看另一条流是否来过
if (thirdEventState.value() != null){
out.collect("对账成功:" + value + " " + thirdEventState.value());
//清空状态
thirdEventState.clear();
} else {
//更新状态
appEventState.update(value);
//注册一个5s后的定时器,开始等待另一条流的事件
ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null){
out.collect("对账成功:" + value + " " + appEventState.value());
//清空状态
appEventState.clear();
} else {
//更新状态
thirdEventState.update(value);
//注册一个5s后的定时器,开始等待另一条流的事件
ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
}
}
@Override
public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
//定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
if (appEventState.value() != null){
out.collect("对账失败:"+ appEventState.value() + " " + "第三方支付平台未到账");
}
if(thirdEventState.value() != null){
out.collect("对账失败:"+ thirdEventState.value() + " " + "app平台未到账");
}
appEventState.clear();
thirdEventState.clear();
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/luan_hao/Flink_1.13.git
git@gitee.com:luan_hao/Flink_1.13.git
luan_hao
Flink_1.13
Flink_1.13
master

搜索帮助