代码拉取完成,页面将自动刷新
创建项目的流程参考 编写MapReduce统计代码
假设现在已经创建了一个 maven
项目
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SqlMapReduceResult</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Archetype - SqlMapReduceResult</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-runner</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.sqlmapreduceresult.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
创建包 com.example.sqlmapreduceresult
,在该包下创建主类 Main
package com.example.sqlmapreduceresult;
import java.io.*;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
public class Main {
public static void main(String[] args) {
try {
// 读取输入文件的路径
String filePath = args[0];
// 创建用于保存增量数据的map
HashMap<String, Integer> dataMap = new HashMap<String, Integer>();
// 读取增量数据文件
BufferedReader reader = new BufferedReader(new FileReader(filePath));
String line;
String lastLine = "";
// 读取每行数据并存储到 map 中
while ((line = reader.readLine()) != null) {
lastLine = line;
System.out.println(line);
String[] split = line.split("\t");
// 兼容空行的情况
if(split.length < 2 || split[0].trim().equals("")) continue;
System.out.println(split[0] + "+" + split[1]);
dataMap.put(split[0], Integer.parseInt(split[1]));
}
String[] split = lastLine.split("\t");
System.out.println(split[0] + "+" + split[1]);
// 准备连接数据库
String url = "jdbc:hive2://master:10001/hive;transportMode=http;httpPath=cliservice";
String username = "";
String password = "";
// 加载驱动并连接数据库
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection connection = DriverManager.getConnection(url, username, password);
Statement statement = connection.createStatement();
// 执行SQL查询
String sql = "SELECT * FROM hive.result";
ResultSet resultSet = statement.executeQuery(sql);
String result = "";
// 处理查询结果
while (resultSet.next()) {
// 从结果集中获取数据
String name = resultSet.getString("name");
int count = resultSet.getInt("count");
// 进行操作或打印数据
System.out.println("name: " + name + ", count: " + count);
// 拼接一行的数据
if (dataMap.containsKey(name)) {
// 增量更新数据
int newCount = dataMap.get(name) + count;
result += name + "\t" + newCount + "\n";
dataMap.remove(name);
} else {
// 保持原有数据
result += name + "\t" + count + "\n";
}
}
// 拼接新数据
if (dataMap.size() > 0) {
for (Map.Entry<String, Integer> entry : dataMap.entrySet()) {
String name = entry.getKey();
int count = entry.getValue();
result += name + "\t" + count + "\n";
}
}
// 关闭连接和资源
resultSet.close();
statement.close();
connection.close();
System.out.println(result);
// 将增量更新后的文件写回源文件中
FileWriter writer = new FileWriter(filePath);
writer.write(result);
writer.close();
System.out.println("写入文件成功!");
} catch (SQLException e) {
System.out.println(e);
} catch (FileNotFoundException e) {
System.out.println(e);
} catch (IOException e) {
System.out.println(e);
} catch (ClassNotFoundException e) {
System.out.println(e);
}
}
}
分别先后运行 mvn clean
和 mvn package
命令,如图:
运行完后得到 jar
包,如图:
将名为 SqlMapReduceResult-1.0-SNAPSHOT-jar-with-dependencies.jar
的 jar
包,上传到 master 的 ~/jars
目录中
nodes:
- name: cleanPart
type: command
config:
command: rm -f /home/master/part-r-00000
- name: clean
type: command
dependsOn:
- cleanPart
config:
command: /home/master/hadoop-3.2.1/bin/hdfs dfs -rm -f -r /output
- name: mapreduce
type: command
dependsOn:
- clean
config:
command: hadoop jar /home/master/jars/MyMapReduce-1.0-SNAPSHOT.jar com.example.mapreduce.WordCountDriverServer /listendDir /output
- name: get
type: command
dependsOn:
- mapreduce
config:
command: /home/master/hadoop-3.2.1/bin/hdfs dfs -get /output/part-r-00000 /home/master
- name: increase
type: command
dependsOn:
- get
config:
command: /home/master/jdk1.8.0_371/bin/java -jar /home/master/jars/SqlMapReduceResult-1.0-SNAPSHOT-jar-with-dependencies.jar /home/master/part-r-00000
- name: beforePut
type: command
dependsOn:
- increase
config:
command: /home/master/hadoop-3.2.1/bin/hdfs dfs -rm /output/part-r-00000
- name: put
type: command
dependsOn:
- beforePut
config:
command: /home/master/hadoop-3.2.1/bin/hdfs dfs -put /home/master/part-r-00000 /output
- name: mysql
type: command
dependsOn:
- put
config:
command: /home/master/hive-3.1.2/bin/hive -f 'load.sql'
- name: lastClean
type: command
dependsOn:
- mysql
config:
command: /home/master/hadoop-3.2.1/bin/hdfs dfs -rm /listendDir/*
本次工作流做了以下事情
home
下的 part-r-00000
hdfs
下的 /output
mapreduce
统计 hdfs 下的 /listendDir
中的文件mapreduce
统计结果放到 home
下hive
数据库的数据并增量更新到 home
下的 part-r-00000
hdfs
中hive
的 sql
命令,将文件数据导入 hive.result
表中hdfs
下 /listendDir
中的数据此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。