The basic idea of indirect style is to write the thread code so that the continuations are made explicit (as closures) at each potentially blocking point. This way, the continuation can be manipulated without the need for any continuation-capture primitive.
Look at the code of the sift thread of the sieve on Figure ??(b). We use >>= (pronounced bind) as the thread sequential composition operator.7 This operator appears at cooperation points, between a potentially blocking operation and its continuation. The continuation is a closure that will be executed when the blocking operation will have completed. The parameter of the continuation will receive the result of the operation. Imperative loops must be turned into (tail-) recursive functions if they contain a blocking operation. One more operation is useful in indirect style : skip, the no-op.
Trampolined style (derived from continuation passing style), monadic style based either on continuations or on promises and event-based programming are all variants of the indirect style. We describe implementations of our concurrency model in all of them in the following.
The idea of trampolined style [9] is that the code is written so that a potentially blocking function is given explicitly its continuation as a closure. It can then manipulate it just like the direct style continuation-capture based versions. The code must be written in a way similar to continuation passing style [25], but the continuations need to be made explicit only at cooperation points.
Figure 5 shows the signatures of the operations. As we can see, each potentially blocking operation is given (as an additional parameter) the (continuation) function to run when the operation has been performed.
val skip : (unit → unit) → unit
val ( >>= ) : ((α → unit) → unit) → (α → unit) → unitval yield : (unit → unit) → unit
val spawn : (unit → unit) → unit
val halt : unit → unit(a)type α mvar
val make_mvar : unit → α mvar
val take_mvar : α mvar → (α → unit) → unit
val put_mvar : α mvar → α → (unit → unit) → unit
(b)
Figure 5: Trampolined style signatures for the thread primitives
For example the yield operation can be used as shown below on the left where the argument is the continuation, i.e. the function to be executed when the thread will be resumed. By defining “bind” (infix >>=) to take two arguments and apply its first to the second (the continuation) we obtain a arguably more pleasant syntax.8 Adopting an indentation more fitted to the intended “sequential execution” semantics, the code can now be written as below on the right.
Writing code so that continuations are explicit is often not as intrusive as one may feel initially. As the code for the sieve shows, continuations often do not even appear explicitly. Actually, only when using procedural abstractions to build complex blocking operations do we need to manipulate explicitly the extra parameter. Even then it is easy, as the following two functions show. The first one abstracts the operation of yielding three times and the second one the transfer of a value from an MVar to another one:
Since a potentially blocking function, such as yield or take_mvar, takes its continuation as an additional parameter, it can execute it immediately or, if it needs to block, store it for later resuming before returning to the scheduler.
The code is very similar to the one using delimited continuations: the only difference is we don’t have to capture continuations as they are provided explicitly as illustrated below:
∣ { v=Some v; read=None; write=Some(c,v′) } →
inp.v ← Some v′; inp.write ← None; enqueue c; k v
A small note about spawn. We can see that composing fragments with >>= produces a function accepting a continuation that needs, before to be executed, to be given a final dummy continuation. spawn could do that, but as the sieve example shows, when composing recursively we may omit the thread continuation argument. We thus define spawn as acting on thunks, and define a close function building a thunk by providing the dummy continuation.
Trampolined style has been created in the dynamically language Scheme. As we’ll see, the continuation monad [5] is basically a formulation of the same ideas in a statically typed language. But let’s first quicky define monads.
Monads are useful in a variety of situations for dealing with effects in a functional setting [20]. A monad is a (parameterized) data type α t with (at least) the return and bind (noted infix >>=) primitives whose types are shown on Figure 6. As the types suggest, return v builds a monadic α t value “containing” v and m >>= f “opens up” m to get its enclosed value v, give it to f and return the monadic value returned by f.
Here we can define type α t = (α → unit) → unit so that a “monadic value” α t is a function taking an α → unit continuation. We can now think of blocking functions as returning a value of type α t as can be seen on Figure 6.
type α t
val return : α → α t
val ( >>= ) : α t → (α → β t) → β tval spawn : (unit → unit t) → unit
val skip : unit t
val yield : unit → unit t
val halt : unit → unit t
val stop : unit → unit t
val start : unit → unit
(a)type α mvar
val make_mvar : unit → α mvar
val put_mvar : α mvar → α → unit t
val take_mvar : α mvar → α t
(b)
Figure 6: Monadic style signatures for the thread primitives
This bind is obviously different from the trivial application operator that we used in trampolined style. Apart from ensuring there’s one argument to all functions (besides the continuation) it removes the need to explicitly manage the continuation parameter when composing thread fragments. Here’s our yield3 example (we have cosmetically changed the definition of yield so that it takes a () argument to be compatible with the new bind):
Just to be explicit about how this works, the “expanded” types and definitions of return and bind are:
val return : α → (α → unit) → unit
let return a = fun k → k a
val (>>=) : ((α → unit) → unit) → (α → (β → unit) → unit) → (β → unit) → unit
let (>>=) f k′ = fun k → f (fun r → k′ r k)
that is, bind returns a function accepting a continuation. Let’s see this on our transfer_mvar example by expanding its definition:
fun k → take_mvar m1 (fun r → (fun v → put_mvar m2 v) r k)
Finally, spawn provides the final dummy continuation to get a thunk it then enqueues.
let spawn (t : unit → unit t) = enqueue (fun () → t () (fun () → ()))
A promise [18] is a “proxy” value that can be used to later access a value that is not immediately available. Using promises, blocking operations need not block: they return immediately a promise for the requested value. A promise can either be ready if the value is indeed available or blocked if it’s still not. The promise can be passed along until its value is actually needed: the claim operation will then be used to get the promised value. Of course claim itself may block.
Using the promise monad to implement threading, operations will still have the signatures of Figure 6. Blocking operations will return a promise and the duty of claim will be performed by bind. In t >>= f, bind will either:
Here’s a brief description of how it can be implemented (Figure 7). This implementation is heavily inspired by Lwt (light weigth threads) [26], a cooperative threads library for OCaml. Our implementation tries to retain only the core workings of Lwt so that it can be compared to the other implementations. Also, [26] describes it differently, by interpreting α t as the type of threads rather than that of promises.
α t is the type of promises for a value of type α. It is a record with one mutable field denoting the current promise state. It can be Ready and containing the promised value, Blocked and holding a list of thunks (the “waiters”) to execute when it will be ready, or Linked to another one (that means it will behave the same, having been connected to it). repr gets the promise a given promise is Linked to.
Consider the evaluation of t >>= f, following the code for >>=. Evaluating t provides a promise. If it’s ready its value v is passed to f. If it’s blocked, its value is Blocked w. A new blocked promise res is created, a thunk is added to the w list, then res is returned.
Thus, when t becomes ready (through fullfill), the thunk is executed. It passes the value to f which returns a new promise (let’s call it p). res is then connected to p. The code for connect shows that if p is ready res is fullfilled. If p is blocked it is changed as a Link to res: this ensures any later operation (such as fullfill) involving p will be actually performed on res.
type α state =
∣ Ready of α
∣ Blocked of (α t → unit) list ref
∣ Link of α tand α t = { mutable st : α state }
let rec repr t =
match t.st with
∣ Link t′ → repr t′
∣ _ → tlet blocked () = { st = Blocked (ref [ ]) }
let ready v = { st = Ready v }let runq = Queue.create ()
let enqueue t = Queue.push t runq
let dequeue () = Queue.take runqlet fullfill t v =
let t = repr t in
match t.st with
∣ Blocked w →
t.st ← Ready v;
List.iter (fun f → f t) !w
∣ _ → failwith "fullfill"let connect t t′ =
let t′ = repr t′ in
match t′.st with
∣ Ready v → fullfill t v
∣ Blocked w′ →
let t = repr t in
match t.st with
∣ Blocked w → w := !w @ !w′;
t′.st ← Link t
∣ _ → failwith "connect"let (>>=) t f =
match (repr t).st with
∣ Ready v → f v
∣ Blocked w → let res = blocked () in
w := (fun t → let Ready v = t.st in
connect res (f v)):: !w;
res
Figure 7: Promise monad: core
Figure 8 shows that the scheduler first fullfills the wait_start promise. spawn makes threads wait on it to ensure they don’t start running before allowed to. The code for yield shows how we suspend a thread: we create a blocked promise p, enqueue a function that will fullfill it and return p. bind will add to p a thunk connecting the promise of the next computation to its own res. When the scheduler dequeues the function enqueued by yield, it fullfills p and so executes the thunk. The same applies for MVars.
let skip = ready ()
let halt () = ready ()
let yield () = let p = blocked () in enqueue (fun () → fullfill p ()); p
let wait_start = blocked ()
let spawn t = wait_start >>= t; ()exception Stop
let stop () = raise Stoplet start () =
fullfill wait_start ();
try
while true do
dequeue () ()
done
with Queue.Empty ∣ Stop → ()type α mvar = { mutable v:α option;
mutable read: α t option;
mutable write: (unit t × α) option }let put_mvar out v =
match out with
∣ { v=Some v′; read=_; write=None } →
let w = blocked () in out.write ← Some (w,v); w∣ { v=None; read=Some r; write=None } →
out.read ← None; enqueue (fun () → fullfill r v); ready ()∣ { v=None; read=None; write=None } → out.v ← Some v; ready ()
Figure 8: Promise monad: scheduler, MVars
It’s not obvious to see how operations chain, so we illustrate the execution of our yield3 example on Figure 9. A promise is shown as a square (R for ready, B for blocked, pointing to its waiter thunk if any, L for a link). (a) In the first occurence of bind (subscripted 1 for convenience), the first yield (subscripted a) returns a blocked promise pa. bind adds it a waiter thunk and returns a fresh blocked promise res1. Remember yield has enqueued a thunk to fullfill pa. (b) When this occurs, pa’s waiter thunk is executed. yieldb returns a blocked promise pb with the rest of the computation as waiter thunk. bind2 returns a new blocked promise res2 which is connected to res1. Since res2 is blocked, it is turned into a link to res1. (c) When pb’s waiter thunk is executed, yieldc returns still a blocked promise pc, that is turned into a link to res1 (since res2 is itself a link to res1) by connect. (d) Finally, when pc is fullfilled, res1 becomes ready.
A popular paradigm supporting user level concurrency is event-driven programming. The OCamlNet library [24] provides an equeue (for event queue) module in which handlers (or callbacks) are set up to process events.
We describe it briefly. First an event system (called esys here) must be created. Events are generated by an event source (here it is the function fun _ → () that generates none) but can also be added by the handlers themselves. Each event in presented to each handler, in turn, until one accepts it (or it is dropped if no one does). An handler rejects an event by raising the Reject exception. Otherwise the event is accepted. In case the handler, having accepted the event, wants to remove itself it must raise the Terminate exception.
The event system is activated by the Equeue.run function. The function returns when all events have been consumed and the event source does not add any.
In our implementation, handlers will always be “one shot”, so they will always raise Terminate after having accepted an event. But before to do that, they will have registered a new handler representing the thread continuation.
For yielding, a thread creates a new eventid, registers its continuation as a handler to the Go event with the correct id, and adds this precise event to the system.
type eventid = unit ref
type α event = Written of eventid
∣ Read of eventid × α ∣ Go of eventid
let esys : int event Equeue.t =
Equeue.create (fun _ → ())
let yield k =
let id = make_eventid () in
Equeue.add_handler esys
(fun esys e →
match e with
∣ Go id′ when id′ ≡ id → k ()
∣ _ → raise Equeue.Reject);
Equeue.add_event esys (Go id);
raise Equeue.Terminate
A thread blocked on a MVar waits for a unique event allowing it to proceed. Blocked writers create a new eventid that they register in the control information of the MVar, along with the value they want to write. They then wait for a Written event with the correct id. Such an event will be generated when the value will have been actually put in the MVar, operation triggered by the takeing of the current MVar value by another thread. Blocked readers create a new eventid and wait for the Read event that will carry the value taken from the MVar. Again, this event will be generated when some thread puts a value in the MVar.
type α mvar = { mutable v:α option;
mutable read:eventid option;
mutable write:(eventid × α) option }
let make_mvar () = { v=None; read=None; write=None }
let put_mvar out v k =
match out with
∣ { v=Some v′; read=_; write=None } →
let id = make_eventid () in out.write ← Some (id, v);
Equeue.add_handler esys (fun esys e →
match e with
∣ Written id′ when id′ ≡ id → k ()
∣ _ → raise Equeue.Reject);
raise Equeue.Terminate
∣ { v=None; read=Some id; write=None } →
out.read ← None;
Equeue.add_event esys (Read(id, v));
k ()
∣ { v=None; read=None; write=None } → out.v ← Some v; k ()
Since each blocking operation (in case it actually blocks) registers a new handler and then raises Terminate, threads must be running as handlers from the very beginning (for the Terminate exception to be catched by the event system). To ensure this, spawn registers the new thread as a handler for a new Go event, then adds the event to the system.
let spawn t =
let id = make_eventid () in
Equeue.add_handler esys (fun esys e →
match e with
∣ Go id′ when id′ ≡ id → t ()
∣ _ → raise Equeue.Reject);
Equeue.add_event esys (Go id)
There’s one serious pitfall with this implementation: MVar operations are not polymorphic due to the event system being a monomorphic queue. Thus, all MVars are required to store the same type of value, which is a serious limitation.
The code for the applications is strictly the same as for the trampolined implementation. Indeed, the threads are written in trampolined style and the event framework is only used to build the scheduler. This implementation can be seen more as an exercise in style.9