p4-mapreduce
EECS 485 Project 4: Map Reduce
Due: 8pm EST November 16, 2020. This is a group project to be completed in groups of two to three.
Changelog
Initial Release for F20: Version 4.1
- 10/25/2020: Correct
tests
directory structure when running thetree
command. - 10/26/2020: Correct
submits.py
with proper test directories. - 11/5/2020: Add
--log-cli-level=INFO
flag topytest
command. This will ensure that logging output is visible when running the test suite.
Introduction
In this project, you will implement a MapReduce server in Python. This will be a single machine, multi-process, multi-threaded server that will execute user-submitted MapReduce jobs. It will run each job to completion, handling failures along the way, and write the output of the job to a given directory. Once you have completed this project, you will be able to run any MapReduce job on your machine, using a MapReduce implementation you wrote!
There are two primary modules in this project: the Master
, which will listen for MapReduce jobs, manage the jobs, distribute work amongst the workers, and handle faults. Worker
modules register themselves with the master, and then await commands, performing map, reduce or sorting (grouping) tasks based on instructions given by the master.
You will not write map reduce programs, but rather the MapReduce server. We have provided several sample map/reduce programs that you can use to test your MapReduce server.
Refer to the Python processes, threads and sockets tutorial for background and examples.
Table of Contents
- Setup
- Run the MapReduce Server
- MapReduce Server Specification
- Walk-through example
- Testing
- Submitting and grading
- Further Reading
Setup
Group registration
Register your group on the Autograder.
Project folder
Create a folder for this project (instructions). Your folder location might be different.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
Version control
Set up version control using the Version control tutorial. You might also take a second look at the Version control for a team tutorial.
After you’re done, you should have a local repository with a “clean” status and your local repository should be connected to a remote GitLab repository.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ git status
On branch master
Your branch is up-to-date with 'origin/master'.
nothing to commit, working tree clean
$ git remote -v
origin https://gitlab.eecs.umich.edu/awdeorio/p4-mapreduce.git (fetch)
origin https://gitlab.eecs.umich.edu/awdeorio/p4-mapreduce.git (push)
You should have a .gitignore
file (instructions).
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ head .gitignore
This is a sample .gitignore file that's useful for EECS 485 projects.
...
Python virtual environment
Create a Python virtual environment using the Project 1 Python Virtual Environment Tutorial.
Check that you have a Python virtual environment, and that it’s activated (remember source env/bin/activate
).
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ ls -d env
env
$ echo $VIRTUAL_ENV
/Users/awdeorio/src/eecs485/p4-mapreduce/env
Starter files
Download and unpack the starter files.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ wget https://eecs485staff.github.io/p4-mapreduce/starter_files.tar.gz
$ tar -xvzf starter_files.tar.gz
Move the starter files to your project directory and remove the original starter_files/
directory and tarball.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ mv starter_files/* .
$ rm -rf starter_files starter_files.tar.gz
You should see these files.
$ tree
.
├── mapreduce
│ ├── __init__.py
│ ├── master
│ │ ├── __init__.py
│ │ └── __main__.py
│ ├── submit.py
│ ├── utils.py
│ └── worker
│ ├── __init__.py
│ └── __main__.py
├── requirements.txt
├── setup.py
├── tests
│ ├── testdata
| │ ├── correct
| │ │ ├── grep_correct.txt
| │ │ └── word_count_correct.txt
| │ ├── exec
| │ │ ├── grep_map.py
| │ │ ├── grep_reduce.py
| │ │ ├── wc_map.sh
| │ │ ├── wc_map_slow.sh
| │ │ ├── wc_reduce.sh
| │ │ └── wc_reduce_slow.sh
| │ ├── input
| │ │ ├── file01
...
| │ │ └── file08
| │ ├── input_small
| │ │ ├── file01
| │ │ └── file02
| | ├── input_large
| │ │ ├── file01
| │ │ ├── file02
| │ │ ├── file03
| │ │ └── file04
...
│ ├── test_worker_08.py
│ └── utils.py
Activate the virtual environment and install packages.
$ source env/bin/activate
$ pip install -r requirements.txt
$ pip install -e .
Here’s a brief description of each of the starter files.
mapreduce |
mapreduce Python package skeleton files |
mapreduce/master/ |
mapreduce master skeleton module, implement this |
mapreduce/worker/ |
mapreduce worker skeleton module, implement this |
mapreduce/submit.py |
Provided code to submit a new MapReduce job |
mapreduce/utils.py |
Code shared between master and worker |
requirements.txt |
Python package dependencies matching autograder |
setup.py |
mapreduce Python package configuration |
tests/ |
Public unit tests |
tests/testdata/exec/ |
Sample mapreduce programs, all use stdin and stdout |
tests/testdata/correct/ |
Sample mapreduce program correct output |
tests/testdata/input/ |
Sample mapreduce program input |
tests/testdata/input_small/ |
Sample mapreduce program input for fast testing |
tests/testdata/input_large/ |
Sample mapreduce program input for testing on large input |
testdata/ |
Files used by our public tests |
Before making any changes to the clean starter files, it’s a good idea to make a commit to your Git repository.
Libraries
Complete the Python processes, threads and sockets tutorial.
Here are some quick links to the libraries we used in our instructor implementation.
- Python Subprocess
- Python Multithreading
- Python Sockets
- Python JSON Library
- Python Pathlib Library (Object-oriented filesystem paths)
- Python Logging facility
- We’ve also provided sample logging code
Run the MapReduce server
You will write a mapreduce
Python package includes master
and worker
modules. Launch a master with the command line entry point mapreduce-master
and a worker with mapreduce-worker
. We’ve also provided mapreduce-submit
to send a new job to the master.
Start a master and workers
The starter code will run out of the box, it just won’t do anything. The master and the worker run as seperate processes, so you will have to start them up separately. This will start up a master which will listen on port 6000 using TCP. Then, we start up two workers, and tell them that they should communicate with the master on port 6000, and then tell them which port to listen on. The ampersand (&
) means to start the process in the background.
$ mapreduce-master 6000 &
$ mapreduce-worker 6000 6001 &
$ mapreduce-worker 6000 6002 &
See your processes running in the background. Note: use pgrep -lf
on OSX and pgrep -af
on GNU/Linux systems.
$ pgrep -lf mapreduce-worker
15364 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6001
15365 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002
$ pgrep -lf mapreduce-master
15353 /usr/local/Cellar/python3/3.6.3/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-master 6000
Stop your processes.
$ pkill -f mapreduce-master
$ pkill -f mapreduce-worker
$ pgrep -lf mapreduce-worker # no output, because no processes
$ pgrep -lf mapreduce-master # no output, because no processes
Submit a MapReduce job
Lastly, we have also provided mapreduce/submit.py
. It sends a job to the Master’s main TCP socket. You can specify the job using command line arguments.
$ mapreduce-submit --help
Usage: mapreduce-submit [OPTIONS]
Top level command line interface.
Options:
-p, --port INTEGER Master port number, default = 6000
-i, --input DIRECTORY Input directory, default=tests/testdata/input
-o, --output DIRECTORY Output directory, default=output
-m, --mapper FILE Mapper executable, default=tests/testdata/exec/wc_map.sh
-r, --reducer FILE Reducer executable,
default=tests/testdata/exec/wc_reduce.sh
--nmappers INTEGER Number of mappers, default=4
--nreducers INTEGER Number of reducers, default=1
--help Show this message and exit.
Here’s how to run a job. Later, we’ll simplify starting the server using a shell script. Right now we expect the job to fail because Master and Worker are not implemented.
$ pgrep -f mapreduce-master # check if you already started it
$ pgrep -f mapreduce-worker # check if you already started it
$ mapreduce-master 6000 &
$ mapreduce-worker 6000 6001 &
$ mapreduce-worker 6000 6002 &
$ mapreduce-submit --mapper tests/testdata/exec/wc_map.sh --reducer tests/testdata/exec/wc_reduce.sh
Init script
The MapReduce server is an example of a service (or daemon), a program that runs in the background. We’ll write an init script to start, stop and check on the map reduce master and worker processes. It should be a shell script named bin/mapreduce
. Print the messages in the following examples.
Be sure to follow the shell script best practices (Tutorial).
Start server
Exit 1 if a master or worker is already running. Otherwise, execute the following commands.
mapreduce-master 6000 &
sleep 2
mapreduce-worker 6000 6001 &
mapreduce-worker 6000 6002 &
Example
$ ./bin/mapreduce start
starting mapreduce ...
Example: accidentally start server when it’s already running.
$ ./bin/mapreduce start
Error: mapreduce-master is already running
Stop server
Execute the following commands. Notice that || true
will prevent a failed “nice” shutdown message from causing the script to exit early. Also notice that we automatically figure out the correct option for Netcat (nc
).
# Detect GNU vs BSD netcat. We need netcat to close the connection after
# sending a message, which requires different options.
set +o pipefail # Avoid erroneous failures due to grep returning non-zero
if nc -h 2>&1 | grep -q "\-c"; then
NC="nc -c"
elif nc -h 2>&1 | grep -q "\-N"; then
NC="nc -N"
elif nc -h 2>&1 | grep -q "\-C"; then
NC="nc -C"
else
echo "Error detecting netcat version."
exit 1
fi
set -o pipefail
echo '{"message_type": "shutdown"}' | $NC localhost 6000 || true
sleep 2 # give the master time to receive signal and send to workers
Then, kill the master and worker processes. The following example is for the master.
echo "killing mapreduce master ..."
pkill -f mapreduce-master || true
Example 1, server responds to shutdown message.
$ ./bin/mapreduce stop
stopping mapreduce ...
Example 2, server doesn’t respond to shutdown message and process is killed.
./bin/mapreduce stop
stopping mapreduce ...
killing mapreduce master ...
killing mapreduce worker ...
Server status
Example
$ ./bin/mapreduce start
starting mapreduce ...
$ ./bin/mapreduce status
master running
workers running
$ ./bin/mapreduce stop
stopping mapreduce ...
killing mapreduce master ...
killing mapreduce worker ...
$ ./bin/mapreduce status
master not running
workers not running
Restart server
Example
$ ./bin/mapreduce restart
stopping mapreduce ...
killing mapreduce master ...
killing mapreduce worker ...
starting mapreduce ...
MapReduce server specification
Here, we describe the functionality of the MapReduce server. The fun part is that we are only defining the functionality and the communication spec, the implementation is entirely up to you. You must follow our exact specifications below, and the Master and the Worker should work independently (i.e. do not add any more data or dependencies between the two classes). Remember that the Master/Workers are listening on TCP/UDP sockets for all incoming messages. Note: To test your server, we will will only be checking for the messages we listed below. You should not rely on any communication other than the messages listed below.
As soon as the Master/Worker receives a message on its main TCP socket, it should handle that message to completion before continuing to listen on the TCP socket. In this spec, let’s say every message is handled in a function called handle_msg
. When the message returns and ends execution, the Master will continue listening in an infinite while loop for new messages. Each TCP message should be communicated using a new TCP connection. Note: All communication in this project will be strings formatted using JSON; sockets receive strings but your thread must parse it into JSON.
We put [Master/Worker] before the subsections below to identify which class should handle the given functionality.
Code organization
Your code will go inside the mapreduce/master
and mapreduce/worker
packages, where you will define the two classes (we got you started in mapreduce/master/__main__.py
and mapreduce/worker/__main__.py
). Since we are using Python packages, you may create new files as you see fit inside each package. We have also provided a utils.py
inside mapreduce/
which you can use to house code common to both Worker and Master. We will only define the communication specs for the Master and the Worker, but the actual implementation of the classes is entirely up to you.
Master overview
The Master should accept only one command line argument.
port_number
: The primary TCP port that the Master should listen on.
On startup, the Master should do the following:
- Create a new folder
tmp
. This is where we will store all intermediate files used by the MapReduce server. Iftmp
already exists, keep it. Hint: Pathlib mkdir.- Hint: use the Pathlib slash operator to “glue together” different parts of a file path.
- Delete any old mapreduce job folders in
tmp
. HINT: see Pathlib glob and use"job-*"
.
- Create a new thread, which will listen for UDP heartbeat messages from the workers. This should listen on (
port_number - 1
) - Create any additional threads or setup you think you may need. Another thread for fault tolerance could be helpful.
- Create a new TCP socket on the given
port_number
and call thelisten()
function. Note: only onelisten()
thread should remain open for the whole lifetime of the master. - Wait for incoming messages! Ignore invalid messages, including those that fail JSON decoding. To ignore these messages use a try/except when you to try to load the message as shown below
try: msg = json.loads(msg) except JSONDecodeError: continue
- Wait to return from the master constructor until all master threads have exited.
Worker overview
The Worker should accept two command line arguments.
master_port
: The TCP socket that the Master is actively listening on (same as the port_number
in the Master constructor)
worker_port
: The TCP socket that this worker should listen on to receive instructions from the master
On initialization, each Worker should do a similar sequence of actions as the Master:
- Get the process ID of the Worker. This will be the Worker’s unique ID, which it should then use to register with the master.
- Create a new TCP socket on the given
worker_port
and call thelisten()
function. Note: only onelisten()
thread should remain open for the whole lifetime of the worker. Ignore invalid messages, including those that fail JSON decoding. - Send the
register
message to the Master - Upon receiving the
register_ack
message, create a new thread which will be responsible for sending heartbeat messages to the Master.
NOTE: The Master should safely ignore any heartbeat messages from a Worker before that Worker successfully registers with the Master.
Shutdown [master + worker]
Because all of of our tests require shutdown to function properly, it should be implemented first. The Master can receive a special message to initiate server shutdown. The shutdown message will be of the following form and will be received on the main TCP socket:
{
"message_type": "shutdown"
}
The Master should forward this message to all of the living Workers that have registered with it. The Workers, upon receiving the shutdown message, should terminate as soon as possible. If the Worker is already in the middle of executing a task (as described below), it is okay for it to complete that task before being able to handle the shutdown message as both these happen inside a single thread.
After forwarding the message to all Workers, the Master should terminate itself.
At this point, you should be able to pass test_master_00
’s first part, and test_worker_00
completely. Another shutdown test is test_integration_00
, but you’ll need to implement worker registration first.
Worker registration [master + worker]
The Master should keep track of all Workers at any given time so that the work is only distributed among the ready Workers. Workers can be in the following states:
ready
: Worker is ready to accept workbusy
: Worker is performing a taskdead
: Worker has failed to ping for some amount of time
The Master must listen for registration messages from Workers. Once a Worker is ready to listen for instructions, it should send a message like this to the Master
{
"message_type" : "register",
"worker_host" : string,
"worker_port" : int,
"worker_pid" : int
}
The Master will then respond with a message acknowledging the Worker has registered, formatted like this. After this message has been received, the Worker should start sending heartbeats. More on this later.
{
"message_type": "register_ack",
"worker_host": string,
"worker_port": int,
"worker_pid" : int
}
After the first Worker registers with the Master, the Master should check the job queue (described later) if it has any work it can assign to the Worker (because a job could have arrived at the Master before any Workers registered). If the Master is already executing a map/group/reduce, it should assign the Worker the next available task immediately.
At this point, you should be able to pass test_master_00
and test_worker_01
and test_integration_00
.
New job request [master]
In the event of a new job, the Master will receive the following message on its main TCP socket:
{
"message_type": "new_master_job",
"input_directory": string,
"output_directory": string,
"mapper_executable": string,
"reducer_executable": string,
"num_mappers" : int,
"num_reducers" : int
}
In response to a job request, the Master will create a set of new directories where all of the temporary files for the job will go, of the form tmp/job-{id}
, where id is the current job counter (starting at 0 just like all counters). The directory structure will resemble this example (you should create 4 new folders for each job):
tmp
job-0/
mapper-output/
grouper-output/
reducer-output/
job-1/
mapper-output/
grouper-output/
reducer-output/
Remember, each MapReduce job occurs in 3 stages: Mapping, Grouping, Reducing. Workers will do the mapping and reducing using the given executable files independently, but the Master and Workers will have to cooperate to do the Grouping Stage. After the directories are setup, the Master should check if there are any Workers ready to work and check whether the MapReduce server is currently executing a job. If the server is busy, or there are no available Workers, the job should be added to an internal queue (described next) and end the function execution. If there are workers and the server is not busy, than the Master can begin job execution.
At this point, you should be able to pass test_master_01
.
Job queue [master]
If a Master receives a new job while it is already executing one or when there were no ready workers, it should accept the job, create the directories, and store the job in an internal queue until the current one has finished. Note that this means that the current job’s map, group, and reduce tasks must be complete before the next job’s Mapping Stage can begin. As soon as a job finishes, the Master should process the next pending job if there is one (and if there are ready Workers) by starting its Mapping Stage. For simplicity, in this project, your MapReduce server will only execute one MapReduce task at any time.
As noted earlier, when you see the first Worker register to work, you should check the job queue for pending jobs.
Input partitioning [master]
To start off the Mapping Stage, the Master scans the input directory and divides the input files into several partitions. Each partition (a group of files) will be a new_worker_task
.
First, the master sorts the input files by name and divides them into num_mappers
partitions using round robin. Here’s an example with num_mappers=4
.
# Before partitioning
["file03", "file02", "file01", "file05", "file04"]
# After partitioning
[
["file01","file05"],
["file02"],
["file03"],
["file04"],
]
After partitioning the input, the Master allocates tasks to Workers by sending JSON messages. Each JSON message looks like this:
{
"message_type": "new_worker_task",
"input_files": [list of strings],
"executable": string,
"output_directory": string
"worker_pid": int
}
Mapping [master]
The Master must log a message at the beginning and end of each stage. To configure the logging module, see the our example. The autograder test cases use these messages to know when your solution is ready to be checked. After adding the “begin map stage” message, you should be able to pass to pass test_master_02
.
logging.info("Master:%s begin map stage", port_num)
logging.info("Master:%s end map stage", port_num)
The Master assigns one task at a time to a Worker. If all the Workers are busy, the Master will need to wait. Because we don’t know ahead of time which worker will become available first, which worker performs which task could change with different executions of the same program (the program is non-deterministic). The autograder test cases accept all valid scenarios.
Here’s an example. Consider a MapReduce job with 2 workers, 5 input files ["file01", "file02", "file03", "file04", "file05"]
and 4 map tasks specified.
Continuing our previous example, we’ll assume that that there are two workers and neither is busy. Worker 0 registered first. Steps 3-6 would be slightly different if Worker 0 finished first.
- Master sends Worker 0 a task with input files
["file01", "file05"]
- Master sends Worker 1 a task with input files
["file02"]
- Master waits for a worker to finish. In this example, it’s Worker 1.
- Master sends Worker 1 a task with input files
["file03"]
- Master waits for a worker to finish. In this example, it’s Worker 0.
- Master sends Worker 0 a task with input files
["file04"]
At this point you should be able to pass test_master_02
.
Mapping [workers]
When a worker receives this new task message, its handle_msg
will start execution of the given executable over the specified input file, while directing the output to the given output_directory
(one output file per input file and you should run the executable on each input file). The input is passed to the executable through standard in and is outputted to a specific file. The output file names should be the same as the input file (overwrite file if it already exists). The output_directory
in the Mapping Stage will always be the mapper-output folder (i.e. tmp/job-{id}/mapper-output
).
For example, the Master should specify the input file is data/input/file01.txt
and the output file tmp/job-0/mapper-output/file01.txt
Hint: See the Python standard library subprocess.run() function.
The Worker should be agnostic to map or reduce tasks. Regardless of the type of operation, the Worker is responsible for running the specified executable over the input files one by one, and piping to the output directory for each input file. Once a Worker has finished its task, it should send a TCP message to the Master’s main socket of the form:
{
"message_type": "status",
"output_files" : [list of strings],
"status": "finished",
"worker_pid": int
}
At this point, you should be able to pass test_worker_03
, test_worker_04
, test_worker_05
.
Grouping [master + workers]
After all mappers finish, the Master will start the Grouping Stage. The Grouping Stage has two steps:
- Workers sort each mapper output file
- Master rearranges sorted mapper output files, grouping like keys together
The Master must log a message at the beginning and end of each stage.
logging.info("Master:%s begin group stage", port_num)
logging.info("Master:%s end group stage", port_num)
To start the Grouping Stage, the Master looks at all of the files created by the mappers, and assigns Workers to sort and merge the files. Sorting in the Grouping Stage should happen by line not by key. If there are more files than Workers, the Master should apportion files to workers using round-robin. Files should be sorted and workers should be sorted in order of registration. If there are fewer files than Workers, it is okay if some Workers sit idle during this stage.
After partitioning is complete, the Master sends a new_sort_task
to each worker in order of registration. For simplicity, you may assume there will be at least one live worker. The input for one sort task is a list of files. The output is one larger file using the naming convention: sorted01
, sorted02
… etc. The messages should be sent to the Workers in registration order and look like this:
{
"message_type": "new_sort_task",
"input_files": [list of strings],
"output_file": string,
"worker_pid": int
}
When a worker receives a new_sort_task
, it should sort by line, not by key. Once the Worker has finished, it sends a message to the Master like this:
{
"message_type": "status",
"output_file" : string,
"status": "finished"
"worker_pid": int
}
After sorting is complete, the Master reads the sorted files and apportions each line in num_reducers
files. Specifically, the Master allocates lines in sorted order by key using round robin to divy the keyspace into num_reducers
files. Hint: Understanding the Merging K sorted lists problem will help with sorted iteration through the keyspace. It will be helpful to use the heapq.merge()
function. Do not assume the entire data set can fit into memory at once. Do not create a single large sorted file.
Use the following naming convention for the output of the Grouping Stage Master, which will be the inputs for the Reducing Stage. File should be placed in the grouper output directory and named reduce01
, reduce02
, and so on.
At this point, you should pass test_master_08
, which verifies efficient round robin grouping. You should also be able to pass test_worker_06
, test_worker_07
and test_worker_08
.
Reducing [master + workers]
The Master must log a message at the beginning and end of each stage.
logging.info("Master:%s begin reduce stage", port_num)
logging.info("Master:%s end reduce stage", port_num)
To the Worker, this is the same as the Mapping Stage - it doesn’t need to know if it is running a map or reduce task. The Worker just runs the executable it is told to run - the Master is responsible for making sure it tells the Worker to run the correct map or reduce executable. The output_directory
in the Reducing Stage will always be the reducer-output
folder. Again, use the same output file name as the input file. Use round-robin file allocation to apportion sorted reducer files to workers ordered by registration time. Be sure to send reducing messages to workers in the order in which the workers registered.
Once a Worker has finished its task, it should send a TCP message to the Master’s main socket of the form:
{
"message_type": "status",
"output_files" : [list of strings],
"status": "finished"
"worker_pid": int
}
Wrapping up [master]
As soon as the master has received the last “finished” message for the reduce tasks for a given job, the Master should move the output files from the reducer-output
directory to the final output directory specified by the original job creation message (The value specified by the output_directory
key). In the final output directory, the files should be renamed outputfilex
, where x
is the final output file number. If there are 4 final output files, the master should rename them outputfile01, outputfile02, outputfile03, outputfile04
. Create the output directory if it doesn’t already exist. Check the job queue for the next available job, or go back to listening for jobs if there isn’t one currently in the job queue.
At this point test_master_03
, test_master_04
, and test_master_09
should be able to pass.
Fault tolerance and heartbeats [master + workers]
Workers can die at any time and may not finish tasks that you send them. Your Master must accommodate for this. If a Worker misses more than 5 pings in a row, you should assume that it has died, and assign whatever work it was responsible for to another Worker machine.
Each Worker will have a heartbeat thread to send updates to Master via UDP. The messages should look like this, and should be sent every 2 seconds:
{
"message_type": "heartbeat",
"worker_pid": int
}
If a Worker dies after the Master assigns a task but before completing the task and notifying the Master, then the Master should reassigned the task to a free worker when one becomes available. For simplicity, the Master should first complete all tasks in the current stage before reassigning tasks from dead Workers.
When a Worker dies, mark the failed Worker as dead, but do not remove it from the Master’s internal data structures. This is due to constraints on the Python dictionary data structure. It can result in an error when keys are modified while iterating over the dictionary. For more info on this, please refer to this link.
Your Master should attempt to maximize concurrency, but avoid duplication. In other words, don’t send the same task to different Workers until you know that the Worker who was previously assigned that task has died. Remember that a worker can die during any stage. If all workers die, the Master should wait until a new worker registers, then resume.
At this point all the tests should pass.
Walk-through example
See a complete example here.
Testing
We have provided a decently comprehensive public test suite for project 4. However, you should attempt to test some features of your mapreduce solution on your own. The walkthrough example above, should help you get started on testing your solution’s functionality.
We have provided a simple word count map and reduce example. Run these executables at the command line without your MapReduce code to generate the correct answers in correct.txt
. Then, concatenate your MapReduce server’s output into one file, output.txt
. The default output directory provided by submit.py
is output
, but it is configurable.
$ cat tests/testdata/input/* | tests/testdata/exec/wc_map.sh | sort | \
tests/testdata/exec/wc_reduce.sh > correct.txt
$ cat output/* | sort > output.txt
$ diff output.txt correct.txt
Note that the map and reduce executables can be in any language - your server should not limit us to running map and reduce jobs written in Python! To help you test this, we have also provided you with a word count solution written in a shell script (see section below).
Note that the autograder will watch your worker and master only for the messages we specified above. Your code should have no other dependency besides the communication spec, and the messages sent in your system must match those listed in this spec exactly.
Run the public unit tests. Add the -vvs
flag to pytest to show output and the--log-cli-level=INFO
flag to show logging messages.
$ pwd
/Users/awdeorio/src/eecs485/p4-mapreduce
$ pytest -vvs --log-cli-level=INFO
Test for busy waiting
A solution that busy-waits may pass on your development machine and fail on the autograder due to a timeout. Your laptop is probably much more powerful than the restricted autograder environment, so you might not notice the performance problem locally. See the Processes, Threads and Sockets in Python Tutorial for an explanation of busy-waiting.
To detect busy waiting, time a master without any workers. After a few seconds, kill it by pressing Control
-C
several times. Ignore any errors or exceptions. We can tell that this solution busy-waits because the user time is similar to the real time.
$ time mapreduce-master 6000
INFO:root:Starting master:6000
...
real 0m4.475s
user 0m4.429s
sys 0m0.039s
This example does not busy wait. Notice that the user time is small compared to the real time.
$ time mapreduce-master 6000
INFO:root:Starting master:6000
...
real 0m3.530s
user 0m0.275s
sys 0m0.036s
Testing fault tolerance
This section will help you verify fault tolerance. The general idea is to kill a worker while it’s running an intentionally slow MapReduce job.
We have provided an intentionally slow MapReduce job in tests/testdata/exec/wc_map_slow.sh
and tests/testdata/exec/wc_reduce_slow.sh
. These executables use sleep
statements to simulate a slow running task. You may want to increase the sleep time.
$ grep -B1 sleep tests/testdata/exec/wc_*slow.sh
tests/testdata/exec/wc_map_slow.sh-# Simulate a long running job
tests/testdata/exec/wc_map_slow.sh:sleep 3
--
tests/testdata/exec/wc_reduce_slow.sh-# Simulate a long running job
tests/testdata/exec/wc_reduce_slow.sh:sleep 3
First, start one MapReduce Master and two Workers. Wait for them to start up.
$ pkill -f mapreduce- # Kill any stale Master or Worker
$ mapreduce-master 6000 & # Start Master
$ mapreduce-worker 6000 6001 & # Start Worker 0
$ mapreduce-worker 6000 6002 & # Start Worker 1
Submit an intentionally slow MapReduce job and wait for the mappers to begin executing.
$ mapreduce-submit \
--input tests/testdata/input_small \
--output output \
--mapper tests/testdata/exec/wc_map_slow.sh \
--reducer tests/testdata/exec/wc_reduce_slow.sh \
--nmappers 2 \
--nreducers 2
Kill one of the workers while it is executing a map or reduce job. Quickly use pgrep
to find the PID of a worker, and then use kill
on its PID. You can use pgrep
again to check that you actually killed the worker.
$ pgrep -af mapreduce-worker # Linux
$ pgrep -lf mapreduce-worker # macOS
77811 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6001
77893 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002
$ kill 77811
$ pgrep -lf mapreduce-worker
77893 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python /Users/awdeorio/src/eecs485/p4-mapreduce/env/bin/mapreduce-worker 6000 6002
Here’s a way to kill one worker with one line.
$ pgrep -af mapreduce-worker | head -n1 | awk '{print $1}' | xargs kill # Linux
$ pgrep -lf mapreduce-worker | head -n1 | awk '{print $1}' | xargs kill # macOS
[2]- Terminated: 15 mapreduce-worker 6000 6001
Finally, verify the correct behavior. Read to logging messages from your Master and Worker to consider:
- How many mapping tasks should the first Worker have received?
- How many mapping tasks should the second Worker have received?
- How many sorting and reducing tasks should the first and the second Worker receive?
Pro-tip: Script this test, adding sleep
commands in between commands to give them time for startup and TCP communication.
Code style
As in previous projects, all Python code should contain no errors or warnings from pycodestyle
, pydocstyle
, and pylint
. Run pylint like this:
$ pylint --disable=no-value-for-parameter --disable=method-hidden mapreduce
You may not use any external dependencies aside from what is provided in setup.py
.
Submitting and grading
One team member should register your group on the autograder using the create new invitation feature.
Submit a tarball to the autograder, which is linked from https://eecs485.org. Include the --disable-copyfile
flag only on macOS.
$ tar \
--disable-copyfile \
--exclude '*__pycache__*' \
--exclude '*tmp*' \
-czvf submit.tar.gz \
setup.py \
bin \
mapreduce
Rubric
This is an approximate rubric.
Deliverable | Value |
---|---|
Public unit tests | 60% |
Hidden unit tests run after the deadline | 40% |
Further reading
Google’s original MapReduce paper
FAQ
Q: I can start my Master/Worker with commands correctly, but when I run the tests, my code seems to not doing anything.
A: Make sure you start up your Master/Worker instances in the constructor.
Q: What is StopIteration
error?
A: Repeated calls to the mock test functions (fake Master or fake Worker) will return values from a hardcoded sequence (an iterable). When the iterable is exhausted, a StopIteration
exception is raised. https://docs.python.org/3/library/exceptions.html#StopIteration
In the tests, we are using a Mock
class to fake the socket functions. Whenever socket.recv()
is called, the tests will send a pre-defined fake message (see worker_message_generator
and master_message_generator
in each test for details). When socket.recv()
is called but no message is given (the message generators run out of pre-defined fake messages), a StopIteration
exception is raised.
Make sure socket.recv()
is not called after the last fake message, which is the shutdown message.
Q: Why am I getting a Failed to close threads
or Connection Refused
error?
A: In the tests, we call the Master
/ Worker
class constructors to run your program. Make sure the constructors start your program and only exit when all threads are closed.
Q: Why do I get TypeError: Object of type PosixPath is not JSON serializable
?
A: By default the Python JSON
library does not know how to convert a pathlib.Path
object to a string. Here’s how to do it:
class PathJSONEncoder(json.JSONEncoder):
"""
Extended the Python JSON encoder to encode Pathlib objects.
Docs: https://docs.python.org/3/library/json.html
Usage:
>>> json.dumps({
"executable": TESTDATA_DIR/"exec/wc_map.sh",
}, cls=PathJSONEncoder)
"""
def default(self, o):
"""Override base class method to include Path object serialization."""
if isinstance(o, pathlib.Path):
return str(o)
return super().default(o)