When programming in direct style the primitives will have the signatures shown on Figure 4. Note that some of the operations are potentially blocking. The source code for the sieve is shown on Figure ??(a).
In direct style, a potentially blocking operation looks just like an ordinary function. This is what is provided by preemptive scheduling systems since preemption can occur anywhere. Thus, standard OCaml support for threads can be used to implement our concurrency model in direct style. We don’t describe this implementation since its only purpose is to be able to measure how light our light weight implementations really are.
(a)type α mvar
val make_mvar : unit → α mvar
val take_mvar : α mvar → α
val put_mvar : α mvar → α → unit
(b)
Figure 4: Direct style signatures for the thread primitives
We want to be able to suspend a running thread and activate it again later, so we need some way to save the current thread state, or rather continuation. The continuation of a computation at some point is what remains to be done at this point, in other words the rest of the computation. It is represented by the context of the computation [4]. The control flow of a program can be treated in terms of continuations.
The call with current continuation primitive (often abreviated as call/cc) was first defined in Scheme [12]. It captures (makes a copy of) the current continuation and reifies (makes it available in the program) it into a value of type α cont. Thus, continuations, which in most languages are implicit, can be explicitly captured and manipulated (passed as parameters, saved in data structures etc) like any other value. These “first class continuations” can also be thrown4(and given a parameter of type α), meaning that the current continuation is discarded and replaced with the thrown one, so that execution resumes at the point where the continuation was captured. Continuations can be used to implement all sorts of manipulations of the control flow, including multi-threading.
This implementation is designed along the lines described in [8]. The queue (of type queue_t) stores the continuations of the suspended threads but also the initial continuation e (of the call to start) to be thrown when the queue becomes empty so that control returns after the call to start.
type α t = α → unit
type queue_t = { mutable e:unit t; q:unit t Queue.t }
let q = { e = (fun()→()); q = Queue.create () }
let enqueue t = Queue.push t q.q
let dequeue () = try Queue.take q.q with Queue.Empty → q.e
There is no scheduler proper, rather each yielding thread captures its continuation with callcc, packages and enqueues it before dequeueing and throwing the next one. spawn inserts halt at the end of a new thread, so that it dequeues and throws the next thread continuation when it terminates.
exception Stop
let stop () = raise Stop
let start () =
try
callcc (fun exitk →
q.e ← (fun () → throw exitk ());
dequeue () ())
with Stop → ()
let yield () =
callcc (fun k → enqueue (fun () → throw k ()); dequeue () ())
let spawn p = enqueue (fun () → p (); halt ())
We implement MVars as a struct containing three option values (they may be empty): the value stored in the MVar, the continuation of the thread blocked on a take_mvar operation, the continuation of the thread blocked on a put_mvar operation along with the value it wanted to put. Suppose a thread blocks on put_mvar, its continuation is captured by callcc, packaged and stored in the MVar write field. The call to halt does not halt anything but ensures the next thread is resumed. When a thread later performs take_mvar on the same MVar it removes the packaged continuation from the write field and enqueues it to be run. The code for Fifos is obviously similar.
type α mvar = { mutable v:α option;
mutable read: α t option;
mutable write: (unit t × α) option }
let make_mvar () = { v=None; read=None; write=None }
let put_mvar out v =
match out with
∣ { v=Some v; read=_; write=None } →
callcc (fun k →
out.write ← Some ((fun () → throw k ()),v); halt ())
∣ { v=None; read=Some r; write=None } →
out.read ← None; enqueue (fun () → r v)
∣ { v=None; read=None; write=None } → out.v ← Some v; ()
∣ _ → failwith "failed put_mvar"
let take_mvar inp =
match inp with
∣ { v=Some v; read=None; write=None } → inp.v ← None; v
∣ { v=Some v; read=None; write=Some(c, v′) } →
inp.v ← Some v′; inp.write ← None; enqueue c; v
∣ { v=None; read=None; write=_ } →
callcc (fun k →
inp.read ← Some (fun v → throw k v);
Obj.magic halt ())
∣ _ → failwith "failed take_mvar"
Note that we have to fool the typechecker with Obj.magic in take_mvar (and take_fifo) to ensure these functions are polymorphic. Otherwise, the call to halt makes it decide the function must return unit and the MVars loose their polymorphism.
callcc captures a whole continuation. An alternative is to capture delimited continuations, as provided by the caml-shift library [23]. A delimited continuation (also called partial, composable, or sub continuation), is a prefix of the rest of the computation, represented by a delimited part of the context of the computation. Unlike regular continuations, delimited continuations return a value, and thus may be reused and composed.
Several slightly different operators have been proposed in the litterature but the general idea is that such a continuation is delimited by first pushing a delimiter (often called a prompt) on the stack, and later capturing the continuation, up to the first prompt.
In this library, push_prompt pushes a prompt on the stack, marking the delimitation, while take_subcont turns the part of the stack up to (and not including) the first prompt into a (α,β) subcont value and removes it (along with the prompt) from the stack.5 Here α is the type of values that must be given when throwing the continuation, and β is the type of values returned by the continuation. push_subcont pushes a delimited continuation on the stack (i.e. throws it).
We can now use a simple FIFO queue (as provided by the OCaml Queue) to implement our scheduler:
let runq = Queue.create ()
let enqueue t = Queue.push t runq
let dequeue () = Queue.take runq
exception Stop
let stop () = raise Stop
let start () =
try
while true do
dequeue () ()
done
with Queue.Empty ∣ Stop → ()
let shift0 p f = take_subcont p (fun sk () →
(f (fun c → push_prompt_subcont p sk (fun () → c))))
let yield () = shift0 prompt (fun f → enqueue f)
let halt () = shift0 prompt (fun f → ())
Basically, when suspending a thread we need to capture the continuation and store it. When resuming a thread we need to re-push the prompt and the subcont. When capturing a continuation we will package it in a function that pushes the prompt and this continuation. This is the behavior of the shift0 operator [4]. The scheduler just needs to run the packaged function for resuming a thread (thus the dequeue () () in the start loop above).
halt removes any trailing context along with the prompt to “clean up” the stack. The shift0 definition above is an optimized6 version of
let shift0 p f = take_subcont p (fun sk () →
(f (fun c → push_prompt p
(fun () → push_subcont sk (fun () → c)))))
The spawn function (that adds a thread in the queue) packages its argument thunk so that it first push_prompt and calls halt at the end to ensure the prompt is removed when the thread terminates.
let spawn t = enqueue (fun () → push_prompt prompt (fun () → t (); halt ()))
The code for MVars is similar to the one using callcc. The only difference, beside the continuation being captured by shift0, is that the thread simply returns to the scheduler that will itself resume the next thread.
type α t = α → unit
type α mvar = { mutable v:α option;
mutable read: α t option;
mutable write: (unit t × α) option }
let make_mvar () = { v=None; read=None; write=None }
let put_mvar out v =
match out with
∣ { v=Some v′; read=_; write=None } →
shift0 prompt (fun f → out.write ← Some (f,v))
∣ { v=None; read=Some r; write=None } →
out.read ← None; enqueue (fun () → r v)
∣ { v=None; read=None; write=None } → out.v ← Some v
let take_mvar inp =
match inp with
∣ { v=Some v; read=None; write=None } → inp.v ← None; v
∣ { v=Some v; read=None; write=Some(c,v′) } →
inp.v ← Some v′; inp.write ← None; enqueue c; v
∣ { v=None; read=None; write=_ } →
shift0 prompt (fun f → inp.read ← Some f)