Andrey Listopadov

Fixed version of pipeline-async and unordered pipeline variants

This is a follow-up to my previous post regarding the bug in the clojure.core.async/pipeline-async function.

As I’ve mentioned at the end of that post, I’ve submitted a patch to Ask Clojure that should fix the off by two error for the asynchronous pipeline. It spawned some discussion, but as far as I can see, at least for now, there aren’t many chances for this issue to be fixed any time soon.

Since I need this function at work, and I need to be able to fine-tune the number of tasks running simultaneously, I’ve put the fixed version into a library, for now. This also should make it easier to test the function in the wild, as people don’t tend to patch libraries in Clojure, or in the JVM world in general. The library source code is located at GitHub and I’ve also submitted it to Clojars. Note, that the library doesn’t include core.async as a dependency, so it is a requirement for your project to pull it. The supplementary library should work on any version of the core.async library, so I don’t want to force any version or update the library when core.async updates.

In addition to fixing the bug, I’ve also included a generalized unordered variant of the pipeline* function, which is a basis for all other pipeline implementations. I’ve mentioned pipeline-async-unordered in my previous post, and the main difference from the ordered one, apart from results coming in an arbitrary order, is higher throughput. Here’s an example:

user> (require '[pipeline-extras.core :as p])
user> (require '[clojure.core.async :as a])
user> (let [data (range 1 10)
            a (a/chan 10)
            b (a/chan 10)
            xf (map (fn [x] (Thread/sleep (if (zero? (mod x 3)) 5000 100)) x))]
        (a/pipeline 3 a xf (a/to-chan! data))
        (time (println (a/<!! (a/into [] a))))
        (p/pipeline-unordered 3 b xf (a/to-chan! data))
        (time (println (a/<!! (a/into [] b)))))
[1 2 3 4 5 6 7 8 9]
"Elapsed time: 10003.213591 msecs"
[2 1 4 5 7 8 3 6 9]
"Elapsed time: 5404.309124 msecs"

As can be seen here, in the xf function every third element is processed longer than any other elements. When we use clojure.core.async/pipeline and measure the time it takes to process elements from 1 to 9, it takes 10 seconds, because pipeline waits before putting results, thus it can’t take any more data to process. Because of that, every third element stops the conveyor.

pipeline-extras.core/pipeline-unordered on the other hand doesn’t care about the order, so it puts elements as they’re ready. As can be seen, elements 3, 6, and 9 came last, because their processing took the longest time. The total time needed to process the same amount of data is cut in half here, though it completely depends on the data order in this particular case. E.g. if the first three elements were the longest to process, both functions would take the same time to finish.

In general, the main difference two approaches is that in the unordered case, if one of the tasks is extremely slow, others can process data as usual. In the ordered case if one of the tasks is extremely slow, others have to wait before putting data into the output channel, so the order is preserved.

Hope this will be useful for someone, and if any problems arise, you can always file an issue at the project’s GitHub repo.