diff --git a/algo_frequency/Dockerfile b/algo_frequency/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..d003850f10a9b174a39b771ed2917b26098b7758
--- /dev/null
+++ b/algo_frequency/Dockerfile
@@ -0,0 +1,6 @@
+FROM openjdk:8u222-jre
+ENV APP_HOME =/opt/docker/json
+WORKDIR $APP_HOME
+ADD ./json-log1-1.0-SNAPSHOT.jar ./demo.jar
+EXPOSE 8090
+CMD ["java","-jar","demo.jar"]
diff --git a/algo_frequency/README.md b/algo_frequency/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/algo_frequency/docker-compose.yml b/algo_frequency/docker-compose.yml
new file mode 100644
index 0000000000000000000000000000000000000000..a7c7bfe9c46f83309cce5c49f81d93c266b858dd
--- /dev/null
+++ b/algo_frequency/docker-compose.yml
@@ -0,0 +1,23 @@
+version: '3.3'
+services:
+ hello:
+ container_name: json-log-processing
+ build:
+ context: ./
+ dockerfile: ./Dockerfile
+ networks:
+ - fine-aiops
+ environment:
+ - "kafkaAddr=124.207.24.170:9092"
+ - "log_addr=124.207.24.170:53306"
+ - "log_db=aiops_logs"
+ - "log_username=aiops_logs"
+ - "log_password=aiops2022"
+ - "operation_addr=124.207.24.170:53306"
+ - "operation_db=aiops_demo"
+ - "operation_username=aiops_demo"
+ - "operation_password=aiops2022"
+
+networks:
+ fine-aiops:
+ external: true # 来自外部
diff --git a/algo_frequency/frequency_project/json-log/.idea/.gitignore b/algo_frequency/frequency_project/json-log/.idea/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..0a8642fac084e27fe66d81f111393d179c2d95d6
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/.gitignore
@@ -0,0 +1,10 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
+# Zeppelin ignored files
+/ZeppelinRemoteNotebooks/
diff --git a/algo_frequency/frequency_project/json-log/.idea/aws.xml b/algo_frequency/frequency_project/json-log/.idea/aws.xml
new file mode 100644
index 0000000000000000000000000000000000000000..b63b642cfb4254fc0f7058903abc5b481895c4ef
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/aws.xml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/.idea/compiler.xml b/algo_frequency/frequency_project/json-log/.idea/compiler.xml
new file mode 100644
index 0000000000000000000000000000000000000000..c3270e91e93ec93aa766ceb4e01fbd02691249b4
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/compiler.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/.idea/deployment.xml b/algo_frequency/frequency_project/json-log/.idea/deployment.xml
new file mode 100644
index 0000000000000000000000000000000000000000..fbee94426eabb3ebabc43e9dc2ab6285551caf8e
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/deployment.xml
@@ -0,0 +1,14 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/.idea/encodings.xml b/algo_frequency/frequency_project/json-log/.idea/encodings.xml
new file mode 100644
index 0000000000000000000000000000000000000000..60caf800435d1e29ff75121690c2759adfd668a9
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/encodings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/.idea/inspectionProfiles/Project_Default.xml b/algo_frequency/frequency_project/json-log/.idea/inspectionProfiles/Project_Default.xml
new file mode 100644
index 0000000000000000000000000000000000000000..6560a98983ec708cf9d8b5c5c3776d7bd39c475b
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/inspectionProfiles/Project_Default.xml
@@ -0,0 +1,36 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/.idea/jarRepositories.xml b/algo_frequency/frequency_project/json-log/.idea/jarRepositories.xml
new file mode 100644
index 0000000000000000000000000000000000000000..980588c7f2720e4a734966089f292ec94e53aec6
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/jarRepositories.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/.idea/misc.xml b/algo_frequency/frequency_project/json-log/.idea/misc.xml
new file mode 100644
index 0000000000000000000000000000000000000000..010c4000e6d8e83751448f94d05be543240d82f2
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/.idea/misc.xml
@@ -0,0 +1,17 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/dependency-reduced-pom.xml b/algo_frequency/frequency_project/json-log/dependency-reduced-pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..a8da0b6840deb1a9a91080d86bc822c856417e2b
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/dependency-reduced-pom.xml
@@ -0,0 +1,56 @@
+
+
+ 4.0.0
+ org.example
+ json-log1
+ 1.0-SNAPSHOT
+
+
+
+ maven-compiler-plugin
+ 3.6.0
+
+ 1.8
+ 1.8
+
+
+
+ maven-shade-plugin
+ 3.1.0
+
+
+ package
+
+ shade
+
+
+
+
+ com.terrance.log.window.TestJsonLogWindow
+
+
+ reference.conf
+
+
+
+
+ org.codehaus.plexus.util
+ org.shaded.plexus.util
+
+ org.codehaus.plexus.util.xml.Xpp3Dom
+ org.codehaus.plexus.util.xml.pull.*
+
+
+
+
+
+
+
+
+
+
+ 8
+ 8
+
+
+
diff --git a/algo_frequency/frequency_project/json-log/pom.xml b/algo_frequency/frequency_project/json-log/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..5cfa6ccfa3bb540b3b66bdc3ba67d3b62b675355
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/pom.xml
@@ -0,0 +1,177 @@
+
+
+ 4.0.0
+
+ org.example
+ json-log1
+ 1.0-SNAPSHOT
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.flink
+ flink-java
+ 1.10.1
+
+
+ org.apache.flink
+ flink-streaming-java_2.12
+ 1.10.1
+
+
+
+
+ org.apache.flink
+ flink-connector-kafka-0.11_2.12
+ 1.10.1
+
+
+
+ org.apache.flink
+ flink-streaming-scala_2.12
+ 1.10.1
+
+
+ org.apache.flink
+ flink-clients_2.12
+ 1.10.1
+
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.25
+
+
+
+
+ log4j
+ log4j
+ 1.2.17
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.16
+
+
+
+
+ mysql
+ mysql-connector-java
+ 8.0.28
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ maven-compiler-plugin
+ 3.6.0
+
+ 1.8
+ 1.8
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.0
+
+
+ package
+
+ shade
+
+
+
+
+ com.terrance.log.window.TestJsonLogWindow
+
+
+ reference.conf
+
+
+
+
+ org.codehaus.plexus.util
+ org.shaded.plexus.util
+
+ org.codehaus.plexus.util.xml.Xpp3Dom
+ org.codehaus.plexus.util.xml.pull.*
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/META-INF/MANIFEST.MF b/algo_frequency/frequency_project/json-log/src/main/java/META-INF/MANIFEST.MF
new file mode 100644
index 0000000000000000000000000000000000000000..1f97ffb2b1e318cb66dcf99f1b30b134fd713942
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/java/META-INF/MANIFEST.MF
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0
+Main-Class: com.terrance.log.window.TestJsonLogWindow
+
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/bean/JsonLog.java b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/bean/JsonLog.java
new file mode 100644
index 0000000000000000000000000000000000000000..53af7250dd45bc7f707c3571bb557243a3388e25
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/bean/JsonLog.java
@@ -0,0 +1,32 @@
+package com.terrance.log.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Classname JsonLog
+ * @Description
+ * @Date 2022/4/28 14:38
+ * @Author terrance_swn
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class JsonLog {
+ private String serviceName;
+ private String logType;
+ private String loggingLevel;
+ private String operationTime;
+ private Long timestamp;
+ private String operatorName;
+ private String operatorType;
+ private String clientIp;
+ private String operationObject;
+ private String operationObjectType;
+ private String operationAction;
+ private String operationActionType;
+ private String label;
+ private String operationResult;
+ private String logText; //日志原文
+}
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/sink/MyJdbcSink.java b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/sink/MyJdbcSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..5a0c2bc532f7b2b40ea073a0956a886643809d15
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/sink/MyJdbcSink.java
@@ -0,0 +1,64 @@
+package com.terrance.log.sink;
+
+import com.terrance.log.bean.JsonLog;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+
+/**
+ * @Classname MyJdbcSink
+ * @Description
+ * @Date 2022/4/28 14:43
+ * @Author terrance_swn
+ */
+public class MyJdbcSink extends RichSinkFunction {
+ Connection connection = null;
+ PreparedStatement insertStat = null;
+
+ public MyJdbcSink(){}
+
+ private String[] args = null;
+
+ public MyJdbcSink(String[] args) {
+ this.args = args;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ //初始化资源
+ //this.connection = DriverManager.getConnection("jdbc:mysql://192.168.9.55:3306/aiops_logs","aiops_logs","aiops2022");
+ this.connection = DriverManager.getConnection("jdbc:mysql://"+ System.getenv("log_addr") +"/"+ System.getenv("log_db"),System.getenv("log_username"),System.getenv("log_password"));
+ this.insertStat = connection.prepareStatement(
+ "insert into tb_json_log values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
+ }
+
+ @Override
+ public void invoke(JsonLog log, Context context) throws Exception {
+ insertStat.setObject(1, null);
+ insertStat.setString(2,log.getServiceName());
+ insertStat.setString(3,log.getLogType());
+ insertStat.setString(4,log.getLoggingLevel());
+ insertStat.setString(5,log.getOperationTime());
+ insertStat.setString(6,log.getOperatorName());
+ insertStat.setString(7,log.getOperatorType());
+ insertStat.setString(8,log.getClientIp());
+ insertStat.setString(9,log.getOperationObject());
+ insertStat.setString(10,log.getOperationObjectType());
+ insertStat.setString(11,log.getOperationAction());
+ insertStat.setString(12,log.getOperationActionType());
+ insertStat.setString(13,log.getLabel());
+ insertStat.setString(14,log.getOperationResult());
+ insertStat.setString(15,log.getLogText());
+
+ insertStat.execute();
+ }
+
+ @Override
+ public void close() throws Exception {
+ connection.close();
+ insertStat.close();
+ }
+}
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/sink/WindowJdbcSink.java b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/sink/WindowJdbcSink.java
new file mode 100644
index 0000000000000000000000000000000000000000..ab64b239ffb6b7ed739fc7c8a727d1415b147fce
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/sink/WindowJdbcSink.java
@@ -0,0 +1,49 @@
+package com.terrance.log.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+
+/**
+ * @Classname WindowJdbcSink
+ * @Description
+ * @Date 2022/4/29 14:30
+ * @Author terrance_swn
+ */
+public class WindowJdbcSink extends RichSinkFunction {
+
+ Connection connection = null;
+ PreparedStatement insertStat = null;
+
+ private WindowJdbcSink(){}
+
+ private String[] args = null;
+
+ public WindowJdbcSink(String[] args) {
+ this.args = args;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ //初始化资源
+ this.connection = DriverManager.getConnection("jdbc:mysql://"+ System.getenv("operation_addr") +"/"+ System.getenv("operation_db"),System.getenv("operation_username"),System.getenv("operation_password"));
+ this.insertStat = connection.prepareStatement(
+ "insert into tb_log_times values (?)");
+ }
+
+ @Override
+ public void invoke(Long value, Context context) throws Exception {
+ insertStat.setLong(1, value);
+
+ insertStat.execute();
+ }
+
+ @Override
+ public void close() throws Exception {
+ connection.close();
+ insertStat.close();
+ }
+}
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/source/FlinkRabbitMqSource.java b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/source/FlinkRabbitMqSource.java
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/transformer/String2JsonLogMapper.java b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/transformer/String2JsonLogMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..f97b5cbce9b1cbc3b6101062a13f1020e0b7b118
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/transformer/String2JsonLogMapper.java
@@ -0,0 +1,83 @@
+package com.terrance.log.transformer;
+
+import com.terrance.log.bean.JsonLog;
+import lombok.SneakyThrows;
+import org.apache.flink.api.common.functions.RichMapFunction;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @Classname String2JsonLogMapper
+ * @Description
+ * @Date 2022/4/28 14:53
+ * @Author terrance_swn
+ */
+public class String2JsonLogMapper extends RichMapFunction {
+
+ private static final String logRegex = "^\\{([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\, ([^\\,]+)\\}$";
+
+ @Override
+ public JsonLog map(String log) throws Exception {
+ JsonLog jsonLog = new JsonLog();
+ jsonLog.setLogText(log);
+
+ Pattern pattern = Pattern.compile(logRegex);
+ Matcher matcher = pattern.matcher(log);
+
+ boolean matches = matcher.matches();
+ System.out.println("matches = " + matches);
+ int groupCount = matcher.groupCount();
+ System.out.println("groupCount = " + groupCount);
+
+ //单独转化时间 operationTime:2022/2/10 12:30:39
+ int idx = matcher.group(4).indexOf(":");
+ String curTime = matcher.group(4).substring(idx + 1);
+ Long timestamp = timeParser(curTime);
+
+ if(matcher.matches() && matcher.groupCount() == 13){
+ jsonLog.setServiceName(matcher.group(1).split(":")[1]);
+ jsonLog.setLogType(matcher.group(2).split(":")[1]);
+ jsonLog.setLoggingLevel(matcher.group(3).split(":")[1]);
+ jsonLog.setOperationTime(curTime);
+ jsonLog.setOperatorName(matcher.group(5).split(":")[1]);
+ jsonLog.setOperatorType(matcher.group(6).split(":")[1]);
+ jsonLog.setClientIp(matcher.group(7).split(":")[1]);
+ jsonLog.setOperationObject(matcher.group(8).split(":")[1]);
+ jsonLog.setOperationObjectType(matcher.group(9).split(":")[1]);
+ jsonLog.setOperationAction(matcher.group(10).split(":")[1]);
+ jsonLog.setOperationActionType(matcher.group(11).split(":")[1]);
+ jsonLog.setLabel(matcher.group(12).split(":")[1]);
+ jsonLog.setOperationResult(matcher.group(13).split(":")[1]);
+
+ jsonLog.setTimestamp(timestamp);
+
+ System.out.println("map方法 --- " + jsonLog);
+ }
+
+ return jsonLog;
+ }
+
+ /**
+ * 解析日志时间为时间戳
+ * @param curTime
+ * @return
+ */
+ @SneakyThrows
+ private Long timeParser(String curTime) {
+
+ //万一有前导空格会解析失败
+ int i = 0;
+ while(i < curTime.length() && curTime.charAt(i) == ' ') {
+ i++;
+ }
+ String newTime = curTime.substring(i);
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+
+ Date date = sdf.parse(newTime);
+ return date.getTime();
+ }
+}
diff --git a/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/window/TestJsonLogWindow.java b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/window/TestJsonLogWindow.java
new file mode 100644
index 0000000000000000000000000000000000000000..9bed3d8cab72ba1bbe46da574ae740c4873d074e
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/java/com/terrance/log/window/TestJsonLogWindow.java
@@ -0,0 +1,120 @@
+package com.terrance.log.window;
+
+import com.terrance.log.bean.JsonLog;
+import com.terrance.log.sink.MyJdbcSink;
+import com.terrance.log.sink.WindowJdbcSink;
+import com.terrance.log.transformer.String2JsonLogMapper;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+import org.apache.flink.util.Collector;
+
+import java.util.Properties;
+import java.util.ResourceBundle;
+
+/**
+ * @Classname TestJsonLogWindow
+ * @Description
+ * @Date 2022/4/28 11:30
+ * @Author terrance_swn
+ */
+public class TestJsonLogWindow {
+
+ private static final String PROPERTY_NAME = "file_location";
+ private static String INPUT_PATH = null;
+ private static String OUTPUT_PATH = null;
+
+ static {
+ ResourceBundle bundle = ResourceBundle.getBundle(PROPERTY_NAME);
+ INPUT_PATH = bundle.getString("INPUT_PATH");
+ //OUTPUT_PATH = bundle.getString("OUTPUT_PATH");
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ //1 env
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(8);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setMaxParallelism(16);
+
+ //2 source
+ //读kafka
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", System.getenv("kafkaAddr"));
+ //Zookeeper服务器IP和端口
+ //properties.setProperty("zookeeper.connect", "101.133.143.197:2181");
+ DataStreamSource inputDataStreamFromKafka = env.addSource(new FlinkKafkaConsumer011("log_window2", new SimpleStringSchema(), properties));
+ //读文件
+ //DataStreamSource inputDataStreamFromFile = env.readTextFile(INPUT_PATH);
+// //rabbitmq
+// RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
+// .setHost("124.207.24.170")
+// .setPort(55673)
+// .setUserName("guest")
+// .setPassword("aiops2022")
+// .setVirtualHost("/")
+// .build();
+//
+// DataStreamSource inputDataStreamFromRabbitMq =
+// env.addSource(new FlinkRabbitMqSource(connectionConfig,
+// "sss", "ss"));
+
+
+
+ //3 transform
+ //转换:map
+ DataStream dataStream = inputDataStreamFromKafka.map(new String2JsonLogMapper());
+
+
+ //4.dataStream提取时间戳和叠加窗口运算
+ SingleOutputStreamOperator streamOperator = dataStream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
+ @Override
+ public long extractAscendingTimestamp(JsonLog element) {
+ return element.getTimestamp();
+ }
+ });
+
+ SingleOutputStreamOperator windowOutputStream =
+ streamOperator.
+ keyBy("serviceName").timeWindow(Time.milliseconds(5000))
+ //输入, 输出(serviceId,Ip,otherInfo,窗口结束时间,当前时间窗口内日志数量)
+ .process(new ProcessWindowFunction() {
+
+ @Override //key context in out
+ public void process(Tuple tuple, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {
+ Long x1Count = 0L;
+
+
+ for (JsonLog element : elements) {
+ x1Count++;
+ }
+
+ long windowEndTime = context.window().getEnd();
+ //Tuple6 res = new Tuple6<>(windowEndTime, x1Count, x2Count, x3Count,x4Count,x5Count);
+ out.collect(x1Count);
+ System.out.println("process方法 --- " + x1Count);
+ }
+ });
+
+
+ //5 sink 写入数据库
+ //将解析后的日志写入日志数据库
+ dataStream.addSink(new MyJdbcSink(args)).setParallelism(1);
+
+ //将需要的窗口内频率信息写入运维数据库
+ windowOutputStream.addSink(new WindowJdbcSink(args)).setParallelism(1);
+
+ //6 exe
+ env.execute();
+ }
+}
diff --git a/algo_frequency/frequency_project/json-log/src/main/resources/file_location.properties b/algo_frequency/frequency_project/json-log/src/main/resources/file_location.properties
new file mode 100644
index 0000000000000000000000000000000000000000..e67e5394b4851e2b519e78387e09ecd1a61b5012
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/resources/file_location.properties
@@ -0,0 +1 @@
+INPUT_PATH = D:\\JAVA_software\\Git_new\\json-log1\\src\\main\\resources\\special.log
\ No newline at end of file
diff --git a/algo_frequency/frequency_project/json-log/src/main/resources/special.log b/algo_frequency/frequency_project/json-log/src/main/resources/special.log
new file mode 100644
index 0000000000000000000000000000000000000000..92e67118800b4e0ea4ac1bb6c5b903e8e393d334
--- /dev/null
+++ b/algo_frequency/frequency_project/json-log/src/main/resources/special.log
@@ -0,0 +1,20 @@
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/2/10 12:30:49, operatorname:publisher, operatortype:get, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:update, operationactiontype:get, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:info, operationtime:2022/2/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.100.1, operationobject:test.txt, operationobjecttype:txt, operationaction:update, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/9/10 12:30:42, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:debug, operationtime:2022/2/10 12:30:40, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:log.txt, operationobjecttype:txt, operationaction:create, operationactiontype:get, label:27, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/03/10 12:30:49, operatorname:getIdentifier, operatortype:post, clientip:192.168.10.1, operationobject:test.txt, operationobjecttype:txt, operationaction:update, operationactiontype:delete, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:debug, operationtime:2022/2/10 12:30:46, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:info, operationtime:2022/1/10 12:30:49, operatorname:publisher, operatortype:get, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:debug, operationtime:2022/2/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:log.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:delete, label:7, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/07/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/11/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.9.1, operationobject:log.txt, operationobjecttype:txt, operationaction:create, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/10/10 12:30:45, operatorname:getIdentifier, operatortype:post, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:update, operationactiontype:delete, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:debug, operationtime:2022/2/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.12.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:20, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/9/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:log.txt, operationobjecttype:txt, operationaction:create, operationactiontype:delete, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:debug, operationtime:2022/2/10 12:30:43, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:update, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:info, operationtime:2022/2/10 12:30:49, operatorname:publisher, operatortype:get, clientip:192.168.5.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/12/10 12:30:41, operatorname:getIdentifier, operatortype:post, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:info, operationtime:2022/2/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.1.1, operationobject:log.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:debug, operationtime:2022/2/10 12:30:49, operatorname:getIdentifier, operatortype:get, clientip:192.168.61.1, operationobject:test.txt, operationobjecttype:txt, operationaction:create, operationactiontype:delete, label:23, operationresult:fail}
+{Servicename:route, logtype:identifier, logginglevel:error, operationtime:2022/10/10 12:30:40, operatorname:publisher, operatortype:get, clientip:192.168.1.1, operationobject:log.txt, operationobjecttype:txt, operationaction:aggregate, operationactiontype:get, label:23, operationresult:success}
+{Servicename:route, logtype:identifier, logginglevel:info, operationtime:2022/2/10 12:30:49, operatorname:getIdentifier, operatortype:post, clientip:192.168.1.1, operationobject:test.txt, operationobjecttype:txt, operationaction:update, operationactiontype:delete, label:23, operationresult:fail}
\ No newline at end of file
diff --git a/algo_frequency/json-log1-1.0-SNAPSHOT.jar b/algo_frequency/json-log1-1.0-SNAPSHOT.jar
new file mode 100644
index 0000000000000000000000000000000000000000..0d67190a0debd808c4f6b0c28021914a38497d96
Binary files /dev/null and b/algo_frequency/json-log1-1.0-SNAPSHOT.jar differ
diff --git a/algo_frequency/test_start.sh b/algo_frequency/test_start.sh
new file mode 100644
index 0000000000000000000000000000000000000000..1c70960f91de31c7dd6ab2a26a65beb8aaff4bb8
--- /dev/null
+++ b/algo_frequency/test_start.sh
@@ -0,0 +1,6 @@
+#! /bin/sh
+
+
+# 遇到错误就退出
+set -e
+docker-compose -f algo_frequency/docker-compose.yml up -d --build
diff --git a/algo_frequency/test_stop.sh b/algo_frequency/test_stop.sh
new file mode 100644
index 0000000000000000000000000000000000000000..7c6dd9c81c656a0875e2c77b92603f88893c0d68
--- /dev/null
+++ b/algo_frequency/test_stop.sh
@@ -0,0 +1,6 @@
+#! /bin/sh
+
+
+# 遇到错误就退出
+set -e
+docker-compose -f algo_frequency/docker-compose.yml down