Semaphores
Scenario
Imagine 3 workers trying to access a shared resource, but only 2 workers can access it at a time. A semaphore ensures that no more than 2 workers use the resource simultaneously.
Multithrading semaphore
import threading
import time
# Create a semaphore with a max of 2 concurrent accesses
semaphore = threading.Semaphore(2)
def worker(worker_id):
print(f"Worker {worker_id} is waiting to access the resource...")
with semaphore: # Acquire the semaphore (decrements count)
print(f"Worker {worker_id} acquired access! 🚀")
time.sleep(2) # Simulating work
print(f"Worker {worker_id} released access! ✅")
# Create 3 worker threads
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# Wait for all threads to finish
for t in threads:
t.join()
print("All workers finished execution!")
How It Works
- Semaphore(2) → At most 2 workers can enter the critical section at a time.
- Each worker waits until the semaphore allows it to proceed.
- Once inside, the worker "locks" the resource, works for 2 seconds, then releases it.
- New workers enter when a previous worker exits.
Expected Output
Worker 0 is waiting to access the resource...
Worker 1 is waiting to access the resource...
Worker 0 acquired access! 🚀
Worker 1 acquired access! 🚀
Worker 2 is waiting to access the resource...
Worker 0 released access! ✅
Worker 2 acquired access! 🚀
Worker 1 released access! ✅
Worker 2 released access! ✅
All workers finished execution!
- Workers 0 and 1 get access first.
- Worker 2 waits until one of them finishes.
- When Worker 0 or 1 releases, Worker 2 gets access.
Why Use This?
- Prevents too many threads from overwhelming a resource.
- Useful for limiting concurrent database access, API calls, GPU processing, etc.
- Essential in multiprocessing (like PyTorch
DataLoader
) to prevent semaphore leaks.
Info
Just like threading.Semaphore, we have semaphores for both multiprocessing and async (asyncio).
1️⃣ Multiprocessing Semaphore
- Used to control processes accessing a shared resource.
- Works like
threading.Semaphore
, but for multiple processes instead of threads.
Example multiprocessing-semaphore
import multiprocessing
import time
# Create a multiprocessing semaphore (max 2 processes can access at a time)
semaphore = multiprocessing.Semaphore(2)
def worker(worker_id):
print(f"Process {worker_id} waiting to access the resource...")
with semaphore: # Acquire the semaphore
print(f"Process {worker_id} acquired access! 🚀")
time.sleep(2) # Simulate work
print(f"Process {worker_id} released access! ✅")
# Create 3 processes
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
# Wait for all processes to finish
for p in processes:
p.join()
print("All processes finished execution!")
Expected Behavior
- At most 2 processes can access the critical section at the same time.
- Other processes wait until a slot is freed.
2️⃣ Asyncio Semaphore
- Used to limit concurrency in async tasks.
- Unlike threading/multiprocessing, it does not block but awaits when access is unavailable.
Example asyncio-semaphore
import asyncio
# Create an asyncio semaphore (max 2 tasks can access at a time)
semaphore = asyncio.Semaphore(2)
async def worker(worker_id):
print(f"Task {worker_id} waiting to access the resource...")
async with semaphore: # Acquire semaphore asynchronously
print(f"Task {worker_id} acquired access! 🚀")
await asyncio.sleep(2) # Simulating work
print(f"Task {worker_id} released access! ✅")
async def main():
tasks = [worker(i) for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
Expected
- Two tasks run at the same time.
- The third task waits until one of them releases the semaphore.
🚀 When to Use What?
Scenario | Use |
---|---|
Multi-threading | threading.Semaphore() |
Multi-processing | multiprocessing.Semaphore() |
Async I/O operations | asyncio.Semaphore() |