master节点
package mr
import (
"log"
"net"
"net/http"
"net/rpc"
"os"
"sync"
"time"
)
// coordinator -> master节点
// 1处理worker节点的rpc请求
// 2维护任务状态
// 3检测任务是否超时,任务执行时间太久要给他停掉然后重新分配
type Task struct {
FileName string
Status string
StartTime time.Time
TaskID int
}
type Coordinator struct {
// Your definitions here.
mu sync.Mutex //worker节点并发来请求,所以需要加锁
mapTasks []Task
reduceTasks []Task
nReduce int //下面的MakeCoordinator函数参数中有一个nReduce
mapFinished bool //map完成后进入reduce阶段
allFinished bool
files []string
nextTaskID int
}
// Your code here -- RPC handlers for the worker to call.
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
// c是方法的接收者,类似于其他原因的this或者self
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
// 检查是否存在超时任务
c.checkTimeOut()
// 如果所有任务完成,通知worker退出
if c.allFinished {
reply.TaskType = ExitTask
return nil
}
// 如果map任务还没执行完,给worker分配map任务
if !c.mapFinished {
for i, task := range c.mapTasks {
if task.Status == Idle {
reply.TaskID = task.TaskID
reply.TaskType = MapTask
reply.FileName = task.FileName
reply.NReduce = c.nReduce //map后会分到n个桶
// 更新任务状态
c.mapTasks[i].Status = InProgress
c.mapTasks[i].StartTime = time.Now()
return nil
}
}
// map没有全部完成,但是找不到一个空闲的map任务
reply.TaskType = WaitTask
return nil
}
//所有map都执行完,则分配reduce任务
for i, task := range c.reduceTasks {
if task.Status == Idle {
reply.TaskID = task.TaskID
reply.TaskType = ReduceTask
reply.ReduceTaskNum = i //第几个reduce任务(编号)
reply.MapTaskNum = len(c.mapTasks) //有多少个map任务
//更新任务状态
c.reduceTasks[i].Status = InProgress
c.reduceTasks[i].StartTime = time.Now()
return nil
}
}
// 没有空闲的reduce任务
reply.TaskType = WaitTask
return nil
}
func (c *Coordinator) checkTimeOut() {
// 超时时间10s
timeOut := 10 * time.Second
now := time.Now()
if !c.mapFinished {
allCompleted := true
for i, task := range c.mapTasks {
if task.Status == InProgress && now.Sub(task.StartTime) > timeOut {
// 任务已经超时了
c.mapTasks[i].Status = Idle
}
if task.Status != Completed {
allCompleted = false
}
}
c.mapFinished = allCompleted
}
allCompleted := true
for i, task := range c.reduceTasks {
if task.Status == InProgress && now.Sub(task.StartTime) > timeOut {
c.reduceTasks[i].Status = Idle
}
if task.Status != Completed {
allCompleted = false
}
}
c.allFinished = allCompleted
}
func (c *Coordinator) ReportTaskDone(args *ReportTaskArgs, reply *ReportTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if args.TaskType == MapTask {
for i, task := range c.mapTasks {
if task.TaskID == args.TaskID && task.Status == InProgress {
c.mapTasks[i].Status = Completed
allCompleted := true
for _, task := range c.mapTasks {
if task.Status != Completed {
allCompleted = false
break
}
}
c.allFinished = allCompleted
reply.OK = true
return nil
}
}
} else if args.TaskType == ReduceTask {
for i, task := range c.reduceTasks {
if task.TaskID == args.TaskID && task.Status == InProgress {
c.reduceTasks[i].Status = Completed
allCompleted := true
for _, task := range c.reduceTasks {
if task.Status != Completed {
allCompleted = false
break
}
}
c.allFinished = allCompleted
reply.OK = true
return nil
}
}
}
reply.OK = false
return nil
}
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
ret := false
// Your code here.
if c.allFinished {
ret = true
}
return ret
}
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
// Your code here.
c := Coordinator{
files: files,
nReduce: nReduce,
mapTasks: make([]Task, len(files)),
reduceTasks: make([]Task, nReduce),
mapFinished: false,
allFinished: false,
nextTaskID: 0,
// mu不需要初始化
}
// 初始化map任务
for i, file := range c.files {
c.mapTasks[i] = Task{
TaskID: i,
FileName: file,
Status: Idle,
}
}
// 初始化reduce任务
// reduce任务输入的是一些中间文件,是之后要产生的,现在并不知道名字,所以FileName不需要初始化
for i := 0; i < nReduce; i++ {
c.reduceTasks[i] = Task{
TaskID: i,
Status: Idle,
}
}
c.server()
return &c
}
worker节点
package mr
import (
"encoding/json"
"fmt"
"hash/fnv"
"io"
"log"
"net/rpc"
"os"
"sort"
"time"
)
/**
* 请求任务
* 执行map或reduce任务
* 处理文件输入输出
* 汇报任务状态
*/
// Map functions return a slice of KeyValue.
type KeyValue struct {
Key string
Value string
}
// for sorting by key.
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
workerID := os.Getpid()
for {
task := getTask(workerID)
switch task.TaskType {
case MapTask:
doMap(task, mapf, workerID)
case ReduceTask:
doReduce(task, reducef, workerID)
case WaitTask:
time.Sleep(500 * time.Millisecond)
continue
case ExitTask:
return
}
}
}
func doMap(task GetTaskReply, mapf func(string, string) []KeyValue, workerID int) {
filename := task.FileName
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := io.ReadAll(file)
if err != nil {
log.Fatalf("connot read %v", filename)
}
file.Close()
// 调用用户自定义的map函数,得到键值对
kva := mapf(filename, string(content))
// 将中间结果分成nReduce个桶
intermediate := make([][]KeyValue, task.NReduce)
for _, kv := range kva {
bucketNum := ihash(kv.Key) % task.NReduce
intermediate[bucketNum] = append(intermediate[bucketNum], kv)
}
// 将每个桶放到对应的临时文件中
for i := 0; i < task.NReduce; i++ {
tempFile, err := os.CreateTemp("", "mr-tmp-*")
if err != nil {
log.Fatalf("connot create tmp file")
}
enc := json.NewEncoder(tempFile)
for _, kv := range intermediate {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("connot encode %v", kv)
}
}
tempFile.Close()
// 将临时文件重命名 mr-map任务编号-reduce桶编号
os.Rename(tempFile.Name(), fmt.Sprintf("mr-%d-%d", task.TaskID, i))
}
reportTaskDone(task.TaskType, task.TaskID, workerID)
}
func doReduce(task GetTaskReply, reducef func(string, []string) string, workerID int) {
reduceTaskNum := task.ReduceTaskNum
mapTaskNum := task.MapTaskNum
intermediate := []KeyValue{}
// 读取该reduce负责的中间文件
for i := 0; i < mapTaskNum; i++ {
filename := fmt.Sprintf("mr-%d-%d", i, reduceTaskNum)
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
file.Close()
}
sort.Sort(ByKey(intermediate))
// 创建输出的临时文件
tempfile, err := os.CreateTemp("", "mr-out-tmp-*")
if err != nil {
log.Fatalf("connot create temp file")
}
// 对每一个key调一下reduce函数
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(tempfile, "%v %v\n", intermediate[i].Key, output)
i = j
}
tempfile.Close()
os.Rename(tempfile.Name(), fmt.Sprintf("mr-out-%d", reduceTaskNum))
reportTaskDone(ReduceTask, task.TaskID, workerID)
}
func getTask(workerID int) GetTaskReply {
args := GetTaskArgs{WorkerID: workerID}
reply := GetTaskReply{}
call("Coordinator.GetTask", &args, &reply)
return reply
}
func reportTaskDone(taskType string, taskID int, workerID int) {
args := ReportTaskArgs{TaskType: taskType, TaskID: taskID, WorkerID: workerID, Completed: true}
reply := ReportTaskReply{}
call("Coordinator.ReportTaskDone", &args, &reply)
}
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
}
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
rpc.go
package mr
//
// RPC definitions.
//
// remember to capitalize all names.
//
import (
"os"
"strconv"
)
//
// example to show how to declare the arguments
// and reply for an RPC.
//
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// Add your RPC definitions here.
// 任务类型常量
const (
MapTask = "map"
ReduceTask = "reduce"
ExitTask = "exit"
WaitTask = "wait"
)
// 任务状态常量
const (
Idle = "idle"
InProgress = "in-progress"
Completed = "completed"
)
// 请求任务
type GetTaskArgs struct {
WorkerID int
}
type GetTaskReply struct {
TaskID int
TaskType string
FileName string
MapTaskNum int //map任务总数
ReduceTaskNum int //reduce任务的编号,负责分区编号,表示当前Reduce任务负责处理的分区编号。
// Map任务完成后,结果会分到NReduce个桶里面,Reduce任务需要从所有Map任务的中间文件中读取对应分区的数据
NReduce int //一共有多少个reduce任务
}
// 汇报任务状态
type ReportTaskArgs struct {
TaskType string
WorkerID int
TaskID int
Completed bool
}
type ReportTaskReply struct {
OK bool
}
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}