Threads, processes, and Async IO
Table of contents
Threads, processes, and Async IO (async
and await
) are all part of the asynchronous programming model. In this model, different functions can execute simultaneously or in a different order than usual. Threads are used for parallel tasks that share data with minimal CPU usage (e.g., handling multiple user requests in a web server). Processes are better suited for maximizing performance on CPU-bound tasks (e.g., mathematical computations or image processing). Async IO is perfect for handling many tasks that involve much waiting (e.g., network requests or file operations). To understand the examples in this lesson, you have to run them all in your editor.
Feature | Threads | Processes | Async IO |
Concurrency | GIL-limited parallelism | True parallelism | Cooperative |
Use Case | I/O-bound tasks | CPU-bound tasks | I/O-bound tasks |
Overhead | Low | High | The lowest |
Shared Data | Shared memory | Inter-process communication | Not shared (message-passing) |
Threads
Threads are a fundamental part of the Python concurrency model. They allow multiple tasks to run simultaneously within a single program. Python provides built-in support for multithreading through the threading
module.
To create a thread, we instantiate a Thread
object and select a function (its target), which contains the code to be executed by the thread. Once the thread is created, it can be started using the start()
method. The join()
method ensures the program waits until the chosen thread completes execution before proceeding further. It is useful because some resources generated by those threads may be necessary for the rest of the program.
import threading
def count_to_10000():
for i in range(0, 10000):
if i == 9999:
print("Task1: Counted to", i)
def sum_to_10000():
total = sum(range(0, 10000))
print("\nTask2: Sum = " + str(total))
# Creating threads
task1 = threading.Thread(target = count_to_10000) # if the function takes arguments: threading.Thread(target = fun, args = [1, "a"])
task2 = threading.Thread(target = sum_to_10000)
# Starting threads
task1.start()
task2.start()
# Waiting for both threads to finish
task1.join()
task2.join()
print("Both tasks completed.")
Of course, using join()
is not necessary. We do not use it if we want to start two separate threads whose results have absolutely nothing to do with each other. However, if we deleted these methods from the example above, the "Both tasks completed." string would display weirdly (check it in your editor) because the interpreter would not wait for both threads to display their results before printing it.
A race condition occurs when multiple threads access shared resources without proper synchronization, leading to unpredictable behavior. A deadlock happens when two or more threads are waiting indefinitely for resources held by each other, preventing further execution. In contrast, a livelock occurs when threads keep changing state in response to each other but make no real progress, as they continuously attempt to resolve the conflict without success. While race conditions lead to incorrect results, deadlocks halt execution, and livelocks keep the system active but unproductive.
Threads can be synchronized to prevent concurrent access to shared resources, ensuring data consistency. Synchronization is applied using the Lock
object. It ensures that only one thread accesses shared resources at a time. Synchronization ensures data consistency but can reduce performance if overused.
In the example below, the Lock
object prevents multiple threads from modifying the shared_resource
variable simultaneously, which could lead to inconsistencies due to race conditions. Without the lock, threads can interfere with each other while accessing the shared resource, causing the final result to be unpredictable. For example, two threads might read the same value of shared_resource
, increment it, and write it back, effectively skipping one increment. This may result in a final value being lower than expected. When a thread enters the with lock
block, it first tries to acquire the lock. If another thread already holds the lock, the current thread will wait until the lock is released. Inside the locked block, the shared resource is modified. Once the other thread finishes modifying shared_resource
, the lock is automatically released.
import threading
shared_resource = 0
# Creating a lock for synchronization
lock = threading.Lock()
def increment():
global shared_resource
for i in range(1000):
# Ensuring only one thread modifies the shared resource at a time (acquiring the lock before accessing the data)
with lock:
shared_resource += 1
# Creating multiple threads
threads = []
for i in range(10):
thread = threading.Thread(target = increment)
threads.append(thread)
thread.start()
# Waiting for all threads to finish
for thread in threads:
thread.join()
print("Final value of shared_resource:", shared_resource)
Thread pools
A thread pool is a collection of worker threads that efficiently execute multiple tasks in parallel, reusing threads to avoid the overhead of thread creation.
from concurrent.futures import ThreadPoolExecutor
import threading
def task(n): # function to execute inside the thread pool
print(f"Task {n} running in {threading.current_thread().name}")
with ThreadPoolExecutor(max_workers=3) as executor: # creating a thread pool with 3 worker threads
# Submitting 5 tasks to the thread pool
for i in range(1, 6):
executor.submit(task, i) # executing the task() function with argument "i"
Processes, GIL (Global Interpreter Lock)
The Global Interpreter Lock (GIL) is a mechanism used in CPython (the most widely used Python interpreter) to ensure that only one thread executes Python bytecode simultaneously in a single process. When using threads, it limits the parallelism of CPU-bound tasks but does not affect I/O-bound tasks like network or file operations. If our program is CPU-bound and we want to take full advantage of multiple CPU cores, we can bypass the GIL by using the multiprocessing
module, which spawns separate processes, each with its own Python interpreter and GIL. Use this module specifically for CPU-bound tasks to utilize multiple cores. Creating too many processes can lead to inefficiency.
import multiprocessing
# A simple CPU-bound task
def cpu_bound_task():
total = 0
for i in range(10 ** 7):
total += i
return total
processes = []
for x in range(4):
process = multiprocessing.Process(target = cpu_bound_task)
processes.append(process)
process.start()
# Waiting for all processes to finish
for process in processes:
process.join()
print("Completed all processes.")
In Java, there is no GIL, so threading works like multiprocessing in Python.
Async IO
Async IO is a single-threaded, single-process concurrency model that excels at handling I/O-bound and high-level structured network code. The Async IO concurrency model comes down to cooperative multitasking, which means pausing the execution of a certain function if it is waiting for, e.g., a response from a server and allowing other functions to run at that time. The event loop is the core of Async IO and is responsible for managing and coordinating the execution of asynchronous tasks. It schedules them, processes their execution, and handles which task is currently being executed.
- A coroutine is a special function defined with
async def
, and a coroutine object is a coroutine that hasn't been scheduled or awaited to run in the event loop. - A coroutine scheduled to run in the event loop is called a task.
- A coroutine can be executed in two ways. It can either be scheduled (become a task) and then awaited or just awaited directly. The difference is that tasks can be executed concurrently. Tasks do not run in parallel in the sense of simultaneous execution, but they are managed in a way that allows them to run concurrently within the event loop by pausing and resuming during waiting periods. They pause at
await
points, allowing other tasks to execute while waiting for results, but they don't just run "without pausing." - Concurrency in Async IO is based on cooperative multitasking, not true parallelism, and tasks interleave at
await
points. Coroutines that are awaited directly can't run concurrently like tasks - they yield control to the event loop atawait
points but run sequentially.
Clarification: Once scheduled, the event loop manages the task's execution right away. Awaiting a task is required to retrieve its result or ensure its completion. If we don’t await a task, it runs in the background, and we risk not handling its results or exceptions.
The main coroutine must be passed as an argument to the asyncio.run()
function to start the event loop.
When a coroutine reaches an await
instruction, its execution is paused, yielding control back to the event loop and allowing it to manage other coroutines. The awaited coroutine is then queued to run in the event loop (it does not necessarily run immediately). The event loop determines when to execute it based on other pending coroutines and their readiness (e.g., I/O completion or delays). Once the awaited coroutine is completed, either by returning a result or raising an exception, the event loop resumes the paused coroutine from the await
point, using the result of the awaited coroutine (result = await coroutine
).
import asyncio
async def fetch(): # defining a coroutine
print("Fetching...")
await asyncio.sleep(2) # simulating work, yielding control to the event loop
print("Data fetched.")
return "data"
async def main():
print("Start of the main coroutine.")
coroutine = fetch() # creating a coroutine object
print("End of the main coroutine.")
result = await coroutine # awaiting a coroutine (calling for its execution)
print("Result:", result)
asyncio.run(main())
Tasks
As I said earlier, a task is a scheduled coroutine run by the event loop. asyncio.create_task()
creates a Task
object. In the example below, using tasks will shorten the execution time from 6 to 3 seconds (longest sleep time).
import asyncio
async def fetch(ID, sleep):
print(f"Coroutine {ID} has started.")
await asyncio.sleep(sleep)
return f"Data from coroutine {ID}"
async def main():
# Method nr. 1
task1 = asyncio.create_task(fetch(1, 1)) # scheduling a coroutine (creating a task)
task2 = asyncio.create_task(fetch(2, 2))
task3 = asyncio.create_task(fetch(3, 3))
result1 = await task1 # awaiting a task (calling for its execution)
result2 = await task2
result3 = await task3
print(result1, result2, result3)
# Method nr. 2
results = await asyncio.gather(fetch(1, 1), fetch(2, 2), fetch(3, 3))
for result in results:
print(result)
asyncio.run(main())
In the example below, we can see a TaskGroup
usage example. It simplifies error handling when working with multiple tasks, ensuring that all tasks are either completed or canceled together.
import asyncio
async def fetch(ID, sleep):
print(f"Coroutine {ID} has started.")
await asyncio.sleep(sleep)
return f"Data from coroutine {ID}"
async def main():
tasks = []
async with asyncio.TaskGroup() as tg:
for i, sleep in enumerate([2, 1, 3], start = 1): # "start = 1" sets the starting index of the iteration to 1 instead of the default 0.
print(i)
task = tg.create_task(fetch(i, sleep))
tasks.append(task)
# The code will get past this point only after completing all the tasks stated above.
results = [task.result() for task in tasks]
for result in results:
print("Received result:", result)
asyncio.run(main())
In summary: A coroutine is a special function defined with async def
. When called, it returns a coroutine object, which must be scheduled or awaited to be executed within the event loop. A task is a coroutine that has been scheduled for execution within the event loop (defined with asyncio.create_task()
). Once scheduled, the event loop manages the task's execution, allowing it to run concurrently with other tasks. Awaiting a task is required to retrieve its result or ensure its completion. Async IO performs concurrent execution, but it does not involve true parallelism since only one task is executing at any given time within the event loop.
Threading should be used to leverage parallelism for I/O-bound tasks and multiprocessing for CPU-bound tasks. Async IO tasks are meant for I/O-bound tasks that involve high concurrency, in which we want to achieve better performance with lower overhead than threading. It is preferable over threading in most cases.
Synchronization
Lock
Async IO locks have the same purpose as thread locks. In the example below, five coroutines increment a shared resource 1,000 times each. The lock ensures synchronized access to avoid data corruption. The execution might take longer due to sequential access, but the shared data remains consistent.
We use locks also for "normal" coroutines (not only tasks), even though they do not run concurrently. This is because a paused coroutine might be in the process of accessing or modifying a shared resource, and if another coroutine accesses that resource during that period, the data could become corrupted. This is also considered concurrent access.
import asyncio
shared_resource = 0
lock = asyncio.Lock()
async def increment():
global shared_resource
for i in range(1000):
async with lock:
print("Before modification:", shared_resource)
shared_resource += 1
await asyncio.sleep(1)
print("After modification:", shared_resource)
async def main():
results = await asyncio.gather(*(increment() for i in range(5)))
asyncio.run(main())
The asterisk (*
) unpacks the generator expression into individual coroutine arguments for gather()
, allowing all the increment()
coroutines to be executed in the event loop. A generator expression creates an iterator that produces items one at a time, which is more memory-efficient than a list comprehension, which generates an entire list in memory.
Semaphore
Unlike a lock, a semaphore allows a specified number of coroutines or tasks to access a resource at the same time. It ensures mutual exclusion based on the defined limit. The example below demonstrates controlled access to the resource. When setting the semaphore limit to 2, exactly two threads can access the resource simultaneously. Threads can also use semaphores.
import asyncio
async def access_resource(semaphore, id):
async with semaphore:
print("Accessing resource:", id)
await asyncio.sleep(1)
print("Releasing resource:", id)
async def main():
semaphore = asyncio.Semaphore(2)
await asyncio.gather(*(access_resource(semaphore, i) for i in range(5)))
asyncio.run(main())
Event
An event provides a way for one coroutine to signal another, allowing them to synchronize actions. The f1
coroutine waits for the event to be set, pausing its execution. The f2
coroutine sets the event after a 2-second delay, allowing f1
to resume. This way the f1
will execute a given fragment of code only after the f2
coroutine has signaled it is ready (e.g., it might have been preparing data).
import asyncio
async def f1(event):
print("Waiting for the event to be set.")
await event.wait()
print("Continuing execution.")
async def f2(event):
await asyncio.sleep(2)
event.set()
print("The event has been set.")
async def main():
event = asyncio.Event()
await asyncio.gather(f1(event), f2(event))
asyncio.run(main())
The asyncio
module also provides something called futures. You will likely never use something like that but if you'd like to know more, check the documentation.