1 Star 0 Fork 0

shampoo6 / cjsf-20dsj1

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
编写SqlMapReduceResult增量更新hive数据.md 8.32 KB
一键复制 编辑 原始数据 按行查看 历史
shampoo6 提交于 2023-06-01 11:31 . update

编写SqlMapReduceResult增量更新hive数据

创建项目的流程参考 编写MapReduce统计代码

假设现在已经创建了一个 maven 项目

编辑 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>SqlMapReduceResult</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>Archetype - SqlMapReduceResult</name>
  <url>http://maven.apache.org</url>

  <dependencies>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>3.1.2</version>
      <exclusions>
        <exclusion>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-slf4j-impl</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty</groupId>
          <artifactId>jetty-runner</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.example.sqlmapreduceresult.Main</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

编写主类

创建包 com.example.sqlmapreduceresult,在该包下创建主类 Main

package com.example.sqlmapreduceresult;

import java.io.*;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;

public class Main {
    public static void main(String[] args) {
        try {
            // 读取输入文件的路径
            String filePath = args[0];

            // 创建用于保存增量数据的map
            HashMap<String, Integer> dataMap = new HashMap<String, Integer>();

            // 读取增量数据文件
            BufferedReader reader = new BufferedReader(new FileReader(filePath));
            String line;
            String lastLine = "";
            // 读取每行数据并存储到 map 中
            while ((line = reader.readLine()) != null) {
                lastLine = line;
                System.out.println(line);
                String[] split = line.split("\t");
                // 兼容空行的情况
                if(split.length < 2 || split[0].trim().equals("")) continue;
                System.out.println(split[0] + "+" + split[1]);
                dataMap.put(split[0], Integer.parseInt(split[1]));
            }
            String[] split = lastLine.split("\t");
            System.out.println(split[0] + "+" + split[1]);

            // 准备连接数据库
            String url = "jdbc:hive2://master:10001/hive;transportMode=http;httpPath=cliservice";
            String username = "";
            String password = "";

            // 加载驱动并连接数据库
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Connection connection = DriverManager.getConnection(url, username, password);
            Statement statement = connection.createStatement();

            // 执行SQL查询
            String sql = "SELECT * FROM hive.result";
            ResultSet resultSet = statement.executeQuery(sql);

            String result = "";

            // 处理查询结果
            while (resultSet.next()) {
                // 从结果集中获取数据
                String name = resultSet.getString("name");
                int count = resultSet.getInt("count");

                // 进行操作或打印数据
                System.out.println("name: " + name + ", count: " + count);

                // 拼接一行的数据
                if (dataMap.containsKey(name)) {
                    // 增量更新数据
                    int newCount = dataMap.get(name) + count;
                    result += name + "\t" + newCount + "\n";
                    dataMap.remove(name);
                } else {
                    // 保持原有数据
                    result += name + "\t" + count + "\n";
                }
            }

            // 拼接新数据
            if (dataMap.size() > 0) {
                for (Map.Entry<String, Integer> entry : dataMap.entrySet()) {
                    String name = entry.getKey();
                    int count = entry.getValue();
                    result += name + "\t" + count + "\n";
                }
            }

            // 关闭连接和资源
            resultSet.close();
            statement.close();
            connection.close();

            System.out.println(result);

            // 将增量更新后的文件写回源文件中
            FileWriter writer = new FileWriter(filePath);
            writer.write(result);
            writer.close();
            System.out.println("写入文件成功!");
        } catch (SQLException e) {
            System.out.println(e);
        } catch (FileNotFoundException e) {
            System.out.println(e);
        } catch (IOException e) {
            System.out.println(e);
        } catch (ClassNotFoundException e) {
            System.out.println(e);
        }
    }
}

打包

分别先后运行 mvn cleanmvn package 命令,如图:

运行完后得到 jar 包,如图:

上传 jar 包

将名为 SqlMapReduceResult-1.0-SNAPSHOT-jar-with-dependencies.jarjar 包,上传到 master 的 ~/jars 目录中

修改 azkaban 工作流

nodes:
  - name: cleanPart
    type: command
    config:
      command: rm -f /home/master/part-r-00000
  - name: clean
    type: command
    dependsOn:
      - cleanPart
    config:
      command: /home/master/hadoop-3.2.1/bin/hdfs dfs -rm -f -r /output
  - name: mapreduce
    type: command
    dependsOn:
      - clean
    config:
      command: hadoop jar /home/master/jars/MyMapReduce-1.0-SNAPSHOT.jar com.example.mapreduce.WordCountDriverServer /listendDir /output
  - name: get
    type: command
    dependsOn:
      - mapreduce
    config:
      command: /home/master/hadoop-3.2.1/bin/hdfs dfs -get /output/part-r-00000 /home/master
  - name: increase
    type: command
    dependsOn:
      - get
    config:
      command: /home/master/jdk1.8.0_371/bin/java -jar /home/master/jars/SqlMapReduceResult-1.0-SNAPSHOT-jar-with-dependencies.jar /home/master/part-r-00000
  - name: beforePut
    type: command
    dependsOn:
      - increase
    config:
      command: /home/master/hadoop-3.2.1/bin/hdfs dfs -rm /output/part-r-00000
  - name: put
    type: command
    dependsOn:
      - beforePut
    config:
      command: /home/master/hadoop-3.2.1/bin/hdfs dfs -put /home/master/part-r-00000 /output
  - name: mysql
    type: command
    dependsOn:
      - put
    config:
      command: /home/master/hive-3.1.2/bin/hive -f 'load.sql'
  - name: lastClean
    type: command
    dependsOn:
      - mysql
    config:
      command: /home/master/hadoop-3.2.1/bin/hdfs dfs -rm /listendDir/*

本次工作流做了以下事情

  1. 删除 home 下的 part-r-00000
  2. 删除 hdfs 下的 /output
  3. 执行 mapreduce 统计 hdfs 下的 /listendDir 中的文件
  4. mapreduce 统计结果放到 home
  5. 查询当前 hive 数据库的数据并增量更新到 home 下的 part-r-00000
  6. 重新将统计结果放入 hdfs
  7. 执行 hivesql 命令,将文件数据导入 hive.result 表中
  8. 最后清空 hdfs/listendDir 中的数据
1
https://gitee.com/shampoo6/cjsf-20dsj1.git
git@gitee.com:shampoo6/cjsf-20dsj1.git
shampoo6
cjsf-20dsj1
cjsf-20dsj1
master

搜索帮助