Python Concurrency: A Beginner’s Guide to Threading, Multithreading, and Multiprocessing
In modern computing, concurrency is essential for building efficient, responsive, and scalable applications. Concurrency allows multiple tasks to run simultaneously, improving an application’s performance, especially when dealing with large datasets or complex computations. Python provides multiple ways to achieve concurrency: threading, multithreading, and multiprocessing.
This guide covers everything a beginner Python developer needs to know about concurrency, threading, multithreading, and multiprocessing, with explanations, code examples, and real-life applications.
1. Understanding Concurrency, Parallelism, and Processes
Before diving into threads and processes, let’s understand a few fundamental concepts:
- Concurrency: The ability to manage multiple tasks simultaneously. Concurrency involves switching between tasks, giving the appearance that multiple tasks are happening at the same time.
- Parallelism: When tasks are executed simultaneously. In Python, true parallelism is achieved through multiprocessing, which uses separate memory spaces and CPU cores.
- Processes: Independent execution units with their own memory. In Python, each process can run independently.
Why Use Concurrency?
Concurrency can improve performance, make applications responsive, and handle tasks (e.g., I/O-bound tasks like network requests) without waiting for one task to finish before starting the next.
2. Threading and Multithreading in Python
Threading is a technique to achieve concurrency by running multiple “threads” (smaller units of a process) in a single process. Python’s threading module enables the creation of threads within a single process.
The Global Interpreter Lock (GIL)
Python has a Global Interpreter Lock (GIL), which restricts the execution of multiple threads in a single process to one thread at a time. This can be a limitation in CPU-bound tasks but doesn’t affect I/O-bound tasks significantly.
When to Use Threads
- I/O-bound tasks (e.g., network calls, file I/O).
- Tasks where time is spent waiting for an external resource.
Basic Thread Example
Here’s a simple threading example:
import threading
import time
def print_numbers():
for i in range(5):
print(f"Number: {i}")
time.sleep(1)
def print_letters():
for letter in 'ABCDE':
print(f"Letter: {letter}")
time.sleep(1)
# Create threads
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)
# Start threads
thread1.start()
thread2.start()
# Wait for both threads to finish
thread1.join()
thread2.join()
print("Finished both tasks!")
In this example, print_numbers
and print_letters
are run in separate threads. While one function waits, the other can proceed, making this an effective solution for I/O-bound tasks.
3. Types of Tasks: I/O-bound vs. CPU-bound
I/O-bound Tasks
These tasks spend time waiting for external resources (e.g., file I/O, network requests). Threading is ideal for these tasks as threads can perform other work while waiting for resources.
CPU-bound Tasks
These tasks require significant processing power and involve heavy computation. Due to the GIL, Python threads are less effective for CPU-bound tasks. In these cases, multiprocessing (discussed later) is more efficient as it allows using multiple cores.
4. Python Memory Manager and Threading
When working with threads, Python’s memory manager ensures each thread can access the memory space of the process it belongs to. However, this shared memory can lead to issues like race conditions, where threads try to modify shared resources concurrently.
Example: Race Condition
import threading
# Shared resource
counter = 0
def increment():
global counter
for _ in range(1000):
counter += 1
# Create threads
threads = [threading.Thread(target=increment) for _ in range(10)]
# Start and join threads
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter value: {counter}") # This may not be 10000 due to race conditions.
In this example, multiple threads increment counter
. However, without proper synchronization, the final count may not match the expected value due to race conditions.
Solution: Locks
To prevent race conditions, use a Lock
to ensure only one thread can access a shared resource at a time.
lock = threading.Lock()
def safe_increment():
global counter
for _ in range(1000):
with lock:
counter += 1
5. Multiprocessing in Python
To bypass the GIL and achieve parallelism in CPU-bound tasks, use the multiprocessing
module, which creates separate processes instead of threads. Each process has its own memory space, allowing true parallel execution.
Example: CPU-bound Multiprocessing
from multiprocessing import Process, cpu_count
def cpu_intensive_task(n):
total = 0
for i in range(n):
total += i
print(f"Task result: {total}")
if __name__ == "__main__":
num_cores = cpu_count()
processes = [Process(target=cpu_intensive_task, args=(10000000,)) for _ in range(num_cores)]
# Start and join processes
for p in processes:
p.start()
for p in processes:
p.join()
print("All processes completed!")
Each process in this example runs independently, utilizing multiple cores for a CPU-bound task.
6. Python Memory Management with Multiprocessing
In multiprocessing, each process has its own memory space. To share data between processes, you can use:
- Queues: For inter-process communication.
- Shared Memory: Shared values and arrays.
Example: Using a Queue
from multiprocessing import Process, Queue
def put_numbers(queue):
for i in range(5):
queue.put(i)
if __name__ == "__main__":
queue = Queue()
p = Process(target=put_numbers, args=(queue,))
p.start()
p.join()
while not queue.empty():
print(f"Received number: {queue.get()}")
In this example, data is passed between processes using a queue.
7. Real-Life Examples and Applications
Web Scraping with Threading (I/O-bound)
import threading
import requests
def fetch_url(url):
response = requests.get(url)
print(f"Fetched {url}: {len(response.content)} bytes")
urls = ["https://example.com", "https://example.org", "https://example.net"]
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Data Processing with Multiprocessing (CPU-bound)
from multiprocessing import Pool
def square_number(n):
return n * n
if __name__ == "__main__":
with Pool(4) as p:
results = p.map(square_number, range(10))
print(results)
In this example, square_number
is a CPU-bound task handled by the Pool
, distributing the work across multiple cores.
8. Handling More Tasks than CPU Cores
When the number of tasks exceeds the available CPU cores, you can still achieve concurrency. Let’s look at ways to manage these situations.
Task Pooling with Pool
The Pool
class in multiprocessing
allows you to create a pool of workers equal to the number of CPU cores. When tasks outnumber the cores, tasks are queued, and each worker picks up the next task as it finishes the previous one.
from multiprocessing import Pool, cpu_count
def process_task(task):
return task * task
tasks = range(20) # More tasks than CPU cores
num_cores = cpu_count()
with Pool(num_cores) as pool:
results = pool.map(process_task, tasks)
print(results)
Task Prioritization and Scheduling
For better control over scheduling and task submission, you can use ProcessPoolExecutor
from the concurrent.futures
library. This allows you to manage task priority and order while still limiting the number of simultaneous tasks to the available CPU cores.
from concurrent.futures import ProcessPoolExecutor, as_completed
def cpu_intensive_task(task):
return task ** 2
tasks = range(20)
num_cores = cpu_count()
with ProcessPoolExecutor(max_workers=num_cores) as executor:
futures = [executor.submit(cpu_intensive_task, task) for task in tasks]
for future in as_completed(futures):
print(f"Task result: {future.result()}")
Using as_completed
, this approach gives more control over task prioritization and order of execution.
Batch Processing
When you have significantly more tasks than cores, consider breaking the tasks into smaller batches. Each batch is processed in a loop, reducing resource strain and keeping tasks manageable.
import math
from multiprocessing import Pool
def process_task(task):
return math.sqrt(task)
tasks = list(range(100))
num_cores = cpu_count()
batch_size = num_cores * 2
# Divide tasks into batches
batches = [tasks[i:i + batch_size] for i in range(0, len(tasks), batch_size)]
for batch in batches:
with Pool(num_cores) as pool:
results = pool.map(process_task, batch)
print(f"Batch results: {results}")
In this example, tasks are divided into batches and processed sequentially, ensuring efficient use of system resources.
Asynchronous Processing with asyncio
If tasks involve both I/O and CPU-bound subtasks, you can leverage asyncio
for I/O-bound parts while using a limited process pool for CPU-bound subtasks. This approach allows efficient concurrency for large numbers of tasks.
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_task(task):
return task * task
async def async_process_task(task, executor):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, cpu_task, task)
print(f"Processed task {task}: {result}")
return result
async def main():
tasks = range(20)
num_cores = cpu_count()
with ProcessPoolExecutor(max_workers=num_cores) as executor:
await asyncio.gather(*(async_process_task(task, executor) for task in tasks))
asyncio.run(main())
This code uses asyncio
for non-blocking I/O and ProcessPoolExecutor
for CPU-intensive tasks, making it effective for mixed workloads.
9. Scaling with Task Queues for Large Workloads
For systems with significant workloads beyond single-machine resources, you can use a task queue like Celery. Celery allows you to distribute tasks across multiple workers or even machines, scaling beyond the physical limits of CPU cores on a single system.
Conclusion
By mastering threading and multiprocessing in Python, you can make your applications more efficient and responsive, whether they’re processing data, handling I/O operations, or performing complex calculations.