p4-mapreduce

Threads and Sockets in Python

This tutorial will walk you through small examples using threads and sockets in Python. You will also learn command line tools for interacting with processes, threads and sockets.

Processes

A process is an executing program with a dedicated memory space. Many processes run at the same time on a computer. When you execute a script (like python test.py), your code runs in a new process. The biggest thing to note is that processes have isolated memory spaces so one process cannot access the data of another process.

See all the running processes on a machine.

$ ps -ax

Search for running Python processes.

$ pgrep -af python  # Linux or WSL
$ pgrep -lf python  # macOS

Threads

Threads are similar to processes in that they allow for parallelization of work. However, each thread is owned by a single process. Unlike processes, threads can share the same memory space and can access each other’s data. Threads are owned and created by a single process, and are only alive as long as the parent process is alive. As soon as a process starts, all work is done in the main thread created by default but you can add new threads at runtime.

See all running threads and processes on a machine.

$ ps -axM

The Python thread library facilitates running functions in separate threads.

Save this Python program to example_thread.py

"""Example threading."""
import os
import threading


def main():
    """Test Threading."""
    threads = []
    for i in range(5):
        thread = threading.Thread(target=worker, args=(i,))
        threads.append(thread)
        thread.start()


def worker(worker_id):
    """Worker thread."""
    pid = os.getpid()
    print("Worker {}  pid={}".format(worker_id, pid))
    while True:
        # spin
        pass


if __name__ == "__main__":
    main()

Run the example. Your PIDs may be different.

$ python3 example_thread.py
Worker 0  pid=89660
Worker 1  pid=89660
Worker 2  pid=89660
Worker 3  pid=89660
Worker 4  pid=89660

In a second terminal window, find the process ID of the running python3 example_thread.py. Then, use the process ID to view its threads. Notice that there is one process with 5 threads.

Linux

$ pgrep -af example_thread
4457 python3 example_thread.py
$ ps -m 4457
  PID TTY      STAT   TIME COMMAND
 4457 pts/2    -      4:13 python3 example_thread.py
    - -        SNl+   0:00 -
    - -        SNl+   0:49 -
    - -        RNl+   0:51 -
    - -        SNl+   0:51 -
    - -        SNl+   0:50 -
    - -        SNl+   0:50 -

macOS

$ pgrep -lf example_thread
89660 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Versions/3.7/Resources/Python.app/Contents/MacOS/Python example_thread.py
$ ps -M <your PID>
USER       PID   TT   %CPU STAT PRI     STIME     UTIME COMMAND
awdeorio 89660 s003    0.0 S    31T   0:00.01   0:00.03 /usr/local/Cellar/python/3.7.2_1/Frameworks/Python.framework/Ve
         89660        21.9 S    31T   0:01.41   1:22.50
         89660        21.6 S    29T   0:01.41   1:22.93
         89660        18.7 S    30T   0:01.41   1:22.81
         89660        18.9 R    29T   0:01.42   1:23.33
         89660        20.7 S    30T   0:01.41   1:22.01

Stop the example running in the first terminal (python3 example_thread.py) by pressing Control-C.

Busy waiting

Busy-waiting (or spinning) is a parallel programming pitfall where a thread consumes CPU resources by repeatedly checking something. For example, in EECS 485 Project 4, a thread in the MapReduce master might repeatedly check if it should shut down.

In EECS 485, you can avoid busy-waiting with Python’s time.sleep(). The sleep() function gives up execution for a period of time, avoiding high CPU consumption.

If you have prior experience with multi-threaded programming, then feel free look into Python Event Objects for a more advanced way to synchronize threads and avoid busy-waiting.

Bad example

The following example is a small, multi-threaded Python program doing busy waiting. Notice that there is no time.sleep() inside the while loop.

"""wait.py - an example of busy waiting."""
import threading
import time


def main():
    """Main thread, which spawns a second wait() thread."""
    print("main() starting")
    signals = {"shutdown": False}
    thread = threading.Thread(target=wait, args=(signals,))
    thread.start()
    time.sleep(1) # This gives up execution to the 'wait' thread
    # The shutdown variable will be set to true in approximately 1 second
    signals["shutdown"] = True
    print("main() shutting down")


def wait(signals):
    """Wait for shutdown signal with sleep in between."""
    print("wait() starting")
    while not signals["shutdown"]:
        print("working")
        # time.sleep(0.1)  # Uncomment to avoid busy waiting
    print("wait() shutting down")


if __name__ == "__main__":
    main()

Run the example, notice that it prints working many many times because the while loops executes very frequently. Your number may be different.

$ python3 wait.py
main() starting
wait() starting
working
working
working
...
main() shutting down
wait() shutting down
$ python3 wait.py | grep -c working
2579689

Run the example again, keeping track of consumed CPU time. Notice that the user CPU time (time actively using the CPU) is similar to the real CPU time (stop_time - start_time).

$ time python3 wait.py
main() starting
wait() starting
working
working
working
...
main() shutting down
wait() shutting down

real	0m1.061s
user	0m0.711s
sys	0m0.189s

Good example

Modify the above example by adding time.sleep(0.1) to the inner loop. Your loop should look like this:

while not signals["shutdown"]:
    print("working")
    time.sleep(0.1)  # Uncomment to avoid busy waiting

Run the example again, measuring CPU consumption. Notice that the user time is small compared to the last example. Problem solved!

$ time python3 wait.py
main() starting
wait() starting
working
working
working
working
working
working
working
working
working
working
main() shutting down
wait() shutting down

real	0m1.077s
user	0m0.044s
sys	0m0.008s

Further reading

If you’re curious about how this works, you can read a more detailed explanation on Stack Overflow here. You can also view the Python source code for the time module here.

Sockets

All communication on the web happens via TCP (Transmission Control Protocol) or UDP (User Datagram Protocol). A socket creates and manages a connection, and all sockets use a specific port. Use a socket to send data to a port or listen for data on a port.

The Python socket library facilitates sending and receiving messages over a network. First, we’ll create a TCP server that receives message and a client that sends messages. Then, we’ll do an example with UDP.

TCP Socket server

Save this server code to example_tcp_server.py. It starts a TCP server and waits for a connection. Upon connection, it reads a message in chunks, until the client disconnects.

We make a simplifying assumption that the client will always cleanly close the connection. See this article for a more advanced discussion of when to stop receiving data from a socket.

"""Example TCP socket server."""
import socket
import json


def main():
    """Test TCP Socket Server."""
    # Create an INET, STREAMing socket, this is TCP
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the server
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("localhost", 8000))
    sock.listen()

    # Socket accept() and recv() will block for a maximum of 1 second.  If you
    # omit this, it blocks indefinitely, waiting for a connection.
    sock.settimeout(1)

    while True:
        # Wait for a connection for 1s.  The socket library avoids consuming
        # CPU while waiting for a connection.
        try:
            clientsocket, address = sock.accept()
        except socket.timeout:
            continue
        print("Connection from", address[0])

        # Receive data, one chunk at a time.  If recv() times out before we can
        # read a chunk, then go back to the top of the loop and try again.
        # When the client closes the connection, recv() returns empty data,
        # which breaks out of the loop.  We make a simplifying assumption that
        # the client will always cleanly close the connection.
        message_chunks = []
        while True:
            try:
                data = clientsocket.recv(4096)
            except socket.timeout:
                continue
            if not data:
                break
            message_chunks.append(data)
        clientsocket.close()

        # Decode list-of-byte-strings to UTF8 and parse JSON data
        message_bytes = b''.join(message_chunks)
        message_str = message_bytes.decode("utf-8")

        try:
            message_dict = json.loads(message_str)
        except json.JSONDecodeError:
            continue
        print(message_dict)


if __name__ == "__main__":
    main()

Run the server. There is no output yet, but it is listening for a message.

$ python3 example_tcp_server.py

Send a message to the server in a second terminal window.

$ echo '{"hello": "world"}' | nc -c localhost 8000  # macOS/WSL/Linux (older)
$ echo '{"hello": "world"}' | nc -N localhost 8000  # Linux/WSL (newer)
$ echo '{"hello": "world"}' | nc -C localhost 8000  # Linux (less common)

In the first terminal (the one running the TCP server), you’ll see the message appear. Kill the server with Control + c.

$ python3 example_tcp_server.py
Connection from 127.0.0.1
{'hello': 'world'}

TCP Socket client

Save this client code to example_tcp_client.py.

"""Example TCP socket client."""
import socket
import json


def main():
    """Test TCP Socket Client."""
    # create an INET, STREAMing socket, this is TCP
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # connect to the server
    sock.connect(("localhost", 8000))

    # send a message
    message = json.dumps({"hello": "world"})
    sock.sendall(message.encode('utf-8'))
    sock.close()


if __name__ == "__main__":
    main()

Start a server on localhost port 8000 using netcat.

$ nc -l -p 8000         # WSL/Linux
$ nc -l localhost 8000  # macOS, newer
$ nc -l 8000            # macOS, newer

In a second terminal window, run the example client.

$ python3 example_tcp_client.py

In the first terminal (the one running the netcat test server), you’ll see the message appear. Kill the server with Control + c.

{"hello": "world"}

TCP Socket client and server

In the previous examples, we used a Python program for one side of the connection, and netcat (nc) for the other side. In this example, we’ll put our two Python programs together.

Start your Python TCP socket server.

$ python3 example_tcp_server.py

In a second terminal window, run your Python TCP socket client.

$ python3 example_tcp_client.py

In the first terminal (the one running the Python test server), you’ll see the message appear. Kill the server with Control + c.

$ python3 example_tcp_server.py
Connection from 127.0.0.1
{'hello': 'world'}

UDP Socket server

In this example, we’ll create a UDP server and UDP client just like the TCP server and TCP client above. What’s the difference? A TCP socket will resend a message if it doesn’t receive an acknowledgment from the recipient. A UDP socket doesn’t check if the message was received. Two examples where UDP is useful are video streaming and heartbeat messages.

Save this code to example_udp_server.py. This example listens for incoming UDP messages in a loop. Because UDP is connectionless, we do not call sock.listen(), which is different from TCP. Also, with UDP, we must receive the entire message at once, unlike TCP which uses streaming to receive message chunks separately. This means we do not need to maintain a message_chunks list.

"""Example UDP socket server."""
import socket
import json


def main():
    """Test UDP Socket Server."""
    # Create an INET, DGRAM socket, this is UDP
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Bind the UDP socket to the server 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("localhost", 8001))
    sock.settimeout(1)

    # No sock.listen() since UDP doesn't establish connections like TCP

    # Receive incoming UDP messages
    while True:
        try:
            message_bytes = sock.recv(4096)
        except socket.timeout:
            continue
        message_str = message_bytes.decode("utf-8")
        message_dict = json.loads(message_str)
        print(message_dict)


if __name__ == "__main__":
    main()

UDP Socket client

Save this code to example_udp_client.py. It sends a message to a UDP socket.

"""Example UDP socket client."""
import socket
import json


def main():
    """Test UDP Socket Client."""
    # Create an INET, DGRAM socket, this is UDP
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Connect to the UDP socket on server
    sock.connect(("localhost", 8001))

    # Send a message
    message = json.dumps({"hello": "world"})
    sock.sendall(message.encode('utf-8'))
    sock.close()


if __name__ == "__main__":
    main()

UDP Socket client and server

Let’s put our UDP client and UDP server together.

Start your Python UDP socket server.

$ python3 example_udp_server.py

Start your Python UDP socket client in a second terminal.

$ python3 example_udp_client.py

In the first terminal (the one running the Python test server), you’ll see the message appear. Kill the server with Control + c.

{'hello': 'world'}

Sockets and waiting

This example shows how to wait for two things at the same time: a message from a TCP socket or a shutdown signal. It will be helpful with implementing graceful shutdown in EECS 485 project 4. Be sure to read the explanation and pitfalls below.

Save this example to example_shutdown.py.

"""Example of waiting on a socket or a shutdown signal."""
import threading
import time
import socket
import json


def main():
    """Main thread, which spawns a second server() thread."""
    print("main() starting")
    signals = {"shutdown": False}
    thread = threading.Thread(target=server, args=(signals,))
    thread.start()
    time.sleep(10) # Give up execution to the 'server' thread (see Pitfall 1)
    signals["shutdown"] = True  # Tell server thread to shut down
    thread.join()  # Wait for server thread to shut down
    print("main() shutting down")


def server(signals):
    """Wait on a message from a socket OR a shutdown signal."""
    print("server() starting")

    # Create an INET, STREAMing socket, this is TCP
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Bind the socket to the server
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(("localhost", 8000))
    sock.listen()

    # Socket accept() and recv() will block for a maximum of 1 second.  If you
    # omit this, it blocks indefinitely, waiting for a connection.
    sock.settimeout(1)

    while not signals["shutdown"]:
        print("waiting ...")

        # Wait for a connection for 1s.  The socket library avoids consuming
        # CPU while waiting for a connection.
        try:
            clientsocket, address = sock.accept()
        except socket.timeout:
            continue
        print("Connection from", address[0])

        # Receive data, one chunk at a time.  If recv() times out before we can
        # read a chunk, then go back to the top of the loop and try again.
        # When the client closes the connection, recv() returns empty data,
        # which breaks out of the loop.  We make a simplifying assumption that
        # the client will always cleanly close the connection.
        message_chunks = []
        while True:
            try:
                data = clientsocket.recv(4096)
            except socket.timeout:
                continue
            if not data:
                break
            message_chunks.append(data)
        clientsocket.close()

        # Decode list-of-byte-strings to UTF8 and parse JSON data
        message_bytes = b''.join(message_chunks)
        message_str = message_bytes.decode("utf-8")

        try:
            message_dict = json.loads(message_str)
        except json.JSONDecodeError:
            continue
        print(message_dict)

    print("server() shutting down")


if __name__ == "__main__":
    main()

Run the server. See that it waits for messages for 10 seconds, then shuts down. Also notice that it does not consume a lot of CPU while waiting (low user time). See the explanation below for more detail.

$ time python3 example_shutdown.py
main() starting
server() starting
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
server() shutting down
main() shutting down

real	0m10.081s
user	0m0.033s
sys	0m0.014s

Run the server again.

$ python3 example_shutdown.py
main() starting
server() starting

Send a message to the server in a second terminal window. Do this several times.

$ echo '{"hello": "world"}' | nc -c localhost 8000  # macOS/WSL/Linux (older)
$ echo '{"hello": "world"}' | nc -c localhost 8000  # macOS/WSL/Linux (older)
$ echo '{"hello": "world"}' | nc -N localhost 8000  # WSL/Linux (newer)
$ echo '{"hello": "world"}' | nc -N localhost 8000  # WSL/Linux (newer)
$ echo '{"hello": "world"}' | nc -C localhost 8000  # Linux (less common)
$ echo '{"hello": "world"}' | nc -C localhost 8000  # Linux (less common)

In the first terminal (the one running the socket server), you’ll see the message appear and the server will continue listening. After 10 seconds, the server shuts down.

$ python3 example_shutdown.py
main() starting
server() starting
waiting ...
waiting ...
waiting ...
waiting ...
Connection from 127.0.0.1
{'hello': 'world'}
waiting ...
waiting ...
waiting ...
Connection from 127.0.0.1
{'hello': 'world'}
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
main() shutting down
server() shutting down

Explanation

We configured the socket to block for one 1 second instead of indefinitely.

sock.settimeout(1)

In the case of a timeout, catch the exception and go back to the top of the loop, which will check the shutdown signal.

while not signals["shutdown"]:
    try:
        clientsocket, address = sock.accept()
    except socket.timeout:
        continue
    # ...

We use the same technique for the recv() function, because it could also time out in the middle of a receiving a long message.

message_chunks = []
while True:
    try:
        data = clientsocket.recv(4096)
    except socket.timeout:
        continue
    if not data:
        break
    message_chunks.append(data)

Pitfalls

Pitfall 1: Make sure to remove the time.sleep(10) in your Master once you implement shutdown message handling. Instead, you’ll listen for a shutdown message and won’t need to call time.sleep(10).

Pitfall 2: As long as you set sock.settimeout(1), you do not need a time.sleep() in loops containing sock.accept(). With a timeout of 1 second, sock.accept() plays the role of time.sleep(1).