p4-mapreduce

Walk-through example

This document walks step-by-step through an example MapReduce job. Our staff master and worker solutions produce logging output so you can see what’s going on. This is optional. If you’d like to replicate the instructor solution logging, see the later Logging section.

Files

Start in your project root directory. Take a look at a few of the input files.

$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ ls tests/input_small
file01  file02 
$ cat tests/input_small/file01
hello world
hello eecs485
$ cat tests/input_small/file02
goodbye autograder
hello autograder

We’re going to run a word count job. Start by simulating the MapReduce job at the command line. This is the expected output. Save it to a file.

$ cat tests/input_small/* | tests/exec/wc_map.sh | sort | tests/exec/wc_reduce.sh
autograder	2
eecs485	1
goodbye	1
hello	3
world	1
$ cat tests/input_small/* | tests/exec/wc_map.sh | sort | tests/exec/wc_reduce.sh > correct.txt

These are the files we have before starting the server.

$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ ls
bin  mapreduce  mapreduce.egg-info  setup.py  tests

Start master

Verify that no mapreduce processes are running. Kill them if needed.

$ pgrep -af mapreduce-  # Linux
$ pgrep -lf mapreduce-  # macOS
77760 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-master 6000
77811 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6001
77893 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002
feature/example* env awdeorio@manzana solution
$ pkill -f mapreduce-
kill -15 77760
kill -15 77811
kill -15 77893
$ pgrep -lf mapreduce-

Start the master in the background.

  1. Run the mapreduce-master command.
  2. Press Control-Z to suspend the process.
  3. Enter the bg command to resume the suspended command in the background.

Pro-tip: Start a background command in one step with the ampersand (&), for example mapreduce-master 6000 &.

The number of background processes produced by your shell ([2] in this sample output) may be different.

$ mapreduce-master 6000
INFO:root:Starting master:6000
INFO:root:Master:6000 PWD /Users/awdeorio/src/eecs485/p4-mapreduce
^Z
[2]+  Stopped                 mapreduce-master 6000
$ bg
[2]+ mapreduce-master 6000 &

Notice that the master created a tmp/ directory, which is currently empty.

$ ls
bin  mapreduce  mapreduce.egg-info  setup.py  tests  tmp
$ tree tmp/
tmp/

Start workers

Start two workers in the background. Notice that the background process is sending output to the terminal even though it is running in the background. You can press enter at any time to see your terminal prompt again.

Pro-tip To produce logging output similar to the instructor solution, see the Logging section of the spec.

Note: Your PIDs may be different, e.g. [3] 26970 and "worker_pid": 77811.

$ mapreduce-worker 6000 6001 &
[3] 26970
INFO:root:Starting worker:6001
INFO:root:Worker:6001 PWD /Users/awdeorio/src/eecs485/p4-mapreduce
DEBUG:root:Master:6000 received
{
  "message_type": "register",
  "worker_host": "localhost",
  "worker_port": 6001,
  "worker_pid": 77811
}
INFO:root:Master:6000 registered worker localhost:6001 PID 77811
DEBUG:root:Worker:6001 received
{
  "message_type": "register_ack",
  "worker_host": "localhost",
  "worker_port": 6001,
  "worker_pid": 77811
}
$ mapreduce-worker 6000 6002 &
[4] 27024
INFO:root:Starting worker:6002
INFO:root:Worker:6002 PWD /Users/awdeorio/src/eecs485/p4-mapreduce
DEBUG:root:Master:6000 received
{
  "message_type": "register",
  "worker_host": "localhost",
  "worker_port": 6002,
  "worker_pid": 77893
}
INFO:root:Master:6000 registered worker localhost:6002 PID 77893
DEBUG:root:Worker:6002 received
{
  "message_type": "register_ack",
  "worker_host": "localhost",
  "worker_port": 6002,
  "worker_pid": 77893
}
$

Submit a MapReduce job

Submit a word count job.

$ mapreduce-submit \
    --input tests/input_small \
    --output output \
    --mapper tests/exec/wc_map.sh \
    --reducer tests/exec/wc_reduce.sh \
    --nmappers 2 \
    --nreducers 2
Submitted job to master localhost:6000
input directory      tests/input_small
output directory     output
mapper executable    tests/exec/wc_map.sh
reducer executable   tests/exec/wc_reduce.sh
num mappers          2
num reducers         2

The master receives the job.

DEBUG:root:Master:6000 received
{
  "message_type": "new_master_job",
  "input_directory": "tests/input_small",
  "output_directory": "output",
  "mapper_executable": "tests/exec/wc_map.sh",
  "reducer_executable": "tests/exec/wc_reduce.sh",
  "num_mappers": 2,
  "num_reducers": 2
}
INFO:root:Master:6000 new job number 0

Map phase

The map phase runs.

INFO:root:Master:6000 starting map phase
DEBUG:root:Worker:6001 received
{
  "message_type": "new_worker_job",
  "input_files": [
    "tests/input_small/file02"
  ],
  "executable": "tests/exec/wc_map.sh",
  "output_directory": "tmp/job-0/mapper-output/",
  "worker_pid": 77811
}
DEBUG:root:Worker:6002 received
{
  "message_type": "new_worker_job",
  "input_files": [
    "tests/input_small/file01"
  ],
  "executable": "tests/exec/wc_map.sh",
  "output_directory": "tmp/job-0/mapper-output/",
  "worker_pid": 77893
}
DEBUG:root:Master:6000 received
{
  "message_type": "status",
  "output_files": [
    "tmp/job-0/mapper-output/file02"
  ],
  "status": "finished",
  "worker_pid": 77811
}
DEBUG:root:Master:6000 received
{
  "message_type": "status",
  "output_files": [
    "tmp/job-0/mapper-output/file01"
  ],
  "status": "finished",
  "worker_pid": 77893
}

Group phase

The group phase runs.

INFO:root:Master:6000 starting group phase
DEBUG:root:Worker:6001 received
{
  "message_type": "new_sort_job",
  "input_files": [
    "tmp/job-0/mapper-output/file02"
  ],
  "output_file": "tmp/job-0/grouper-output/sorted01",
  "worker_pid": 77811
}
DEBUG:root:Worker:6002 received
{
  "message_type": "new_sort_job",
  "input_files": [
    "tmp/job-0/mapper-output/file01"
  ],
  "output_file": "tmp/job-0/grouper-output/sorted02",
  "worker_pid": 77893
}
DEBUG:root:Master:6000 received
{
  "message_type": "status",
  "output_file": "tmp/job-0/grouper-output/sorted01",
  "status": "finished",
  "worker_pid": 77811
}
DEBUG:root:Master:6000 received
{
  "message_type": "status",
  "output_file": "tmp/job-0/grouper-output/sorted02",
  "status": "finished",
  "worker_pid": 77893
}

Reduce phase

Finally, the reducer stage runs.

INFO:root:Master:6000 starting reduce phase
DEBUG:root:Worker:6001 received
{
  "message_type": "new_worker_job",
  "input_files": [
    "tmp/job-0/grouper-output/reduce01"
  ],
  "executable": "tests/exec/wc_reduce.sh",
  "output_directory": "tmp/job-0/reducer-output/",
  "worker_pid": 77811
}
DEBUG:root:Worker:6002 received
{
  "message_type": "new_worker_job",
  "input_files": [
    "tmp/job-0/grouper-output/reduce02"
  ],
  "executable": "tests/exec/wc_reduce.sh",
  "output_directory": "tmp/job-0/reducer-output/",
  "worker_pid": 77893
}
DEBUG:root:Master:6000 received
{
  "message_type": "status",
  "output_files": [
    "tmp/job-0/reducer-output/reduce01"
  ],
  "status": "finished",
  "worker_pid": 77811
}
DEBUG:root:Master:6000 received
{
  "message_type": "status",
  "output_files": [
    "tmp/job-0/reducer-output/reduce02"
  ],
  "status": "finished",
  "worker_pid": 77893
}
INFO:root:Master:6000 finished job.  Output directory: output

Output

After the job is finished, we can see the temporary files inside job-0. Note: the name of the intermediate files produced in the group stage - the merged files each Worker creates is up to you. If you choose to use the worker pids, they might be different than ours.

$ tree tmp/
tmp/
└── job-0
    ├── grouper-output
    │   ├── sorted01
    │   ├── sorted02
    │   ├── reduce01
    │   └── reduce02
    ├── mapper-output
    │   ├── file01
    │   ├── file02
    └── reducer-output
$ tree output/
output/
├── outputfile01
└── outputfile02

See the output. Sort it so that we can easily compare with the correct answer.

$ cat output/* | sort
autograder	2
eecs485	1
goodbye	1
hello	3
world	1
$ cat output/* | sort > actual.txt
$ diff correct.txt actual.txt

Shutdown

Finally, send a shutdown signal to the server.

$ echo '{"message_type": "shutdown"}' | nc -c localhost 6000  # macOS/WSL/Linux (older)
$ echo '{"message_type": "shutdown"}' | nc -N localhost 6000  # WSL/Linux (newer)
$ echo '{"message_type": "shutdown"}' | nc -C localhost 6000  # Linux (less common)
DEBUG:root:Master:6000 received
{
  "message_type": "shutdown"
}
INFO:root:Master:6000 shutting down
DEBUG:root:Worker:6001 received
{
  "message_type": "shutdown"
}
INFO:root:Worker:6001 shutting down
DEBUG:root:Worker:6002 received
{
  "message_type": "shutdown"
}
INFO:root:Worker:6002 shutting down

After you run the shutdown command and get the above output, hit Enter. You should see something similar to the following:

[1] Done mapreduce-master 6000
[2]- Done mapreduce-worker 6000 6001
[3]+ Done mapreduce-worker 6000 6002

Verify that no mapreduce processes are running.

$ pgrep -lf mapreduce-  # macOS
$ pgrep -af mapreduce-  # Linux

Logging

The Python logging facility is helpful for monitoring multiple processes. For logging output similar to the instructor solution, copy this code.

import logging
logging.basicConfig(level=logging.DEBUG)  # Don't hide debug messages
...
logging.info("Starting worker:%s", worker_port)
logging.info("Worker:%s PWD %s", worker_port, os.getcwd())
...
logging.debug(
    "Worker:%s received\n%s",
    worker_port,
    json.dumps(message_dict, indent=2),
)