- Published on
Concurrency [in Python]
- Authors

- Name
- Kai Kang
- Role
- Staff Software Engineer @ Meta · Solo App Builder
Thread
Threads are good for IO-bound work. Each python thread maps to a real OS thread. Example:
from concurrent.futures import ThreadPoolExecutor
import time
def download(url: str) -> str:
time.sleep(1) # pretend network I/O
return f"downloaded {url}"
urls = ["a.com", "b.com", "c.com", "d.com"]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(download, urls))
print(results)
One thread can execute one Python bytecode under GIL. It might help if each thread is a native library run like numpy, but need to benchmark.
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import requests
class ThreadedCrawler:
def __init__(self, max_workers=10, max_pages=50):
self.max_workers = max_workers
self.max_pages = max_pages
self.visited = set()
def fetch(self, url: str) -> str:
print(f"Fetching: {url}")
resp = requests.get(
url,
timeout=10,
headers={"User-Agent": "SimpleThreadedCrawler/1.0"},
)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/html" not in content_type:
return ""
return resp.text
def extract_links(self, base_url: str, html: str) -> list[str]:
soup = BeautifulSoup(html, "html.parser")
links = []
for a in soup.find_all("a", href=True):
absolute_url = urljoin(base_url, a["href"])
absolute_url = self.normalize_url(absolute_url)
if absolute_url:
links.append(absolute_url)
return links
def normalize_url(self, url: str) -> str | None:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return None
# Remove fragment: /page#section -> /page
return parsed._replace(fragment="").geturl()
def crawl(self, start_url: str) -> set[str]:
to_visit = [start_url]
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {}
while (to_visit or futures) and len(self.visited) < self.max_pages:
# Submit new work while we have capacity.
while (
to_visit
and len(futures) < self.max_workers
and len(self.visited) < self.max_pages
):
url = to_visit.pop()
if url in self.visited:
continue
self.visited.add(url)
future = executor.submit(self.fetch, url)
futures[future] = url
if not futures:
break
done, _ = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
url = futures.pop(future)
try:
html = future.result()
except Exception as e:
print(f"Failed {url}: {e}")
continue
for link in self.extract_links(url, html):
if link not in self.visited:
to_visit.append(link)
return self.visited
if __name__ == "__main__":
crawler = ThreadedCrawler(max_workers=10, max_pages=30)
visited = crawler.crawl("https://example.com")
print("\nVisited:")
for url in visited:
print(url)
AsyncIO
asyncio scales better with lower overhead.
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin
class AsyncCrawler:
def __init__(self, max_concurrency=20):
self.visited = set()
self.queue = asyncio.Queue()
self.sem = asyncio.Semaphore(max_concurrency)
async def fetch(self, session, url):
async with self.sem:
async with session.get(url, timeout=10) as resp:
if resp.status != 200:
return ""
return await resp.text()
def extract_links(self, base_url, html):
soup = BeautifulSoup(html, "html.parser")
links = []
for a in soup.find_all("a", href=True):
links.append(urljoin(base_url, a["href"]))
return links
async def worker(self, session):
while True:
url = await self.queue.get()
if url in self.visited:
self.queue.task_done()
continue
self.visited.add(url)
try:
html = await self.fetch(session, url)
for link in self.extract_links(url, html):
if link not in self.visited:
await self.queue.put(link)
except Exception as e:
print(f"failed {url}: {e}")
finally:
self.queue.task_done()
async def crawl(self, start_url, num_workers=10):
await self.queue.put(start_url)
async with aiohttp.ClientSession() as session:
workers = [
asyncio.create_task(self.worker(session))
for _ in range(num_workers)
]
await self.queue.join()
for w in workers:
w.cancel()
Process
Good for CPU-heavy processes to utilize more cores. Threads will be interrupted by OS preemptive scheduling, but AsyncIO switch is cooperative at await points.
Enjoyed this post? Subscribe for more.