1 Star 10 Fork 16

栾昊/Flink_1.13

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
TransformReduceTest.java 2.82 KB
一键复制 编辑 原始数据 按行查看 历史
dahuanhuan 提交于 2022-04-14 16:26 +08:00 . Reduce 归约聚合
package com.hao.chapter05;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformReduceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3200L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
);
//当前访问量最大,也就是点击次数最多的,最活跃的用户是谁
//1.统计每个用户的访问频次
KeyedStream<Tuple2<String, Long>, String> keyedStream = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).keyBy(data -> data.f0);
SingleOutputStreamOperator<Tuple2<String, Long>> clickByUser = keyedStream.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
//2.选取当前最活跃的用户 ==> (keyBy之后,可能有分区操作,将所有的分区合并成一个分区,再次计算)
clickByUser.keyBy(data -> "key")
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
if (value1.f1 > value2.f1){ //如果老数据大于新数据,则返回老数据,否则返回新数据
return value1;
}else {
return value2;
}
}
}).print();
env.execute();
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/luan_hao/Flink_1.13.git
git@gitee.com:luan_hao/Flink_1.13.git
luan_hao
Flink_1.13
Flink_1.13
master

搜索帮助