MapReduce

简述

MapReduce是一种计算模型,该模型可以将大型数据处理任务分解成很多单个的、可以在服务器集群中并行执行的任务,而这些任务的计算结果可以合并在一起来计算最终的结果。

MapRuduce可以计算大量文章单词的个数

流程

程序执行

实验开始

观察测试文件 testmr.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/env bash

#
# map-reduce tests
#

# un-comment this to run the tests with the Go race detector.
# RACE=-race

if [[ "$OSTYPE" = "darwin"* ]]
then
if go version | grep 'go1.17.[012345]'
then
# -race with plug-ins on x86 MacOS 12 with
# go1.17 before 1.17.6 sometimes crash.
RACE=
echo '*** Turning off -race since it may not work on a Mac'
echo ' with ' `go version`
fi
fi

ISQUIET=$1
maybe_quiet() {
if [ "$ISQUIET" == "quiet" ]; then
"$@" > /dev/null 2>&1
else
"$@"
fi
}


TIMEOUT=timeout
TIMEOUT2=""
if timeout 2s sleep 1 > /dev/null 2>&1
then
:
else
if gtimeout 2s sleep 1 > /dev/null 2>&1
then
TIMEOUT=gtimeout
else
# no timeout command
TIMEOUT=
echo '*** Cannot find timeout command; proceeding without timeouts.'
fi
fi
if [ "$TIMEOUT" != "" ]
then
TIMEOUT2=$TIMEOUT
TIMEOUT2+=" -k 2s 120s "
TIMEOUT+=" -k 2s 45s "
fi

# run the test in a fresh sub-directory.
rm -rf mr-tmp
mkdir mr-tmp || exit 1
cd mr-tmp || exit 1
rm -f mr-*

# make sure software is freshly built.
(cd ../../mrapps && go clean)
(cd .. && go clean)
(cd ../../mrapps && go build $RACE -buildmode=plugin wc.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin indexer.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin mtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin rtiming.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin jobcount.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin early_exit.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin crash.go) || exit 1
(cd ../../mrapps && go build $RACE -buildmode=plugin nocrash.go) || exit 1
(cd .. && go build $RACE mrcoordinator.go) || exit 1
(cd .. && go build $RACE mrworker.go) || exit 1
(cd .. && go build $RACE mrsequential.go) || exit 1

failed_any=0

#########################################################
# first word-count

# generate the correct output
../mrsequential ../../mrapps/wc.so ../pg*txt || exit 1
sort mr-out-0 > mr-correct-wc.txt
rm -f mr-out*

echo '***' Starting wc test.

maybe_quiet $TIMEOUT ../mrcoordinator ../pg*txt &
pid=$!

# give the coordinator time to create the sockets.
sleep 1

# start multiple workers.
(maybe_quiet $TIMEOUT ../mrworker ../../mrapps/wc.so) &
(maybe_quiet $TIMEOUT ../mrworker ../../mrapps/wc.so) &
(maybe_quiet $TIMEOUT ../mrworker ../../mrapps/wc.so) &

# wait for the coordinator to exit.
wait $pid

# since workers are required to exit when a job is completely finished,
# and not before, that means the job has finished.
sort mr-out* | grep . > mr-wc-all
if cmp mr-wc-all mr-correct-wc.txt
then
echo '---' wc test: PASS
else
echo '---' wc output is not the same as mr-correct-wc.txt
echo '---' wc test: FAIL
failed_any=1
fi

# wait for remaining workers and coordinator to exit.
wait
# exit 1
#########################################################
# now indexer
rm -f mr-*

# generate the correct output
../mrsequential ../../mrapps/indexer.so ../pg*txt || exit 1
sort mr-out-0 > mr-correct-indexer.txt
rm -f mr-out*

echo '***' Starting indexer test.

maybe_quiet $TIMEOUT ../mrcoordinator ../pg*txt &
sleep 1

# start multiple workers
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/indexer.so &
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/indexer.so

sort mr-out* | grep . > mr-indexer-all
if cmp mr-indexer-all mr-correct-indexer.txt
then
echo '---' indexer test: PASS
else
echo '---' indexer output is not the same as mr-correct-indexer.txt
echo '---' indexer test: FAIL
failed_any=1
fi

wait
# exit 0
#########################################################
echo '***' Starting map parallelism test.

rm -f mr-*

maybe_quiet $TIMEOUT ../mrcoordinator ../pg*txt &
sleep 1

maybe_quiet $TIMEOUT ../mrworker ../../mrapps/mtiming.so &
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/mtiming.so

NT=`cat mr-out* | grep '^times-' | wc -l | sed 's/ //g'`
if [ "$NT" != "2" ]
then
echo '---' saw "$NT" workers rather than 2
echo '---' map parallelism test: FAIL
failed_any=1
fi

if cat mr-out* | grep '^parallel.* 2' > /dev/null
then
echo '---' map parallelism test: PASS
else
echo '---' map workers did not run in parallel
echo '---' map parallelism test: FAIL
failed_any=1
fi

wait

# exit 0
#########################################################
echo '***' Starting reduce parallelism test.

rm -f mr-*

maybe_quiet $TIMEOUT ../mrcoordinator ../pg*txt &
sleep 1

maybe_quiet $TIMEOUT ../mrworker ../../mrapps/rtiming.so &
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/rtiming.so

NT=`cat mr-out* | grep '^[a-z] 2' | wc -l | sed 's/ //g'`
if [ "$NT" -lt "2" ]
then
echo '---' too few parallel reduces.
echo '---' reduce parallelism test: FAIL
failed_any=1
else
echo '---' reduce parallelism test: PASS
fi

wait
# exit 0
#########################################################
echo '***' Starting job count test.

rm -f mr-*

maybe_quiet $TIMEOUT ../mrcoordinator ../pg*txt &
sleep 1

maybe_quiet $TIMEOUT ../mrworker ../../mrapps/jobcount.so &
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/jobcount.so
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/jobcount.so &
maybe_quiet $TIMEOUT ../mrworker ../../mrapps/jobcount.so

NT=`cat mr-out* | awk '{print $2}'`
if [ "$NT" -eq "8" ]
then
echo '---' job count test: PASS
else
echo '---' map jobs ran incorrect number of times "($NT != 8)"
echo '---' job count test: FAIL
failed_any=1
fi

wait
# exit 0
#########################################################
# test whether any worker or coordinator exits before the
# task has completed (i.e., all output files have been finalized)
rm -f mr-*

echo '***' Starting early exit test.

DF=anydone$$
rm -f $DF

(maybe_quiet $TIMEOUT ../mrcoordinator ../pg*txt; touch $DF) &

# give the coordinator time to create the sockets.
sleep 1

# start multiple workers.
(maybe_quiet $TIMEOUT ../mrworker ../../mrapps/early_exit.so; touch $DF) &
(maybe_quiet $TIMEOUT ../mrworker ../../mrapps/early_exit.so; touch $DF) &
(maybe_quiet $TIMEOUT ../mrworker ../../mrapps/early_exit.so; touch $DF) &

# wait for any of the coord or workers to exit.
# `jobs` ensures that any completed old processes from other tests
# are not waited upon.
jobs &> /dev/null
if [[ "$OSTYPE" = "darwin"* ]]
then
# bash on the Mac doesn't have wait -n
while [ ! -e $DF ]
do
sleep 0.2
done
else
# the -n causes wait to wait for just one child process,
# rather than waiting for all to finish.
wait -n
fi

rm -f $DF

# a process has exited. this means that the output should be finalized
# otherwise, either a worker or the coordinator exited early
sort mr-out* | grep . > mr-wc-all-initial

# wait for remaining workers and coordinator to exit.
wait

# compare initial and final outputs
sort mr-out* | grep . > mr-wc-all-final
if cmp mr-wc-all-final mr-wc-all-initial
then
echo '---' early exit test: PASS
else
echo '---' output changed after first worker exited
echo '---' early exit test: FAIL
failed_any=1
fi
# exit 0
rm -f mr-*

#########################################################
echo '***' Starting crash test.

# generate the correct output
../mrsequential ../../mrapps/nocrash.so ../pg*txt || exit 1
sort mr-out-0 > mr-correct-crash.txt
rm -f mr-out*

rm -f mr-done
((maybe_quiet $TIMEOUT2 ../mrcoordinator ../pg*txt); touch mr-done ) &
sleep 1

# start multiple workers
maybe_quiet $TIMEOUT2 ../mrworker ../../mrapps/crash.so &

# mimic rpc.go's coordinatorSock()
SOCKNAME=/var/tmp/5840-mr-`id -u`

( while [ -e $SOCKNAME -a ! -f mr-done ]
do
maybe_quiet $TIMEOUT2 ../mrworker ../../mrapps/crash.so
sleep 1
done ) &

( while [ -e $SOCKNAME -a ! -f mr-done ]
do
maybe_quiet $TIMEOUT2 ../mrworker ../../mrapps/crash.so
sleep 1
done ) &

while [ -e $SOCKNAME -a ! -f mr-done ]
do
maybe_quiet $TIMEOUT2 ../mrworker ../../mrapps/crash.so
sleep 1
done

wait

rm $SOCKNAME
sort mr-out* | grep . > mr-crash-all
if cmp mr-crash-all mr-correct-crash.txt
then
echo '---' crash test: PASS
else
echo '---' crash output is not the same as mr-correct-crash.txt
echo '---' crash test: FAIL
failed_any=1
fi

#########################################################
if [ $failed_any -eq 0 ]; then
echo '***' PASSED ALL TESTS
else
echo '***' FAILED SOME TESTS
exit 1
fi

测试解释

这个脚本是一个用于测试MapReduce框架的自动化测试脚本。它通过运行一系列的测试来验证MapReduce框架的正确性和鲁棒性。以下是脚本的主要功能和步骤的详细解释:

1. 环境检查和设置
  • RACE变量:用于启用Go的竞争检测器(race detector),但在某些特定版本的Go和MacOS环境下会禁用。
  • TIMEOUT变量:用于设置命令的超时时间。如果系统中没有timeout命令,会尝试使用gtimeout,如果都没有则不使用超时。
2. 清理和准备
  • 删除旧的测试目录mr-tmp,并重新创建。
  • 清理并重新构建所有相关的Go应用和插件。
3. 测试用例

脚本中包含了多个测试用例,每个测试用例都验证了MapReduce框架的不同方面:

3.1 单词计数测试 (wc test)
  • 使用mrsequential生成正确的输出文件。
  • 启动mrcoordinator和多个mrworker进程来执行单词计数任务。
  • 比较生成的输出文件与正确输出文件,验证结果是否一致。
3.2 索引生成测试 (indexer test)
  • 类似单词计数测试,但使用的是索引生成插件。
3.3 映射并行度测试 (map parallelism test)
  • 启动多个mrworker进程,验证映射任务是否并行执行。
3.4 归约并行度测试 (reduce parallelism test)
  • 验证归约任务是否并行执行。
3.5 任务计数测试 (job count test)
  • 验证任务是否按预期次数执行。
3.6 早期退出测试 (early exit test)
  • 验证在任务完成前是否有进程提前退出。
3.7 崩溃测试 (crash test)
  • 模拟工作进程崩溃的情况,验证系统是否能正确处理崩溃并生成正确的输出。
4. 结果输出
  • 每个测试用例结束后,会输出测试结果(PASS或FAIL)。
  • 如果所有测试都通过,脚本会输出“PASSED ALL TESTS”,否则输出“FAILED SOME TESTS”并退出并返回非零状态码。
5. 注意事项
  • 脚本中使用了maybe_quiet函数来控制是否静默执行命令,这取决于ISQUIET参数。
  • 脚本中使用了wait命令来等待所有子进程完成。

这个脚本是一个典型的自动化测试脚本,用于验证分布式系统的正确性和鲁棒性。通过运行这个脚本,开发者可以确保他们的MapReduce框架在各种情况下都能正确工作。