p4-mapreduce

Threads and Sockets in Python

This tutorial will walk you through small examples using threads and sockets in Python. You will also learn to use command line tools for interacting with processes, threads and sockets.

Processes

A process is an executing program with a dedicated memory space. Many processes run at the same time on a computer. When you execute a script (like python3 test.py), your code runs in a new process. The biggest thing to note is that processes have isolated memory spaces, so one process cannot access the data of another process.

See all the running processes on a machine.

$ ps -ax

Search for running Python processes.

$ pgrep -af python  # Linux or WSL
$ pgrep -lf python  # macOS

Threads

Threads are similar to processes in that they allow for parallelization of work. However, each thread is owned by a single process. Unlike processes, threads can share the same memory space and can access each other’s data. Threads are owned and created by a single process, and are only alive as long as the parent process is alive. As soon as a process starts, all work is done in the main thread created by default, but you can add new threads at runtime.

See all running threads and processes on a machine.

$ ps -axM

The Python threading library facilitates running functions in separate threads.

Save this Python program to example_thread.py

"""Example threading."""
import time
import threading


def main():
    """Test Threading."""
    print("main() starting")
    threads = []
    thread = threading.Thread(target=worker)
    threads.append(thread)
    # primer-spec-highlight-start
    thread.start()
    # primer-spec-highlight-end
    print("main() can do other work here")
    # ...
    print("main() waiting for worker() to exit")
    # primer-spec-highlight-start
    thread.join()
    # primer-spec-highlight-end
    print("main() shutting down")


def worker():
    """Worker thread."""
    print("worker() starting")
    time.sleep(10)
    print("worker() shutting down")


if __name__ == "__main__":
    main()

Take a second look at the code above. The main() function runs concurrently with the worker() function after thread.start(). When the Main thread runs thread.join(), it will wait until the second thread finishes before running any more code.

Run the example. Two functions run concurrently: main() and worker().

$ python3 example_thread.py
main() starting
worker() starting
main() can do other work here
main() waiting for worker() to exit
worker() shutting down
main() shutting down

Busy-waiting

Busy-waiting (or spinning) is a parallel programming pitfall where a thread consumes CPU resources by repeatedly checking something. For example, in EECS 485 Project 4, a thread in the MapReduce manager might repeatedly check if it should shut down.

In EECS 485, you can avoid busy-waiting with Python’s time.sleep(). The sleep() function gives up execution for a period of time, avoiding high CPU consumption.

If you have prior experience with multi-threaded programming, then feel free to look into Python Event Objects for a more advanced way to synchronize threads and avoid busy-waiting.

Bad example

The following example is a small, multi-threaded Python program that’s busy-waiting. Notice that there is no time.sleep() inside the while loop.

"""wait.py - an example of busy-waiting."""
import threading
import time


def main():
    """Main thread, which spawns a second wait() thread."""
    print("main() starting")
    signals = {"shutdown": False}
    thread = threading.Thread(target=wait, args=(signals,))
    thread.start()
    time.sleep(1) # This gives up execution to the 'wait' thread
    # The shutdown variable will be set to true in approximately 1 second
    signals["shutdown"] = True
    thread.join()
    print("main() shutting down")


def wait(signals):
    """Wait for shutdown signal with sleep in between."""
    print("wait() starting")
    while not signals["shutdown"]:
        print("working")
        # primer-spec-highlight-start
        # time.sleep(0.1)  # Uncomment to avoid busy-waiting
        # primer-spec-highlight-end
    print("wait() shutting down")


if __name__ == "__main__":
    main()

Run the example, and notice that it prints working many times because the while loops executes frequently. Your number may be different.

$ python3 wait.py
main() starting
wait() starting
working
working
working
...
main() shutting down
wait() shutting down
$ python3 wait.py | grep -c working
2579689

Run the example again, keeping track of consumed CPU time. Notice that the user CPU time (time actively using the CPU) is similar to the real CPU time (stop_time - start_time).

$ time python3 wait.py
main() starting
wait() starting
working
working
working
...
main() shutting down
wait() shutting down

real	0m1.061s
user	0m0.711s
sys	0m0.189s

Good example

Modify the above example by adding time.sleep(0.1) to the inner loop. Your loop should look like this:

while not signals["shutdown"]:
    print("working")
    # primer-spec-highlight-start
    time.sleep(0.1)  # Uncomment to avoid busy-waiting
    # primer-spec-highlight-end

Run the example again, measuring CPU consumption. Notice that the user time is small compared to the last example. Problem solved!

$ time python3 wait.py
main() starting
wait() starting
working
working
working
working
working
working
working
working
working
working
main() shutting down
wait() shutting down

real	0m1.077s
user	0m0.044s
sys	0m0.008s

Further reading

If you’re curious about how time measurement works, you can read a more detailed explanation on Stack Overflow here. You can also view the Python source code for the time module here.

If you would like a more in-depth sockets tutorial after you complete this one, check out this Real Python Tutorial.

Sockets

The vast majority of communication on the web happens via TCP (Transmission Control Protocol) or UDP (User Datagram Protocol). A socket creates and manages a connection, and all sockets use a specific port. Use a socket to send data to a port or listen for data on a port.

The Python socket library facilitates sending and receiving messages over a network. First, we’ll create a TCP server that receives messages and a client that sends messages. Then, we’ll do an example with UDP.

NOTE: TCP and UDP ports are independent, so it’s possible to have a TCP socket and UDP socket that use the same port and receive different data.

TCP client/server

Save this server code to example_tcp_server.py. It starts a TCP server and waits for a connection. Upon connection, it reads a message in chunks until the client disconnects.

We make a simplifying assumption that the client will always cleanly close the connection. See this article for a more advanced discussion of when to stop receiving data from a socket.

"""Example TCP socket server."""
import socket
import json


def main():
    """Test TCP Socket Server."""
    # Create an INET, STREAMing socket, this is TCP
    # Note: context manager syntax allows for sockets to automatically be
    # closed when an exception is raised or control flow returns.
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:

        # Bind the socket to the server
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(("localhost", 8000))
        sock.listen()

        # Socket accept() will block for a maximum of 1 second.  If you
        # omit this, it blocks indefinitely, waiting for a connection.
        sock.settimeout(1)

        while True:
            # Wait for a connection for 1s.  The socket library avoids consuming
            # CPU while waiting for a connection.
            try:
                clientsocket, address = sock.accept()
            except socket.timeout:
                continue
            print("Connection from", address[0])

            # Socket recv() will block for a maximum of 1 second.  If you omit
            # this, it blocks indefinitely, waiting for packets.
            clientsocket.settimeout(1)

            # Receive data, one chunk at a time.  If recv() times out before we
            # can read a chunk, then go back to the top of the loop and try
            # again.  When the client closes the connection, recv() returns
            # empty data, which breaks out of the loop.  We make a simplifying
            # assumption that the client will always cleanly close the
            # connection.
            with clientsocket:
                message_chunks = []
                while True:
                    try:
                        data = clientsocket.recv(4096)
                    except socket.timeout:
                        continue
                    if not data:
                        break
                    message_chunks.append(data)

            # Decode list-of-byte-strings to UTF8 and parse JSON data
            message_bytes = b''.join(message_chunks)
            message_str = message_bytes.decode("utf-8")

            try:
                message_dict = json.loads(message_str)
            except json.JSONDecodeError:
                continue
            print(message_dict)


if __name__ == "__main__":
    main()

Save this client code to example_tcp_client.py.

"""Example TCP socket client."""
import socket
import json


def main():
    """Test TCP Socket Client."""
    # create an INET, STREAMing socket, this is TCP
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:

        # connect to the server
        sock.connect(("localhost", 8000))

        # send a message
        message = json.dumps({"hello": "world"})
        sock.sendall(message.encode('utf-8'))


if __name__ == "__main__":
    main()

Now we’ll put the two programs above together: the client will send a message to the server, and the server will print it to the screen.

Start your Python TCP socket server.

$ python3 example_tcp_server.py

In a second terminal window, run your Python TCP socket client.

$ python3 example_tcp_client.py

In the first terminal (the one running the Python server), you’ll see the message appear. Kill the server with Control + c.

Connection from 127.0.0.1
{'hello': 'world'}

UDP client/server

In this example, we’ll create a UDP server and UDP client just like the TCP server and TCP client above. What’s the difference? A TCP socket will resend a message if it doesn’t receive an acknowledgment from the recipient. A UDP socket doesn’t check if the message was received. Two examples where UDP is useful are video streaming and heartbeat messages.

Save this code to example_udp_server.py. This example listens for incoming UDP messages in a loop. Because UDP is connectionless, we do not call sock.listen(), which is different from TCP. Also, with UDP, we must receive the entire message at once, unlike TCP which uses streaming to receive message chunks separately. This means we do not need to maintain a message_chunks list.

"""Example UDP socket server."""
import socket
import json


def main():
    """Test UDP Socket Server."""
    # Create an INET, DGRAM socket, this is UDP
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:

        # Bind the UDP socket to the server
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(("localhost", 8001))
        sock.settimeout(1)

        # No sock.listen() since UDP doesn't establish connections like TCP

        # Receive incoming UDP messages
        while True:
            try:
                message_bytes = sock.recv(4096)
            except socket.timeout:
                continue
            message_str = message_bytes.decode("utf-8")
            message_dict = json.loads(message_str)
            print(message_dict)


if __name__ == "__main__":
    main()

Save this code to example_udp_client.py. It sends a message to a UDP socket.

"""Example UDP socket client."""
import socket
import json


def main():
    """Test UDP Socket Client."""
    # Create an INET, DGRAM socket, this is UDP
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:

        # Connect to the UDP socket on server
        sock.connect(("localhost", 8001))

        # Send a message
        message = json.dumps({"hello": "world"})
        sock.sendall(message.encode('utf-8'))


if __name__ == "__main__":
    main()

Let’s put our UDP client and UDP server together.

Start your Python UDP socket server.

$ python3 example_udp_server.py

Start your Python UDP socket client in a second terminal.

$ python3 example_udp_client.py

In the first terminal (the one running the Python test server), you’ll see the message appear. Kill the server with Control + c.

{'hello': 'world'}

Sockets and waiting

This example shows how to wait for two things at the same time: a message from a TCP socket or a shutdown signal. It will be helpful with implementing graceful shutdown in EECS 485 project 4. Be sure to read the explanation and pitfalls below.

Save this example to example_shutdown.py.

"""Example of waiting on a socket or a shutdown signal."""
import threading
import time
import socket
import json


def main():
    """Main thread, which spawns a second server() thread."""
    print("main() starting")
    signals = {"shutdown": False}
    thread = threading.Thread(target=server, args=(signals,))
    thread.start()
    time.sleep(10) # Give up execution to the 'server' thread (see Pitfall 1)
    signals["shutdown"] = True  # Tell server thread to shut down
    thread.join()  # Wait for server thread to shut down
    print("main() shutting down")


def server(signals):
    """Wait on a message from a socket OR a shutdown signal."""
    print("server() starting")

    # Create an INET, STREAMing socket, this is TCP
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:

        # Bind the socket to the server
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(("localhost", 8000))
        sock.listen()

        # Socket accept() will block for a maximum of 1 second.  If you
        # omit this, it blocks indefinitely, waiting for a connection.
        sock.settimeout(1)

        while not signals["shutdown"]:
            print("waiting ...")

            # Wait for a connection for 1s.  The socket library avoids
            # consuming CPU while waiting for a connection.
            try:
                clientsocket, address = sock.accept()
            except socket.timeout:
                continue
            print("Connection from", address[0])

            # Socket recv() will block for a maximum of 1 second.  If you omit
            # this, it blocks indefinitely, waiting for packets.
            clientsocket.settimeout(1)

            # Receive data, one chunk at a time.  If recv() times out before
            # we can read a chunk, then go back to the top of the loop and try
            # again.  When the client closes the connection, recv() returns
            # empty data, which breaks out of the loop.  We make a simplifying
            # assumption that the client will always cleanly close the
            # connection.
            with clientsocket:
                message_chunks = []
                while True:
                    try:
                        data = clientsocket.recv(4096)
                    except socket.timeout:
                        continue
                    if not data:
                        break
                    message_chunks.append(data)

            # Decode list-of-byte-strings to UTF8 and parse JSON data
            message_bytes = b''.join(message_chunks)
            message_str = message_bytes.decode("utf-8")

            try:
                message_dict = json.loads(message_str)
            except json.JSONDecodeError:
                continue
            print(message_dict)

    print("server() shutting down")


if __name__ == "__main__":
    main()

Run the server. See that it waits for messages for 10 seconds, then shuts down. Also notice that it does not consume a lot of CPU while waiting (low user time). See the explanation below for more detail.

$ time python3 example_shutdown.py
main() starting
server() starting
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
server() shutting down
main() shutting down

real	0m10.081s
user	0m0.033s
sys	0m0.014s

Run the server again.

$ python3 example_shutdown.py
main() starting
server() starting

Send a message to the server in a second terminal window. Do this several times.

$ python3 example_tcp_client.py
$ python3 example_tcp_client.py
$ python3 example_tcp_client.py

In the first terminal (the one running the socket server), you’ll see the message appear and the server will continue listening. After 10 seconds, the server shuts down.

$ python3 example_shutdown.py
main() starting
server() starting
waiting ...
waiting ...
waiting ...
waiting ...
Connection from 127.0.0.1
{'hello': 'world'}
waiting ...
waiting ...
waiting ...
Connection from 127.0.0.1
{'hello': 'world'}
waiting ...
waiting ...
waiting ...
waiting ...
waiting ...
main() shutting down
server() shutting down

Explanation

We configured the socket to block for one 1 second instead of indefinitely.

sock.settimeout(1)

In the case of a timeout, catch the exception and go back to the top of the loop, which will check the shutdown signal.

while not signals["shutdown"]:
    try:
        clientsocket, address = sock.accept()
    except socket.timeout:
        continue
    # ...

We use the same technique for the recv() function, because it could also time out in the middle of a receiving a long message.

clientsocket.settimeout(1)
message_chunks = []
while True:
    try:
        data = clientsocket.recv(4096)
    except socket.timeout:
        continue
    if not data:
        break
    message_chunks.append(data)

Pitfalls

Pitfall 1: Make sure to remove the time.sleep(10) in your Manager once you implement shutdown message handling. Instead, you’ll listen for a shutdown message and won’t need to call time.sleep(10).

Pitfall 2: As long as you set sock.settimeout(1), you do not need a time.sleep() in loops containing sock.accept(). With a timeout of 1 second, sock.accept() plays the role of time.sleep(1).

Acknowledgments

Original document written by Andrew DeOrio awdeorio@umich.edu.

This document is licensed under a Creative Commons Attribution-NonCommercial 4.0 License. You’re free to copy and share this document, but not to sell it. You may not share source code provided with this document.