Andrey Listopadov

Clojure's core.async port for the Fennel language

After I explored asynchronous programming in my first naive implementation a few years ago, I was hooked on the idea of asynchronous communication in programs. Motivated to take this concept further I’ve created the fennel-async library that I’ve shown at FennelConf 2022. While this library works, and I’ve managed to write a simple socket-based REPL and integrate it into Emacs as an experiment, I wasn’t satisfied with the implementation. Mainly because the library is pull-based, which means that the tasks are pulled by the continuously running scheduler. It poses some difficulties in Lua runtime, as there’s no support for multithreading, and the scheduler can’t be offloaded to a separate thread. I soon realized that a proper asynchronous system is push-based, meaning that pushing to the task triggers its activation. With that in mind I’ve started working on a newer version of the library: async.fnl

The library started as a small channel-based communication system, where each channel had its own queue of puts, a queue of takes, and an optional buffer, much like core.async from Clojure. I was planning to only implement put!, take! and alts! operations, as these are the bare minimum that is needed for the library to be functional, as everything else is bootstrapped on them. However, it was hard to test the library this way, so I started adding more and more operations, like merge, into, eventually added the support for transducers, and before I knew it I was thinking about how to port Mult, Mix, and Pub. At that moment I decided that I will simply port everything from the core.async, including tests, and that’s where my implementation of channels started failing. So I ported the channels too.

Now this library is a mostly complete port of the ClojureScript version of the library. The reason I’ve chosen ClojureScript as a base is that it is also a single-threaded system, so it should behave similarly to Lua, which Fennel uses as its target for compilation. Interestingly enough, and you don’t have to believe me1, but up until I decided to port the tests, and later the channel implementation, I was writing this library without looking at the core.async source code and the resulting Fennel code was quite similar. Well, this is largely due to the fact that channels are well covered by Rich Hickey in his talk, and I’ve already made three wrong implementations of them, so I knew what I need to do now for the most part. I’m also a bit familiar with Clojure’s core.async code, but only with the public API. I’ve never looked into the implementation of channels, as my tinkering with the library only needed to go as far as the implementation of pipelines, which I’ve covered in my post about fixing the off-by-two error.

So when the tests started to fail, I knew that something wasn’t right, so after some hours of debugging I ditched my own implementation of channels and ported the code from ClojureScript into Fennel. This was a fun experience, and I’ve even “ported” reify and defprotocol just not to think too much about how to write it manually in every function. I’ve put ported in quotes because it’s not a true port, just a similar macro that expands to the runtime definition of an object that conforms to the defprotocol spec:

(macro defprotocol [name ...]
  `(local ,name
     ,(faccumulate [methods {} i 1 (select :# ...)]
        (let [method (select i ...)]
          (assert-compile (list? method) "expected method declaration")
          (let [[name arglist body] method]
            (assert-compile (sym? name) "expected named method" name)
            (assert-compile (sequence? arglist) (.. "expected arglist for method " (tostring name)) arglist)
            (assert-compile (or (= :string (type body)) (= nil body)) (.. "expected no body for method " (tostring name)) body)
            (doto methods
              (tset (tostring name) `#(<= $ ,(length arglist)))))))))

(macro reify [...]
  (let [index (gensym)
        protocols []
        actions '(do)]
    (var current nil)
    ((fn loop [x ...]
       (assert-compile (or (sym? x) (list? x)) "expected symbol or fnspec" x)
       (if (sym? x)
           (do (set current x)
               (table.insert protocols (tostring x)))
           (list? x)
           (let [[name & [arglist &as fnspec]] x]
             (assert-compile (sym? name) "expected method name" name)
             (assert-compile (sequence? arglist) "expected method arglist" arglist)
              `(case (. ,current ,(tostring name))
                 f# (if (f# ,(length arglist))
                        (tset ,index ,(tostring name) (fn ,(unpack fnspec)))
                        (error ,(.. "arity mismatch for method " (tostring name))))
                 ,(sym :_) (error ,(.. "Protocol " (tostring current) " doesn't define method " (tostring name)))))))
       (when (not= 0 (select :# ...))
         (loop ...)))
    `(let [,index {}]
        {:__index ,index
         :name "reify"
         #(.. "#<" (: (tostring $) :gsub "table:" "reify:")
              ": " ,(table.concat protocols ", ") ">")}))))

It’s a little messy but works fine. I added it when I was porting Mult, which looks a lot like the ClojureScript version:

(defprotocol Mux
  (muxch [_]))

(defprotocol Mult
  (tap [_ ch close?])
  (untap [_ ch])
  (untap-all [_]))

(fn mult [ch]
  "Creates and returns a mult(iple) of the supplied channel
`ch`.  Channels containing copies of the channel can be created with
'tap', and detached with 'untap'.

Each item is distributed to all taps in parallel and synchronously,
i.e. each tap must accept before the next item is distributed.  Use
buffering/windowing to prevent slow taps from holding up the mult.

Items received when there are no taps get dropped.

If a tap puts to a closed channel, it will be removed from the mult."
  (var dctr nil)
  (let [atom {:cs {}}
        m (reify
           (muxch [_] ch)
           (tap [_ ch close?] (tset atom :cs ch close?) nil)
           (untap [_ ch] (tset atom :cs ch nil) nil)
           (untap-all [_] (tset atom :cs {}) nil))
        dchan (chan 1)
        done (fn [_]
               (set dctr (- dctr 1))
               (when (= 0 dctr)
                 (put! dchan true)))]
    (go-loop []
      (let [val (<! ch)]
        (if (= nil val)
            (each [c close? (pairs atom.cs)]
              (when close? (close! c)))
            (let [chs (icollect [k (pairs atom.cs)] k)]
              (set dctr (length chs))
              (each [_ c (ipairs chs)]
                (when (not (put! c val done))
                  (m:untap c)))
              (when (next chs)
                (<! dchan))

Sure, fennel doesn’t have sequences and atoms by default, so this uses mutable tables, but the semantics are the same.

In general, Fennel isn’t exactly compatible with Clojure, as it is a different language. There’s no arity overloading, data structures aren’t persistent, and there are no lazy sequences, only iterators over tables. All these features can be implemented as libraries, and I’ve already made a bunch that should make a Clojure programmer feel more at home. Well, at least the strings are immutable by default, which is good!

I’ve chosen the same versioning scheme as the Clojure version of the library, but it is still in an experimental state. More tests should be added, I also need to test on different runtimes, as this is only tested to work on Lua 5.4 at the moment.

Differences from the Clojure version

One tricky thing this library requires is the presence of the debug Lua library. It is included by default in most Lua implementations, but it may be absent in an embedded Lua.

The debug library is required to run timers. By using the debug.sethook function, when the library is loaded, a hook that checks if any of the timeout channels have expired runs every 1000 Lua instructions, and on each function return. This needs a bit more profiling to get an optimal amount of instructions between runs, but the tests show a nice accuracy with the error of 10ms, which is largely due to the library trying to minimize the number of timeout channels by rounding their duration to these 10ms. It’s similar to what the Clojure version does, e.g. the expression (= (timeout 1000) (timeout 1000)) is true because only one timeout channel is created under the hood. When the debug library is unavailable, the timeout function throws an error, but the rest of the library should work fine, as timeout channels aren’t used anywhere.

Another tricky thing is that Lua by default doesn’t support sub-second precision for measuring time. The default os.time reports the number of seconds since the epoch, and there’s no way to get the number of milliseconds. The solution is to check if the luasocket or luaposix libraries are present on the LUA_PATH. If so, their implementations of the gettime function are used, which gives millisecond precision. Otherwise, the number of milliseconds passed to the timeout function is rounded up to the next whole second. Not ideal, but works, especially since the asynchronous code shouldn’t be too dependent on specific timings. The library will warn you if you don’t have any of these installed.

And the final difference from Clojure is that there is no inversion of control (IOC) transformations involved. Lua, unlike the JVM, supports sackful coroutines, which means that any function can be turned into coroutines that can be stopped and resumed with the coroutine.yield call. Thus, all operations that would result in a blocking behavior, like <! or >!, actually just capture the running thread with the coroutine.running put it to the respective queue on the channel where the operation occurs, and the channel calls coroutine.yield. Next, when this channel receives another put or takes if there’s a such pending thread, it will be resumed with the value of the operation. Stackful coroutines make the implementation much simpler, without the need to write complex macros that transform the code into state machines. I wish more languages had them, as it is a great feature that can be used in a variety of ways. Generators are cool, but Lua coroutines are much cooler. But I digress.

Future plans

Overall, this was a fun experiment, and while I really like asynchronous programming, I feel that I need to stop myself here. There are a lot of other topics I want to get into, like the bytecode VM design, gamedev, and other cool stuff. So hopefully there will be no more async shenanigans from me. Though, maybe I’ll try to port the TCP module from fennel-async into this library, but we’ll see. That may be difficult, as there’s no way to register a callback on a socket object with the luasocket library, which almost defeats the purpose of being push-based.

If you’re using Fennel, and you’re familiar with core.async do give this library a test. I’m curious how it will work in other programs because I’m making all these libraries, but I don’t really have any time to use them as parts of other projects, besides maybe the fenneldoc, which uses the cljlib library.

Anyway, thanks for reading, and I hope this library will be useful!

  1. You don’t have to, but if you read my blog, you may remember that that’s the way I usually like to do stuff. Diving in without any or very little amount of prior knowledge on the topic and thinking about how would I implement that. And if the result is similar to the reference implementation or differs from it, that’s how I know if I made any mistakes, or if my assumptions were correct. This library started the same way. ↩︎