EECS 485 Project 4: Map Reduce

Due: 8pm EST November 16, 2020. This is a group project to be completed in groups of two to three.


Initial Release for F20: Version 4.1


In this project, you will implement a MapReduce server in Python. This will be a single machine, multi-process, multi-threaded server that will execute user-submitted MapReduce jobs. It will run each job to completion, handling failures along the way, and write the output of the job to a given directory. Once you have completed this project, you will be able to run any MapReduce job on your machine, using a MapReduce implementation you wrote!

There are two primary modules in this project: the Master, which will listen for MapReduce jobs, manage the jobs, distribute work amongst the workers, and handle faults. Worker modules register themselves with the master, and then await commands, performing map, reduce or sorting (grouping) tasks based on instructions given by the master.

You will not write map reduce programs, but rather the MapReduce server. We have provided several sample map/reduce programs that you can use to test your MapReduce server.

Refer to the Python processes, threads and sockets tutorial for background and examples.

Table of Contents


Group registration

Register your group on the Autograder.

Project folder

Create a folder for this project (instructions). Your folder location might be different.

$ pwd

Version control

Set up version control using the Version control tutorial. You might also take a second look at the Version control for a team tutorial.

After you’re done, you should have a local repository with a “clean” status and your local repository should be connected to a remote GitLab repository.

$ pwd
$ git status
On branch master
Your branch is up-to-date with 'origin/master'.

nothing to commit, working tree clean
$ git remote -v
origin	https://gitlab.eecs.umich.edu/awdeorio/p4-mapreduce.git (fetch)
origin	https://gitlab.eecs.umich.edu/awdeorio/p4-mapreduce.git (push)

You should have a .gitignore file (instructions).

$ pwd
$ head .gitignore
This is a sample .gitignore file that's useful for EECS 485 projects.

Python virtual environment

Create a Python virtual environment using the Project 1 Python Virtual Environment Tutorial.

Check that you have a Python virtual environment, and that it’s activated (remember source env/bin/activate).

$ pwd
$ ls -d env

Starter files

Download and unpack the starter files.

$ pwd
$ wget https://eecs485staff.github.io/p4-mapreduce/starter_files.tar.gz
$ tar -xvzf starter_files.tar.gz

Move the starter files to your project directory and remove the original starter_files/ directory and tarball.

$ pwd
$ mv starter_files/* .
$ rm -rf starter_files starter_files.tar.gz

You should see these files.

$ tree
├── mapreduce
│   ├── __init__.py
│   ├── master
│   │   ├── __init__.py
│   │   └── __main__.py
│   ├── submit.py
│   ├── utils.py
│   └── worker
│       ├── __init__.py
│       └── __main__.py
├── requirements.txt
├── setup.py
├── tests
│   ├── testdata
|   │   ├── correct
|   │   │   ├── grep_correct.txt
|   │   │   └── word_count_correct.txt
|   │   ├── exec
|   │   │   ├── grep_map.py
|   │   │   ├── grep_reduce.py
|   │   │   ├── wc_map.sh
|   │   │   ├── wc_map_slow.sh
|   │   │   ├── wc_reduce.sh
|   │   │   └── wc_reduce_slow.sh
|   │   ├── input
|   │   │   ├── file01
|   │   │   └── file08
|   │   ├── input_small
|   │   │   ├── file01
|   │   │   └── file02
|   |   ├── input_large
|   │   │   ├── file01
|   │   │   ├── file02
|   │   │   ├── file03
|   │   │   └── file04
│   ├── test_worker_08.py
│   └── utils.py

Activate the virtual environment and install packages.

$ source env/bin/activate
$ pip install -r requirements.txt
$ pip install -e .

Here’s a brief description of each of the starter files.

mapreduce mapreduce Python package skeleton files
mapreduce/master/ mapreduce master skeleton module, implement this
mapreduce/worker/ mapreduce worker skeleton module, implement this
mapreduce/submit.py Provided code to submit a new MapReduce job
mapreduce/utils.py Code shared between master and worker
requirements.txt Python package dependencies matching autograder
setup.py mapreduce Python package configuration
tests/ Public unit tests
tests/testdata/exec/ Sample mapreduce programs, all use stdin and stdout
tests/testdata/correct/ Sample mapreduce program correct output
tests/testdata/input/ Sample mapreduce program input
tests/testdata/input_small/ Sample mapreduce program input for fast testing
tests/testdata/input_large/ Sample mapreduce program input for testing on large input
testdata/ Files used by our public tests

Before making any changes to the clean starter files, it’s a good idea to make a commit to your Git repository.


Complete the Python processes, threads and sockets tutorial.

Here are some quick links to the libraries we used in our instructor implementation.

Run the MapReduce server

You will write a mapreduce Python package includes master and worker modules. Launch a master with the command line entry point mapreduce-master and a worker with mapreduce-worker. We’ve also provided mapreduce-submit to send a new job to the master.

Start a master and workers

The starter code will run out of the box, it just won’t do anything. The master and the worker run as seperate processes, so you will have to start them up separately. This will start up a master which will listen on port 6000 using TCP. Then, we start up two workers, and tell them that they should communicate with the master on port 6000, and then tell them which port to listen on. The ampersand (&) means to start the process in the background.

$ mapreduce-master 6000 &
$ mapreduce-worker 6000 6001 &
$ mapreduce-worker 6000 6002 &

See your processes running in the background. Note: use pgrep -lf on OSX and pgrep -af on GNU/Linux systems.

$ pgrep -lf mapreduce-worker
15364 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6001
15365 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002
$ pgrep -lf mapreduce-master
15353 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-master 6000

Stop your processes.

$ pkill -f mapreduce-master
$ pkill -f mapreduce-worker
$ pgrep -lf mapreduce-worker  # no output, because no processes
$ pgrep -lf mapreduce-master  # no output, because no processes

Submit a MapReduce job

Lastly, we have also provided mapreduce/submit.py. It sends a job to the Master’s main TCP socket. You can specify the job using command line arguments.

$ mapreduce-submit --help
Usage: mapreduce-submit [OPTIONS]

  Top level command line interface.

  -p, --port INTEGER      Master port number, default = 6000
  -i, --input DIRECTORY   Input directory, default=tests/testdata/input
  -o, --output DIRECTORY  Output directory, default=output
  -m, --mapper FILE       Mapper executable, default=tests/testdata/exec/wc_map.sh
  -r, --reducer FILE      Reducer executable,
  --nmappers INTEGER      Number of mappers, default=4
  --nreducers INTEGER     Number of reducers, default=1
  --help                  Show this message and exit.

Here’s how to run a job. Later, we’ll simplify starting the server using a shell script. Right now we expect the job to fail because Master and Worker are not implemented.

$ pgrep -f mapreduce-master  # check if you already started it
$ pgrep -f mapreduce-worker  # check if you already started it
$ mapreduce-master 6000 &
$ mapreduce-worker 6000 6001 &
$ mapreduce-worker 6000 6002 &
$ mapreduce-submit --mapper tests/testdata/exec/wc_map.sh --reducer tests/testdata/exec/wc_reduce.sh

Init script

The MapReduce server is an example of a service (or daemon), a program that runs in the background. We’ll write an init script to start, stop and check on the map reduce master and worker processes. It should be a shell script named bin/mapreduce. Print the messages in the following examples.

Be sure to follow the shell script best practices (Tutorial).

Start server

Exit 1 if a master or worker is already running. Otherwise, execute the following commands.

mapreduce-master 6000 &
sleep 2
mapreduce-worker 6000 6001 &
mapreduce-worker 6000 6002 &


$ ./bin/mapreduce start
starting mapreduce ...

Example: accidentally start server when it’s already running.

$ ./bin/mapreduce start
Error: mapreduce-master is already running

Stop server

Execute the following commands. Notice that || true will prevent a failed “nice” shutdown message from causing the script to exit early. Also notice that we automatically figure out the correct option for Netcat (nc).

# Detect GNU vs BSD netcat.  We need netcat to close the connection after
# sending a message, which requires different options.
set +o pipefail  # Avoid erroneous failures due to grep returning non-zero
if nc -h 2>&1 | grep -q "\-c"; then
  NC="nc -c"
elif nc -h 2>&1 | grep -q "\-N"; then
  NC="nc -N"
elif nc -h 2>&1 | grep -q "\-C"; then
  NC="nc -C"
  echo "Error detecting netcat version."
  exit 1
set -o pipefail

echo '{"message_type": "shutdown"}' | $NC localhost 6000 || true
sleep 2  # give the master time to receive signal and send to workers

Then, kill the master and worker processes. The following example is for the master.

echo "killing mapreduce master ..."
pkill -f mapreduce-master || true

Example 1, server responds to shutdown message.

$ ./bin/mapreduce stop
stopping mapreduce ...

Example 2, server doesn’t respond to shutdown message and process is killed.

./bin/mapreduce stop
stopping mapreduce ...
killing mapreduce master ...
killing mapreduce worker ...

Server status


$ ./bin/mapreduce start
starting mapreduce ...
$ ./bin/mapreduce status
master running
workers running
$ ./bin/mapreduce stop
stopping mapreduce ...
killing mapreduce master ...
killing mapreduce worker ...
$ ./bin/mapreduce status
master not running
workers not running

Restart server


$ ./bin/mapreduce restart
stopping mapreduce ...
killing mapreduce master ...
killing mapreduce worker ...
starting mapreduce ...

MapReduce server specification

Here, we describe the functionality of the MapReduce server. The fun part is that we are only defining the functionality and the communication spec, the implementation is entirely up to you. You must follow our exact specifications below, and the Master and the Worker should work independently (i.e. do not add any more data or dependencies between the two classes). Remember that the Master/Workers are listening on TCP/UDP sockets for all incoming messages. Note: To test your server, we will will only be checking for the messages we listed below. You should not rely on any communication other than the messages listed below.

As soon as the Master/Worker receives a message on its main TCP socket, it should handle that message to completion before continuing to listen on the TCP socket. In this spec, let’s say every message is handled in a function called handle_msg. When the message returns and ends execution, the Master will continue listening in an infinite while loop for new messages. Each TCP message should be communicated using a new TCP connection. Note: All communication in this project will be strings formatted using JSON; sockets receive strings but your thread must parse it into JSON.

We put [Master/Worker] before the subsections below to identify which class should handle the given functionality.

Code organization

Your code will go inside the mapreduce/master and mapreduce/worker packages, where you will define the two classes (we got you started in mapreduce/master/__main__.py and mapreduce/worker/__main__.py). Since we are using Python packages, you may create new files as you see fit inside each package. We have also provided a utils.py inside mapreduce/ which you can use to house code common to both Worker and Master. We will only define the communication specs for the Master and the Worker, but the actual implementation of the classes is entirely up to you.

Master overview

The Master should accept only one command line argument.

port_number : The primary TCP port that the Master should listen on.

On startup, the Master should do the following:

Worker overview

The Worker should accept two command line arguments.

master_port: The TCP socket that the Master is actively listening on (same as the port_number in the Master constructor)

worker_port: The TCP socket that this worker should listen on to receive instructions from the master

On initialization, each Worker should do a similar sequence of actions as the Master:

NOTE: The Master should safely ignore any heartbeat messages from a Worker before that Worker successfully registers with the Master.

Shutdown [master + worker]

Because all of of our tests require shutdown to function properly, it should be implemented first. The Master can receive a special message to initiate server shutdown. The shutdown message will be of the following form and will be received on the main TCP socket:

  "message_type": "shutdown"

The Master should forward this message to all of the living Workers that have registered with it. The Workers, upon receiving the shutdown message, should terminate as soon as possible. If the Worker is already in the middle of executing a task (as described below), it is okay for it to complete that task before being able to handle the shutdown message as both these happen inside a single thread.

After forwarding the message to all Workers, the Master should terminate itself.

At this point, you should be able to pass test_master_00’s first part, and test_worker_00 completely. Another shutdown test is test_integration_00, but you’ll need to implement worker registration first.

Worker registration [master + worker]

The Master should keep track of all Workers at any given time so that the work is only distributed among the ready Workers. Workers can be in the following states:

The Master must listen for registration messages from Workers. Once a Worker is ready to listen for instructions, it should send a message like this to the Master

  "message_type" : "register",
  "worker_host" : string,
  "worker_port" : int,
  "worker_pid" : int

The Master will then respond with a message acknowledging the Worker has registered, formatted like this. After this message has been received, the Worker should start sending heartbeats. More on this later.

  "message_type": "register_ack",
  "worker_host": string,
  "worker_port": int,
  "worker_pid" : int

After the first Worker registers with the Master, the Master should check the job queue (described later) if it has any work it can assign to the Worker (because a job could have arrived at the Master before any Workers registered). If the Master is already executing a map/group/reduce, it should assign the Worker the next available task immediately.

At this point, you should be able to pass test_master_00 and test_worker_01 and test_integration_00.

New job request [master]

In the event of a new job, the Master will receive the following message on its main TCP socket:

  "message_type": "new_master_job",
  "input_directory": string,
  "output_directory": string,
  "mapper_executable": string,
  "reducer_executable": string,
  "num_mappers" : int,
  "num_reducers" : int

In response to a job request, the Master will create a set of new directories where all of the temporary files for the job will go, of the form tmp/job-{id}, where id is the current job counter (starting at 0 just like all counters). The directory structure will resemble this example (you should create 4 new folders for each job):


Remember, each MapReduce job occurs in 3 stages: Mapping, Grouping, Reducing. Workers will do the mapping and reducing using the given executable files independently, but the Master and Workers will have to cooperate to do the Grouping Stage. After the directories are setup, the Master should check if there are any Workers ready to work and check whether the MapReduce server is currently executing a job. If the server is busy, or there are no available Workers, the job should be added to an internal queue (described next) and end the function execution. If there are workers and the server is not busy, than the Master can begin job execution.

At this point, you should be able to pass test_master_01.

Job queue [master]

If a Master receives a new job while it is already executing one or when there were no ready workers, it should accept the job, create the directories, and store the job in an internal queue until the current one has finished. Note that this means that the current job’s map, group, and reduce tasks must be complete before the next job’s Mapping Stage can begin. As soon as a job finishes, the Master should process the next pending job if there is one (and if there are ready Workers) by starting its Mapping Stage. For simplicity, in this project, your MapReduce server will only execute one MapReduce task at any time.

As noted earlier, when you see the first Worker register to work, you should check the job queue for pending jobs.

Input partitioning [master]

To start off the Mapping Stage, the Master scans the input directory and divides the input files into several partitions. Each partition (a group of files) will be a new_worker_task.

First, the master sorts the input files by name and divides them into num_mappers partitions using round robin. Here’s an example with num_mappers=4.

# Before partitioning
["file03", "file02", "file01", "file05", "file04"]

# After partitioning

After partitioning the input, the Master allocates tasks to Workers by sending JSON messages. Each JSON message looks like this:

  "message_type": "new_worker_task",
  "input_files": [list of strings],
  "executable": string,
  "output_directory": string
  "worker_pid": int

Mapping [master]

The Master must log a message at the beginning and end of each stage. To configure the logging module, see the our example. The autograder test cases use these messages to know when your solution is ready to be checked. After adding the “begin map stage” message, you should be able to pass to pass test_master_02.

logging.info("Master:%s begin map stage", port_num)
logging.info("Master:%s end map stage", port_num)

The Master assigns one task at a time to a Worker. If all the Workers are busy, the Master will need to wait. Because we don’t know ahead of time which worker will become available first, which worker performs which task could change with different executions of the same program (the program is non-deterministic). The autograder test cases accept all valid scenarios.

Here’s an example. Consider a MapReduce job with 2 workers, 5 input files ["file01", "file02", "file03", "file04", "file05"] and 4 map tasks specified.

Continuing our previous example, we’ll assume that that there are two workers and neither is busy. Worker 0 registered first. Steps 3-6 would be slightly different if Worker 0 finished first.

  1. Master sends Worker 0 a task with input files ["file01", "file05"]
  2. Master sends Worker 1 a task with input files ["file02"]
  3. Master waits for a worker to finish. In this example, it’s Worker 1.
  4. Master sends Worker 1 a task with input files ["file03"]
  5. Master waits for a worker to finish. In this example, it’s Worker 0.
  6. Master sends Worker 0 a task with input files ["file04"]

At this point you should be able to pass test_master_02.

Mapping [workers]

When a worker receives this new task message, its handle_msg will start execution of the given executable over the specified input file, while directing the output to the given output_directory (one output file per input file and you should run the executable on each input file). The input is passed to the executable through standard in and is outputted to a specific file. The output file names should be the same as the input file (overwrite file if it already exists). The output_directory in the Mapping Stage will always be the mapper-output folder (i.e. tmp/job-{id}/mapper-output).

For example, the Master should specify the input file is data/input/file01.txt and the output file tmp/job-0/mapper-output/file01.txt

Hint: See the Python standard library subprocess.run() function.

The Worker should be agnostic to map or reduce tasks. Regardless of the type of operation, the Worker is responsible for running the specified executable over the input files one by one, and piping to the output directory for each input file. Once a Worker has finished its task, it should send a TCP message to the Master’s main socket of the form:

  "message_type": "status",
  "output_files" : [list of strings],
  "status": "finished",
  "worker_pid": int

At this point, you should be able to pass test_worker_03, test_worker_04, test_worker_05.

Grouping [master + workers]

After all mappers finish, the Master will start the Grouping Stage. The Grouping Stage has two steps:

  1. Workers sort each mapper output file
  2. Master rearranges sorted mapper output files, grouping like keys together

The Master must log a message at the beginning and end of each stage.

logging.info("Master:%s begin group stage", port_num)
logging.info("Master:%s end group stage", port_num)

To start the Grouping Stage, the Master looks at all of the files created by the mappers, and assigns Workers to sort and merge the files. Sorting in the Grouping Stage should happen by line not by key. If there are more files than Workers, the Master should apportion files to workers using round-robin. Files should be sorted and workers should be sorted in order of registration. If there are fewer files than Workers, it is okay if some Workers sit idle during this stage.

After partitioning is complete, the Master sends a new_sort_task to each worker in order of registration. For simplicity, you may assume there will be at least one live worker. The input for one sort task is a list of files. The output is one larger file using the naming convention: sorted01, sorted02 … etc. The messages should be sent to the Workers in registration order and look like this:

  "message_type": "new_sort_task",
  "input_files": [list of strings],
  "output_file": string,
  "worker_pid": int

When a worker receives a new_sort_task, it should sort by line, not by key. Once the Worker has finished, it sends a message to the Master like this:

  "message_type": "status",
  "output_file" : string,
  "status": "finished"
  "worker_pid": int

After sorting is complete, the Master reads the sorted files and apportions each line in num_reducers files. Specifically, the Master allocates lines in sorted order by key using round robin to divy the keyspace into num_reducers files. Hint: Understanding the Merging K sorted lists problem will help with sorted iteration through the keyspace. It will be helpful to use the heapq.merge() function. Do not assume the entire data set can fit into memory at once. Do not create a single large sorted file.

Use the following naming convention for the output of the Grouping Stage Master, which will be the inputs for the Reducing Stage. File should be placed in the grouper output directory and named reduce01, reduce02, and so on.

At this point, you should pass test_master_08, which verifies efficient round robin grouping. You should also be able to pass test_worker_06, test_worker_07 and test_worker_08.

Reducing [master + workers]

The Master must log a message at the beginning and end of each stage.

logging.info("Master:%s begin reduce stage", port_num)
logging.info("Master:%s end reduce stage", port_num)

To the Worker, this is the same as the Mapping Stage - it doesn’t need to know if it is running a map or reduce task. The Worker just runs the executable it is told to run - the Master is responsible for making sure it tells the Worker to run the correct map or reduce executable. The output_directory in the Reducing Stage will always be the reducer-output folder. Again, use the same output file name as the input file. Use round-robin file allocation to apportion sorted reducer files to workers ordered by registration time. Be sure to send reducing messages to workers in the order in which the workers registered.

Once a Worker has finished its task, it should send a TCP message to the Master’s main socket of the form:

  "message_type": "status",
  "output_files" : [list of strings],
  "status": "finished"
  "worker_pid": int

Wrapping up [master]

As soon as the master has received the last “finished” message for the reduce tasks for a given job, the Master should move the output files from the reducer-output directory to the final output directory specified by the original job creation message (The value specified by the output_directory key). In the final output directory, the files should be renamed outputfilex, where x is the final output file number. If there are 4 final output files, the master should rename them outputfile01, outputfile02, outputfile03, outputfile04. Create the output directory if it doesn’t already exist. Check the job queue for the next available job, or go back to listening for jobs if there isn’t one currently in the job queue.

At this point test_master_03, test_master_04, and test_master_09 should be able to pass.

Fault tolerance and heartbeats [master + workers]

Workers can die at any time and may not finish tasks that you send them. Your Master must accommodate for this. If a Worker misses more than 5 pings in a row, you should assume that it has died, and assign whatever work it was responsible for to another Worker machine.

Each Worker will have a heartbeat thread to send updates to Master via UDP. The messages should look like this, and should be sent every 2 seconds:

  "message_type": "heartbeat",
  "worker_pid": int

If a Worker dies after the Master assigns a task but before completing the task and notifying the Master, then the Master should reassigned the task to a free worker when one becomes available. For simplicity, the Master should first complete all tasks in the current stage before reassigning tasks from dead Workers.

When a Worker dies, mark the failed Worker as dead, but do not remove it from the Master’s internal data structures. This is due to constraints on the Python dictionary data structure. It can result in an error when keys are modified while iterating over the dictionary. For more info on this, please refer to this link.

Your Master should attempt to maximize concurrency, but avoid duplication. In other words, don’t send the same task to different Workers until you know that the Worker who was previously assigned that task has died. Remember that a worker can die during any stage. If all workers die, the Master should wait until a new worker registers, then resume.

At this point all the tests should pass.

Walk-through example

See a complete example here.


We have provided a decently comprehensive public test suite for project 4. However, you should attempt to test some features of your mapreduce solution on your own. The walkthrough example above, should help you get started on testing your solution’s functionality.

We have provided a simple word count map and reduce example. Run these executables at the command line without your MapReduce code to generate the correct answers in correct.txt. Then, concatenate your MapReduce server’s output into one file, output.txt. The default output directory provided by submit.py is output, but it is configurable.

$ cat tests/testdata/input/* | tests/testdata/exec/wc_map.sh | sort | \
    tests/testdata/exec/wc_reduce.sh > correct.txt
$ cat output/* | sort > output.txt
$ diff output.txt correct.txt

Note that the map and reduce executables can be in any language - your server should not limit us to running map and reduce jobs written in Python! To help you test this, we have also provided you with a word count solution written in a shell script (see section below).

Note that the autograder will watch your worker and master only for the messages we specified above. Your code should have no other dependency besides the communication spec, and the messages sent in your system must match those listed in this spec exactly.

Run the public unit tests. Add the -vvs flag to pytest to show output and the--log-cli-level=INFO flag to show logging messages.

$ pwd
$ pytest -vvs --log-cli-level=INFO

Test for busy waiting

A solution that busy-waits may pass on your development machine and fail on the autograder due to a timeout. Your laptop is probably much more powerful than the restricted autograder environment, so you might not notice the performance problem locally. See the Processes, Threads and Sockets in Python Tutorial for an explanation of busy-waiting.

To detect busy waiting, time a master without any workers. After a few seconds, kill it by pressing Control-C several times. Ignore any errors or exceptions. We can tell that this solution busy-waits because the user time is similar to the real time.

$ time mapreduce-master 6000
INFO:root:Starting master:6000

real	0m4.475s
user	0m4.429s
sys	0m0.039s

This example does not busy wait. Notice that the user time is small compared to the real time.

$ time mapreduce-master 6000
INFO:root:Starting master:6000

real	0m3.530s
user	0m0.275s
sys	0m0.036s

Testing fault tolerance

This section will help you verify fault tolerance. The general idea is to kill a worker while it’s running an intentionally slow MapReduce job.

We have provided an intentionally slow MapReduce job in tests/testdata/exec/wc_map_slow.sh and tests/testdata/exec/wc_reduce_slow.sh. These executables use sleep statements to simulate a slow running task. You may want to increase the sleep time.

$ grep -B1 sleep tests/testdata/exec/wc_*slow.sh
tests/testdata/exec/wc_map_slow.sh-# Simulate a long running job
tests/testdata/exec/wc_map_slow.sh:sleep 3
tests/testdata/exec/wc_reduce_slow.sh-# Simulate a long running job
tests/testdata/exec/wc_reduce_slow.sh:sleep 3

First, start one MapReduce Master and two Workers. Wait for them to start up.

$ pkill -f mapreduce-           # Kill any stale Master or Worker
$ mapreduce-master 6000 &       # Start Master
$ mapreduce-worker 6000 6001 &  # Start Worker 0
$ mapreduce-worker 6000 6002 &  # Start Worker 1

Submit an intentionally slow MapReduce job and wait for the mappers to begin executing.

$ mapreduce-submit \
    --input tests/testdata/input_small \
    --output output \
    --mapper tests/testdata/exec/wc_map_slow.sh \
    --reducer tests/testdata/exec/wc_reduce_slow.sh \
    --nmappers 2 \
    --nreducers 2

Kill one of the workers while it is executing a map or reduce job. Quickly use pgrep to find the PID of a worker, and then use kill on its PID. You can use pgrep again to check that you actually killed the worker.

$ pgrep -af mapreduce-worker  # Linux
$ pgrep -lf mapreduce-worker  # macOS
77811 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6001
77893 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002
$ kill 77811
$ pgrep -lf mapreduce-worker
77893 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002

Here’s a way to kill one worker with one line.

$ pgrep -af mapreduce-worker | head -n1 | awk '{print $1}' | xargs kill  # Linux
$ pgrep -lf mapreduce-worker | head -n1 | awk '{print $1}' | xargs kill  # macOS
[2]-  Terminated: 15          mapreduce-worker 6000 6001

Finally, verify the correct behavior. Read to logging messages from your Master and Worker to consider:

Pro-tip: Script this test, adding sleep commands in between commands to give them time for startup and TCP communication.

Code style

As in previous projects, all Python code should contain no errors or warnings from pycodestyle, pydocstyle, and pylint. Run pylint like this:

$ pylint --disable=no-value-for-parameter --disable=method-hidden mapreduce

You may not use any external dependencies aside from what is provided in setup.py.

Submitting and grading

One team member should register your group on the autograder using the create new invitation feature.

Submit a tarball to the autograder, which is linked from https://eecs485.org. Include the --disable-copyfile flag only on macOS.

$ tar \
  --disable-copyfile \
  --exclude '*__pycache__*' \
  --exclude '*tmp*' \
  -czvf submit.tar.gz \
  setup.py \
  bin \


This is an approximate rubric.

Deliverable Value
Public unit tests 60%
Hidden unit tests run after the deadline 40%

Further reading

Google’s original MapReduce paper


Q: I can start my Master/Worker with commands correctly, but when I run the tests, my code seems to not doing anything.

A: Make sure you start up your Master/Worker instances in the constructor.

Q: What is StopIteration error?

A: Repeated calls to the mock test functions (fake Master or fake Worker) will return values from a hardcoded sequence (an iterable). When the iterable is exhausted, a StopIteration exception is raised. https://docs.python.org/3/library/exceptions.html#StopIteration

In the tests, we are using a Mock class to fake the socket functions. Whenever socket.recv() is called, the tests will send a pre-defined fake message (see worker_message_generator and master_message_generator in each test for details). When socket.recv() is called but no message is given (the message generators run out of pre-defined fake messages), a StopIteration exception is raised.

Make sure socket.recv() is not called after the last fake message, which is the shutdown message.

Q: Why am I getting a Failed to close threads or Connection Refused error?

A: In the tests, we call the Master / Worker class constructors to run your program. Make sure the constructors start your program and only exit when all threads are closed.

Q: Why do I get TypeError: Object of type PosixPath is not JSON serializable?

A: By default the Python JSON library does not know how to convert a pathlib.Path object to a string. Here’s how to do it:

class PathJSONEncoder(json.JSONEncoder):
    Extended the Python JSON encoder to encode Pathlib objects.

    Docs: https://docs.python.org/3/library/json.html

    >>> json.dumps({
            "executable": TESTDATA_DIR/"exec/wc_map.sh",
        }, cls=PathJSONEncoder)

    def default(self, o):
        """Override base class method to include Path object serialization."""
        if isinstance(o, pathlib.Path):
            return str(o)
        return super().default(o)