From 9d34bd95d230abc6d452f39ad4863b59a0a92d4a Mon Sep 17 00:00:00 2001 From: Andrey Listopadov Date: Tue, 22 Nov 2022 20:34:51 +0300 Subject: [PATCH v2] fix ASYNC-163 Create an additional channel when working in async mode. This channel is then polled by the thread that spawns the asynchronous function task, and filled by the thread that awaits the results of asynchronous tasks. --- src/main/clojure/cljs/core/async.cljs | 8 +++++++- src/main/clojure/clojure/core/async.clj | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/clojure/cljs/core/async.cljs b/src/main/clojure/cljs/core/async.cljs index 2e5664f..0b6e3ed 100644 --- a/src/main/clojure/cljs/core/async.cljs +++ b/src/main/clojure/cljs/core/async.cljs @@ -263,6 +263,7 @@ (assert (pos? n)) (let [jobs (chan n) results (chan n) + finishes (and (= type :async) (chan n)) process (fn [[v p :as job]] (if (nil? job) (do (close! results) nil) @@ -274,7 +275,9 @@ true))) async (fn [[v p :as job]] (if (nil? job) - (do (close! results) nil) + (do (close! results) + (close! finishes) + nil) (let [res (chan 1)] (xf v res) (put! p res) @@ -288,6 +291,7 @@ :async (go-loop [] (let [job (! to v)) (recur)))) + (when finishes + (>! finishes :done)) (recur)))))))) (defn pipeline-async diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj index 55ae450..66bfad0 100644 --- a/src/main/clojure/clojure/core/async.clj +++ b/src/main/clojure/clojure/core/async.clj @@ -530,6 +530,7 @@ to catch and handle." nil)) jobs (chan n) results (chan n) + finishes (and (= type :async) (chan n)) process (fn [[v p :as job]] (if (nil? job) (do (close! results) nil) @@ -540,7 +541,9 @@ to catch and handle." true))) async (fn [[v p :as job]] (if (nil? job) - (do (close! results) nil) + (do (close! results) + (close! finishes) + nil) (let [res (chan 1)] (xf v res) (put! p res) @@ -554,6 +557,7 @@ to catch and handle." :async (go-loop [] (let [job (! to v)) (recur)))) + (when finishes + (>! finishes :done)) (recur)))))))) ;;todo - switch pipe arg order to match these (to/from) -- 2.38.1