Coding Collie Logo
Coding Collie

Concurrency [in Python]

Authors
  • avatar
    Name
    Kai Kang
    Role
    Staff Software Engineer @ Meta · Solo App Builder
    Twitter

Thread

Threads are good for IO-bound work. Each python thread maps to a real OS thread. Example:

from concurrent.futures import ThreadPoolExecutor
import time

def download(url: str) -> str:
    time.sleep(1)  # pretend network I/O
    return f"downloaded {url}"

urls = ["a.com", "b.com", "c.com", "d.com"]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(download, urls))

print(results)

One thread can execute one Python bytecode under GIL. It might help if each thread is a native library run like numpy, but need to benchmark.

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import requests


class ThreadedCrawler:
    def __init__(self, max_workers=10, max_pages=50):
        self.max_workers = max_workers
        self.max_pages = max_pages
        self.visited = set()

    def fetch(self, url: str) -> str:
        print(f"Fetching: {url}")

        resp = requests.get(
            url,
            timeout=10,
            headers={"User-Agent": "SimpleThreadedCrawler/1.0"},
        )
        resp.raise_for_status()

        content_type = resp.headers.get("content-type", "")
        if "text/html" not in content_type:
            return ""

        return resp.text

    def extract_links(self, base_url: str, html: str) -> list[str]:
        soup = BeautifulSoup(html, "html.parser")
        links = []

        for a in soup.find_all("a", href=True):
            absolute_url = urljoin(base_url, a["href"])
            absolute_url = self.normalize_url(absolute_url)

            if absolute_url:
                links.append(absolute_url)

        return links

    def normalize_url(self, url: str) -> str | None:
        parsed = urlparse(url)

        if parsed.scheme not in ("http", "https"):
            return None

        # Remove fragment: /page#section -> /page
        return parsed._replace(fragment="").geturl()

    def crawl(self, start_url: str) -> set[str]:
        to_visit = [start_url]

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {}

            while (to_visit or futures) and len(self.visited) < self.max_pages:
                # Submit new work while we have capacity.
                while (
                    to_visit
                    and len(futures) < self.max_workers
                    and len(self.visited) < self.max_pages
                ):
                    url = to_visit.pop()

                    if url in self.visited:
                        continue

                    self.visited.add(url)
                    future = executor.submit(self.fetch, url)
                    futures[future] = url

                if not futures:
                    break

                done, _ = wait(futures, return_when=FIRST_COMPLETED)

                for future in done:
                    url = futures.pop(future)

                    try:
                        html = future.result()
                    except Exception as e:
                        print(f"Failed {url}: {e}")
                        continue

                    for link in self.extract_links(url, html):
                        if link not in self.visited:
                            to_visit.append(link)

        return self.visited


if __name__ == "__main__":
    crawler = ThreadedCrawler(max_workers=10, max_pages=30)
    visited = crawler.crawl("https://example.com")

    print("\nVisited:")
    for url in visited:
        print(url)

AsyncIO

asyncio scales better with lower overhead.

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin

class AsyncCrawler:
    def __init__(self, max_concurrency=20):
        self.visited = set()
        self.queue = asyncio.Queue()
        self.sem = asyncio.Semaphore(max_concurrency)

    async def fetch(self, session, url):
        async with self.sem:
            async with session.get(url, timeout=10) as resp:
                if resp.status != 200:
                    return ""
                return await resp.text()

    def extract_links(self, base_url, html):
        soup = BeautifulSoup(html, "html.parser")
        links = []

        for a in soup.find_all("a", href=True):
            links.append(urljoin(base_url, a["href"]))

        return links

    async def worker(self, session):
        while True:
            url = await self.queue.get()

            if url in self.visited:
                self.queue.task_done()
                continue

            self.visited.add(url)

            try:
                html = await self.fetch(session, url)
                for link in self.extract_links(url, html):
                    if link not in self.visited:
                        await self.queue.put(link)
            except Exception as e:
                print(f"failed {url}: {e}")
            finally:
                self.queue.task_done()

    async def crawl(self, start_url, num_workers=10):
        await self.queue.put(start_url)

        async with aiohttp.ClientSession() as session:
            workers = [
                asyncio.create_task(self.worker(session))
                for _ in range(num_workers)
            ]

            await self.queue.join()

            for w in workers:
                w.cancel()

Process

Good for CPU-heavy processes to utilize more cores. Threads will be interrupted by OS preemptive scheduling, but AsyncIO switch is cooperative at await points.

Enjoyed this post? Subscribe for more.