PageRank 是 Google 创始人拉里·佩奇(Larry Page)和谢尔盖·布林(Sergey Brin)在 1998 年提出的一种网页排名算法,用于衡量网页“重要性”的一种方式。它是搜索引擎中用于排序网页的一种基础算法
一个网页越是被其他重要网页链接,它就越重要
PageRank 的计算流程
初始化:假设总共 N 个网页,每个网页初始 PR 值为 1/N。
迭代计算:通过 MapReduce 不断迭代更新 PR 值,直到值趋于稳定。
结果输出:PR 值越大,说明该网页越重要,排名越靠前
A 0.25 B C D
B 0.25 A D
C 0.25 C
D 0.25 B C
第一列:网页编号(如 A)
第二列:初始 PageRank 值(例如 0.25)
后续列:该网页链接到的其他网页
迭代的计算PageRank
值,每次MapReduce 的输出要和输入的格式是一样的,这样才能使得Mapreduce 的输出用来作为下一轮MapReduce 的输入
Map过程
解析输入行,提取:
当前网页 ID
当前网页的 PR 值
当前网页链接的其他网页列表
计算出要链接到的其他网友的个数,然后求出当前网页对其他网页的贡献值。
第一种输出的< key ,value>
中的key
表示其他网页,value
表示当前网页对其他网页的贡献值
为了区别这两种输出
出链网页贡献值(标记为 @):<出链网页, @贡献值>
第二种输出的< key ,value>
中的key
表示当前网页,value
表示所有其他网页。
网页链接列表(标记为 &):<当前网页, &链接网页列表>
B @0.0833
C @0.0833
D @0.0833
A &B C D
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*map过程*/
public class MyMapper extends Mapper<Object,Text,Text,Text>{
private String id;
private float pr;
private int count;
private float average_pr;
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
StringTokenizer str = new StringTokenizer(value.toString());//对value进行解析
id =str.nextToken();//id为解析的第一个词,代表当前网页
pr = Float.parseFloat(str.nextToken());//pr为解析的第二个词,转换为float类型,代表PageRank值
count = str.countTokens();//count为剩余词的个数,代表当前网页的出链网页个数
average_pr = pr/count;//求出当前网页对出链网页的贡献值
String linkids ="&";//下面是输出的两类,分别有'@'和'&'区分
while(str.hasMoreTokens()){
String linkid = str.nextToken();
context.write(new Text(linkid),new Text("@"+average_pr));//输出的是<出链网页,获得的贡献值>
linkids +=" "+ linkid;
}
context.write(new Text(id), new Text(linkids));//输出的是<当前网页,所有出链网页>
}
}
输入数据格式(value):网页ID PageRank值 出链网页1 出链网页2 ...
输出键值对:
<出链网页ID, "@贡献值">
(表示这个网页从别的网页获得了多少贡献)<当前网页ID, "& 出链网页列表">
(保留网页结构)
String id; // 当前网页ID
float pr; // 当前网页的PageRank值
int count; // 出链网页的数量
float average_pr; // 当前网页对每个出链网页的平均贡献值
StringTokenizer str = new StringTokenizer(value.toString());是把整行字符串(比如 "A 1.0 B C D")按照空格分割成一个个小单元(token)
id = str.nextToken(); // 第一个token是当前网页ID------取出第一个单词(比如 A
),表示当前正在处理的网页 ID,赋值给 id
pr = Float.parseFloat(str.nextToken()); // 第二个token是当前网页的PageRank值
取出第二个单词(比如 "1.0"
),将其转为 float
类型,就是当前网页的 PageRank 值,赋值给 pr
count = str.countTokens();// 剩下的token是出链网页数量----
统计剩余 token 的数量
average_pr = pr / count; //把当前网页的 PageRank 值平均分配给所有它链接的网页
贡献值输出:
while(str.hasMoreTokens()) {
String linkid = str.nextToken(); // B, 然后 C, 然后 D
context.write(new Text(linkid), new Text("@" + average_pr));
linkids += linkid + " "; // 把 B、C、D 加入 linkids 中
}
str.hasMoreTokens() 只要还有未读取的 token(即还有出链网页没处理完),就继续执行循环体
网页结构输出(带 &
开头):
String linkids记录当前网页的所有出链网页 ID
context.write(new Text(id), new Text(linkids));
Shuffle 是指 Map 阶段输出的数据按照 key 进行分组,并将具有相同 key 的数据发送到同一个 Reduce 任务中处理的过程
每个网页 Map 阶段都会:
向它出链的网页发 PageRank 贡献(加@前缀)
自己保留一份出链结构
Shuffle 阶段:按网页ID归并聚合
对 Map 输出的 key(网页 ID)进行排序
将相同 key 的所有 value 合并成一个列表
Reducer 接收到的格式为:<网页ID, [贡献值, 出链结构]>
<网页ID, 列表[@贡献1, @贡献2, ..., &出链结构]>
Reduce过程
求每个网页的新 PageRank 值
保留该网页的出链结构
输出格式为:
网页ID 新的PR值 出链网页列表
shuffule
的输出也即是reduce
的输入。
reduce
输入的key
直接作为输出的key
对reduce
输入的value
进行解析,它是一个列表
a.若列表里的值里包含`@`,就把该值`@`后面的字符串转化成`float`型加起来
b.若列表里的值里包含`&`,就把该值`&`后面的字符串提取出来
c.把所有贡献值的加总,和提取的字符串进行连接,作为`reduce`的输出`value`
public class MyReducer extends Reducer<Text,Text,Text,Text>{
继承 Hadoop 提供的 Reducer
类,泛型参数说明:
Text, Text
:输入的 key 和 value 类型Text, Text
:输出的 key 和 value 类型
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
为每一个网页 key
传入一个 values
列表,里面是 Shuffle 过程收集到的所有值
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reduce过程:计算每个网页的新PageRank值,并保留出链网页结构。
* 输入:<网页ID, [@贡献值, @贡献值, ..., &出链网页列表]>
* 输出:<网页ID, 新PageRank值 + 出链网页列表>
*/
public class MyReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String lianjie = ""; // 用于保存当前网页的出链网页列表(结构信息)
float pr = 0; // 用于累加当前网页从其他网页获得的PageRank贡献值
// 遍历所有传入的值:包含两类信息,分别通过首字符判断
for (Text val : values) {
String strVal = val.toString(); // 当前值转换为字符串
if (strVal.substring(0, 1).equals("@")) {
// 以@开头,表示这是从其他网页传来的PageRank贡献值
// 取出@后面的数值并累加
pr += Float.parseFloat(strVal.substring(1));
} else if (strVal.substring(0, 1).equals("&")) {
// 以&开头,表示这是本网页的出链结构信息
// 将&后面的网页列表保留下来
lianjie += strVal.substring(1); // 注意可能是多个网页用空格分隔
}
}
// 平滑处理(加入跳转因子d = 0.8)
// 假设网页总数为4,(1 - d) / N = 0.2 * 0.25 = 0.05
// 新PageRank = d * 贡献值总和 + (1 - d)/N
pr = 0.8f * pr + 0.2f * 0.25f;
// 构造输出字符串:新PR值 + 出链网页列表
String result = pr + lianjie;
// 输出结果:<当前网页ID, 新的PageRank值 + 出链网页列表>
context.write(key, new Text(result));
}
}
遍历所有值,分类处理
pr += Float.parseFloat(val.toString().substring(1));
如果是 @
开头,就从第 1 个字符开始截取字符串(去掉 @
),再把它转换成浮点数,并累加到 pr
中
lianjie += val.toString().substring(1);
如果是 &
开头,就把 &
后面的出链网页字符串加到变量 lianjie
中
以
@
开头:表示来自其他网页的 PageRank 贡献值,提取并累加。以
&
开头:表示这是该网页自身的 出链网页结构,保留下来。
pr = 0.8f * pr + 0.2f * 0.25f;
PageRank 中的阻尼系数模型:
0.8f
:阻尼系数 d(表示 80% 用户点击链接)0.2f
:1 - d,有 20% 用户会随机跳转0.25f
:假设网页总数是 4 个,随机跳转概率均分为 0.25
PR(A) = d × 所有贡献值之和 + (1 - d) / N
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;
public class MyRunner {
public static void main(String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
// 创建 Hadoop 配置对象
Configuration conf = new Configuration();
// 使用控制台输入获取初始输入路径和输出路径
Scanner sc = new Scanner(System.in);
System.out.print("inputPath:");
String inputPath = sc.next(); // 第一次输入的 HDFS 输入路径,如:/pagerank/input
System.out.print("outputPath:");
String outputPath = sc.next(); // 第一次输出的 HDFS 路径,如:/pagerank/output
// 进行 PageRank 的迭代计算,这里迭代 5 次
for (int i = 1; i <= 5; i++) {
// 创建新的 MapReduce 作业
Job job = Job.getInstance(conf);
// 设置 Job 的主类,用于打包 Jar
job.setJarByClass(MyRunner.class);
// 设置 Map 和 Reduce 的处理类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
// 设置 Map 阶段输出键值对类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置 Reduce 阶段输出键值对类型(最终输出)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入数据路径(每轮迭代输入路径是上一轮的输出)
FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000" + inputPath));
// 设置输出数据路径(每轮迭代输出不同路径)
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));
// 更新下一轮迭代的输入输出路径
inputPath = outputPath; // 当前输出变为下一轮的输入
outputPath = outputPath + i; // 每次输出加上数字以区分路径(如 output1, output2,...)
// 提交作业并等待执行完成
job.waitForCompletion(true);
}
// 读取最终输出文件内容并打印到控制台
try {
// 获取 Hadoop 文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
// 拼接最终输出文件的路径(最后一轮输出的 part-r-00000)
Path srcPath = new Path(outputPath.substring(0, outputPath.length() - 1) + "/part-r-00000");
// 打开输出文件
FSDataInputStream is = fs.open(srcPath);
// 打印最终结果到控制台
System.out.println("Results:");
while (true) {
String line = is.readLine(); // 读取一行结果
if (line == null) break; // 如果到文件末尾,结束循环
System.out.println(line); // 打印当前行
}
is.close(); // 关闭输入流
} catch (Exception e) {
e.printStackTrace(); // 如果读取输出失败,打印错误
}
}
}