p4-mapreduce
Walk-through example
This document walks step-by-step through an example MapReduce job. Our staff Manager and Worker solutions produce logging output so you can see what’s going on.
Files
Start in your project root directory. Take a look at a few of the input files.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ ls tests/testdata/input_small
file01 file02
$ cat tests/testdata/input_small/file01
hello world
hello eecs485
$ cat tests/testdata/input_small/file02
goodbye autograder
hello autograder
We’re going to run a word count job. Start by simulating the MapReduce job at the command line. This is the expected output. Save it to a file.
$ cat tests/testdata/input_small/* | tests/testdata/exec/wc_map.sh | sort | tests/testdata/exec/wc_reduce.sh
autograder 2
eecs485 1
goodbye 1
hello 3
world 1
$ cat tests/testdata/input_small/* | tests/testdata/exec/wc_map.sh | sort | tests/testdata/exec/wc_reduce.sh > correct.txt
These are the files we have before starting the server.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ ls
bin mapreduce mapreduce.egg-info requirements.txt pyproject.toml tests
Start Manager
Kill any stale MapReduce processes.
$ pkill -lf mapreduce- # macOS
$ pkill -f mapreduce- # Linux/WSL
Start the Manager.
$ mapreduce-manager --loglevel=DEBUG
Manager:6000 [INFO] Starting manager:6000
Manager:6000 [INFO] PWD /Users/awdeorio/src/eecs485/p4-mapreduce
Start Workers
Start two Workers in separate terminals.
$ mapreduce-worker --port 6001 --loglevel=DEBUG # Terminal 2
Worker:6001 [INFO] Starting worker:6001
Worker:6001 [INFO] PWD /Users/awdeorio/src/eecs485/p4-mapreduce
Manager:6000 [DEBUG] received
{
"message_type": "register",
"worker_host": "localhost",
"worker_port": 6001,
}
Manager:6000 [INFO] registered worker localhost:6001
Worker:6001 [DEBUG] received
{
"message_type": "register_ack"
}
$ mapreduce-worker --port 6002 --loglevel=DEBUG # Terminal 3
Worker:6002 [INFO] Starting worker:6002
Worker:6002 [INFO] PWD /Users/awdeorio/src/eecs485/p4-mapreduce
Manager:6000 [DEBUG] received
{
"message_type": "register",
"worker_host": "localhost",
"worker_port": 6002,
}
Manager:6000 [INFO] registered worker localhost:6002
Worker:6002 [DEBUG] received
{
"message_type": "register_ack"
}
$
At this point, you’ll have 3 terminals open for the Manager and two Workers.
Pro-tip To produce logging output similar to the instructor solution, see the Logging section of the spec.
Submit a MapReduce job
Submit a word count job in a fourth terminal. We’re taking advantage of the submit defaults, see mapreduce-submit --help
for more info.
$ mapreduce-submit -i tests/testdata/input_small
Submitted job to Manager localhost:6000
input directory tests/testdata/input_small
output directory output
mapper executable tests/testdata/exec/wc_map.sh
reducer executable tests/testdata/exec/wc_reduce.sh
num mappers 2
num reducers 2
The Manager receives the job.
Manager:6000 [DEBUG] received
{
"message_type": "new_manager_job",
"input_directory": "tests/testdata/input_small",
"output_directory": "output",
"mapper_executable": "tests/testdata/exec/wc_map.sh",
"reducer_executable": "tests/testdata/exec/wc_reduce.sh",
"num_mappers": 2,
"num_reducers": 2
}
Manager:6000 [INFO] Created output
Manager:6000 [INFO] Created /tmp/mapreduce-shared-job00000-k7kc6ux9
The Manager created a final output directory output
and a shared temporary directory for this job /tmp/mapreduce-shared-job00000-k7kc6ux9
. We’re using the Python standard library tempfile.TemporaryDirectory to create and automatically clean up the temporary directory, so your path could be different.
Map Stage
The Map Stage runs.
Manager:6000 [INFO] Begin Map Stage
Worker:6001 [DEBUG] received
{
"message_type": "new_map_task",
"task_id": 0,
"input_paths": [
"tests/testdata/input_small/file01"
],
"executable": "tests/testdata/exec/wc_map.sh",
"output_directory": "/tmp/mapreduce-shared-job00000-k7kc6ux9",
"num_partitions": 2
}
Worker:6002 [DEBUG] received
{
"message_type": "new_map_task",
"task_id": 1,
"input_paths": [
"tests/testdata/input_small/file02"
],
"executable": "tests/testdata/exec/wc_map.sh",
"output_directory": "/tmp/mapreduce-shared-job00000-k7kc6ux9",
"num_partitions": 2
}
Worker:6001 [INFO] Created /tmp/mapreduce-local-task00000-pe6k0epd
Worker:6001 [INFO] Executed tests/testdata/exec/wc_map.sh input=tests/testdata/input_small/file01
Worker:6001 [INFO] Sorted /tmp/mapreduce-local-task00000-pe6k0epd/maptask00000-part00000
Worker:6001 [INFO] Sorted /tmp/mapreduce-local-task00000-pe6k0epd/maptask00000-part00001
Worker:6001 [INFO] Moved /tmp/mapreduce-local-task00000-pe6k0epd/maptask00000-part00000 -> /tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00000-part00000
Worker:6001 [INFO] Moved /tmp/mapreduce-local-task00000-pe6k0epd/maptask00000-part00001 -> /tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00000-part00001
Worker:6001 [INFO] Removed /tmp/mapreduce-local-task00000-pe6k0epd
Manager:6000 [DEBUG] received
{
"message_type": "finished",
"task_id": 0,
"worker_host": "localhost",
"worker_port": 6001
}
Worker:6002 [INFO] Created /tmp/mapreduce-local-task00001-mev01saa
Worker:6002 [INFO] Executed tests/testdata/exec/wc_map.sh input=tests/testdata/input_small/file02
Worker:6002 [INFO] Sorted /tmp/mapreduce-local-task00001-mev01saa/maptask00001-part00000
Worker:6002 [INFO] Sorted /tmp/mapreduce-local-task00001-mev01saa/maptask00001-part00001
Worker:6002 [INFO] Moved /tmp/mapreduce-local-task00001-mev01saa/maptask00001-part00000 -> /tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00001-part00000
Worker:6002 [INFO] Moved /tmp/mapreduce-local-task00001-mev01saa/maptask00001-part00001 -> /tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00001-part00001
Worker:6002 [INFO] Removed /tmp/mapreduce-local-task00001-mev01saa
Manager:6000 [DEBUG] received
{
"message_type": "finished",
"task_id": 1,
"worker_host": "localhost",
"worker_port": 6002
}
Manager:6000 [INFO] End Map Stage
Note how the Workers created local temporary directories of their own for each task, and then moved the output files into the Manager’s shared temporary directory when they finished writing and sorting.
At the end of the Map Stage, mapping, partitioning, and sorting is complete. In this example, the Manager’s shared temporary directory (e.g., /tmp/mapreduce-shared-job00000-XYZ123
) contains 4 files. To see these files during debugging, set a breakpoint before the Manager’s shared temporary directory is deleted.
hello 1
hello 1
eecs485 1
world 1
hello 1
autograder 1
autograder 1
goodbye 1
With 2 reducers, we have 2 partitions. For example when map task 0 computes md5("hello") % 2 == 0
, this key goes in partition 0, maptask00000-part00000
. Similarly, when map task 1 computes md5("hello") % 2 == 0
, this key goes in partition 0, maptask00001-part00000
.
Reduce Stage
Finally, the Reduce Stage runs.
Manager:6000 [INFO] begin Reduce Stage
Worker:6001 [DEBUG] received
{
"message_type": "new_reduce_task",
"task_id": 0,
"input_paths": [
"/tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00000-part00000",
"/tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00001-part00000"
],
"executable": "tests/testdata/exec/wc_reduce.sh",
"output_directory": "output"
}
Worker:6002 [DEBUG] received
{
"message_type": "new_reduce_task",
"task_id": 1,
"input_paths": [
"/tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00000-part00001",
"/tmp/mapreduce-shared-job00000-k7kc6ux9/maptask00001-part00001"
],
"executable": "tests/testdata/exec/wc_reduce.sh",
"output_directory": "output"
}
Worker:6001 [INFO] Created /tmp/mapreduce-local-task00000-9tjc90x8
Worker:6001 [INFO] Executed tests/testdata/exec/wc_reduce.sh
Worker:6001 [INFO] Moved /tmp/mapreduce-local-task00000-9tjc90x8/part-00000 -> output/part-00000
Worker:6001 [INFO] Removed /tmp/mapreduce-local-task00000-9tjc90x8
Manager:6000 [DEBUG] received
{
"message_type": "finished",
"task_id": 0,
"worker_host": "localhost",
"worker_port": 6001
}
Worker:6002 [INFO] Created /tmp/mapreduce-local-task00001-hqummo7a
Worker:6002 [INFO] Executed tests/testdata/exec/wc_reduce.sh
Worker:6002 [INFO] Moved /tmp/mapreduce-local-task00001-hqummo7a/part-00001 -> output/part-00001
Worker:6002 [INFO] Removed /tmp/mapreduce-local-task00001-hqummo7a
Manager:6000 [DEBUG] received
{
"message_type": "finished",
"task_id": 1,
"worker_host": "localhost",
"worker_port": 6002
}
Manager:6000 [INFO] end Reduce Stage
Manager:6000 [INFO] Removed /tmp/mapreduce-shared-job00000-k7kc6ux9
Manager:6000 [INFO] Finished job. Output directory: output
Output
After the job is finished, we can see the final output inside output/
.
$ tree output/
output/
├── part-00000
└── part-00001
hello 3
autograder 2
eecs485 1
goodbye 1
world 1
Recall that md5("hello") % 2 == 0
, so this key is in partition 0 (part00000
). Similarly, md5("autograder") % 2 == 1
, so this key is in partition 1 (part00001
).
Sort the output so that we can easily compare with the correct answer.
$ cat output/* | sort
autograder 2
eecs485 1
goodbye 1
hello 3
world 1
$ cat output/* | sort > actual.txt
$ diff correct.txt actual.txt
Shutdown
Finally, send a shutdown signal to the server.
$ mapreduce-submit --shutdown # Terminal 4
Manager:6000 [DEBUG] received
{
"message_type": "shutdown"
}
Manager:6000 [INFO] shutting down
Worker:6001 [DEBUG] received
{
"message_type": "shutdown"
}
Worker:6001 [INFO] shutting down
Worker:6002 [DEBUG] received
{
"message_type": "shutdown"
}
Worker:6002 [INFO] shutting down
After you run the shutdown command and get the above output, hit Enter
.
You should see something similar to the following:
[1] Done mapreduce-manager
[2]- Done mapreduce-worker
[3]+ Done mapreduce-worker --port 6002
Verify that no MapReduce processes are running.
$ pgrep -lf mapreduce- # macOS
$ pgrep -af mapreduce- # Linux
Acknowledgments
This document is licensed under a Creative Commons Attribution-NonCommercial 4.0 License. You’re free to copy and share this document, but not to sell it. You may not share source code provided with this document.