util.async

This module provides an API to help implement asynchronous non-blocking code. It is divided into a handful of objects, each of which are described below.

runner

A runner essentially makes it possible to temporarily pause a function while it is executing, so that it can wait for the result of an external operation without blocking the server. Runners are implemented using Lua coroutines, but they should only be managed through util.async, the API documented here.

When creating a runner, you need to pass in a function that the runner will execute. This function will receive the data that you pass the runner using the :run() method (see below). Note that there is no direct mechanism for returning values from this function, all return values are ignored. However it's easy to simply share variables and such using usual Lua techniques.

runner:run(data)

Once a runner is created, you can run it as many times as you like. The given data will be passed to the function you provided, either immediately or when it has finished executing previous data if it is currently busy (each runner will only process one piece of data at a time).

runner:enqueue(data)

Add data to a runner's queue, without starting the runner if it is currently idle. You may call runner:run() later to start processing the queue.

waiter

A waiter can be used within a runner to temporarily suspend the runner until some event happens (typically a callback). The usage is simple.

When you call waiter(), it returns two functions. You must call the first one when you want to pause execution. Call the second one when you want to resume it again. An example explains best:

local async = require "util.async";
local timer = require "util.timer";
 
function my_async_func()
    local wait, done = async.waiter(); -- Get the two functions
    timer.add_task(5, function () -- Call done() in 5 seconds
       done();
    end);
    print "Wait 5 seconds..."
    wait(); -- Wait here until done() is called
    print "Hello world"
end

The wait() function must only be used once. To pause execution again, create two new functions by calling waiter().

As an additional feature, if you need to wait for multiple things to happen, you can pass a number to waiter(), and it will wait for done() to be called that number of times before resuming from wait(). For example:

    local wait, done = waiter(2);
    timer.add_task(1, function ()
        done(); -- Won't resume yet
    end);
    timer.add_task(5, function ()
        done(); -- This, second, call will resume wait()
    end);
    wait();

Promises

Currently wait_for is only available in the development branch (trunk).

Sometimes you have a promise and you wanna wait for it to settle. For this you can use a waiter, like this:

local wait, done = waiter();

local result, err;
promise:next(function(result_)
    result = result_;
end, function(err_)
    err = err_;
end):finally(done);
wait();
print(result or err);

To make this more convenient, a convenience function exists for this pattern, called wait_for:

local result, err = async.wait_for(promise);

Examples

Basic example

Here is a basic example, using net.adns, Prosody's asynchronous DNS lookup API:

local dns_lookup = require "net.adns".lookup;
local runner, waiter = require "util.async".runner, require "util.async".waiter;
 
local async_func = runner(function (name)
    local wait, done = waiter();
    local result;
    local lookup = dns_lookup(function(answer)
        result = answer;
        done();
    end, name);
    wait();
    print(result);
end);
async_func:run("example.com")

At a first glance,it might not be obvious what benefit util.async offers here. Why not just use the callback directly?

Concurrent callbacks example

A good way to demonstrate how util.async makes life easier is to show what happens if we need to do two DNS lookups at the same time. This is a real example - we generally need to look up both A records (IPv4) and AAAA records (IPv6).

Here is what that code might look like using only callbacks, without util.async:

    local dns_lookup = require "net.adns".lookup;
    local have_v4_result, have_v6_result;
 
    local function do_lookup(name)
        local result4, result6;
        local have_v4_result, have_v6_result;
 
        local function have_both_results()
            -- We should have both now
            print("Results for "..name)
            print("IPv4:", result4  or "None");
            print("IPv6:", result6 or "None");
        end
 
        -- Get the IPv4 address
        dns_lookup(function (answer)
            result4 = answer and answer[1] and answer[1].a;
            have_v4_result = true;
            if have_v6_result then  -- We have the other result, so now we...
                have_both_results();
            end
        end);
 
        -- Get the IPv6 address
        dns_lookup(function (answer)
            result6 = answer and answer[1] and answer[1].aaaa;
            have_v6_result = true;
            if have_v4_result then -- We have the other result, so now we...
                have_both_results();
            end
        end);
     end
 
     do_lookup("prosody.im");
     do_lookup("example.com");

Note how, because we cannot tell which request will be answered first, we have to perform the completion check in both callbacks. Thankfully we only have two! Also, to avoid code duplication, we've managed to factor out the "all done" case to its own function, which is called from whichever callback happens second.

However using util.async, we can forget all this complex logic, and write the code step-by-step, the way we naturally would think about the task in our heads:

    local dns_lookup = require "net.adns".lookup;
    local runner, waiter = require "util.async".runner, require "util.async".waiter;
 
    local do_lookup = runner(function (name)
         -- Get a waiter for two events
         local wait, done = waiter(2);
 
         local result4, result6;
 
         -- Get the IPv4 address
         local lookup = dns_lookup(function(answer)
             result4 = answer and answer[1] and answer[1].a;
             done();
         end, name, "A");
 
        -- And the IPv6 address
        local lookup = dns_lookup(function(answer)
            result6 = answer and answer[1] and answer[1].aaaa;
            done();
        end, name, "AAAA");
 
        wait(); -- for both to call done()
 
        -- We should have both now
        print("Results for "..name)
        print("IPv4:", result4  or "None");
        print("IPv6:", result6 or "None");
end);
 
do_lookup:run("prosody.im");
do_lookup:run("example.com");

An improvement, no?

Sequential callbacks example

The improvement that util.async brings becomes even more striking if we consider a slightly different example. Here we're going to do three things, one after the other. First, we'll wait 5 seconds, then perform a DNS lookup, and then make a HTTP request:

    local timer = require "util.timer";
    local adns = require "net.adns";
    local http = require "net.http";
 
    timer.add_task(5, function ()
        adns.lookup(function (answer)
            if answer and answer[1] then
                local ip = answer[1].a
                if ip then
                     http.request("http://example.com/?ip="..http.urlencode(ip), function (result)
                         print("Response:", result);
                     end);
                end
            end
        end, "example.com");
    end);

This code demonstrates the problem with sequential operations in callback-based APIs, often referred to as the 'Pyramid of Doom'. And our example is only quite a simple one with little actual logic.

Let's see what happens if we rewrite the code using util.async, which allows us to flatten it out using waiters:

    local timer = require "util.timer";
    local adns = require "net.adns";
    local http = require "net.http";
 
    local wait, done = waiter();
    timer.add_task(5, done);
    wait(); -- Wait for timer
 
    local wait, done = waiter();
    local ip;
    adns.lookup(function (answer)
        if answer and answer[1] then
            ip = answer[1].a
        end
        done();
    end, "example.com");
    wait(); -- Wait for DNS result
 
    if ip then
        http.request("http://example.com/?ip="..http.urlencode(ip), function (result)
            print("Response:", result);
        end);
    end