# searchEngine **Repository Path**: briupAdam/search-engine ## Basic Information - **Project Name**: searchEngine - **Description**: 使用hadoop+hbase搜索引擎 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 1 - **Created**: 2022-12-16 - **Last Updated**: 2023-04-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 基于PageRank的搜索引擎 ## 搜索引擎 > 搜索引擎是指根据一定的策略、运用特定的计算机程序从互联网上采集信息,在对信息进行组织和处理后,为用户提供检索服务,将检索的相关信息展示给用户的系统。 ## PageRank PageRank,网页排名,又称网页级别、Google左侧排名或佩奇排名,是一种由根据网页之间相互的超链接计算的技术,而作为网页排名的要素之一,以Google公司创办人[拉里·佩奇](https://baike.baidu.com/item/拉里·佩奇)(Larry Page)之姓来命名。Google用它来体现网页的相关性和重要性,在搜索引擎优化操作中是经常被用来评估网页优化的成效因素之一。 ## 数据爬取 这里使用的是`Nutch`来爬取网上的数据; ### 安装Nutch 1. 上传安装包到虚拟机 可以使用xShell提供的拖拽 或者是使用scp命令 ```sh scp apache-nutch-2.4-bin.tar.gz briup@hadoop01:/opt ``` 2. 解压安装包 ```sh tar -zxvf apache-nutch-2.4-bin.tar.gz ``` 3. 修改配置文件`conf/nutch-site.xml` ```xml plugin.folders /opt/nutch-2.4/plugins ``` 4. 使用自己的hbase的配置 覆盖nutch配置目录下的hbase的配置 ```sh cp /opt/hbase/conf/hbase-site.xml /opt/nutch-2.4/conf/ ``` ### 数据爬取 使用命令`crawl` > `crawl [] ` > > -seedDir:将来要爬取的网址的目录 > > -crawlID: 给这次爬取任务起 个名字 会影响到将来hbase里边表名的 表名=`任务名_webpage` > > -numberOfRounds: 爬取几层关联网页 1. 创建存放爬取网址的目录 ```sh mkdir urls ``` 2. 在url是目录下创建send.txt 在里面写好要抓取的url 每行一个 ```sh touch urls/seed.txt ``` 3. 输入要抓取的网址 ```sh vi urls/seed.txt ``` ``` https://developer.huawei.com/consumer/cn/forum/ https://developer.aliyun.com/ ``` 4. 开始爬取数据 ```sh bin/crawl urls/ briup 3 ``` ### nutch创建的表结构 ```xml
``` ### webpage字段含义 **rowkey** 主键,根据网页url生成(格式:reversed domain name:protocol:port and path),因此,Nutch2只能保存当前网页的状态,而不能保存历史信息。 **headers** 标准的http headers ,其中包含非打印字符。Last-Modified 等信息可能于判断网页是否需要更新(仅需发一个head请求,而不是下载整个网页) **text** 合并了解析出来的所有文本字段(utf-8),用于普通的检索,不过现在检索一般使用solr,所以这个字段意义不大。 **status** 记录抓取状态 [html] view plain copy 1 unfetched (links not yet fetched due to limits set in regex-urlfilter.txt, -TopN crawl parameters, etc.) 2 fetched (page was successfully fetched) 3 gone (that page no longer exists) 4 redir_temp (temporary redirection — see reprUrl below for more details) 5 redir_perm (permanent redirection — see reprUrl below for more details) 6 retry 7 not modified **markers** 各个任务的标记(如:dist***injmrk_***updmrk_***ftcmrk_***gnmrk_***prsmrk_**) **parseStatus** parse状态,在执行parseJob之前都是NULL。 ParseStatusCodes.html **modifiedTime** 最后更改时间 **score** 网页重要程度(PR),Nutch2.2.1 使用的是OPIC算法 **typ** 类型(如application/xhtml+xml) **batchId** 批次ID,由generate生成( (curTime/1000) + "-" +randomSeed ), fetch时可选择特定batchId的任务 **baseUrl** 用于将网页源码中相对链接地址的转为绝对地址,通常就是当前网页的地址,有重定向的情况下,是最终定向到的地址 **content** 完整的网页源码,未经任何处理(字符集也没转)。  **title** title标签里的内容 (已转utf-8编码) **reprUrl** 重定向url,将在下一轮抓取,不会立即跟入 **fetchInterval** 抓取间隔,默认是2592000(30天) **prevFetchTime** 上次抓取时间 **inlinks** 入链(url+linktext) **prevSignature** 上次更新时网页签名 **outlinks** 出链(url+linktext) **fetchTime** 下次抓取时间,通常是间隔一个月 **retriesSinceFetch** 重试次数 **protocolStatus** [html] view plain copy ACCESS_DENIED 17 BLOCKED 23 EXCEPTION 16 FAILED 2 GONE 11 MOVED 12 NOTFETCHING 20 NOTFOUND 14 NOTMODIFIED 21 PROTO_NOT_FOUND 10 REDIR_EXCEEDED 19 RETRY 15 ROBOTS_DENIED 18 SUCCESS 1 TEMP_MOVED 13 WOULDBLOCK 22 **signature** 网页签名,用于判断网页是否改变,默认的实现是:org.apache.nutch.crawl.MD5Signature ,采用content的MD5值,另一个方案是org.apache.nutch.crawl.TextProfileSignature,content抽取文本、分词、排序等一系列操作后计算MD5值 TextProfileSignature **metadata** 自定义元数据,可以在种子文件里面加,例如: "http://xxxx/xxx.html \t type=news" ### 数据清洗 通过MR程序实现清洗逻辑: 1. 判断抓取是否成功根据`f:st` 值为 2 success 2. 保留列族 `il`下列的个数、列族`ol`下列的个数 、`s:s`、`p:t`、`p:c`,整体保留列族`il`和`ol`; 3. 组成Result继续输出到新表 `clean_webpage` Clean_webpage 表结构如图所示 ![截屏2022-06-10 11.20.52](assest/截屏2022-06-10 11.20.52.png) 建表语句 ``` create 'clean_webpage','page','ol','il' ``` 参考代码: ```java package com.briup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Map; import java.util.NavigableMap; /** * @author adam * @date 2022/6/10 * 清洗数据 * 通过MR程序实现清洗逻辑: *

* 1. 判断抓取是否成功根据`f:st` 值为 2 success * * 2. 保留列族 `il`下列的个数、列族`ol`下列的个数 、`s:s`、`p:t`、`p:c`,整体保留列族`il`和`ol`; * 3. 组成Result继续输出到新表 `clean_webpage` *

* map 读取未清洗的数据 清洗 * reducer 保存清洗后的数据 */ public class Step1_CleanData extends Configured implements Tool { public static class CleanDataMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { MapWritable map = null; //获取单元格最新的值 Cell cell = value.getColumnLatestCell("f".getBytes(), "st".getBytes()); //Cell pCell = value.getColumnLatestCell("p".getBytes(), "st".getBytes()); //int pFlag = Bytes.toInt(pCell.getValueArray(), pCell.getValueOffset(), pCell.getValueLength()); //将字节数组转成int 就是采集是否成功 int fFlag = Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); //剔除采集或者解析失败的数据 if (fFlag == 2 ) { map = new MapWritable(); // 获取要保留的数据 // il列族的个数 int ilNum = countByFamily(value, "il"); // ol列族的个数 int olNum = countByFamily(value, "ol"); // 获取网页内容 byte[] pageContext = getValueByFamilyAndCol(value, "p", "c"); // 获取网页标题 byte[] title = getValueByFamilyAndCol(value, "p", "t"); // 获取nutch对当前url的评分 byte[] sorce = getValueByFamilyAndCol(value, "s", "s"); // 获取ol列族的的列和值 MapWritable olMap = getMapByFamily(value, "ol"); // 获取il列族的的列和值 MapWritable ilMap = getMapByFamily(value, "il"); map.put(new BytesWritable("ilNum".getBytes()), new BytesWritable(Bytes.toBytes(ilNum))); map.put(new BytesWritable("olNum".getBytes()), new BytesWritable(Bytes.toBytes(olNum))); map.put(new BytesWritable("pageContext".getBytes()), new BytesWritable(pageContext)); map.put(new BytesWritable("title".getBytes()), new BytesWritable(title)); map.put(new BytesWritable("score".getBytes()), new BytesWritable(sorce)); map.put(new BytesWritable("olMap".getBytes()), olMap); map.put(new BytesWritable("ilMap".getBytes()), ilMap); context.write(key,map); } } //获取列族下的所有数据 private MapWritable getMapByFamily(Result value, String family) { // 构建一个Map MapWritable map = new MapWritable(); NavigableMap navigableMaps = value.getFamilyMap(family.getBytes()); for (Map.Entry entry : navigableMaps.entrySet()) { byte[] key = entry.getKey(); byte[] value1 = entry.getValue(); map.put(new BytesWritable(key), new BytesWritable(value1)); } return map; } //根据列族名和列名获取值 private byte[] getValueByFamilyAndCol(Result value, String family, String col) { Cell cell = value.getColumnLatestCell(family.getBytes(), col.getBytes()); return Bytes.copy(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); } //获取列族下 列的个数 private int countByFamily(Result value, String family) { NavigableMap map = value.getFamilyMap(family.getBytes()); return map.size(); } } public static class CleanDataReducer extends TableReducer{ @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { //获取清洗之后的数据 MapWritable map = values.iterator().next(); Put put = new Put(key.get()); put.addColumn("page".getBytes(),"oln".getBytes(),((BytesWritable)(map.get(new BytesWritable("olNum".getBytes())))).getBytes()); put.addColumn("page".getBytes(),"iln".getBytes(),((BytesWritable)(map.get(new BytesWritable("ilNum".getBytes())))).getBytes()); put.addColumn("page".getBytes(),"t".getBytes(),((BytesWritable)(map.get(new BytesWritable("title".getBytes())))).getBytes()); put.addColumn("page".getBytes(),"s".getBytes(),((BytesWritable)(map.get(new BytesWritable("score".getBytes())))).getBytes()); put.addColumn("page".getBytes(),"cnt".getBytes(),((BytesWritable)(map.get(new BytesWritable("pageContext".getBytes())))).getBytes()); MapWritable olMap = (MapWritable) map.get(new BytesWritable("olMap".getBytes())); for (Map.Entry entry : olMap.entrySet()) { put.addColumn("ol".getBytes(),((BytesWritable)(entry.getKey())).getBytes(),((BytesWritable)entry.getValue()).getBytes()); } MapWritable ilMap = (MapWritable) map.get(new BytesWritable("ilMap".getBytes())); for (Map.Entry entry : olMap.entrySet()) { put.addColumn("il".getBytes(),((BytesWritable)(entry.getKey())).getBytes(),((BytesWritable)entry.getValue()).getBytes()); } context.write(NullWritable.get(),put); } } @Override public int run(String[] strings) throws Exception { Configuration conf = getConf(); String inTable = conf.get("in"); String outTable = conf.get("out"); inTable = "briup_webpage"; outTable = "clean_webpage"; conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job = Job.getInstance(conf, "cleanData"); job.setJarByClass(this.getClass()); //map 读取爬取到的数据 清洗 将符合要求的数据保存到map TableMapReduceUtil.initTableMapperJob(inTable, new Scan(), CleanDataMapper.class, ImmutableBytesWritable.class, MapWritable.class, job); //reduce 将清洗之后的数据 保存到hbase TableMapReduceUtil.initTableReducerJob(outTable,CleanDataReducer.class,job); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Step1_CleanData(),args)); } } ``` ### 计算权重值 1. 平铺自己所有的外连接 2. 给自己一个默认的权重值 3. 将自己的权重值平均分配给自己所有的外连接 4. 对于没有外链的权重直接给0 5. 将网页的权重值写入到表rank_result 6. 多次执行将权重值收敛 建表语句 ``` create 'rank_result','page','ol' ``` 参考代码: ```java package com.briup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.*; /** * @author adam * @date 2022/6/10 * 使用pagerank算法进行权重计算 */ public class Step2_PageRank extends Configured implements Tool { /** * 1. 平铺自己所有的外连接 * 2. 给自己一个默认的权重值 * 3. 将自己的权重值平均分配给自己所有的外连接 * 4. 对于没有外链的权重直接给0 */ public static class PageRankMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String s = new String(key.get()); //得到常规格式的uri String uri = showUri(s); if (uri != null) { uri = uri.trim(); // 获取当前行中page列族里边oln的值 也就获取外链个数 byte[] olnBytes = value.getValue("page".getBytes(), "oln".getBytes()); int i = 0; if (olnBytes != null) { i = Bytes.toInt(olnBytes); } byte[] rankBytes = value.getValue("page".getBytes(), "rank".getBytes()); // 设置网页默认的权重值 10 double rank = 10; //如果不是第一次计算权重 那么就会有权重值 if (rankBytes != null) { rank = Bytes.toDouble(rankBytes); } // 定义字符串 将来拼接每一个外链以及其权重值 String outLinks = ""; //有外链去拼接 if (i > 0) { //获取列族ol的所有数据 NavigableMap ols = value.getFamilyMap("ol".getBytes()); //外链集合 Set keySet = ols.keySet(); double score = rank / keySet.size(); // 拿到每一个外链 然后拼接他的权重 for (byte[] bytes : keySet) { String oLink = new String(bytes); outLinks += oLink +","; context.write(new Text(oLink), new Text(score + "")); } // 去除最后拼接的一个逗号 if (outLinks.endsWith(".")) { outLinks = outLinks.substring(0, outLinks.length() - 1); } context.write(new Text(uri), new Text(outLinks)); } else { //对于没有外链的 权重值给0 context.write(new Text(uri), new Text(0 + "")); } } } public String showUri(String s) { String[] split = s.split(":"); if (split.length == 2) { String[] domain = split[0].split("[.]"); List list = Arrays.asList(domain); Collections.reverse(list); String rs = ""; for (String s1 : list) { rs += s1 + "."; } rs = rs.substring(0, rs.length() - 1); String proto = split[1].substring(0, split[1].indexOf("/")); String res = split[1].substring(split[1].indexOf("/"), split[1].length()); return proto + "://" + rs.trim() + res.trim(); } return null; } } public static class PageRankReducer extends TableReducer { @Override //http://www.briup.com/index [http://www.briup.com/index,http://www.briup.com/index,http://www.briup.com/index,http://www.briup.com/index,2,23,23,2] protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 定义阻尼因子 double d = 0.85; // 权重 double sum = 0; // 如果values为空字符串不操作 // 如果value包含http 表示这个value是记录当前连接与外链的关系 如果不包含 记录的是权重值 String k = key.toString(); System.out.println("k = " + k); // 将key恢复成nutch采集的数据形式 String uri = returnNutchKey(k); Put put = new Put(uri.getBytes()); for (Text value : values) { String ctn = value.toString().trim(); if (!ctn.equals("")){ if (ctn.contains("http")){ String[] split = ctn.split(","); //外链个数 put.addColumn("page".getBytes(),"oln".getBytes(),Bytes.toBytes(split.length)); for (String s : split) { put.addColumn("ol".getBytes(),s.getBytes(),"1".getBytes()); } }else { double v = Double.parseDouble(ctn); sum+=v; } } } //嵌套公式 double rank=sum*d+(1-d); put.addColumn("page".getBytes(),"rank".getBytes(),Bytes.toBytes(rank)); context.write(NullWritable.get(),put); } //将正常的uri编程nutch采集到数据格式 //in:http://www.briup.com/index //out:com.briup.www:http/index public String returnNutchKey(String uri) { String[] split = uri.split("://"); String protocol = split[0]; String domain = split[1].substring(0, split[1].indexOf("/")); String res=split[1].substring(split[1].indexOf("/"),split[1].length()); List list = Arrays.asList(domain.split("[.]")); Collections.reverse(list); String rs = ""; for (String s : list) { rs += s + "."; } rs = rs.substring(0, rs.length() - 1); return rs.trim() + ":" + protocol.trim() + res.trim(); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job = Job.getInstance(conf, "pageRank"); job.setJarByClass(this.getClass()); TableMapReduceUtil.initTableMapperJob("clean_webpage",new Scan(),PageRankMapper.class,Text.class,Text.class ,job); TableMapReduceUtil.initTableReducerJob("rank_result",PageRankReducer.class,job); job.waitForCompletion(true); for (int i=0;i<20;i++){ Configuration conf1 = getConf(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job1 = Job.getInstance(conf1, "pageRank-item"+i); job1.setJarByClass(this.getClass()); TableMapReduceUtil.initTableMapperJob("rank_result",new Scan(),PageRankMapper.class,Text.class,Text.class ,job1); TableMapReduceUtil.initTableReducerJob("rank_result",PageRankReducer.class,job1); job1.waitForCompletion(true); } return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new Step2_PageRank(),args); } } ``` ### 分析网页关键字 本次项目为了更快满足实验需求 不使用算法去研究网页内容从而提取关键字 网页的关键字通过入链的超链接标签里边的文字内容和当前网页的title共同组成 使用第一个入链值不为空的超链接标签内容 参考代码: ```java package com.briup; import org.apache.commons.math3.analysis.function.Tan; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Map; import java.util.NavigableMap; import java.util.Set; /** * @author adam * @date 2022/6/10 * 查找每个网页的关键字 */ public class Step3_FindKeyWord extends Configured implements Tool { public static class FindKeyWordMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String k = new String(key.get()); // 拿到il列族低下所有的网址和值 NavigableMap ilMap = value.getFamilyMap("il".getBytes()); // 当前网页的tile byte[] titleBytes = value.getValue("page".getBytes(), "t".getBytes()); byte[] value1=null; if (ilMap.size()>0){ Set> set = ilMap.entrySet(); //获取第一个entry的值 value1= ilMap.firstEntry().getValue(); if (value1==null||value1.length<1){ for (Map.Entry entry : set) { byte[] value2 = entry.getValue(); if (value2!=null&&value2.length>0){ value1=value2; break; } } } if (value1!=null){ context.write(new Text(k),new Text(new String(titleBytes)+new String(value1))); }else { context.write(new Text(k),new Text(new String(titleBytes))); } } } } public static class FindKeyWordReducer extends TableReducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Put put = new Put(key.toString().getBytes()); for (Text value : values) { System.out.println("value = " + value.toString()); put.addColumn("page".getBytes(),"key".getBytes(),value.toString().getBytes()); } context.write(NullWritable.get(),put); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job = Job.getInstance(conf, "findKeyWord"); job.setJarByClass(this.getClass()); // mapper 查数据 提取title和第一个不为空的入链内容 TableMapReduceUtil.initTableMapperJob("clean_webpage",new Scan(),FindKeyWordMapper.class,Text.class,Text.class,job); // reducer 往clean_webpage 列族 page 插入新列key TableMapReduceUtil.initTableReducerJob("clean_webpage", FindKeyWordReducer.class,job); return job.waitForCompletion(true)?0:-1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Step3_FindKeyWord(),args)); } } ``` ### 给表打标识 因为后面要对`clean_webpage`以及`rank_ressult`使用同一个MR进行关联,但是在操作的时候是没有办法确定数据来源于那张表的 所以要在这里给不同的表在page这个列族上边加标识列 `clean_webpage` 的值为`a` `rank_result`的值为`b` ```java package com.briup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; import java.util.ArrayList; /** * @author adam * @date 2022/6/13 * 为了将来在做连接的时候能够更明确数据从那张表获取 先给表加一个标记 * 在列族 page里边 加一个列i clean_webpage 的值为a rank_result的值为b * */ public class Step4_AddIdentity { public static void main(String[] args) throws IOException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("clean_webpage")); //Table table = connection.getTable(TableName.valueOf("rank_result")); // 先扫描到所有的行 然后给所有的行添加一个列 i ResultScanner resultScanner = table.getScanner(new Scan()); ArrayList list = new ArrayList<>(); for (Result result : resultScanner) { Put put = new Put(result.getRow()); put.addColumn("page".getBytes(),"i".getBytes(),"a".getBytes()); list.add(put); } table.put(list); } } ``` ### 关联表 通过行键进行`clean_webpage`与`rank_result`两张表的连接,并将连接之后的结果存入表`last_result` 建表语句 ```sql create 'last_result','page' ``` 参考代码: ```java package com.briup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import sun.jvm.hotspot.runtime.Bytes; import java.io.IOException; import java.util.*; /** * @author adam * @date 2022/6/13 * 连接两张表 clean_webpage 和 rank_result */ public class Step5_JoinCleanAndRank extends Configured implements Tool { public static class JoinCleanAndRankMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 获取标志位的值 判断数据来源 String s = new String(getValueByFamilyAndCol(value, "page", "i")); MapWritable map = new MapWritable(); //a的话 是数据来源 clean_webpage if ("a".equals(s)) { byte[] keyBytes = getValueByFamilyAndCol(value, "page", "key"); map.put(new Text("key"), new BytesWritable(keyBytes)); context.write(key, map); } else { byte[] rankBytes = getValueByFamilyAndCol(value, "page", "rank"); map.put(new Text("rank"), new BytesWritable(rankBytes)); context.write(key, map); } } public byte[] getValueByFamilyAndCol(Result value, String family, String col) { Cell cell = value.getColumnLatestCell(family.getBytes(), col.getBytes()); if (cell == null) { return new byte[0]; } byte[] copy = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueArray().length); return copy; } } public static class JoinCleanAndPankReducer extends TableReducer { @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { String s = showUri(new String(key.get())); if (s != null) { Put put = new Put(s.getBytes()); for (MapWritable map : values) { Set> entrySet = map.entrySet(); for (Map.Entry entry : entrySet) { Text k = (Text) entry.getKey(); System.out.println("k.toString() = " + k.toString()); BytesWritable v = (BytesWritable) entry.getValue(); System.out.println("entry = " + Arrays.toString(v.getBytes())); put.addColumn("page".getBytes(), k.toString().getBytes(), v.getBytes()); } } context.write(NullWritable.get(), put); } } // 将nutch爬取到的网址数据 转换成常见格式 public String showUri(String s) { String[] split = s.split(":"); if (split.length == 2) { String[] domain = split[0].split("[.]"); List list = Arrays.asList(domain); Collections.reverse(list); String rs = ""; for (String s1 : list) { rs += s1 + "."; } rs = rs.substring(0, rs.length() - 1); String proto = split[1].substring(0, split[1].indexOf("/")); String res = split[1].substring(split[1].indexOf("/"), split[1].length()); return proto + "://" + rs.trim() + res.trim(); } return null; } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job = Job.getInstance(conf, "JoinCleanAndRank"); job.setJarByClass(this.getClass()); //创建 list对象存储Scan对象 ArrayList list = new ArrayList<>(); Scan scan1 = new Scan(); //设置要扫描的表的名字 scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, "clean_webpage".getBytes()); Scan scan2 = new Scan(); scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, "rank_result".getBytes()); list.add(scan1); list.add(scan2); //设置mapper TableMapReduceUtil.initTableMapperJob(list, JoinCleanAndRankMapper.class, ImmutableBytesWritable.class, MapWritable.class, job); TableMapReduceUtil.initTableReducerJob("last_result", JoinCleanAndPankReducer.class, job); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new Step5_JoinCleanAndRank(), args); } } ``` ### 处理关联以后无效的数据 `rank_result`表是对抓取到数据进行的权重值计算 之后部分数据不会产生升权重值 但是`clean_webpage`中的数据是清洗过后的数据 会剔除很多数据 两张表的数据量不一定会相同,所以我们要剔除只有`rank`列数据或者只有`keyword`的列 将两个列都有的数据保留并存入新表`join_result` 建表语句 ```sql create 'join_result','page' ``` 参考代码 ```java package com.briup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Map; import java.util.NavigableMap; import java.util.Set; /** * @author adam * @date 2022/6/13 */ public class Step6_HoldJoinResult extends Configured implements Tool { public static class HoldJoinResultMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { NavigableMap nvm = value.getFamilyMap("page".getBytes()); MapWritable map = new MapWritable(); System.out.println(nvm.size()); if (nvm.size() == 2) { byte[] rank = value.getValue("page".getBytes(), "rank".getBytes()); byte[] keyword = value.getValue("page".getBytes(), "key".getBytes()); if (rank != null && keyword != null) { map.put(new Text("rank"), new BytesWritable(rank)); map.put(new Text("key"), new BytesWritable(keyword)); context.write(key, map); } } } } public static class HoldJoinResultReducer extends TableReducer { @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { Put put = new Put(key.get()); for (MapWritable value : values) { Set> entrySet = value.entrySet(); for (Map.Entry entry : entrySet) { BytesWritable v = (BytesWritable) entry.getValue(); put.addColumn("page".getBytes(), entry.getKey().toString().getBytes(), v.getBytes()); } } context.write(NullWritable.get(), put); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job = Job.getInstance(conf, "HoldJoinResult"); job.setJarByClass(this.getClass()); TableMapReduceUtil.initTableMapperJob("last_result", new Scan(), HoldJoinResultMapper.class, ImmutableBytesWritable.class, MapWritable.class, job); TableMapReduceUtil.initTableReducerJob("join_result", HoldJoinResultReducer.class, job); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { ToolRunner.run(new Step6_HoldJoinResult(),args); } } ``` ### 索引倒置 将合并并处理好的数据进行索引倒置,并将结果保存到表`invertindex_result` 建表语句 ```sql create 'invertindex_result','page' ``` 参考代码 ```java package com.briup; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.TreeSet; /** * @author adam * @date 2022/6/13 * 使用mr 实现索引倒置 */ public class Step7_InvertIndex extends Configured implements Tool { public static class InvertIndexMapper extends TableMapper { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String s = new String(key.get()); byte[] keyBytes = value.getValue("page".getBytes(), "key".getBytes()); byte[] rankBytes = value.getValue("page".getBytes(), "rank".getBytes()); double rank = Bytes.toDouble(rankBytes); System.out.println("key = " + new String(keyBytes)); //关键字做行键 权重值,url context.write(new ImmutableBytesWritable(keyBytes), new Text(rank + "," + s)); } } public static class InvertIndexReducer extends TableReducer { @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { if (key.get().length >= 1) { //treeset 必须排序 可以是自然排序 有可以是客户化排序 TreeSet set = new TreeSet((a, b) -> { String[] sa = a.split(","); String[] sb = b.split(","); Double da = new Double(sa[0]); Double db = new Double(sb[0]); return da.compareTo(db); }); //将reducer接受到的值数组 进行遍历 添加到treeset 从而实现排序 根据权重值 for (Text value : values) { set.add(value.toString()); } System.out.println(set); Put put = new Put(key.get()); for (String s : set) { String[] split = s.split(","); double rank = Double.parseDouble(split[0]); String url = split[1]; put.addColumn("page".getBytes(), url.getBytes(), Bytes.toBytes(rank)); } context.write(NullWritable.get(), put); } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("hbase.zookeeper.quorum", "hadoop01:2181"); Job job = Job.getInstance(conf, "InvertIndex"); job.setJarByClass(this.getClass()); TableMapReduceUtil.initTableMapperJob("join_result", new Scan(), InvertIndexMapper.class, ImmutableBytesWritable.class, Text.class, job); TableMapReduceUtil.initTableReducerJob("invertindex_result", InvertIndexReducer.class, job); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { ToolRunner.run(new Step7_InvertIndex(), args); } } ``` web项目导入之后建表 ```sql create 'table_info','info' put 'table_info','invertindex_result','info:owner','briup' ``` step1 将爬取到的数据进行清洗;首先剔除抓取失败的数据;判断抓取是否成功根据`f:st` 的值;为2的时候表示success;保留列族 `il`下列的个数、列族`ol`下列的个数 、`s:s`、`p:t`、`p:c`,整体保留列族`il`和`ol`;构建出Result继续保存到新表 `clean_webpage` step2 使用pagerank算法进行权重计算 1. 平铺自己所有的外连接 2. 给自己一个默认的权重值 3. 将自己的权重值平均分配给自己所有的外连接 4. 对于没有外链的权重直接给0 代码要复用第一次逻辑和之后的每一次逻辑不同;第一次数据从'clean_webpage'表获取 之后保存到'rank_result' 表 ;后面每一次的运算结果要从'rank_result'拿出数据计算之后再次存储到'rank_result' step3 查找每个网页的关键字 从'clean_webpage'提取title和第一个不为空的入链内容 如果第一个入链的文本为空则往后顺延 如果没有入链则关键字为标题;将获取的关键字作为值添加到表'clean_webpage'的列族'page'下 列名为'key' step4 表连接前做标记 为了将来在做连接的时候能够更明确数据从那张表获取 先给表加一个标记 分别在两张表的列族 'page'里边 加一个列i 其中表'clean_webpage' 的值为a 表'rank_result'的值为b step5