p4-mapreduce

Processes, Threads and Sockets in Python

This tutorial will walk you through small examples using threads and sockets in Python. You will also learn command line tools for interacting with processes, threads and sockets.

Table of Contents

Processes

A process is an executing program with a dedicated memory space. Many processes run at the same time on a computer. When you execute a script (like python test.py), your code runs in a new process. The biggest thing to note is that processes have isolated memory spaces so one process cannot access the data of another process.

See all the running processes on a machine.

$ ps -ax

Search for running Python processes.

$ pgrep -af python  # Linux or WSL
$ pgrep -lf python  # macOS

Threads

Threads are similar to processes in that they allow for parallelization of work. However, each thread is owned by a single process. Unlike processes, threads can share the same memory space and can access each other’s data. Threads are owned and created by a single process, and are only alive as long as the parent process is alive. As soon as a process starts, all work is done in the main thread created by default but you can add new threads at runtime.

See all running threads and processes on a machine.

$ ps -axM

The Python thread library facilitates running functions in separate threads.

Save this Python program to example_thread.py

"""Example threading."""
import os
import threading


def main():
    """Test Threading."""
    threads = []
    for i in range(5):
        thread = threading.Thread(target=worker, args=(i,))
        threads.append(thread)
        thread.start()


def worker(worker_id):
    """Worker thread."""
    pid = os.getpid()
    print("Worker {}  pid={}".format(worker_id, pid))
    while True:
        # spin
        pass


if __name__ == "__main__":
    main()

Run the example. Your PIDs may be different.

$ python3 example_thread.py 
Worker 0  pid=89660
Worker 1  pid=89660
Worker 2  pid=89660
Worker 3  pid=89660
Worker 4  pid=89660

In a second terminal window, find the process ID of the running python3 example_thread.py. Then, use the process ID to view its threads. Notice that there is one process with 5 threads.

Linux

$ pgrep -af example_thread
4457 python3 example_thread.py
$ ps -m 4457
  PID TTY      STAT   TIME COMMAND
 4457 pts/2    -      4:13 python3 example_thread.py
    - -        SNl+   0:00 -
    - -        SNl+   0:49 -
    - -        RNl+   0:51 -
    - -        SNl+   0:51 -
    - -        SNl+   0:50 -
    - -        SNl+   0:50 -

macOS

$ pgrep -lf example_thread
89660 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python example_thread.py
$ ps -M <your PID>
USER       PID   TT   %CPU STAT PRI     STIME     UTIME COMMAND
awdeorio 89660 s003    0.0 S    31T   0:00.01   0:00.03 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Ve
         89660        21.9 S    31T   0:01.41   1:22.50 
         89660        21.6 S    29T   0:01.41   1:22.93 
         89660        18.7 S    30T   0:01.41   1:22.81 
         89660        18.9 R    29T   0:01.42   1:23.33 
         89660        20.7 S    30T   0:01.41   1:22.01 

Stop the example running in the first terminal (python3 example_thread.py) by pressing Control-C.

Busy waiting

Busy-waiting (or spinning) is a parallel programming pitfall where a thread consumes CPU resources by repeatedly checking something. For example, in EECS 485 Project 4, a thread in the MapReduce master might repeatedly check if it should shut down.

In EECS 485, you can avoid busy-waiting with Python’s time.sleep(). The sleep() function gives up execution for a period of time, avoiding high CPU consumption.

If you have prior experience with multi-threaded programming, then feel free look into Python Event Objects for a more advanced way to synchronize threads and avoid busy-waiting.

Bad example

The following example is a small, multi-threaded Python program doing busy waiting. Notice that there is no time.sleep() inside the while loop.

"""wait.py - an example of busy waiting."""
import threading
import time


def main():
    """Main thread, which spawns a second wait() thread."""
    print("main() starting")
    signals = {"shutdown": False}
    thread = threading.Thread(target=wait, args=(signals,))
    thread.start()
    time.sleep(1) # This gives up execution to the 'wait' thread
    # The shutdown variable will be set to true in approximately 1 second
    signals["shutdown"] = True
    print("main() shutting down")


def wait(signals):
    """Wait for shutdown signal with sleep in between."""
    print("wait() starting")
    while not signals["shutdown"]:
        print("working")
        # time.sleep(0.1)  # Uncomment to avoid busy waiting
    print("wait() shutting down")


if __name__ == "__main__":
    main()

Run the example, notice that it prints working many many times because the while loops executes very frequently. Your number may be different.

$ python3 wait.py
main() starting
wait() starting
working
working
working
...
main() shutting down
wait() shutting down
$ python3 wait.py | grep -c working
2579689

Run the example again, keeping track of consumed CPU time. Notice that the user CPU time (time actively using the CPU) is similar to the real CPU time (stop_time - start_time).

$ time python3 wait.py
main() starting
wait() starting
working
working
working
...
main() shutting down
wait() shutting down

real	0m1.061s
user	0m0.711s
sys	0m0.189s

Good example

Modify the above example by adding time.sleep(0.1) to the inner loop. Your loop should look like this:

while not signals["shutdown"]:
    print("working")
    time.sleep(0.1)  # Uncomment to avoid busy waiting

Run the example again, measuring CPU consumption. Notice that the user time is small compared to the last example. Problem solved!

$ time python3 wait.py
main() starting
wait() starting
working
working
working
working
working
working
working
working
working
working
main() shutting down
wait() shutting down

real	0m1.077s
user	0m0.044s
sys	0m0.008s

Further reading

If you’re curious about how this works, you can read a more detailed explanation on Stack Overflow here. You can also view the Python source code for the time module here.

Sockets

All communication on the web happens via TCP (Transmission Control Protocol) or UDP (User Datagram Protocol). A socket creates and manages a TCP connection, and all sockets use a specific port. Sockets can be used to send data to a specific port, and to listen for data on a specific port.

The Python socket library facilitates sending and receiving messages over a network. We’ll create a TCP server that receives message and a client that sends messages.

Socket server

Save this server code to example_socket_server.py. It starts a TCP server and waits for a connection. Upon connection, it reads a message in chunks, until the client disconnects.

We make a simplifying assumption that the client will always cleanly close the connection. See this article for a more advanced discussion of when to stop receiving data from a socket.

"""Example socket server."""
import socket
import json


def main():
    """Test Socket Server."""
    # Create an INET, STREAMing socket, this is TCP
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the server
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("localhost", 8000))
    sock.listen(5)

    # Connect to a client
    clientsocket, address = sock.accept()
    print("Connection from", address[0])

    # Receive data, one chunk at a time.  When the client closes the
    # connection, recv() returns empty data, which breaks out of the loop.  We
    # make a simplifying assumption that the client will always cleanly close
    # the connection.
    message_chunks = []
    while True:
        data = clientsocket.recv(4096)
        if not data:
            break
        message_chunks.append(data)
    clientsocket.close()

    # Decode UTF8 and parse JSON data
    message_bytes = b''.join(message_chunks)
    message_str = message_bytes.decode("utf-8")
    message_dict = json.loads(message_str)
    print(message_dict)


if __name__ == "__main__":
    main()

Run the server. There is no output yet, but it is listening for a message.

$ python3 example_socket_server.py 

Send a message to the server in a second terminal window.

$ echo '{"hello": "world"}' | nc -c localhost 8000  # macOS/WSL/Linux (older)
$ echo '{"hello": "world"}' | nc -N localhost 8000  # Linux/WSL (newer)
$ echo '{"hello": "world"}' | nc -C localhost 8000  # Linux (less common)

In the first terminal (the one running the socket server), you’ll see the message appear and the server shut down.

$ python3 example_socket_server.py 
Connection from 127.0.0.1
{'hello': 'world'}

Socket client

Save this client code to example_socket_client.py.

"""Example socket client."""
import socket


def main():
    """Test Socket Client."""
    # create an INET, STREAMing socket, this is TCP
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # connect to the server
    sock.connect(("localhost", 8000))

    # send a message
    message = json.dumps({"hello": "world"})
    sock.sendall(message.encode('utf-8'))
    sock.close()


if __name__ == "__main__":
    main()

Start a server on localhost port 8000 using netcat.

$ nc -l -p 8000         # WSL/Linux
$ nc -l localhost 8000  # macOS, newer
$ nc -l 8000            # macOS, newer

In a second terminal window, run the example client.

$ python3 example_socket_client.py 

In the first terminal (the one running the netcat test server), you’ll see the message appear and the server shut down.

{"hello": "world"}

Socket client and server

In the previous examples, we used a Python program for one side of the connection, and netcat (nc) for other. In this example, we’ll put our two Python programs together.

Start your Python socket server.

$ python3 example_socket_server.py 

In a second terminal window, run your Python socket client.

$ python3 example_socket_client.py 

In the first terminal (the one running the Python test server), you’ll see the message appear and the server shut down.

$ python3 example_socket_server.py 
Connection from 127.0.0.1
{'hello': 'world'}

Sockets and waiting

This example shows how to wait for two things at the same time: a message from a socket, or a shutdown signal. It will be helpful with implementing graceful shutdown in EECS 485 project 4.

Earlier, we saw how to avoid busy waiting while waiting on a shutdown signal. (Busy waiting).

while not signals["shutdown"]:
    print("working")
    time.sleep(1)

We also saw how to wait for a message from a socket (Socket server).

message_chunks = []
while True:
    data = clientsocket.recv(4096)
    if not data:
        break
    message_chunks.append(data)

Bad example

If we naively combine these two examples, we have a bug. The clientsocket.accept() function blocks until a connection is received. That means the loop will never check for shutdown because it’s stuck waiting for a connection.

while not signals["shutdown"]:
    clientsocket, address = sock.accept()
    # ...
    data = clientsocket.recv(max_data_size)
    # ...

Good example

Configure the socket to block for one 1 second instead of indefinitely.

sock.settimeout(1)

In the case of a timeout, catch the exception and go back to the top of the loop, which will check the shutdown signal.

while not signals["shutdown"]:
    try:
        clientsocket, address = sock.accept()
    except socket.timeout:
        continue
    # ...

We use the same technique for the recv() function, because it could also time out in the middle of a receiving a long message.

message_chunks = []
while True:
    try:
        data = clientsocket.recv(4096)
    except socket.timeout:
        continue
    if not data:
        break
    message_chunks.append(data)

Here is the complete example, save it to example_shutdown.py.

"""Example of waiting on a socket or a shutdown signal."""
import threading
import time
import socket
import json


def main():
    """Main thread, which spawns a second listen() thread."""
    print("main() starting")
    signals = {"shutdown": False}
    thread = threading.Thread(target=listen, args=(signals,))
    thread.start()
    time.sleep(10) # This gives up execution to the 'listen' thread
    signals["shutdown"] = True  # Tell listen thread to shut down
    thread.join()  # Wait for listen thread to shut down
    print("main() shutting down")


def listen(signals):
    """Wait on a message from a socket OR a shutdown signal."""
    print("listen() starting")

    # Create an INET, STREAMing socket, this is TCP
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the server
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("localhost", 8000))
    sock.listen(5)

    # Socket accept() and recv() will block for a maximum of 1 second.  If you
    # omit this, it blocks indefinitely, waiting for a connection.
    sock.settimeout(1)

    while not signals["shutdown"]:
        print("listening")

        # Listen for a connection for 1s.  The socket library avoids consuming
        # CPU while waiting for a connection.
        try:
            clientsocket, address = sock.accept()
        except socket.timeout:
            continue
        print("Connection from", address[0])

        # Receive data, one chunk at a time.  If recv() times out before we can
        # read a chunk, then go back to the top of the loop and try again.
        # When the client closes the connection, recv() returns empty data,
        # which breaks out of the loop.  We make a simplifying assumption that
        # the client will always cleanly close the connection.
        message_chunks = []
        while True:
            try:
                data = clientsocket.recv(4096)
            except socket.timeout:
                continue
            if not data:
                break
            message_chunks.append(data)
        clientsocket.close()

        # Decode list-of-byte-strings to UTF8 and parse JSON data
        message_bytes = b''.join(message_chunks)
        message_str = message_bytes.decode("utf-8")
        message_dict = json.loads(message_str)
        print(message_dict)

    print("listen() shutting down")


if __name__ == "__main__":
    main()

Run the server. See that it listens for messages for 10 seconds, then shuts down. Also notice that it does not consume a lot of CPU while waiting (low user time).

$ time python3 example_shutdown.py
main() starting
listen() starting
listening
listening
listening
listening
listening
listening
listening
listening
listening
listening
main() shutting down
listen() shutting down

real	0m10.104s
user	0m0.053s
sys	0m0.011s

Run the server again.

$ python3 example_shutdown.py
main() starting
listen() starting

Send a message to the server in a second terminal window. Do this several times.

$ echo '{"hello": "world"}' | nc -c localhost 8000  # macOS/WSL/Linux (older)
$ echo '{"hello": "world"}' | nc -c localhost 8000  # macOS/WSL/Linux (older)
$ echo '{"hello": "world"}' | nc -N localhost 8000  # WSL/Linux (newer)
$ echo '{"hello": "world"}' | nc -N localhost 8000  # WSL/Linux (newer)
$ echo '{"hello": "world"}' | nc -C localhost 8000  # Linux (less common)
$ echo '{"hello": "world"}' | nc -C localhost 8000  # Linux (less common)

In the first terminal (the one running the socket server), you’ll see the message appear and the server will continue listening. After 10 seconds, the server shuts down.

$ python3 example_shutdown.py
main() starting
listen() starting
listening
listening
listening
listening
Connection from 127.0.0.1
{'hello': 'world'}
listening
listening
listening
Connection from 127.0.0.1
{'hello': 'world'}
listening
listening
listening
listening
listening
main() shutting down
listen() shutting down