Andrey Listopadov

Naive async implementation in Fennel

@programming fennel lua async ~17 minutes read

Lately, I’ve been into understanding asynchronous programming. Since my background is mostly bare metal C, which has no asynchronous programming whatsoever (apart from running on multiple chips, and communicating via shared memory), and I did only a little bit of C++ and Rust, it’s fair to say, that async is pretty new for me. And because I now work in Clojure, I sometimes have to use asynchronous programming libraries like core.async, and manifold. Both are great libraries, providing ways to write asynchronous applications and stay sane.

Clojure has really good facilities for writing concurrent programs, as it was designed with that in mind. Fennel, on the other hand, wasn’t, as it targets Lua, and Lua doesn’t have multithreading capabilities at all. Because of that, there are no concurrency primitives in the Lua language, besides coroutines. Coroutines, however, provide a certain degree of multitasking but run on a single thread. It is possible to implement some kind of multitasking via coroutines, but it requires writing a program in a certain way.

I’ve decided to try implementing something similar to Clojure’s core.async for Fennel, but probably naive and much simpler, and see where I can get with it. This article will go into detail on how to implement a rather simple event loop, with a way of spawning concurrent tasks, which rely only on coroutines.

Writing an event loop

Event loop in our case will be an infinite loop, that cycles through all tasks over and over, executing each in a blocking way. The only way of writing a non-blocking code in Lua is to use coroutine.create and then coroutine.yield to temporarily stop the execution. Let’s start with a task queue:

(local queue [])

For our purposes, just a sequential table will do. The queue will hold coroutines, and we need to check their statuses in the loop. Let’s write the loop itself:

(fn worker []
  (while true
    (each [i task (ipairs queue)]
      (match (coroutine.status task)
        :suspended (coroutine.resume task)
        _ (table.remove queue i)))))

If the coroutine is suspended, we resume it, and go to the next one, once it yields. If coroutine has died, we remove it from the task queue to keep the queue clean.

Now we need a way to spawn coroutines. Let’s write a function, that accepts a function, creates the coroutine, and puts it in our queue:

(fn spawn [task]
  (let [task (coroutine.create task)]
    (table.insert queue task)
    task))

That’s pretty much it! Upon invocation, this function would return something like #<thread: 0x564c2c5e2ae8>, which is a newly created thread object. Not to be confused with real threads, this is just the way Lua refers to coroutines.

We now have a very, very simple event loop, that we can use to spawn coroutines like this:

(spawn (fn [] (for [i 1 5] (print "foo" i) (coroutine.yield))))
;; #(...) is the same as (fn [] ...) but shorter
(spawn       #(for [i 1 5] (print "bar" i) (coroutine.yield)))

Right now nothing happens, because we need to start the worker to see the result.

(worker)

We should see something like this:

foo 1
bar 1
foo 2
bar 2
foo 3
bar 3
foo 4
bar 4
foo 5
bar 5

As can be seen, both tasks were executed simultaneously, because each task pauses itself on each iteration step, so other tasks had a chance to run. It is an example of collaborative (preemptive) multithreading. We can’t spawn real threads in stock Lua runtime without special libraries that provide process-based multithreading.

But these libraries spawn separate lua instances that can communicate via message-passing. Coroutines on the other hand can share state via global variables, which is a bit more performant, as you don’t have to serialize and deserialize data. Although, because everything runs in a single process, there’s no real performance gain, when compared to real threading, so where the performance matters, message-passing between real threads is the way to go.

Let’s leave message passing out for now, as right now we don’t have a nice way of working with these tasks interactively. Right now, since we’ve started the worker, our tasks are completed, and the worker now loops in an infinite empty cycle, blocking our REPL. Because of this, it is quite cumbersome to work, so let’s write a REPL that will work asynchronously in our event loop alongside other tasks.

Writing an asynchronous REPL

The REPL code below is based on Fennel’s REPL test code but slightly simplified for the purpose of this article. For those unfamiliar with Lisps, REPL stands for Read Eval Print Loop and is something like an interactive shell for working with a running Lisp process. Fennel has an inbuilt REPL, and we can reuse it, but we need to make sure, that its looping aspect doesn’t block our task queue when we wait for input.

Let’s start by declaring prompt variables:

(local prompt-ready "repl>> ")
(local prompt-multi ".. ")
(var prompt prompt-ready)

I’m deliberately changing the standard ">> " prompt to "repl>> ", so we could see that we’re in our special kind of REPL a bit easier. There are two separate prompts here, and we will change the type if our REPL receives an unfinished expression, indicating that the REPL still waits for input. Next, let’s set up the REPL itself:

(local fennel (require :fennel))
(fn repl [options]
  (fn send []
    (var output []) ; ➊
    (fennel.repl
     {:readChunk (fn [{: stack-size}]
                   (set prompt (if (< 0 stack-size) ; ➋
                                   prompt-multi
                                   prompt-ready))
                   (when (> (length output) 0)
                     (io.write (table.concat output "\t") "\n"))
                   (let [chunk (coroutine.yield)] ; ➌
                     (set output [])
                     (and chunk (.. chunk "\n"))))
      :onValues (fn [x]
                  (table.insert output (table.concat x "\t")))
      :onError (fn [_ e]
                 (table.insert output (.. "error: " e)))
      :pp (fn [x]
            (fennel.view x))}))
  (let [send (coroutine.wrap send)]
    (send) ; ➍
    send))

Let’s look at it step by step.

First, we define an output variable ➊. It will store current output, which we will use in the readChunk method, which we can see in the table, being passed to fennel.repl call.

This method accepts a table with REPL’s state, and we’re interested in the stack-size value. If it is greater than 0, we set the prompt to multiline prompt .., and to repl>> otherwise ➋. Next, we check if the output table contains anything. If it does, we concatenate the elements and print it to standard out. After which we yield ➌ from the REPL until new input arrives. This yield right there is what allows us to use the REPL in a non-blocking way.

The rest methods handle values and errors that happen in the REPL, and pp pretty-prints values for display.

Lastly, we wrap our function, execute it to start the REPL ➍, and return the wrapped function. The resulting function is called send, which sends the input to the REPL for evaluation. But how can we get input? Especially since we don’t want to block other tasks from running.

Input polling

We can get the input with io.read function, and luckily for us, this function works with the stdin buffer:

>> (io.read 0)
1234
""
>> (io.read "l*")
"1234"

We’ve called io.read(0) which reads 0 characters from standard input. It returned an empty string because that’s what zero characters are. Next, we’ve called it again but with the "l*" argument, which means to read a complete line, and the result was "1234" - the data that we’ve previously sent via stdin, and we’ve successfully read it back.

But this has a problem - we don’t know if stdin already has something in the buffer, so if the buffer is empty, the (io.read "l*") call will block until the input is supplied. Unfortunately, Lua runtime doesn’t have any kind of input polling, and its io module is very minimalist. So we need a way to check if stdin has anything in the buffer. For this task we can use luaposix library which can be installed via luarocks:

$ luarocks install --local luaposix

This library provides the rpoll function, that can be used to check if a file descriptor contains anything ready to be read. Let’s write a task for our worker that will first set up a REPL, and then go into an infinite loop, polling for the input:

(local p (require :posix))

(fn async-repl []
  (let [send (repl)]
    (io.write prompt)
    (io.flush)
    (while true
      (when (< 0 (p.rpoll p.STDIN_FILENO 0))
        (send (io.read "l*"))
        (io.write "\r" prompt)
        (io.flush))
      (coroutine.yield))))

(spawn async-repl)
(worker)

The p.rpool function checks if p.STDIN_FILENO has any data in the buffer. When this function returns a non-zero value, we read the whole line from stdio, and send it to the REPL. Then we display the prompt, flush, and finally yield so other spawned processes could cooperate.

Let’s try it out. Te code up to this point can be saved to a file, called async.fnl so we could easier run it from the shell:

$ fennel async.fnl
repl>> (+ 1 2 3)
6
repl>> (+ 1
..  2
..  3)
6
repl>>

Now, when we have the REPL, and it’s working we can spawn some processes!

repl>> (spawn #(for [i 1 5] (print i) (coroutine.yield)))
error: global 'spawn' is not callable (a nil value)

Uh oh, the spawn function is not known by our REPL. But that’s easy to fix. As you may remember, coroutines can access the global state, and we just need to make spawn function and the queue globals:

(global queue [])
(fn _G.spawn [task]
  (let [task (coroutine.create task)]
    (table.insert queue task)
    task))

With this change we can start spawning processes from our REPL:

$ fennel async.fnl
repl>> (spawn #(for [i 1 5] (print i) (for [i 1 1000000] (coroutine.yield))))
#<thread: 0x56055ef88ae8>
repl>> 1
2
3
(print :foo)
foo
repl>> 4
5
repl>>

As you can see, I’ve added an empty loop with coroutine.yield call to slow down the process, so I could input the (print :foo) expression. Because our system toggles between processes, we were able to input the expression and see the result without waiting until the first task completes. Which basically means that our non-blocking REPL works.

Though, unfortunately, the prompt gets in the way when printing from different threads. As a workaround, the print function can be redefined to clear the line before printing, but it’s not a proper solution either.

Communicating between threads

The last missing point for our system is a way to communicate between tasks/threads. While global variables can be used by coroutines as a way of sharing state, it is a really poor choice. Global variables don’t have any concurrency semantics and are especially very hard to debug. Another problem is that we will have to implement synchronization primitives ourselves each time we interact with globals, which is not good.

There are two things that can help us solve this problem. First are so-called promises, which can be blocked upon reading if they weren’t realized, and the other one is a channel. Promises are easy to implement, and we can actually write one like this:

(fn _G.promise []
  {:val nil
   :deliver (fn [self val]
              (if (= self.val nil)
                  (do (set self.val val)
                      true)
                  false))
   :deref (fn [self]
            (while (= nil self.val)
              (coroutine.yield))
            self.val)})

It’s a really simple implementation, but we can see that it works as expected:

repl>> (local p (promise))
nil
repl>> (spawn #(print (.. "promise: " (p:deref))))
#<thread: 0x561ae3b49568>
repl>> (p:deliver 42)
true
repl>> promise: 42
repl>>

As we can see, we’ve created a thread, that parked itself when it tried to dereference the promise, and the moment we’ve delivered the promise, we saw the message from that thread.

Note: Once again, repl>> prompt got in the way, and it will be a common thing in all following examples. I’ve thought about removing the prompt from output manually but decided not to, as if someone will try this code, they might think that something is wrong, as the output will differ from the one in this article.

This is a handy construct when we need to deliver value from one thread to another, without blocking our program, but after the deliver is complete, the promise becomes useless. Because we can’t deliver a promise twice - once it was delivered, the value is set in stone1. What if we need to pass several values from one thread to another? Of course, we can deliver a table with values to our promise, but what if we don’t have all the necessary data to produce these values right now?

Channels are really good for this - we can put as many values as we want to a named channel and take from it in another thread, parking the reading thread, if the channel is empty, or putting thread if the channel is full in case of backpressure. Let’s create the chan, take and put functions:

(fn _G.chan [size]
  {:size (or size math.huge)
   :buf []})

For now, chan will simply create a table with our channel buffer buf stored as a sequential table, and optional size, indicating channel size and acting as backpressure. Now we can write a non-blocking put and take:

(fn _G.put [{: buf : size} value]
  (assert (not= nil value) "value must not be nil")
  (while (>= (length buf) size)
    (coroutine.yield))
  (table.insert buf value)
  true)

(fn _G.take [{: buf}]
  (while (= 0 (length buf))
    (coroutine.yield))
  (table.remove buf 1))

You can see, that our put will wait until the length of the channel’s buffer is less than its maximum size. Similarly, take will wait for elements to appear. These operations are non-blocking, which means, that they must be used only in the processes we’re created via spawn function. Let’s try this out:

repl>> (local c (chan 3))
nil
repl>> (spawn #(for [i 1 10] (put c i) (print (.. "put: " i))))
#<thread: 0x564b48408608>
repl>> put: 1
put: 2
put: 3

First, we’ve created a channel c with a size of 3. Next, we’ve spawned a numeric for loop, that will try putting indexes to the channel, and printing the values afterward. You can see that right after that the put: 1 to put: 3 messages appeared, indicating that we’ve successfully put 3 values to our channel c. However, the values 4 to 10 were not put to the channel, because of the backpressure.

Let’s spawn a single take from the channel:

repl>> (spawn #(print (.. "take: " (take c))))
#<thread: 0x564b483f0ae8>
repl>> take: 1
put: 4

As can be seen, we’ve taken the value 1 from the channel, and immediately the next value was put to it, as shown by put: 4 message. Let’s spawn another loop, that repeatedly takes values from our channel and prints them:

repl>> (spawn #(while true (print "take: " (take c))))
#<thread: 0x564b483dbeb8>
repl>> take: 2
take: 3
take: 4
put: 5
put: 6
put: 7
take: 5
take: 6
take: 7
put: 8
put: 9
put: 10
take: 8
take: 9
take: 10

Aha! Now we can see that this spawned task took 3 elements from the channel and parked itself, waiting for elements. Then our other loop inserted values 5, 6, 7 into the channel, and parked itself because of the backpressure. And the cycle repeated itself until the putting loop was exhausted.

We can spawn another put, and we’ll immediately see take:

repl>> (spawn #(put c 42))
#<thread: 0x564b48408608>
repl>> take: 42
repl>>

Because the size of the channel was set to 3, we saw puts and takes in groups of three. If we create a channel with an unlimited buffer size, we can potentially block our program if some process would decide to put an infinite amount of values to the channel. So it is better to have channels with some buffering, to prevent that from happening.

We can create two more channels to illustrate that:

repl>> (local c-no-bp (chan))
nil
repl>> (do (spawn #(for [i 1 5] (put c-no-bp i) (print (.. "put: " i))))
..         (spawn #(while true (print (.. "take: " (take c-no-bp))))))
#<thread: 0x5607d13a0eb8>
repl>> put: 1
put: 2
put: 3
put: 4
put: 5
take: 1
take: 2
take: 3
take: 4
take: 5
repl>>

As can be seen, the first for loop puts all of its elements into a channel c-no-bp, which has no backpressure, and only then the next loop takes each of these elements. On the other hand, if we create a channel with the size of 1, we’ll get this output:

repl>> (local c1 (chan 1))
nil
repl>> (do (spawn #(for [i 1 5] (put c1 i) (print (.. "put: " i))))
..         (spawn #(while true (print (.. "take: " (take c1))))))
#<thread: 0x5607d1365f78>
repl>> put: 1
take: 1
put: 2
take: 2
put: 3
take: 3
put: 4
take: 4
put: 5
take: 5
repl>>

Channel with a fixed size can act as a synchronization primitive to your program. Alternatively, if you really need an unlimited buffer, you could use coroutine.yield directly in the putting loop, to put one value at a time.

Practical example

Now, when our system is mostly ready, let’s create a practical example with our channels!

For this, I’ve decided to use Love2d - a game engine, that can run Lua directly. Love2d actually has its own implementation of threads, that work similarly to as I’ve described at the beginning of this article - by passing messages, serializing, and deserializing data. The real reason is that I want to draw something using collaborative multitasking, and I could as well take Fengari, but I don’t want to mess with the web until necessary.

So let’s put all our code to the file, and set up the LÖVE project, we’ll only need one file main.fnl for this.

First let’s add our spawn, worker, and channel-related code to the main.fnl file:

(local queue [])

(fn spawn [task]
  (let [task (coroutine.create task)]
    (table.insert queue task)
    task))

(fn chan [size]
  {:size (or size math.huge)
   :buf []})

(fn put [{: buf : size} value]
  (assert (not= nil value) "value must not be nil")
  (while (>= (length buf) size)
    (coroutine.yield))
  (table.insert buf value)
  true)

(fn take [{: buf}]
  (while (= 0 (length buf))
    (coroutine.yield))
  (table.remove buf 1))

(fn worker []
  (each [i task (ipairs queue)]
    (match (coroutine.status task)
      :suspended (coroutine.resume task)
      _ (table.remove queue i))))

Note that the worker no longer has the while loop in it, because it will be handled by love.draw function.

Now, let’s set up our channel, and some threads:

(local c (chan 1))

(let [(width height) (love.graphics.getDimensions)]
  (for [i 0 (/ width 8)]
    (for [j 0 (/ height 8)]
      (spawn #(while true
                (put c [(math.random)
                        (math.random)
                        (math.random)])))
      (spawn #(while true
                (love.graphics.setColor (take c))
                (love.graphics.rectangle :fill i j 1 1))))))

We’ve created a channel with a buffer of size 1. Next, for each point on the canvas, we spawn a thread, that will try to put a random color to the channel. Our channel will refuse to take more than 1 color at a time, so these threads will park once it’s full. The next thread reads one color from the channel and draws a rectangle with this color.

Next, we need to draw all of this:

(fn love.load []
  (love.window.setTitle "Async")
  (love.window.setMode 640 480 {:resizable false :vsync false}))

(fn love.draw []
  (love.graphics.scale 8 8)
  (worker))

The love.load simply sets the window parameters and love.draw sets the scaling and calls our worker, which handles our threads. Compiling this code with fennel -c main.fnl > main.lua and running love . in this directory should result in something like this:

The color for each square is set by a separate thread, and the squares are also drawn by separate threads. As you can see here, each thread is a while loop, that is being parked by calls to put and take, each time our channel c is full. Thus, we’ve almost fully abstracted coroutines away.

Further up

This was a very basic implementation that can be expanded upon if one wants to. Obviously, it’s far from being complete, but even in the current state, you can write code with some sort of multitasking in mind. One thing to improve is our implementation of the worker function, which loops over all threads. Right now it’s just an each loop, but a proper scheduler might reorder tasks in a more efficient way, depending on how tasks perform.

The key problem with this system is that your main application must be running inside this worker as well as one of the threads. Because if you want to block on waiting for a promise, you can’t use a conventional while loop, as it will stop worker from running, and your program will hang. This is something that just needed to be kept in mind though, and I believe it is possible to write programs in such a way.

Additionally to parking on channel or promise operations, a set of macros can be created to replace while, for, each, and other blocking things with non-blocking variants. Such macros will allow running loops in threads without blocking other threads:

(macro pwhile [condition ...]
  `(while ,condition
     (do ,...)
     (coroutine.yield)))

(macro pfor [bindings ...]
  `(for ,bindings
     (do ,...)
     (coroutine.yield)))

(macro peach [bindings ...]
  `(each ,bindings
     (do ,...)
     (coroutine.yield)))

Alternatively, a single macro, that wraps user code can be created, that will analyze code for iteration constructs, and upgrade them to use coroutine.yield. This is somewhat similar to what Clojure’s core.async does with its go macro, that transforms the code.

Overall, I think it is an interesting topic to explore, and I might create a library like this, if there will be enough interest.


  1. Nothing is really set in stone, as this is a mutable object, and you can break this guarantee by mutating it. This can be avoided by using proxy tables with special __newindex metamethod, but I’ve left this out for the purpose of this article. ↩︎