流程简述

将coordinate和work进行rpc套接字连接

每当一个空闲work连接到coordinate上时,coordinate会向它分配任务,让work去读取某一类文件

当work进行任务时,会时不时像coordinate发出请求证明自己还活着

当work完成任务时,会向coordinate发送完成任务编号

当coordinate发现某一work 10s没有反应时,将他的任务分给别人

rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
X int
}

type ExampleReply struct {
Y int
}

// Add your RPC definitions here.
type CallFileArgs struct {
X int
}
type CallFileReply struct {
File string
X int
}
type CallOKArgs struct {
X int
}
type CallOKReply struct {
X int
}
type CallOverArgs struct {
File string
X int
}
type CallOverReply struct {
X int
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {
s := "/var/tmp/5840-mr-"
s += strconv.Itoa(os.Getuid())
return s
}

coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package mr

import "log"
import "net"
import "os"
import "io/ioutil"
import "net/rpc"
import "net/http"
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
)

type Coordinator struct {
// Your definitions here.
m map[string]int
file []string
nReduce int
mrFile string
ss int
mi int
cnt int
over map[string]int
ff map[int]int64
ffs map[int]string
}

var (
wg sync.WaitGroup
m sync.Mutex
mw sync.Mutex
cmm sync.Mutex
)

// Your code here -- RPC handlers for the worker to call.
func (c *Coordinator) AllotFile(args *CallFileArgs, reply *CallFileReply) error {
f := "mr1/"
files, err := ioutil.ReadDir(f)
if err != nil {
fmt.Println(11)
return err
}
a := make(map[string]int)
for _, filename := range files {
if strings.HasPrefix(filename.Name(), "mr-") {
fn := filename.Name()
ffn := fn[len(fn)-6:]
a[ffn[:1]] += 1

}

}
cc1 := 0

for i := 0; i <= 9; i++ {
if reply.X > 0 {
break
}
h := strconv.Itoa(i)
if c.mi != 0 && a[h] == c.mi {

m.Lock()

if c.m[h] != 100001 && (c.m[h] == 0 || (c.ff[c.m[h]] == -1 && c.ffs[c.m[h]] == h)) {

reply.File = h
reply.X = 2

c.m[h] = args.X

}
m.Unlock()
cc1 += 1

}

}
if cc1 > 0 && reply.X > 0 {
m.Lock()
c.ss = cc1
m.Unlock()
}
if reply.X > 0 {

tn := time.Now()
m.Lock()
c.ff[args.X] = tn.Unix()
c.ffs[args.X] = reply.File
m.Unlock()
return nil
}
c.file[0] = "../"
files, err = ioutil.ReadDir(c.file[0])
if err != nil {
fmt.Println(12)
return err
}
mi := 0
if c.mi == 0 {
for _, file := range files {

if strings.HasPrefix(file.Name(), "pg-") {
mi += 1
}

}
}
if c.mi == 0 {
m.Lock()
c.mi = mi
m.Unlock()
}
for _, file := range files {
if reply.X > 0 {
break
}
if strings.HasPrefix(file.Name(), "pg-") {
filename := file.Name()
// fmt.Println(c.m[filename])
m.Lock()
if c.m[filename] != 100001 && (c.m[filename] == 0 || (c.ff[c.m[filename]] == -1 && c.ffs[c.m[filename]] == filename)) {
reply.File = filename
reply.X = 1

c.m[filename] = args.X

}
m.Unlock()
}
}
if reply.X > 0 {

tn := time.Now()
m.Lock()
c.ff[args.X] = tn.Unix()
c.ffs[args.X] = reply.File
m.Unlock()
return nil
}
// fmt.Println(1)
return nil
}
func (c *Coordinator) CheckOK(args *CallOKArgs, reply *CallOKReply) error {
if args.X != 0 {
if c.mi != 0 && c.cnt == c.mi+10 {
reply.X = 2
}

}
return nil
}
func (c *Coordinator) CheckOver(args *CallOverArgs, reply *CallOverReply) error {

if args.X > 0 {
m.Lock()
c.m[args.File] = 100001
c.cnt += 1
m.Unlock()
// fmt.Println(args.File)
}

return nil
}

// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {
ret := false
if c.ss == 10 && c.mi != 0 && c.cnt == c.mi+c.ss {
ret = true
}
tn := time.Now().Unix()
m.Lock()
ff1 := c.ff
m.Unlock()
for k, v := range ff1 {
if tn-v > 10 && v != 0 {
m.Lock()
c.ff[k] = -1
m.Unlock()
}
}
// Your code here.

return ret
}

// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c.m = make(map[string]int)
c.over = make(map[string]int)
c.ff = make(map[int]int64)
c.ffs = make(map[int]string)
c.file = append(files)
c.nReduce = nReduce
// Your code here.
dirPath := "mr1/"
_, err := os.Stat(dirPath)
if os.IsNotExist(err) {
// fmt.Printf("文件夹 %s 不存在\n", dirPath)
} else if err != nil {
fmt.Printf("检查文件夹时出错: %v\n", err)
} else {
err = os.RemoveAll(dirPath)
if err != nil {
fmt.Printf("删除文件夹时出错: %v\n", err)
}
}

// 使用os.RemoveAll删除文件夹
time.Sleep(10 * time.Millisecond)

_ = os.Mkdir(dirPath, 0755)
c.server()
return &c
}

worker.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package mr

import (
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"os"
"sort"
"strconv"
"time"
)
import "log"
import "net/rpc"
import "hash/fnv"

// Map functions return a slice of KeyValue.
type KeyValue struct {
Key string
Value string
}
type ByKey []KeyValue

func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.
id := rand.Intn(10000000)
a := 1
b := 1
// uncomment to send the Example RPC to the coordinator.
// CallExample()
// wg.Add(1)

go CallOK(id, &a)

for a == 1 {
if a == 0 {
return
}
CallFile(id, mapf, reducef, &b)

time.Sleep(5 * time.Millisecond)
}
// wg.Wait()
}
func CallFile(id int, mapf func(string, string) []KeyValue,
reducef func(string, []string) string, b *int) {
args := CallFileArgs{}
args.X = id
reply := CallFileReply{}
// fmt.Printf("call failed!\n")
ok := call("Coordinator.AllotFile", &args, &reply)
// fmt.Println(ok, "11111", reply, id)
if ok {
*b = 1
// fmt.Println(reply)
go jstime(b)
if reply.X == 1 {
mapFile(&reply, mapf, id)
} else if reply.X == 2 {
reduceFile(&reply, reducef, id)
}
*b = 0
} else {
fmt.Printf("call failed!\n")
}
// wg.Done()
}
func jstime(b *int) {
cnt := 0
for *b == 1 {
time.Sleep(1 * time.Millisecond)
cnt += 1
if cnt == 100000 {
os.Exit(1)
}
if *b == 0 {
return
}
}
}
func mapFile(reply *CallFileReply, mapf func(string, string) []KeyValue, id int) {
filename := reply.File
// fmt.Printf("%v\n", filename)
filename = "../" + filename
file, err := os.Open(filename)
if err != nil {
fmt.Printf("cannot open %v", filename)
log.Fatal("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {

fmt.Printf("cannot read %v", filename)
log.Fatal("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))

for i := 0; i <= 9; i++ {
filename := "mr1/mr-" + reply.File[:6] + strconv.Itoa(i) + ".json"
if _, err := os.Stat(filename); os.IsNotExist(err) {
// fmt.Printf("File %s does not exist.\n", filename)
} else {
// fmt.Printf("File %s exists.\n", filename)
return
}
file, err := os.Create(filename)
if err != nil {
fmt.Printf("cannot open %v", filename)
}
enc := json.NewEncoder(file)
for _, j := range kva {

xx := j
xkey := ihash(xx.Key)
if err != nil {
fmt.Println("atoierr")
}

if xkey%10 == i {
err := enc.Encode(&xx)
if err != nil {
fmt.Printf("dialing:", err)
}
}

}
file.Close()
}
args := CallOverArgs{}
args.File = reply.File
args.X = id
rep := CallOverReply{}
mw.Lock()
_ = call("Coordinator.CheckOver", &args, &rep)
mw.Unlock()
}
func reduceFile(reply *CallFileReply, reducef func(string, []string) string, id int) {
filename := reply.File
f := "mr1/"
// fmt.Printf("%v %s\n", f, filename)
files, err := ioutil.ReadDir(f)
if err != nil {
fmt.Printf("cannot open %v", filename)
}
kva := []KeyValue{}
for _, file1 := range files {
ff1 := file1.Name()
ffn := ff1[len(ff1)-6:]
if ffn[:1] == filename {
// fmt.Printf("%v\n", file1)
ff1 = "mr1/" + ff1
file, err := os.Open(ff1)
if err != nil {
fmt.Printf("cannot open %v\n", ff1)
}

dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
}

}

sort.Sort(ByKey(kva))
// fna, _ := strconv.Atoi(filename)
// fna += 1
// ffc := strconv.Itoa(fna)
oname := "mr-out-" + filename

i := 0
ss := ""
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)

// this is the correct format for each line of Reduce output.
ss += fmt.Sprintf("%v %v\n", kva[i].Key, output)

i = j
}

if _, err := os.Stat(oname); os.IsNotExist(err) {
// fmt.Printf("File %s does not exist.\n", filename)
} else {
// fmt.Printf("File %s exists.\n", filename)
return
}
ofile, _ := os.Create(oname)
fmt.Fprintf(ofile, "%v\n", ss)
ofile.Close()
args := CallOverArgs{}
args.File = oname
args.X = id
rep := CallOverReply{}
mw.Lock()
_ = call("Coordinator.CheckOver", &args, &rep)
mw.Unlock()
}
func CallOK(id int, a *int) {

args := CallOKArgs{}
args.X = id
reply := CallOKReply{}
for args.X == id {
mw.Lock()
_ = call("Coordinator.CheckOK", &args, &reply)
mw.Unlock()
if reply.X == 2 {
*a = 0
// defer wg.Done()
os.Exit(0)
}
reply.X = 0
time.Sleep(10 * time.Millisecond)
}

}

// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()

err = c.Call(rpcname, args, reply)
if err == nil {
return true
}

fmt.Println(err)
return false
}

尾声

该代码未通过竞争性检测,将来有时间或许会用chan重构