This blog post presents a simple implementation of Bluebird’s Promise.map function in Python. I tend to code most of my bots in NodeJS since I find it really convenient to manage asynchronous tasks (HTTP requests here) using await/async. A NodeJS library I heavily rely on is Bluebird, and in particular, its Promise.map function. Promise.map provides a simple mechanism to manage a pool of asynchronous tasks (promises) so that, at a given time, there are at most N tasks running (where N can be passed as an argument).

I was interested to know if there was an equivalent in Python. After a quick search, I found: concurrent.futures.ThreadPoolExecutor. However, concurrent.futures.ThreadPoolExecutor.map seems to take a synchronous function as input, which is not convenient if your code leverages asynchronous code, e.g. if you use aiohttp to make HTTP requests.

I didn’t search further and used this as a pretext to get more familiar with Python asyncio and to code an equivalent of Bluebird’s Promise.map in Python.

Warning: I don’t think my implementation is really Pythonic, it must just be seen as an exercise ;)

Promise.map: Example with aiohttp

To showcase our implementation of Promise.map in Python, we use an example of a parallel crawler, i.e. a crawler that fetches several pages in parallel.

First, we define an asynchronous get_url function to fetch the content of a single URL using aiohttp and extract all the links of the blog posts present on the page.

import asyncio
import aiohttp
from bs4 import BeautifulSoup


async def get_url(url):
    connector = aiohttp.TCPConnector()
    session = aiohttp.ClientSession(connector=connector)

    headers = {
        "accept-language": "en-US;q=0.8,en;q=0.7",
        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36"
        # Add any header you want
    }

    print("Crawling: {}".format(url))
    results = []
    resp = await session.get(url, headers=headers)
    print("Crawled: {}".format(url))
    html_content = await resp.text()
    await connector.close()

    soup = BeautifulSoup(html_content, 'html.parser')
    for link in soup.select('a.storylink'):
        results.append('{};{}'.format(link.get('href'), link.text))

    return results

Now that we have our get_url function to asynchronously fetch the list of blog posts on an URL, we define the promise_map function that mimics the behavior of Bluebird’s Promise.map function.

It takes 3 values as input:

  1. values: the values you want to iterate on;
  2. mapper: the function you want to apply on each value of values. Note that my implementation requires the mapper to have a single argument as a parameter;
  3. concurrency: the number of parallel tasks.
async def promise_map(values, mapper, concurrency):

    async def mapper_wrapper(iterator, mapper):
        # store all results of a given coroutine in a list
        res_coroutine = []
        
        # Iterate on the iterator
        # Note that this iterator is shared among all coroutines
        # so that each element of "values" is handled only once
        for elt in iterator:
            # Call the "mapper" function and wait for it to finish
            res_mapper = await mapper(elt)
            # Store the result of the function into res_coroutine
            res_coroutine.append(res_mapper)

        # When there's no more value to iterate on, the coroutine
        # return all its results
        return res_coroutine

    coroutines = []
    # get an iteror on the values we want to iterate on
    # the iterator will be shared among the "concurrency" workers
    values_iterator = iter(values)

    # Spawn "concurrency" coroutines
    for idx_concurrency in range(0, concurrency):
        # we store all returned coroutines in a list 
        coroutines.append(mapper_wrapper(values_iterator, mapper))

    # Once all coroutines have been spawned, we await them
    results = await asyncio.gather(*coroutines)

    # Each coroutine returns a list
    # We flatten the list of lists to obtain a list of raw value element
    res_coroutines = []
    for res_coroutine in results:
        for v in res_coroutine:
            res_coroutines.append(v)

    return res_coroutines


Finally, we define an asynchronous main function and apply our Pythonic implementation of Promise.map on a set of Hacker news URLs.

async def main():
    # Number of pages we want to iterate on
    num_pages = 15

    # Number of concurrent requests
    concurrency = 4
    urls = ['https://news.ycombinator.com/news?p={}'.format(idx_page) for idx_page in range(1, num_pages)]

    # Launch  "concucrrency" coroutines (here 4) to fetch blog post titles present on Hacker news
    res = await promise_map(urls, get_url, concurrency)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

You can find the code of this bot on Github in my Bots zoo repository. You’ll also find other examples of bots in Python/NodeJS, as well as lists of user-agents and HTTP headers:

  • Selenium, Puppeteer, Playwright bots;
  • Parallel/sequential bots using simple NodeJS HTTP requests.

Antoine Vastel

Head of research at Datadome.