Andrey Listopadov

Clojure's core.async pipeline-async off-by-two error explained

@programming clojure async ~20 minutes read

Quite recently I’ve been working on my asynchronous programming library for Fennel, and fiddling with clojure.core.async at the same time for my other project. I’ve found an interesting function, called pipeline-async, which seemed to be a good fit for my task. Here’s the documentation for this function:

user> (doc clojure.core.async/pipeline-async)
-------------------------
clojure.core.async/pipeline-async
([n to af from] [n to af from close?])
  Takes elements from the from channel and supplies them to the to
  channel, subject to the async function af, with parallelism n. af
  must be a function of two arguments, the first an input value and
  the second a channel on which to place the result(s). The
  presumption is that af will return immediately, having launched some
  asynchronous operation whose completion/callback will put results on
  the channel, then close! it. Outputs will be returned in order
  relative to the inputs. By default, the to channel will be closed
  when the from channel closes, but can be determined by the close?
  parameter. Will stop consuming the from channel if the to channel
  closes. See also pipeline, pipeline-blocking.

So the basic idea is that you can spawn n asynchronous tasks that will be working in parallel processing data from the from channel, and putting results to the to channel, in the order of the inputs. The order here is important because that’s why this function behaves the way I’ll show later.

Here’s a demonstration of this function processing data:

user> (require '[clojure.core.async :as a])
user> (let [out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value channel]
                   (a/go (a/<! (a/timeout 1000))
                         (a/>! channel (inc value))
                         (a/close! channel)))]
        (a/pipeline-async 3 out task data)
        (time (a/<!! (a/into [] out))))
"Elapsed time: 2017.977058 msecs"
[1 2 3 4 5 6 7 8 9 10]

In this example, I’ve created an out channel, and a data channel, containing numbers from 0 to 9. Then I’ve created a function task that receives a value and some channel, sleeps for one second, and puts incremented value to that channel, then closes it. After that, I started the pipeline-async with a parallelism of 3 and our channels and the task as arguments. And finally, we use a/into to convert a channel to a vector, and await the operation with a/<!!. Kinda like pmap on a limited thread pool, except we’re doing things in an asynchronous way here.

So, as you can see, even though our asynchronous function sleeps for one second on each element, thanks to the parallelism of 3 we’ve finished processing ten numbers in just two seconds! Wait, I don’t think that math checks out… shouldn’t it be close to four seconds instead? I mean, 10 divided by 3 is not 2. Let’s try with the parallelism of 1 instead, it should be around ten seconds:

user> (let [out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value chan]
                   (a/go (a/<! (a/timeout 1000))
                         (a/>! chan (inc value))
                         (a/close! chan)))]
        (a/pipeline-async 1 out task data)
        (time (a/<!! (a/into [] out))))
"Elapsed time: 4011.54529 msecs"
[1 2 3 4 5 6 7 8 9 10]

Wait, now it looks like it has a parallelism of three! Let’s add some logging:

user> (let [log (a/chan 10)
            out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value chan]
                   (a/go  (a/>! log (str "started job  " value))
                          (a/<! (a/timeout 1000))
                          (a/>! chan (inc value))
                          (a/close! chan)
                          (a/>! log (str "finished job " value))))]
        (a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
        (a/pipeline-async 1 out task data)
        (time (a/<!! (a/into [] out)))
        (a/close! log))
started job  2
started job  1
started job  0
finished job 1
finished job 0
finished job 2
started job  3
started job  4
started job  5
finished job 4
finished job 3
finished job 5
started job  6
started job  7
started job  8
finished job 6
finished job 7
finished job 8
started job  9
finished job 9
"Elapsed time: 4011.423488 msecs"

Ho, now I see it, there’s an off-by-two error in pipeline-async. Instead of launching 1 task, in this example 3 tasks were launched! That explains, why we see four seconds with parallelism of 1 and only two seconds with parallelism of 3 - in the latter case 5 tasks are running simultaneously. And ten over five is indeed two.

Off-by-two error

Now, this is actually a known bug, submitted to Clojure’s JIRA back in 2016. However, it was closed with a “won’t fix” status, and I don’t think anyone is working on it. I’ve studied the code of the pipeline* function, which is a base for all pipeline-* variants in core.async, and couldn’t figure out why this happens just by looking at the code.

So, I decided to port this function, to Fennel using channel implementation from my fennel-async library. I did it, and as can be seen, the comment in my implementation mentions the same “off-by-two” error.

Because I got it as well.

Somehow.

Initially, I thought that my implementation may avoid this bug, because instead of callbacks I use Lua’s coroutines, to park threads, and a scheduler that runs through all asynchronous tasks in a loop. However, as it turned out later, neither the channel implementation nor the core scheduling principles are strictly related to the problem. It’s just how the code of pipeline-async works.

Let’s look at the code for pipeline* function, which is what pipeline-async calls internally:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
(defn- pipeline*
  ([n to xf from close? ex-handler type]
   (assert (pos? n))
   (let [ex-handler (or ex-handler (fn [ex] ...))
         jobs (chan n)
         results (chan n)
         process (fn [[v p :as job]] ...)
         async (fn [[v p :as job]]
                 (if (nil? job)
                   (do (close! results) nil)
                   (let [res (chan 1)]
                     (xf v res)
                     (put! p res)
                     true)))]
     (dotimes [_ n]
       (case type
         (:blocking :compute) (thread ...)
         :async (go-loop []
                  (let [job (<! jobs)]
                    (when (async job)
                      (recur))))))
     (go-loop []
       (let [v (<! from)]
         (if (nil? v)
           (close! jobs)
           (let [p (chan 1)]
             (>! jobs [v p])
             (>! results p)
             (recur)))))
     (go-loop []
       (let [p (<! results)]
         (if (nil? p)
           (when close? (close! to))
           (let [res (<! p)]
             (loop []
               (let [v (<! res)]
                 (when (and (not (nil? v)) (>! to v))
                   (recur))))
             (recur))))))))
Code Snippet 1: pipeline* source code

I’ve hidden some parts that are uninteresting to us (the ... markers) because this function can work in two ways - asynchronous and parallel. We’re only interested in the asynchronous one, so the process function is not interesting for our case, same for the (:blocking :compute) branch.

If called with the value of n equal to 1 this function spawns three go-loop threads, which are asynchronous threads running on the core.async thread pool. These threads can be parked when needed, and are based on the callback system, this library implements.

I’m not going to explain to you how core.async works, because it is a huge topic, well covered by the author of the go macro. In short, each <! in this code is a point of transformation, which turns all the code after the <! call to a callback, attached to the channel of the go block. Maybe not exactly like that, but it’s beside the point, really.

What you need to know about this function is, that at the very least it will spawn three internal threads, and instinctively you might think that this is why there are three tasks running instead of one, as one would expect. And you’re right, well, in a way, but not exactly. It’s the interplay of these threads, and the channels they use for synchronization that’s causing this. Let me explain.

Explanation

So let’s try to understand why the off-by-two error happens in this code. After reimplementing this function in Fennel, and spending a good amount of time thinking instead of sleeping, I think I got why it works the way it works.

The main source of the issue is that this function wants to retain the order of outputs relative to inputs. I don’t exactly know why this was a decision because when processing data, asynchronously transferring them between channels, I kinda expect the order to be mixed, because some tasks may take more time then others, but for some reason, this function was implemented specifically to prevent this from happening. So, how it does it?

I’m going to refer to specific lines in the pipeline* source code I’ve shown above, which is not equal to the original source, so keep that in mind. Line numbers are links that should move your view to the mentioned line, but if you’re reading this on a big display, it’s better to open the code and the explanation side by side. Sorry for the inconvenience.

In lines 5 and 6 we create two channels jobs and results. These channels are buffered with the same buffer size as our parallelism parameter n and this is what should give us the back-pressure needed to achieve a limited amount of tasks running. Then there are three go-loop blocks defined at lines 18, 22, and 30. I’ll call them go-1, go-2, and go-3 respectively.

So, for the parallelism of 1 the process goes like this:

  1. go-2 takes data from the from channel.
  2. go-2 puts the job to the jobs channel, and a new channel to the results channel in the next line.

Current state: jobs: full, results: full

  1. go-1 takes a job from the jobs channel, and spawns the asynchronous task at line 12.

    This task receives a value v and a channel res, to which it will put the result and close! it. This will be important for the go-3 channel later.

  2. go-1 puts res to the p channel.

Current state: jobs: empty, results: full

  1. go-3 takes a value from the results channel.

  2. go-3 waits for the p channel to get the res channel from the step 3.

  3. go-3 waits for the asynchronous job to finish.

    This is when the asynchronous block started by the function we’ve passed as an argument to pipeline-async is awaited. Until it closes the res channel, the go-3 thread will be parked.

Current state: jobs: empty, results: empty, go-3: parked, one task is running.

  1. go-2 takes data from the from channel.
  2. go-2 puts the job to the jobs channel, and a new channel to the results channel in the next line.

Current state: jobs: full, results: full, go-3: parked, one task is running.

  1. go-1 takes a job from the jobs channel, and spawns the asynchronous task at line 12.

    This is when the second task is spawned, despite the fact that the first one is still running.

  2. go-1 puts res to the p channel.

    This should advance go-3 block, but it is already parked.

Current state: jobs: empty, results: full, go-3: parked, two tasks are running. We’re still not done.

  1. go-3 is still parked, awaiting the first task.
  2. go-2 takes data from the from channel yet again.
  3. go-2 puts the job to the jobs channel.
  4. go-2 wants to put a new channel to the results channel but the results channel is full, so the go-2 parks.

Current state: jobs: full, results: full, go-3: parked, go-2: parked, two tasks are running.

  1. go-1 takes a job from the jobs channel, and spawns the asynchronous task at line 12.

    This is when the third task is spawned, despite the fact that the first one is still running.

  2. go-1 puts res to the p channel.

    This should advance go-3 block, but it is already parked.

  3. go-1 wants to take a new job from the jobs channel, but the producer go-2 is parked, so go-1 parks as well.

Current state: jobs: empty, results: full, go-3: parked, go-2: parked, go-1: parked, three tasks are running.

Phew! This is quite a convoluted process if you ask me!

So the reason for the extra two jobs running is tied to the fact that between the go-3 thread parking, go-2 is able to push two more tasks before parking on results channel being full. I’m not sure if you’ve noticed this, but we can turn this off-by-two error into an off-by-one error simply by changing the order we put stuff to the jobs and results channel.

So we simply need to change this:

22
23
24
25
26
27
28
29
(go-loop []
  (let [v (<! from)]
    (if (nil? v)
      (close! jobs)
      (let [p (chan 1)]
        (>! jobs [v p])
        (>! results p)
        (recur))))

Into this:

22
23
24
25
26
27
28
29
(go-loop []
  (let [v (<! from)]
    (if (nil? v)
      (close! jobs)
      (let [p (chan 1)]
        (>! results p)
        (>! jobs [v p])
        (recur))))

Now the whole process will park a bit earlier and only two tasks will run, as per the 10th step. This doesn’t fix the error but makes it a bit more manageable, in cases when you’re not aware of the problem in the first place, and you’re trying to access a resource with a limited amount of connections. Still can cause problems.

Unordered pipeline-async implementation

The issue can be fixed if we don’t maintain the order of results, and instead, put them by the order of task completion. Here’s the implementation of such a function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
(defn pipeline-async-unordered
  "Takes elements from the from channel and supplies them to the to
  channel, subject to the async function af, with parallelism n. af
  must be a function of two arguments, the first an input value and
  the second a channel on which to place the result(s). The
  presumption is that af will return immediately, having launched some
  asynchronous operation whose completion/callback will put results on
  the channel, then close! it. Outputs will be returned in order
  of completion. By default, the to channel will be closed
  when the from channel closes, but can be determined by the close?
  parameter. Will stop consuming the from channel if the to channel
  closes. See also pipeline, pipeline-blocking."
  ([n to af from]
   (pipeline-async-unordered n to af from true))
  ([n to af from close?]
   (let [closes (to-chan! (repeat (dec n) :close))]
     (dotimes [_ n]
       (go-loop []
         (if-some [v (<! from)]
           (let [c (chan 1)]
             (af v c)
             (when (loop []
                     (if-some [res (<! c)]
                       (when (>! to res)
                         (recur))
                       true))
               (recur)))
           (when (and close?
                      (nil? (<! closes)))
             (a/close! to)))))
     to)))

An interesting trick happens at line 16. We’re creating a channel that has N-1 elements in it. Each thread when it is done processing all data and the from channel was closed will try to take from that channel. If the result is nil it means other threads already exhausted the channel, and we’re the last one, and it is our responsibility to close! the to channel, if desired.

The same example from above using this variant of pipeline shows that there are exactly 3 tasks running at a time:

user> (let [log (a/chan 10)
            out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value chan]
                   (a/go  (a/>! log (str "started job  " value))
                          (a/<! (a/timeout 1000))
                          (a/>! chan (inc value))
                          (a/close! chan)
                          (a/>! log (str "finished job " value))))]
        (a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
        (pipeline-async-unordered 3 out task data)
        (time (a/<!! (a/into [] out))))
started job  2
started job  1
started job  0
finished job 0
finished job 1
finished job 2
started job  3
started job  4
started job  5
finished job 4
finished job 3
finished job 5
started job  6
started job  7
started job  8
finished job 6
finished job 7
finished job 8
started job  9
finished job 9
"Elapsed time: 4007.215739 msecs"
[2 3 1 5 4 6 7 8 9 10]

As can be seen, the order of elements is not the same as the order of inputs, but we get exactly three tasks running at a time. This actually has another huge benefit over to default pipeline-async.

You see, if the first-ever task in pipeline-async takes the most amount of time to complete, this will be the time needed to get the first result from the to channel. Yes, other N-1 elements will be processed in parallel, however, until the first-ever task is finished, no more tasks will be taken at all! See for yourself:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
user> (let [log (a/chan 10)
            out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value chan]
                   (a/go  (a/>! log (str "started job  " value))
                          (when (= value 0)
                            (a/<! (a/timeout 10000)))
                          (a/>! chan (inc value))
                          (a/close! chan)
                          (a/>! log (str "finished job " value))))]
        (a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
        (a/pipeline-async 1 out task data) ;; 1 here is 3 actually
        (time (a/<!! out)))
started job  2
started job  1
started job  0
finished job 2
finished job 1
finished job 0
started job  3
"Elapsed time: 10013.587567 msecs"
finished job 3
1

In this example, I’ve made it so only the first task will sleep for 10 seconds. In the log, you can see that we’ve started three jobs (because of the off-by-two error), and then jobs 2 and 1 finished immediately. Then we waited for job 0 to finish before starting the job 3. Thus the time for the first value to appear in the out channel is 10 seconds.

If we use pipeline-async-unordered here, you can see that we have a much higher throughput:

(let [log (a/chan 10)
        out (a/chan)
        data (a/to-chan! (range 10))
        task (fn [value chan]
               (a/go  (a/>! log (str "started job  " value))
                      (when (= value 0)
                        (a/<! (a/timeout 10000)))
                      (a/>! chan (inc value))
                      (a/close! chan)
                      (a/>! log (str "finished job " value))))]
    (a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
    (pipeline-async-unordered 3 out task data)
    (time (a/<!! out)))
started job  2
started job  1
started job  0
"Elapsed time: 4.602578 msecs"
finished job 1
finished job 2
started job  3
finished job 3
finished job 0
3

In this case, the first job to complete was the job 2, and then job 1, 3, and long after that job 0. If we change the example a bit and make other jobs sleep for, say 1 second, we’ll still get more throughput with an unordered version:

user> (let [log (a/chan 10)
            out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value chan]
                   (a/go  (a/>! log (str "started job  " value))
                          (if (= value 0)
                            (a/<! (a/timeout 10000))
                            (a/<! (a/timeout 1000)))
                          (a/>! chan (inc value))
                          (a/close! chan)
                          (a/>! log (str "finished job " value))))]
        (a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
        (a/pipeline-async 1 out task data) ;; 1 here is 3 actually
        (time (a/<!! (a/into [] out))))
started job  2
started job  0
started job  1
finished job 1
finished job 2
finished job 0
started job  3
started job  4
started job  5
finished job 3
finished job 4
finished job 5
started job  6
started job  7
started job  8
finished job 6
finished job 7
finished job 8
started job  9
finished job 9
"Elapsed time: 13011.037871 msecs"
[1 2 3 4 5 6 7 8 9 10]

Here you can see that until we’ve finished the job 0 we were waiting before we could take other jobs. Because of that we first spent 10 seconds on the first three tasks, and then 3 seconds on the remaining task. With the unordered version and the same parallelism, we spend at most 10 seconds, because other tasks can still run even though the first task is not completed yet:

user> (let [log (a/chan 10)
            out (a/chan)
            data (a/to-chan! (range 10))
            task (fn [value chan]
                   (a/go  (a/>! log (str "started job  " value))
                          (if (= value 0)
                            (a/<! (a/timeout 10000))
                            (a/<! (a/timeout 1000)))
                          (a/>! chan (inc value))
                          (a/close! chan)
                          (a/>! log (str "finished job " value))))]
        (a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
        (pipeline-async-unordered 3 out task data)
        (time (a/<!! (a/into [] out))))
started job  0
started job  2
started job  1
finished job 1
finished job 2
started job  3
started job  4
finished job 4
finished job 3
started job  5
started job  6
finished job 5
finished job 6
started job  7
started job  8
finished job 7
finished job 8
started job  9
finished job 9
finished job 0
"Elapsed time: 10003.531318 msecs"
[2 3 5 4 6 7 9 8 10 1]

I hope this gives a clear illustration of why you might want unordered version.

A possible fix for pipeline-async

Apart from the already mentioned line swap, which turns this problem from off-by-two to off-by-one, I don’t think I can come up with any other ideas. We kinda have to put channels to the results channel, and then take them out, awaiting on the inner channel in order to preserve the order. As a hack, we can subtract one from the number of threads we spawn in the dotimes block, if n is greater than 1, but this still leaves us with situations where two threads instead of one will run. Disallowing parallelism of 1 is an option too, as it doesn’t make any sense to use this function with such parallelism. In other words:

(defn pipeline-async
  "..."
  ([n to af from] (pipeline-async n to af from true))
  ([n to af from close?]
   (assert (> n 1) "don't use pipeline-async with parallelism of 1")
   (pipeline* (dec n) to af from close? nil :async)))

This should take care of most use cases, but isn’t a proper solution by any means.

Given that I have the same problem in my fennel-async library, I may eventually find the solution for it and propose it for the Clojure implementation too. Unless it would use some Lua-specific features, that can’t easily be achieved without full coroutine support in the JVM, or other runtimes Clojure uses.

Until then, I hope this was a useful and interesting read!

A proper fix for pipeline-async

After a bit more time, I think I figured out a proper way to fix the pipeline-async function. All we really need is a way to synchronize the channel that processes the result and the channel that processes the jobs. We can add one more channel to the mix like so:

diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj
index 55ae450..66bfad0 100644
--- a/src/main/clojure/clojure/core/async.clj
+++ b/src/main/clojure/clojure/core/async.clj
@@ -530,6 +530,7 @@ to catch and handle."
                                        nil))
            jobs (chan n)
            results (chan n)
+           finishes (and (= type :async) (chan n))
            process (fn [[v p :as job]]
                      (if (nil? job)
                        (do (close! results) nil)
@@ -540,7 +541,9 @@ to catch and handle."
                          true)))
            async (fn [[v p :as job]]
                    (if (nil? job)
-                     (do (close! results) nil)
+                     (do (close! results)
+                         (close! finishes)
+                         nil)
                      (let [res (chan 1)]
                        (xf v res)
                        (put! p res)
@@ -554,6 +557,7 @@ to catch and handle."
                :async (go-loop []
                                  (let [job (<! jobs)]
                                    (when (async job)
+                                     (<! finishes)
                                      (recur))))))
        (go-loop []
                   (let [v (<! from)]
@@ -572,6 +576,8 @@ to catch and handle."
                           (let [v (<! res)]
                             (when (and (not (nil? v)) (>! to v))
                               (recur))))
+                        (when finishes
+                          (>! finishes :done))
                         (recur))))))))
  ;;todo - switch pipe arg order to match these (to/from)

This essentially means that the thread that spawns jobs will park until the thread that processes job results signals that the job is finished. This way we basically ensure that there will be no more than n active jobs. I will look into the process of submitting this fix to the core.async library soon.

Edit Nov 23 2022:

I’ve submitted the patch from above as a Clojure Q&A question because it seems there’s no other way to suggest changes unless you’re a part of the Clojure development team who has access to the project’s JIRA. The post is here. Alex Miller replied that they don’t think this is a problem that needs fixing, so I guess the bug is here to stay.

It is interesting, however, that neither pipeline or pipeline-blocking share this behavior, as their implementation waits for the task result directly on the job consumer thread. Which makes it more awkward to combine them with the asynchronous variant, when you need to have the same amount of threads.

It is also a bit weird that all variants of pipeline functions don’t return the out channel, so you can pass it through, and connect pipelines, so I end up using something like this when I need to mix pipelines:

(defn process [parallelism blocking-fn data-ch]
  (let [out-ch (chan)]
    (pipeline-blocking parallelism out-ch (map blocking-fn) data-ch)
    out-ch))

(defn process-async [parallelism async-fn data-ch]
  (assert (> parallelism 2)) ; Compensate for the ASYNC-163
  (let [out-ch (chan)]
    (pipeline-async (- parallelism 2) out-ch async-fn data-ch)
    out-ch))

Much like other Clojure functions that work with collections, these two accept the data channel as the last argument, so you can use the thread-last macro with them:

user> (->> (range 10)
           (to-chan!)
           (process 10 (fn [v] (Thread/sleep 1000) (inc v)))
           (process-async 10 (fn [v c] (go (<! (timeout 1000)) (>! c (inc v)) (close! c))))
           (a/into [])
           <!!
           time)
"Elapsed time: 2011.683893 msecs"
[2 3 4 5 6 7 8 9 10 11]

This comes at a cost that you can’t use pipeline-async with parallelism less than three, but it doesn’t make sense to do it anyway.

Now, for real, thanks for reading!