Thread Jobs

The Thread Jobs library is the lib-common implementation of the job posting programming model. Its main inspiration is Apple’s GCD library. The main difference resides in the integration of GCD in the operating system and the roots of the runtime.

The POSIX model for parallelism has its roots at a time where computers had few processors and cores. It uses threads with explicit locking (using mutexes, condition variables, semaphores). This has some issues:

  1. the optimal cases are those with very few concurrence since contended locks behave badly

  2. neither the language nor the compiler knows about locks and so neither is able to provide good-practice or static validation. As a consequence this is error-prone

  3. everything in that context bear some overhead: allocations are slower, taking/releasing locks must be done explicitly (and in the right order), …​

  4. the code is hard to debug and validate for developers

The job posting paradigm provides a highly scalable approach to concurrent programming since there’s no more locks. Instead, each concurrent piece of code (named job) is posted in queues. Then, the actual parallelism is performed by the runtime that will schedule the jobs on the available CPUs. Serialization is performed using queues or synchronization points.

Jobs and Queues

Jobs

In the thr-job library, a job can be either a small context structure or a block. The first solution can be used in case your job are complex while the second offers a great readability by inlining the code of the job simply as a kind of branch of another function.

To use the block variant, simply use the _b variants of the functions, while the context based variant necessitate that you create a structure type that extends the thr_job_t structure, create one instance, post a job and then destroy the object.

The following code example (taken from the Block documentation) shows in a simple use case (doing "something" on every element of a vector), how both approach differ.

Without parallelism
void apply_to_all_map(qv_t(map) *maps)
{
    qv_for_each_pos(map, pos, maps) {
        map_t *map = maps->tab[pos];

        do_something1(map);
        do_something2(map);
    }
}
Parallelism without blocks
struct apply_to_map_ctx_t {
    thr_job_t job;
    mapt_t *map;
};

static void apply_to_map_job(thr_job_t *job)
{
    struct apply_to_map_ctx_t *ctx
        = container_of(job, struct apply_to_map_ctx_t, job);

    do_something1(ctx->map);
    do_something2(ctx->map);
    p_delete(&job); /* Don't forget to deallocate the job */
}

void apply_to_all_map(qv_t(map) *maps)
{
    thr_syn_t syn;

    thr_syn_init(&syn);
    qv_for_each_pos(map, pos, maps) {
        /* We allocate the context because is needs
         * to persists until the job is executed
         * (with is out of scope of the loop.
         */
        struct apply_to_map_ctx_t *ctx
            = p_new(struct apply_to_map_ctx_t, 1);

        ctx->job.run = &apply_to_map_job;
        ctx->map = maps->tab[pos];
        thr_syn_schedule(&syn, &ctx->job);
    }
    thr_syn_wait(&syn);
    thr_syn_wipe(&syn);
}
Parallelism with blocks
void apply_to_all_map(qv_t(map) *maps)
{
    thr_syn_t syn;

    thr_syn_init(&syn);
    qv_for_each_pos(map, pos, maps) {
        map_t *map = maps->tab[pos];

        /* The allocation and variable
         * capture is automatic thanks
         * to the Block's runtime.
         */
        thr_syn_schedule_b(&syn, ^{
            do_something1(map);
            do_something2(map);
        });
    }
    thr_syn_wait(&syn);
    thr_syn_wipe(&syn);
}

As you can see, using the Block version is much less intrusive (in that case) since it allows jobs to be posted by simply adding a thr_syn_schedule_b around the body of the loop.

Unordered execution

Jobs can be posted in an unordered fashion. For this, use the thr_schedule function family. Unless there are too many awaiting jobs, this function simply puts the job in a pending job queue and returns immediately (if the queue is full - 256 awaiting jobs, the job get executed immediately). Jobs posted that way can be executed concurrently, which means they must be very careful not having side effects on shared memory.

Scheduling jobs for unordered execution is the most efficient solution in term of parallelism. It scales gracefully to the number of CPUs, so it the solution of choice whenever you have to bring parallelism to some code. However, it has constraints, the main one is that you must ensure that each job is self-contained and won’t touch memory used by other jobs. Moreover, if you want optimal performances, you’ll want the context of each job to be 64-bytes aligned in order to avoid sharing a CPU cache-line with another job, which would hurt the performances with CPU-level locking.

If we take back our loop example, we post one job per entry of the vector using thr_syn_schedule_b. This means that we post a lot of jobs that can be executed in a totally random order (and they will be due to the fact some jobs will get transferred from a CPU queue to another).

Ordered execution

Sometimes we need to ensure that actions are executed sequentially. This happens when some action interacts with thread-unsafe resources, for example, if you want to interact with the event loop (or ichannels). In that case, the solution is to put the job in a queue.

Queues are instantiated using thr_queue_create() (excepted the default special thr_queue_main_g, that is created by the thr-job runtime). Then you can post jobs in them using the thr_queue function family. Job posting on a queue is thread-safe, and the execution of a queue is guaranteed to be ordered and sequential. There’s absolutely no guarantee however that a job posted on a queue will be executed immediately or in the same thread as the one in which it get posted.

The mostly used queue is thr_queue_main_g. Every job posted on that queue is guaranteed to be executed on the main thread and thus it is used for example to send the result of a job back in a ichannel. That particular queue is automatically consumed whenever the event loop is executed or when a thr_syn_wait is used on the main thread. Moreover, when the event loop is idle and a job is queued on thr_queue_main_g, the event loop is automatically waken up and the job get consumed immediately.

Synchronisation points

In order to be able to wait for the completion of some jobs, you can place a synchronization point by calling thr_syn_wait(). For this, you must queue or schedule you job with the _syn variant of the call and provide a thr_syn_t previously initialized.

A thr_syn_t can be used several times and can be waited concurrently by several threads.

The queue API offers a helper thr_queue_sync that post a job on a queue and wait for the job to be finished. Be careful in using this because jobs posted using that function must not use thr_queue_sync on the same queue themselves (the queue is not reentrant).

In a general rule of thumb, synchronization must be done carefully because it can lead to deadlocks in case of misuse.

Moreover, using a thr_syn_t is often mandatory, even in purely asynchronous jobs because it can be used to wait for the termination of all background tasks.

Our loop example above use a thr_syn_t to ensure that even if each individual job is performed asynchronously, the whole processing remains synchronous.

thr_syn_wait() may spawn new threads, so using it inside recursive jobs is forbidden. If jobs spawns children jobs (recursively), we usually want the job to wait for all children before returning, so we used thr_syn_wait() to synchronize at their end. However, this could create a huge amount of threads suddenly similar to a fork bomb.

For example, this pattern (spawn/wait/done) should never be used:

void do_job()
{
   thr_syn_t syn;

   /* do stuff */

   /* spawn children */
   thr_syn_init(&syn);
   for (int i = 0; i < n; i++) {
      thr_syn_schedule_b(&syn, ^{
         do_job();
      });
   }
   thr_syn_wait(&syn);
   /* all data is ready, end of children */
   end_of_current_job_level()
}

Instead, you should use only one thr_syn_t (hence only one thr_syn_wait()), and passing a callback that will be executed by the last job:

void do_job(thr_syn_t *syn, block_t end_of_parent_job)
{
   atomic_uint count = ATOMIC_VAR_INIT(n);

   /* do stuff */

   /* spawn children */
   for (int i = 0; i < n; i++) {
       thr_syn_schedule_b(&syn, ^{
           do_job(syn, ^{
               if (atomic_fetch_sub(&count, 1) == 1) {
                  /* last child executes this function's  */
                  end_of_current_job_level();
                  /* last child also executes parent continuation */
                  end_of_parent_job();
               }
           });
       });
   }
}

void main_call()
{
    thr_syn_t syn;

    thr_syn_init(&syn);
    do_job(&syn, ^{
        /* called after everything ended, you probably want to
         * set a local variable */
    });
    thr_syn_wait(&syn);
}

Usage patterns

This section show some simple usage pattern that can be easily reproduced.

Divide & Conquer

Base

This is the use case demonstrated above:

  • a data set get split in a multitude of small data chunks

  • each data chunk is processed independently

  • we wait for the completion of the job

Vector Tree
void apply_to_all_map(qv_t(map) *maps)
{
    thr_syn_t syn;

    thr_syn_init(&syn);
    qv_for_each_pos(map, pos, maps) {
        map_t *map = maps->tab[pos];

        thr_syn_schedule_b(&syn, ^{
            do_something(map);
        });
    }
    thr_syn_wait(&syn);
    thr_syn_wipe(&syn);
}
typedef tree_t {
    tree_t *left;
    tree_t *right;

    map_t *map; /* ... payload */
} tree_t;

static void apply_to_tree(thr_syn_t *syn, tree_t *tree)
{
    if (!tree) {
        return;
    }
    thr_syn_schedule_b(syn, ^{
        apply_to_tree(syn, tree->left);
    });
    thr_syn_schedule_b(syn, ^{
        apply_to_tree(syn, tree->right);
    });
    do_something(tree->map);
}

void apply_to_all_tree(tree_t *tree)
{
    thr_syn_t syn;

    thr_syn_init(&syn);
    apply_to_tree(&syn, tree);
    thr_syn_wait(&syn);
    thr_syn_wipe(&syn);
}

Double explosion

In case you are using large data set, it can be clever to first post coarse-grained jobs which themselves will post fine-grained jobs. This is because the CPU-local queue is limited to 256 pending jobs, so if you post a lot of jobs in it, a lot of them will get executed immediately hitting the peak parallelism. Moreover, since posting a job is not totally free (a few hundreds of CPU cycles), you may want to keep the fine-grained ones to be large enough to overweight that cost.

A constant named thr_parallelism_g exposes the number of processing threads. You can use it to estimate the number of coarse-grained jobs to post (usually a small factor of that constant, such a 2 times).

Here is the vector example rewritten with that double-explosion pattern:

void apply_to_all_map(qv_t(map) *maps)
{
    t_scope;
    int jobs_per_coarse = DIV_ROUND_UP(maps->len / (2 * thr_parallelism_g));
    thr_syn_t *syn = t_new_raw(thr_syn_t, 1);

    thr_syn_init(syn);

    for (int i = 0; i < maps->len; i += jobs_per_coarse) {
        thr_syn_schedule_b(syn, ^{
            for (int j = i; j < MIN(maps->len, i + jobs_per_coarse); j++) {
                map_t *map = maps->tab[j];
                thr_syn_schedule_b(syn, ^{
                    do_something(map);
                });
            }
        });
    }
    thr_syn_wait(syn);
    thr_syn_wipe(syn);
}

Post & Notify

Base

The post-notify pattern lets you perform some task in background and then notify the main thread when the task is finished. It may uses a queue for the background tasks and always use the main (or another) queue for the notification. This is a useful pattern in case you have long-running tasks that must not block the program but that require to provide user-feedback on termination.

Background computation

Background fsync()

void IOP_RPC_IMPL(compute, do)
{
    thr_schedule_b(^{
        int res = do_computation();
        thr_queue_b(thr_queue_main_g, ^{
            ic_reply(NULL, slot, .res = res);
        });
    });
}
static thr_queue_t *queue_g;

void IOP_RPC_IMPL(data, sync)
{
    thr_queue_b(queue_g, ^{
        fsync(_G.data_fd);
        thr_queue_b(thr_queue_main_g, ^{
            ic_reply(NULL, slot);
        });
    });
}

Map-Reduce

A common variant is the Map-Reduce pattern. In that pattern, we have two passes:

  • the map pass that apply a processing independently on each element of the dataset

  • the reduce pass that aggregate the results

void map_reduce(qv_t(map) *maps)
{
    __block reduce_t res;
    __block int pending = maps->len;
    thr_queue_t *reduce_queue = thr_queue_create();

    qv_for_each_pos(maps, pos, maps) {
        map_t *map = maps->tab[pos];

        thr_schedule_b(^{
            intermediate_t map_res = do_map(map);
            thr_queue_b(reduce_queue, ^{
                do_reduce(res, map_res);
                pending--;
                if (pending == 0) {
                    thr_queue_g(thr_queue_main_g, ^{
                        has_result(res);
                    });
                }
            });
        });
    }
}

mpsc-queue

The MPSC queue (multiple producers, single consumer) provide a lower level approach. Instead of posting jobs in a queue, you post data in a queue and if the queue was empty then you post a job that will consume it. This approach could be used in a database commit management. However, this has caveats (mostly because you have not ordering guarantee on the notification part) so use it with a lot of care.

Limited parallelism

Sometime, you may want to post asynchronous jobs but limit the actual amount of parallelism to something well under to the number of CPUs. For this, you can instantiate a set of queues (no more that the maximum parallelism you want to allow) and dispatch your jobs in those queues. Jobs are executed sequentially in each queue but in random order across queues so, you fall back the "unordered" processing case.