Andrey Listopadov

Comparison of manifold and clojure.core.async

@programming async clojure ~20 minutes read

I’ve been into clojure.core.async lately, and I like the overall design of this library - it’s simple and easy to understand. However, at work, we use manifold in our projects, mainly because it is supported by the aleph server, which was chosen because it can asynchronously read data coming from the client. Today that’s beside the point, what matters is that manifold and core.async use different approaches for building concurrent code, and I will try to explain both and compare the two. Let’s start with how core.async works on the surface.

core.async basics

The minimum you need to know about core.async is how to properly use channels because it is a central abstraction in this library. Everything is represented as a channel, which is basically a queue of puts, a queue of takes, plus an optional buffer queue that holds values. The basic idea is that you share channels between asynchronous threads, and when one thread puts a value to the channel it parks until someone else takes value off the channel (if there’s no buffer). There are a lot of things that can be built on top of this kind of abstraction, and in reality programming with core.async looks more like programming with queues.

I will expand upon the info above later when I’ll go to the actual code, but this should be fine as an abstract introduction. Now, let’s look at manifold in a similar way.

manifold basics

Manifold actually has a similar abstraction, but it is called a stream. Streams can be sources, sinks, or both at the same time, and they provide a similar interface to core.async where you can put a value to or take a value from the stream, as well as do a lot of different operations on streams directly, e.g connecting, filtering, e.t.c. In addition to streams, however, manifold provides an additional abstraction, called a deferred. It is a lot like Clojure’s built-in promise, e.g. you can use defer on it, and it will cache the value that was delivered to it, but in addition, you can attach callbacks onto deferreds, and they also can represent errors.

What sets manifold apart is that this library is aimed to be a translation layer between different asynchronous implementations. For example, you can convert core.async channels into sources and act upon them as you would on manifold streams. The same can be done with lazy sequences or Java’s BlockingQueues.

Now, that we have a general understanding of both, let’s look at how the same code can be written using both libraries. The code in examples won’t involve complex scenarios, but I think the patterns I’ll show are general enough to be seen in most of the code you’ll see when dealing with these libraries.

Channels vs deferreds

It may seem weird, that I’m going to compare core.async channels to manifold deferreds instead of streams, but that’s how we use manifold at work, so I’m much more familiar with deferreds. What you should understand, however, is that both core.async and manifold aim at giving you a way of writing code that looks linear, even though it isn’t, however, the ways of doing so differ quite a lot between these libraries.

Let’s start simple - say, we want to off-load an abstract operation to another asynchronous thread, and once it completes we’d continue working with a result in our asynchronous thread. First, how do you actually create a “thread”1 in core.async? The answer is a go macro:

(require '[clojure.core.async :as a])

(defn offloaded-task []
  (a/go
    (a/<! (a/timeout 1000))
    (rand-int 100)))

(a/go
  (let [res (a/<! (offloaded-task))]
    (println res)
    res))

Again, nothing fancy for now. The offloaded-task function spawns an asynchronous thread which immediately parks on a timeout operation, simulating some asynchronous work that takes time, like a web request, for example. Then it just returns a random integer. The result of calling this function is a channel, created by the go macro. Once the rand-int completes, this channel will contain the resulting number.

Then, we create a second go thread. Inside it, we write a let block, as we normally would, we then call our offloaded-task and park until it completes. Once it is complete, we print it and return the value. The result is again a channel, which will contain the value of res.

So what’s this <! thing actually does? Well, this is a lot of machinery that happens inside the go macro. You don’t really need to know it in detail, but go macro transforms this linear code into a callback-based state-machine, and each time it meets <! it transforms the rest of the code to a callback, which will be executed once a value appeared on the channel. So, essentially, go macro is doing inversion of control for us, transforming our linear code into the code which can be executed in an asynchronous manner. I may be a bit incorrect here on the details, but that’s essentially how I understand the go block. Here’s a much more thorough and complicated explanation by the author of the go block. It comes in two parts, so if you want to know more, make sure you’ve watched the second part too.

Now, let’s look at the same thing, but with manifold:

(require '[manifold.deferred :as d])

(defn offloaded-task []
  (d/timeout! (d/deferred) 1000 (rand-int 100)))

(d/chain
 (offloaded-task)
 (fn [res]
   (println res)
   res))

This looks less linear when compared to core.async version, mainly because we specify callbacks as literal callbacks. However, manifold provides the let-flow macro, which I, personally, never use, but for the sake of completeness we can change the code to:

(defn offloaded-task []
  (d/let-flow [_ (d/timeout! (deferred) 1000 nil)]
    (rand-int 100)))

(d/let-flow [res (offloaded-task)]
  (println res)
  res)

This now looks even more linear than the core.async version, thanks to the let-flow macro, which basically expands to a similar code that we’ve manually written before, except it uses d/zip to await multiple deferreds (even though right now there’s only one):

(let [d (offloaded-task)]
  (d/chain
   (d/zip d)
   (fn [[res]] (println res) res)))

So, what’s happening here? In case of core.async everything was represented as a channel. In the case of manifold, however, everything was represented as deferreds. Functions such as chain, and timeout!, and later the let-flow macro produce a deferred, which represents a value that eventually will be there. As you can see, chain is the central mechanism here, as we can literally chain tasks, each of which will consume deferred when it’s ready and produce another deferred upon completion. This allows us to add more and more actions to a given deferred, which will be executed linearly. And let-flow simply provides a more convenient interface.

manifold streams but they’re also deferreds

We can, of course, do the same thing using streams:

(require '[manifold.stream :as s])

(defn offloaded-task []
  (let [s (s/stream)]
    (d/chain
     (d/timeout! (d/deferred) 1000 nil)
     (fn [_] (s/put! s (rand-int 100))))
    s))

(d/chain
 (s/take! (offloaded-task))
 (fn [res]
   (println res)
   res))

But again, as you can see we still use chain because operations on streams are always asynchronous and return deferreds. It works this way, because manifold is mainly based on executors and callbacks attached to deferreds, so streams share the API and use deferreds when possible.

Some technical differences between core.async and manifold

Now, as we’ve seen the basic usage of both libraries, we can talk about some technical differences in their implementation. The most prominent difference is that core.async supports ClojureScript out of the box. There is a version of manifold for ClojureScript, manifold-cljs, however, it has some differences and missing functionality, like let-flow. So if you want a cross-platform library for asynchronous programming, I believe core.async is in the lead here for now.

Function boundaries

core.async, however, has, possibly, a pretty serious design flaw, or at least one can look at it that way. The mentioned go macro can’t go over function boundaries, because there’s no way to transform code to a state machine in a meaningful way when parking happens effectively in a completely different stack frame. Thus, you can’t do something like this in core.async without additional go blocks and extra parking:

(def s (s/stream))

;; feed all stream values through `println`
(s/consume println s)

(d/chain
 s
 (fn [s]
   (mapv (partial s/put! s)
         (repeatedly 10 #(rand-int 100)))))

In core.async you’d have to do this:

(def c (a/chan))

(a/pipe c (a/chan 1 (filter println)))

(a/go
  ;; NOTE: you can't do (fn [val] (a/>! c val)) here
  (mapv (fn [val] (a/go (a/>! c val)))
        (repeatedly 10 #(rand-int 100))))

;; or, better
(a/go
  (mapv (fn [val] (a/put! c val))
        (repeatedly 10 #(rand-int 100))))

Note, this is not a proper way of writing such code, just a demonstration that in order to do a non-blocking put with >! an additional go block can be needed, when crossing function boundaries. Documentation for core.async suggests that in this kind of situation you can just use put! to which >! compiles after the macroexpansion of go: But this kind of code doesn’t really respect the back-pressure and can lead to an error when too many puts were enqueued. The same point about back-pressure goes for manifold version from above too.

In both cases go and chain around mapv are not necessary, in fact, they’re both harmful because we’re basically putting a synchronous loop inside an asynchronous thread. Such threads run on a limited thread pool, which can lead to locking a thread for a long enough time to cause problems across the whole system. This code may look like it does things asynchronously, given that both s/put! and a/put! are non-blocking, and we’re inside go / chain context, but it is not.

A better way to write both examples would be:

;; manifold
(def s (s/stream))

(s/consume println s)

(d/loop [vals (repeatedly 10 #(rand-int 100))]
  (when-let [[val & vals] (seq vals)]
    (-> (s/put! s val)
        (d/chain (fn [_] (d/recur vals))))))

;; core.async
(def c (a/chan))

(a/pipe c (a/chan 1 (filter println)))

(a/go-loop [vals (repeatedly 10 #(rand-int 100))]
  (when-let [[val & vals] (seq vals)]
    (a/>! c val)
    (recur vals)))

In these examples, the code properly parks itself on s/put! and a/>!, when the consumer isn’t ready to accept the value or the buffer is full (if any).

So why did I say that core.async has a design flaw at the beginning of this section? Well, if you look at the thing that go macro does with your code, and if you’re familiar with other languages, such as Lua or Python, you might get the idea that what the go macro really tries to do is to create something similar to coroutines or generators.

In Lua, a coroutine is an ordinary function with the only difference that it can pause its execution when its body calls the yield function, and then continue its execution when desired. Pythons generators work in a similar way.

So in Lua, it would be possible to write such a system without complex transformations to state machines, the >! or <! functions would simply yield from the current coroutine stack frame, and then the scheduler will resume them when needed. This is how my library for Fennel works, and it doesn’t have the same limitation as core.async, as you can yield across function boundaries. As far as I understand this is also how new virtual threads in the upcoming Project Loom work, and thus it will be possible to use them with things like agents. core.async still has the advantage that it will work on runtimes that have neither coroutines nor threads, such as JavaScript.

Iteration

The previous example shows another difference between core.async and manifold - iteration handling.

The go-loop macro simply expands to (go (loop [] ...)), and parking is handled in the same way, the rest of the loop is transformed into a callback, which is attached to a channel. This model is simple to understand and obeys the same restrictions as an ordinary loop special in Clojure.

In manifold, however, we’re required to use d/loop, which is a different implementation of a loop, implemented as a macro. This macro expands into an ordinary loop that runs on an asynchronous thread-pool, but inside that loop there’s a cond which properly manages deferreds, thus parking itself instead of blocking. It relies on the fact that somebody will call the d/recur function which returns doesn’t do anything by itself. Instead, it returns an object which is an instance of manifold.deferred.Recur that d/loop expects. Once it sees such an object, it goes to the next iteration.

This method allows us to get fancy since we are no longer required to use recur in the same lexical scope of loop, and we can actually return it from any other function. For example:

(defn offloaded-task []
  (d/timeout! (d/deferred) 100 (rand-int 100)))

(defn handle [x retries]
  (if (< x 50)
    (do (println (format "%s < 50, retrying" x))
        (d/recur (inc retries)))
    (println (format "%s > 50, stop" x))))

(d/loop [retries 0]
  (if (< retries 10)
    (d/chain
     (offloaded-task)
     (fn [res]
       (handle res retries)))
    (println "max retry count exceeded")))

This actually allows us to build code, that has a lot of logic scattered across different functions, each of which, for example, may decide to retry the whole thing. In core.async this can be done as well, except it would require creating a custom API, e.g by passing a map that has a value under the :val key, and if it is equal to, say, ::recur then we need to call real recur:

(defn offloaded-task []
  (a/go
    (a/<! (a/timeout 100))
    (rand-int 100)))

(defn handle [x retries]
  (if (< x 50)
    (do (println (format "%s < 50, retrying" x))
        {:val ::recur :args [(inc retries)]})
    (println (format "%s > 50, stop" x))))

(a/go-loop [retries 0]
  (if (< retries 10)
    (when-let [res (a/<! (offloaded-task))]
      (let [res (handle res retries)]
        (when (identical? (:val res) ::recur)
          (recur (nth (:args res) 0)))))
    (println "max retry count exceeded")))

So, definitively doable, manifold just has a dedicated API for that which goes through all the hoops for you. Though it’s better to avoid explicit iteration in both libraries, and instead use pipelining facilities that do the processing of items.

Dynamic binding

I’m not entirely sure about this but as it seems core.async works better with dynamic scoping, while manifold doesn’t work with it at all. Though a lot of macros in manifold generate calls to bound-fn, which should capture the dynamic scope, perhaps, it’s a responsibility of a user when you use plain d/chain. I’m not a big fan of bound-fn so most of the time when working with manifold I pass such bindings explicitly as function arguments. Here’s a demonstration of using a dynamically bound var *token* in both core.async and manifold:

(def ^:dynamic *token* 42)

(defn send-to-server [data]
  (d/chain
   (d/timeout! (d/deferred) 100 nil)
   (fn [_]
     {:headers "some: headers"
      :body (str data)})))

(defn send-to-server-async [data]
  (let [c (a/chan 1)]
    (d/chain
     (send-to-server data)
     (fn [resp]
       (a/put! c resp)
       (a/close! c)))
    c))

(defn core-async [data]
  (a/go
    (let [resp (a/<! (send-to-server-async data))]
      (assoc resp :token *token*))))

(defn manifold [data]
  (d/chain
   (send-to-server data)
   (fn [resp]
     (assoc resp :token *token*))))

(binding [*token* 1337] @(manifold 42))
;; => {:headers "some: headers", :body "42", :token 42}

(binding [*token* 1337] (a/<!! (core-async 42)))
;; => {:headers "some: headers", :body "42", :token 1337}

As you can see, in manifold’s case, the :token key holds the root binding value of *token* which is 42. In core.async case, the binding value is properly captured by the go thread. This even works if we move the <!! call out of the binding form:

(a/<!! (binding [*token* 1337] (core-async 42)))
;; => {:headers "some: headers", :body "42", :token 1337}

So, in manifold I’d usually do this:

(defn manifold [data]
  (let [token *token*]
    (d/chain
     (send-to-server data)
     (fn [resp] ; you can use `bound-fn` here, and it will work without `let`
       (assoc resp :token token)))))

@(binding [*token* 1337] (manifold 42))
;; => {:headers "some: headers", :body "42", :token 1337}

Now it’s working properly. Or, if I have a more complex example, with functions generating functions and whatnot, I pass such bindings around as additional parameters, or as a table, if there are too many:

(defn add-token [token]
  (fn [resp]
    (assoc resp :token token)))

(defn manifold [data]
  (let [token *token*]
    (d/chain
     (send-to-server data)
     (add-token token))))

@(binding [*token* 1337] (manifold 42))
;; => {:headers "some: headers", :body "42", :token 1337}

Though I could use bound-fn here, for some reason it makes me worry about performance too much. The point is core.async properly captures bindings when it creates the go block, so it’s less likely that you’ll get errors with it since you don’t have to remember to use bound-fn or manually capture bindings with lexical scoping mechanisms.

Handling errors

Error handling in an asynchronous context is hard in general. The approach each library takes is a bit different - core.async is more barebones but can be managed, and manifold can represent an error in the system pretty well.

Unlike Clojure’s promises, deferreds in manifold can store errors, and when you dereference such deferred, the error is being thrown:

@(d/chain nil inc)
;; Execution error (NullPointerException) at manifold.deferred/eval24263$chain (deferred.clj:838).

Thus, you can use default try and catch around points where your program becomes synchronous by explicitly awaiting a value. Or, you can use d/catch to catch errors in an asynchronous way:

(-> 0
    (d/chain #(/ 1 %))
    (d/catch
        java.lang.ArithmeticException
        (fn [e]
          (println (format "error: %s" (ex-message e)))
          nil))
    deref)

This d/catch function internally wraps the code into a try block that catches exceptions and deals with them via deferreds. You can specify what kind of exception you want to catch, just like you would do for a real catch clause. If the kind is unspecified, any Throwable is caught, which is a bit broad, but there aren’t many options here sadly in the JVM.

core.async, on the other hand, doesn’t have any mechanism for representing an error. Channels can’t have nil as a value, because receiving nil from a channel means that the channel is closed. So, if any kind of error happened in your asynchronous part of the code, you either have to pass it through our pipeline as an exception, or log the error and drop the data completely. Some functions in the core.async library take an additional exception handler. The default handler simply logs the exception, which may not fly for your usecase. Unlike manifold however, you don’t need any fancy catch replacements, you can just use plain try in your go blocks:

(let [c (a/chan 1)]
  (a/>!! c 0)
  (a/go
    (try
      (/ 10 (a/<! c))
      (catch java.lang.ArithmeticException e
        (println (format "error: %s" (ex-message e)))))))

Again, the go macro takes care of transforming this code into a meaningful state machine, that has error handling in the way you’d expect it to.

It is possible, to define your own operations, like a throwing-take, that will throw when it receives an object that is an instance of Throwable:

(defmacro <!?
  "Throw an exception when the `port` returns an exception."
  [port]
  `(let [v# (a/<! ~port)]
     (if (instance? java.lang.Throwable v#)
       (throw v#)
       v#)))

(let [c (doto (a/chan 1)
          (a/put! (Exception. "bang!")))]
  (a/go
    (try
      (<!? c)
      (catch Exception e
        (println (ex-message e))))))
;; prints: bang!

You can build your application with this kind of technique, but you need to make sure that all errors are fed back into channels, to be caught later, in order to establish a proper data flow.

Debugging asynchronous code

The last topic that I’d like to discuss in this section is debugging. I use CIDER, which has an interactive step debugger, that I find quite helpful, and I use it a lot when I want to understand what’s going on in some code that I didn’t write. Debugger got me through a lot of times, and when it comes to debugging asynchronous code, core.async and manifold are on the different ends of the spectrum.

Because CIDER’s debugger instruments the code by injecting breakpoints into it, and then evals the form as usual, it’s not a problem for manifold at all, because all functions, accepted by chain are ordinary functions. You can instrument them, redefine them, and even mock them via with-redefs if you need to.

The go macro in the core.async library, however, is not a good friend for debugging. If you try to debug the function with a go block in it, CIDER will throw an error, that can be quite hard to understand:

#dbg
(defn core-async [data]
  (a/go
    (let [resp (a/<! (send-to-server-async data))]
      (assoc resp :token *foo*))))
2. Unhandled clojure.lang.Compiler$CompilerException
   Error compiling *cider-repl localhost:42295(clj)* at (1:8246)
   #:clojure.error{:phase :macro-syntax-check,
                   :line 1,
                   :column 8246,
                   :source
                   "*cider-repl Git/async_vs_manifold:localhost:42295(clj)*",
                   :symbol a/go}
             Compiler.java: 7027  clojure.lang.Compiler/macroexpand1
                  ...
                  meta.clj:  324  cider.nrepl.inlined.deps.orchard.v0v11v0.orchard.meta/macroexpand-all/fn
                  meta.clj:  313  cider.nrepl.inlined.deps.orchard.v0v11v0.orchard.meta/macroexpand-all
               RestFn.java:  423  clojure.lang.RestFn/invoke
                  meta.clj:  319  cider.nrepl.inlined.deps.orchard.v0v11v0.orchard.meta/macroexpand-all/fn
                  core.clj: 2772  clojure.core/map/fn
              LazySeq.java:   42  clojure.lang.LazySeq/sval
                  ...
                  walk.clj:   47  clojure.walk/walk
                  meta.clj:  319  cider.nrepl.inlined.deps.orchard.v0v11v0.orchard.meta/macroexpand-all
                  meta.clj:  313  cider.nrepl.inlined.deps.orchard.v0v11v0.orchard.meta/macroexpand-all
               RestFn.java:  423  clojure.lang.RestFn/invoke
                  meta.clj:  319  cider.nrepl.inlined.deps.orchard.v0v11v0.orchard.meta/macroexpand-all/fn
                  core.clj: 2772  clojure.core/map/fn
              LazySeq.java:   42  clojure.lang.LazySeq/sval
                  ...
    interruptible_eval.clj:   84  nrepl.middleware.interruptible-eval/evaluate
    interruptible_eval.clj:   56  nrepl.middleware.interruptible-eval/evaluate
    interruptible_eval.clj:  152  nrepl.middleware.interruptible-eval/interruptible-eval/fn/fn
                  AFn.java:   22  clojure.lang.AFn/run
               session.clj:  218  nrepl.middleware.session/session-exec/main-loop/fn
               session.clj:  217  nrepl.middleware.session/session-exec/main-loop
                  AFn.java:   22  clojure.lang.AFn/run
               Thread.java:  833  java.lang.Thread/run

1. Caused by clojure.lang.ExceptionInfo
   Could not resolve var: data
   {:var data,
    :file "*cider-repl localhost:42295(clj)*",
    :column 28,
    :line 13}

Debugging manifold’s chains, on the other hand, can be done without any issues. That’s another downside of code transformation, instead of using proper support from the platform, but this is a downside the developers of core.async are willing to take because supporting platforms without support for concurrency is more important here.

Mixing core.async and manifold

One of the goals of the manifold project is to be general enough to be considered lingua franca and used as a translation layer when working with several different asynchronous libraries. I once used both core.async and manifold in the same project, because I wanted to use core.async parallel pipelines, so I needed the data I provide to be stored in a channel. It is definitively possible to mix and match both go and chain together, though the resulting code is a convoluted mess, so I’d suggest sticking to one library and not using both simultaneously as I did. In short, I had a go block which called a function that created a channel, then did a request to a socket server with the aleph.tcp module, manifold.deferred/chain‘ed the result to a function that used core.async/put! on the channel:

(defn send-to-server [req]
  (let [c (a/chan 1)]
    (d/chain
     ;; no real request, just an imitation here
     (d/timeout! (d/deferred) 100
                 {:body "some body"
                  :headers "once: told me"})
     (fn [resp]
       (a/put! c resp)
       (a/close! c)))
    c))

(defn communicate [req]
  (a/go
    (let [resp (a/<! (send-to-server req))]
      (process-response resp))))

Instead, what I could do was to use manifold streams all the way through:

(defn send-to-server [req]
  ;; no real request, just an imitation here
  (d/timeout! (d/deferred) 100
              {:body "some body"
               :headers "once: told me"}))

(defn communicate [req]
  (let [c (a/chan)
        s (s/->sink c)]
    (d/chain (send-to-server req)
             (fn [resp]
               (s/put! s resp)
               (s/close! s)))
    c))

Now I only need to deal with a manifold stream, representing a channel, and a deferred returned by the server. The channel gets closed once we close the stream that was created from it, so for the outside world, it looks like this function was implemented with core.async, even though technically it isn’t.

There are a lot of ways how we can create communication between the two libraries. The example above creates a one-way port from manifold side to core.async side, but it’s often needed to communicate both ways. To achieve that, manifold provides spliced streams, which are constructed from a separate sink and a source. This way you can do bidirectional communication between core.async and manifold:

(defn ->stream [port]
  (s/splice (s/->sink port) (s/->source port)))

(let [c (a/chan)
      s (->stream c)]
  (a/put! c 42)
  (println "manifold took:" @(s/take! s))
  (s/put! s 28)
  (println "core.async took:" (a/<!! c)))
;; prints:
;; manifold took: 42
;; core.async took: 28

There are more ways of combining both libraries, which I’m not going to describe here, but the overall feeling is that manifold has everything you’ll need to work seamlessly between the two. Note, however, that this comes at a cost, and probably introduces intermediate buffers you don’t usually expect to have.

My personal opinion on both libraries

The first library I used for asynchronous programming in Clojure, apart from agents, was manifold, and I remember that it was hard for me to understand the code. After some practice, I got the ropes of it, and it was pretty easy to think in terms of chain and streams. I’ve managed to write a lot of horribly complicated code with it, though.

After that, I started looking at core.async as a possible candidate to replace manifold in our project. It was much easier to understand, and honestly, I think that go blocks lead to better code that has the benefit of being just like your ordinary Clojure code.

However, I am a bit biased here, because I haven’t worked with asynchronous code before I saw manifold, so since it was my first experience, and it wasn’t the most pleasant one, I may see core.async as a better library sorely based on that impression. I do like more the fact that in core.async there’s only one concept of a channel, and everything nicely integrates with it. You can use transducers with channels, you can wait for multiple channels at once, and cancel those that were too slow. The fact that core.async uses channels similarly to promises, by simply opening a channel, and closing it immediately after putting a single value onto it is an elegant solution for not having two asynchronous types, like in manifold.

In manifold there are both deferreds and streams. Deferreds can be successful or erroneous, so you have to be careful with them. And streams can be sources, sinks, or both, and you sometimes have to check if something is a sink or not. Sure, it provides more flexibility, but I enjoyed working with a more simple system more.

On the other hand, core.async is a thing in itself. Once you’ve committed to using core.async you can’t really interact with other things, like Java streams without defining your own wrappers. The manifold library has you covered.

You can use default Clojure’s concurrency primitives, such as atoms or agents with both of these libraries, which I find great. No need to limit yourself to only using channels/streams or deferreds, you can always call swap! on an atom or send a task to an agent if it makes sense.

I think both manifold and core.async are great libraries, both having their upsides and downsides. So use what fits the task you’re doing and don’t be afraid to experiment!


  1. Not to be confused with an OS or JVM thread, or clojure.core.async/thread↩︎