`
>
> -seedDir:将来要爬取的网址的目录
>
> -crawlID: 给这次爬取任务起 个名字 会影响到将来hbase里边表名的 表名=`任务名_webpage`
>
> -numberOfRounds: 爬取几层关联网页
1. 创建存放爬取网址的目录
```sh
mkdir urls
```
2. 在url是目录下创建send.txt 在里面写好要抓取的url 每行一个
```sh
touch urls/seed.txt
```
3. 输入要抓取的网址
```sh
vi urls/seed.txt
```
```
https://developer.huawei.com/consumer/cn/forum/
https://developer.aliyun.com/
```
4. 开始爬取数据
```sh
bin/crawl urls/ briup 3
```
### nutch创建的表结构
```xml
```
### webpage字段含义
**rowkey**
主键,根据网页url生成(格式:reversed domain name:protocol:port and path),因此,Nutch2只能保存当前网页的状态,而不能保存历史信息。
**headers**
标准的http headers ,其中包含非打印字符。Last-Modified 等信息可能于判断网页是否需要更新(仅需发一个head请求,而不是下载整个网页)
**text**
合并了解析出来的所有文本字段(utf-8),用于普通的检索,不过现在检索一般使用solr,所以这个字段意义不大。
**status**
记录抓取状态
[html] view plain copy
1 unfetched (links not yet fetched due to limits set in regex-urlfilter.txt, -TopN crawl parameters, etc.)
2 fetched (page was successfully fetched)
3 gone (that page no longer exists)
4 redir_temp (temporary redirection — see reprUrl below for more details)
5 redir_perm (permanent redirection — see reprUrl below for more details)
6 retry
7 not modified
**markers**
各个任务的标记(如:dist***injmrk_***updmrk_***ftcmrk_***gnmrk_***prsmrk_**)
**parseStatus**
parse状态,在执行parseJob之前都是NULL。 ParseStatusCodes.html
**modifiedTime**
最后更改时间
**score**
网页重要程度(PR),Nutch2.2.1 使用的是OPIC算法
**typ**
类型(如application/xhtml+xml)
**batchId**
批次ID,由generate生成( (curTime/1000) + "-" +randomSeed ), fetch时可选择特定batchId的任务
**baseUrl**
用于将网页源码中相对链接地址的转为绝对地址,通常就是当前网页的地址,有重定向的情况下,是最终定向到的地址
**content**
完整的网页源码,未经任何处理(字符集也没转)。
**title**
title标签里的内容 (已转utf-8编码)
**reprUrl**
重定向url,将在下一轮抓取,不会立即跟入
**fetchInterval**
抓取间隔,默认是2592000(30天)
**prevFetchTime**
上次抓取时间
**inlinks**
入链(url+linktext)
**prevSignature**
上次更新时网页签名
**outlinks**
出链(url+linktext)
**fetchTime**
下次抓取时间,通常是间隔一个月
**retriesSinceFetch**
重试次数
**protocolStatus**
[html] view plain copy
ACCESS_DENIED 17
BLOCKED 23
EXCEPTION 16
FAILED 2
GONE 11
MOVED 12
NOTFETCHING 20
NOTFOUND 14
NOTMODIFIED 21
PROTO_NOT_FOUND 10
REDIR_EXCEEDED 19
RETRY 15
ROBOTS_DENIED 18
SUCCESS 1
TEMP_MOVED 13
WOULDBLOCK 22
**signature**
网页签名,用于判断网页是否改变,默认的实现是:org.apache.nutch.crawl.MD5Signature ,采用content的MD5值,另一个方案是org.apache.nutch.crawl.TextProfileSignature,content抽取文本、分词、排序等一系列操作后计算MD5值 TextProfileSignature
**metadata**
自定义元数据,可以在种子文件里面加,例如: "http://xxxx/xxx.html \t type=news"
### 数据清洗
通过MR程序实现清洗逻辑:
1. 判断抓取是否成功根据`f:st` 值为 2 success
2. 保留列族 `il`下列的个数、列族`ol`下列的个数 、`s:s`、`p:t`、`p:c`,整体保留列族`il`和`ol`;
3. 组成Result继续输出到新表 `clean_webpage`
Clean_webpage 表结构如图所示

建表语句
```
create 'clean_webpage','page','ol','il'
```
参考代码:
```java
package com.briup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
/**
* @author adam
* @date 2022/6/10
* 清洗数据
* 通过MR程序实现清洗逻辑:
*
* 1. 判断抓取是否成功根据`f:st` 值为 2 success
*
* 2. 保留列族 `il`下列的个数、列族`ol`下列的个数 、`s:s`、`p:t`、`p:c`,整体保留列族`il`和`ol`;
* 3. 组成Result继续输出到新表 `clean_webpage`
*
* map 读取未清洗的数据 清洗
* reducer 保存清洗后的数据
*/
public class Step1_CleanData extends Configured implements Tool {
public static class CleanDataMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
MapWritable map = null;
//获取单元格最新的值
Cell cell = value.getColumnLatestCell("f".getBytes(), "st".getBytes());
//Cell pCell = value.getColumnLatestCell("p".getBytes(), "st".getBytes());
//int pFlag = Bytes.toInt(pCell.getValueArray(), pCell.getValueOffset(), pCell.getValueLength());
//将字节数组转成int 就是采集是否成功
int fFlag = Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
//剔除采集或者解析失败的数据
if (fFlag == 2 ) {
map = new MapWritable();
// 获取要保留的数据
// il列族的个数
int ilNum = countByFamily(value, "il");
// ol列族的个数
int olNum = countByFamily(value, "ol");
// 获取网页内容
byte[] pageContext = getValueByFamilyAndCol(value, "p", "c");
// 获取网页标题
byte[] title = getValueByFamilyAndCol(value, "p", "t");
// 获取nutch对当前url的评分
byte[] sorce = getValueByFamilyAndCol(value, "s", "s");
// 获取ol列族的的列和值
MapWritable olMap = getMapByFamily(value, "ol");
// 获取il列族的的列和值
MapWritable ilMap = getMapByFamily(value, "il");
map.put(new BytesWritable("ilNum".getBytes()), new BytesWritable(Bytes.toBytes(ilNum)));
map.put(new BytesWritable("olNum".getBytes()), new BytesWritable(Bytes.toBytes(olNum)));
map.put(new BytesWritable("pageContext".getBytes()), new BytesWritable(pageContext));
map.put(new BytesWritable("title".getBytes()), new BytesWritable(title));
map.put(new BytesWritable("score".getBytes()), new BytesWritable(sorce));
map.put(new BytesWritable("olMap".getBytes()), olMap);
map.put(new BytesWritable("ilMap".getBytes()), ilMap);
context.write(key,map);
}
}
//获取列族下的所有数据
private MapWritable getMapByFamily(Result value, String family) {
// 构建一个Map
MapWritable map = new MapWritable();
NavigableMap navigableMaps = value.getFamilyMap(family.getBytes());
for (Map.Entry entry : navigableMaps.entrySet()) {
byte[] key = entry.getKey();
byte[] value1 = entry.getValue();
map.put(new BytesWritable(key), new BytesWritable(value1));
}
return map;
}
//根据列族名和列名获取值
private byte[] getValueByFamilyAndCol(Result value, String family, String col) {
Cell cell = value.getColumnLatestCell(family.getBytes(), col.getBytes());
return Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
//获取列族下 列的个数
private int countByFamily(Result value, String family) {
NavigableMap map = value.getFamilyMap(family.getBytes());
return map.size();
}
}
public static class CleanDataReducer extends TableReducer{
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
//获取清洗之后的数据
MapWritable map = values.iterator().next();
Put put = new Put(key.get());
put.addColumn("page".getBytes(),"oln".getBytes(),((BytesWritable)(map.get(new BytesWritable("olNum".getBytes())))).getBytes());
put.addColumn("page".getBytes(),"iln".getBytes(),((BytesWritable)(map.get(new BytesWritable("ilNum".getBytes())))).getBytes());
put.addColumn("page".getBytes(),"t".getBytes(),((BytesWritable)(map.get(new BytesWritable("title".getBytes())))).getBytes());
put.addColumn("page".getBytes(),"s".getBytes(),((BytesWritable)(map.get(new BytesWritable("score".getBytes())))).getBytes());
put.addColumn("page".getBytes(),"cnt".getBytes(),((BytesWritable)(map.get(new BytesWritable("pageContext".getBytes())))).getBytes());
MapWritable olMap = (MapWritable) map.get(new BytesWritable("olMap".getBytes()));
for (Map.Entry entry : olMap.entrySet()) {
put.addColumn("ol".getBytes(),((BytesWritable)(entry.getKey())).getBytes(),((BytesWritable)entry.getValue()).getBytes());
}
MapWritable ilMap = (MapWritable) map.get(new BytesWritable("ilMap".getBytes()));
for (Map.Entry entry : olMap.entrySet()) {
put.addColumn("il".getBytes(),((BytesWritable)(entry.getKey())).getBytes(),((BytesWritable)entry.getValue()).getBytes());
}
context.write(NullWritable.get(),put);
}
}
@Override
public int run(String[] strings) throws Exception {
Configuration conf = getConf();
String inTable = conf.get("in");
String outTable = conf.get("out");
inTable = "briup_webpage";
outTable = "clean_webpage";
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job = Job.getInstance(conf, "cleanData");
job.setJarByClass(this.getClass());
//map 读取爬取到的数据 清洗 将符合要求的数据保存到map
TableMapReduceUtil.initTableMapperJob(inTable, new Scan(), CleanDataMapper.class, ImmutableBytesWritable.class, MapWritable.class, job);
//reduce 将清洗之后的数据 保存到hbase
TableMapReduceUtil.initTableReducerJob(outTable,CleanDataReducer.class,job);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Step1_CleanData(),args));
}
}
```
### 计算权重值
1. 平铺自己所有的外连接
2. 给自己一个默认的权重值
3. 将自己的权重值平均分配给自己所有的外连接
4. 对于没有外链的权重直接给0
5. 将网页的权重值写入到表rank_result
6. 多次执行将权重值收敛
建表语句
```
create 'rank_result','page','ol'
```
参考代码:
```java
package com.briup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.*;
/**
* @author adam
* @date 2022/6/10
* 使用pagerank算法进行权重计算
*/
public class Step2_PageRank extends Configured implements Tool {
/**
* 1. 平铺自己所有的外连接
* 2. 给自己一个默认的权重值
* 3. 将自己的权重值平均分配给自己所有的外连接
* 4. 对于没有外链的权重直接给0
*/
public static class PageRankMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String s = new String(key.get());
//得到常规格式的uri
String uri = showUri(s);
if (uri != null) {
uri = uri.trim();
// 获取当前行中page列族里边oln的值 也就获取外链个数
byte[] olnBytes = value.getValue("page".getBytes(), "oln".getBytes());
int i = 0;
if (olnBytes != null) {
i = Bytes.toInt(olnBytes);
}
byte[] rankBytes = value.getValue("page".getBytes(), "rank".getBytes());
// 设置网页默认的权重值 10
double rank = 10;
//如果不是第一次计算权重 那么就会有权重值
if (rankBytes != null) {
rank = Bytes.toDouble(rankBytes);
}
// 定义字符串 将来拼接每一个外链以及其权重值
String outLinks = "";
//有外链去拼接
if (i > 0) {
//获取列族ol的所有数据
NavigableMap ols = value.getFamilyMap("ol".getBytes());
//外链集合
Set keySet = ols.keySet();
double score = rank / keySet.size();
// 拿到每一个外链 然后拼接他的权重
for (byte[] bytes : keySet) {
String oLink = new String(bytes);
outLinks += oLink +",";
context.write(new Text(oLink), new Text(score + ""));
}
// 去除最后拼接的一个逗号
if (outLinks.endsWith(".")) {
outLinks = outLinks.substring(0, outLinks.length() - 1);
}
context.write(new Text(uri), new Text(outLinks));
} else {
//对于没有外链的 权重值给0
context.write(new Text(uri), new Text(0 + ""));
}
}
}
public String showUri(String s) {
String[] split = s.split(":");
if (split.length == 2) {
String[] domain = split[0].split("[.]");
List list = Arrays.asList(domain);
Collections.reverse(list);
String rs = "";
for (String s1 : list) {
rs += s1 + ".";
}
rs = rs.substring(0, rs.length() - 1);
String proto = split[1].substring(0, split[1].indexOf("/"));
String res = split[1].substring(split[1].indexOf("/"), split[1].length());
return proto + "://" + rs.trim() + res.trim();
}
return null;
}
}
public static class PageRankReducer extends TableReducer {
@Override
//http://www.briup.com/index [http://www.briup.com/index,http://www.briup.com/index,http://www.briup.com/index,http://www.briup.com/index,2,23,23,2]
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
// 定义阻尼因子
double d = 0.85;
// 权重
double sum = 0;
// 如果values为空字符串不操作
// 如果value包含http 表示这个value是记录当前连接与外链的关系 如果不包含 记录的是权重值
String k = key.toString();
System.out.println("k = " + k);
// 将key恢复成nutch采集的数据形式
String uri = returnNutchKey(k);
Put put = new Put(uri.getBytes());
for (Text value : values) {
String ctn = value.toString().trim();
if (!ctn.equals("")){
if (ctn.contains("http")){
String[] split = ctn.split(",");
//外链个数
put.addColumn("page".getBytes(),"oln".getBytes(),Bytes.toBytes(split.length));
for (String s : split) {
put.addColumn("ol".getBytes(),s.getBytes(),"1".getBytes());
}
}else {
double v = Double.parseDouble(ctn);
sum+=v;
}
}
}
//嵌套公式
double rank=sum*d+(1-d);
put.addColumn("page".getBytes(),"rank".getBytes(),Bytes.toBytes(rank));
context.write(NullWritable.get(),put);
}
//将正常的uri编程nutch采集到数据格式
//in:http://www.briup.com/index
//out:com.briup.www:http/index
public String returnNutchKey(String uri) {
String[] split = uri.split("://");
String protocol = split[0];
String domain = split[1].substring(0, split[1].indexOf("/"));
String res=split[1].substring(split[1].indexOf("/"),split[1].length());
List list = Arrays.asList(domain.split("[.]"));
Collections.reverse(list);
String rs = "";
for (String s : list) {
rs += s + ".";
}
rs = rs.substring(0, rs.length() - 1);
return rs.trim() + ":" + protocol.trim() + res.trim();
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job = Job.getInstance(conf, "pageRank");
job.setJarByClass(this.getClass());
TableMapReduceUtil.initTableMapperJob("clean_webpage",new Scan(),PageRankMapper.class,Text.class,Text.class
,job);
TableMapReduceUtil.initTableReducerJob("rank_result",PageRankReducer.class,job);
job.waitForCompletion(true);
for (int i=0;i<20;i++){
Configuration conf1 = getConf();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job1 = Job.getInstance(conf1, "pageRank-item"+i);
job1.setJarByClass(this.getClass());
TableMapReduceUtil.initTableMapperJob("rank_result",new Scan(),PageRankMapper.class,Text.class,Text.class
,job1);
TableMapReduceUtil.initTableReducerJob("rank_result",PageRankReducer.class,job1);
job1.waitForCompletion(true);
}
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Step2_PageRank(),args);
}
}
```
### 分析网页关键字
本次项目为了更快满足实验需求 不使用算法去研究网页内容从而提取关键字
网页的关键字通过入链的超链接标签里边的文字内容和当前网页的title共同组成
使用第一个入链值不为空的超链接标签内容
参考代码:
```java
package com.briup;
import org.apache.commons.math3.analysis.function.Tan;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
/**
* @author adam
* @date 2022/6/10
* 查找每个网页的关键字
*/
public class Step3_FindKeyWord extends Configured implements Tool {
public static class FindKeyWordMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String k = new String(key.get());
// 拿到il列族低下所有的网址和值
NavigableMap ilMap = value.getFamilyMap("il".getBytes());
// 当前网页的tile
byte[] titleBytes = value.getValue("page".getBytes(), "t".getBytes());
byte[] value1=null;
if (ilMap.size()>0){
Set> set = ilMap.entrySet();
//获取第一个entry的值
value1= ilMap.firstEntry().getValue();
if (value1==null||value1.length<1){
for (Map.Entry entry : set) {
byte[] value2 = entry.getValue();
if (value2!=null&&value2.length>0){
value1=value2;
break;
}
}
}
if (value1!=null){
context.write(new Text(k),new Text(new String(titleBytes)+new String(value1)));
}else {
context.write(new Text(k),new Text(new String(titleBytes)));
}
}
}
}
public static class FindKeyWordReducer extends TableReducer{
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.toString().getBytes());
for (Text value : values) {
System.out.println("value = " + value.toString());
put.addColumn("page".getBytes(),"key".getBytes(),value.toString().getBytes());
}
context.write(NullWritable.get(),put);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job = Job.getInstance(conf, "findKeyWord");
job.setJarByClass(this.getClass());
// mapper 查数据 提取title和第一个不为空的入链内容
TableMapReduceUtil.initTableMapperJob("clean_webpage",new Scan(),FindKeyWordMapper.class,Text.class,Text.class,job);
// reducer 往clean_webpage 列族 page 插入新列key
TableMapReduceUtil.initTableReducerJob("clean_webpage", FindKeyWordReducer.class,job);
return job.waitForCompletion(true)?0:-1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Step3_FindKeyWord(),args));
}
}
```
### 给表打标识
因为后面要对`clean_webpage`以及`rank_ressult`使用同一个MR进行关联,但是在操作的时候是没有办法确定数据来源于那张表的 所以要在这里给不同的表在page这个列族上边加标识列 `clean_webpage` 的值为`a` `rank_result`的值为`b`
```java
package com.briup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
import java.util.ArrayList;
/**
* @author adam
* @date 2022/6/13
* 为了将来在做连接的时候能够更明确数据从那张表获取 先给表加一个标记
* 在列族 page里边 加一个列i clean_webpage 的值为a rank_result的值为b
*
*/
public class Step4_AddIdentity {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("clean_webpage"));
//Table table = connection.getTable(TableName.valueOf("rank_result"));
// 先扫描到所有的行 然后给所有的行添加一个列 i
ResultScanner resultScanner = table.getScanner(new Scan());
ArrayList list = new ArrayList<>();
for (Result result : resultScanner) {
Put put = new Put(result.getRow());
put.addColumn("page".getBytes(),"i".getBytes(),"a".getBytes());
list.add(put);
}
table.put(list);
}
}
```
### 关联表
通过行键进行`clean_webpage`与`rank_result`两张表的连接,并将连接之后的结果存入表`last_result`
建表语句
```sql
create 'last_result','page'
```
参考代码:
```java
package com.briup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import sun.jvm.hotspot.runtime.Bytes;
import java.io.IOException;
import java.util.*;
/**
* @author adam
* @date 2022/6/13
* 连接两张表 clean_webpage 和 rank_result
*/
public class Step5_JoinCleanAndRank extends Configured implements Tool {
public static class JoinCleanAndRankMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
// 获取标志位的值 判断数据来源
String s = new String(getValueByFamilyAndCol(value, "page", "i"));
MapWritable map = new MapWritable();
//a的话 是数据来源 clean_webpage
if ("a".equals(s)) {
byte[] keyBytes = getValueByFamilyAndCol(value, "page", "key");
map.put(new Text("key"), new BytesWritable(keyBytes));
context.write(key, map);
} else {
byte[] rankBytes = getValueByFamilyAndCol(value, "page", "rank");
map.put(new Text("rank"), new BytesWritable(rankBytes));
context.write(key, map);
}
}
public byte[] getValueByFamilyAndCol(Result value, String family, String col) {
Cell cell = value.getColumnLatestCell(family.getBytes(), col.getBytes());
if (cell == null) {
return new byte[0];
}
byte[] copy = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueArray().length);
return copy;
}
}
public static class JoinCleanAndPankReducer extends TableReducer {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
String s = showUri(new String(key.get()));
if (s != null) {
Put put = new Put(s.getBytes());
for (MapWritable map : values) {
Set> entrySet = map.entrySet();
for (Map.Entry entry : entrySet) {
Text k = (Text) entry.getKey();
System.out.println("k.toString() = " + k.toString());
BytesWritable v = (BytesWritable) entry.getValue();
System.out.println("entry = " + Arrays.toString(v.getBytes()));
put.addColumn("page".getBytes(), k.toString().getBytes(), v.getBytes());
}
}
context.write(NullWritable.get(), put);
}
}
// 将nutch爬取到的网址数据 转换成常见格式
public String showUri(String s) {
String[] split = s.split(":");
if (split.length == 2) {
String[] domain = split[0].split("[.]");
List list = Arrays.asList(domain);
Collections.reverse(list);
String rs = "";
for (String s1 : list) {
rs += s1 + ".";
}
rs = rs.substring(0, rs.length() - 1);
String proto = split[1].substring(0, split[1].indexOf("/"));
String res = split[1].substring(split[1].indexOf("/"), split[1].length());
return proto + "://" + rs.trim() + res.trim();
}
return null;
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job = Job.getInstance(conf, "JoinCleanAndRank");
job.setJarByClass(this.getClass());
//创建 list对象存储Scan对象
ArrayList list = new ArrayList<>();
Scan scan1 = new Scan();
//设置要扫描的表的名字
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, "clean_webpage".getBytes());
Scan scan2 = new Scan();
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, "rank_result".getBytes());
list.add(scan1);
list.add(scan2);
//设置mapper
TableMapReduceUtil.initTableMapperJob(list, JoinCleanAndRankMapper.class, ImmutableBytesWritable.class, MapWritable.class, job);
TableMapReduceUtil.initTableReducerJob("last_result", JoinCleanAndPankReducer.class, job);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Step5_JoinCleanAndRank(), args);
}
}
```
### 处理关联以后无效的数据
`rank_result`表是对抓取到数据进行的权重值计算 之后部分数据不会产生升权重值 但是`clean_webpage`中的数据是清洗过后的数据 会剔除很多数据 两张表的数据量不一定会相同,所以我们要剔除只有`rank`列数据或者只有`keyword`的列 将两个列都有的数据保留并存入新表`join_result`
建表语句
```sql
create 'join_result','page'
```
参考代码
```java
package com.briup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
/**
* @author adam
* @date 2022/6/13
*/
public class Step6_HoldJoinResult extends Configured implements Tool {
public static class HoldJoinResultMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
NavigableMap nvm = value.getFamilyMap("page".getBytes());
MapWritable map = new MapWritable();
System.out.println(nvm.size());
if (nvm.size() == 2) {
byte[] rank = value.getValue("page".getBytes(), "rank".getBytes());
byte[] keyword = value.getValue("page".getBytes(), "key".getBytes());
if (rank != null && keyword != null) {
map.put(new Text("rank"), new BytesWritable(rank));
map.put(new Text("key"), new BytesWritable(keyword));
context.write(key, map);
}
}
}
}
public static class HoldJoinResultReducer extends TableReducer {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.get());
for (MapWritable value : values) {
Set> entrySet = value.entrySet();
for (Map.Entry entry : entrySet) {
BytesWritable v = (BytesWritable) entry.getValue();
put.addColumn("page".getBytes(), entry.getKey().toString().getBytes(), v.getBytes());
}
}
context.write(NullWritable.get(), put);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job = Job.getInstance(conf, "HoldJoinResult");
job.setJarByClass(this.getClass());
TableMapReduceUtil.initTableMapperJob("last_result", new Scan(), HoldJoinResultMapper.class, ImmutableBytesWritable.class, MapWritable.class, job);
TableMapReduceUtil.initTableReducerJob("join_result", HoldJoinResultReducer.class, job);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Step6_HoldJoinResult(),args);
}
}
```
### 索引倒置
将合并并处理好的数据进行索引倒置,并将结果保存到表`invertindex_result`
建表语句
```sql
create 'invertindex_result','page'
```
参考代码
```java
package com.briup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.TreeSet;
/**
* @author adam
* @date 2022/6/13
* 使用mr 实现索引倒置
*/
public class Step7_InvertIndex extends Configured implements Tool {
public static class InvertIndexMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String s = new String(key.get());
byte[] keyBytes = value.getValue("page".getBytes(), "key".getBytes());
byte[] rankBytes = value.getValue("page".getBytes(), "rank".getBytes());
double rank = Bytes.toDouble(rankBytes);
System.out.println("key = " + new String(keyBytes));
//关键字做行键 权重值,url
context.write(new ImmutableBytesWritable(keyBytes), new Text(rank + "," + s));
}
}
public static class InvertIndexReducer extends TableReducer {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {
if (key.get().length >= 1) {
//treeset 必须排序 可以是自然排序 有可以是客户化排序
TreeSet set = new TreeSet((a, b) -> {
String[] sa = a.split(",");
String[] sb = b.split(",");
Double da = new Double(sa[0]);
Double db = new Double(sb[0]);
return da.compareTo(db);
});
//将reducer接受到的值数组 进行遍历 添加到treeset 从而实现排序 根据权重值
for (Text value : values) {
set.add(value.toString());
}
System.out.println(set);
Put put = new Put(key.get());
for (String s : set) {
String[] split = s.split(",");
double rank = Double.parseDouble(split[0]);
String url = split[1];
put.addColumn("page".getBytes(), url.getBytes(), Bytes.toBytes(rank));
}
context.write(NullWritable.get(), put);
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("hbase.zookeeper.quorum", "hadoop01:2181");
Job job = Job.getInstance(conf, "InvertIndex");
job.setJarByClass(this.getClass());
TableMapReduceUtil.initTableMapperJob("join_result", new Scan(), InvertIndexMapper.class, ImmutableBytesWritable.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob("invertindex_result", InvertIndexReducer.class, job);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Step7_InvertIndex(), args);
}
}
```
web项目导入之后建表
```sql
create 'table_info','info'
put 'table_info','invertindex_result','info:owner','briup'
```
step1
将爬取到的数据进行清洗;首先剔除抓取失败的数据;判断抓取是否成功根据`f:st` 的值;为2的时候表示success;保留列族 `il`下列的个数、列族`ol`下列的个数 、`s:s`、`p:t`、`p:c`,整体保留列族`il`和`ol`;构建出Result继续保存到新表 `clean_webpage`
step2
使用pagerank算法进行权重计算
1. 平铺自己所有的外连接
2. 给自己一个默认的权重值
3. 将自己的权重值平均分配给自己所有的外连接
4. 对于没有外链的权重直接给0
代码要复用第一次逻辑和之后的每一次逻辑不同;第一次数据从'clean_webpage'表获取 之后保存到'rank_result' 表 ;后面每一次的运算结果要从'rank_result'拿出数据计算之后再次存储到'rank_result'
step3
查找每个网页的关键字
从'clean_webpage'提取title和第一个不为空的入链内容 如果第一个入链的文本为空则往后顺延 如果没有入链则关键字为标题;将获取的关键字作为值添加到表'clean_webpage'的列族'page'下 列名为'key'
step4
表连接前做标记
为了将来在做连接的时候能够更明确数据从那张表获取 先给表加一个标记
分别在两张表的列族 'page'里边 加一个列i 其中表'clean_webpage' 的值为a 表'rank_result'的值为b
step5