Skip to content

Commit

Permalink
use impl/dispatch executors
Browse files Browse the repository at this point in the history
  • Loading branch information
richhickey committed Feb 25, 2025
1 parent d922995 commit c56fd02
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/main/clojure/clojure/core/async/flow.clj
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"starts the entire flow from init values. The processes start paused.
Call 'resume' or 'resume-proc' to start flow. Returns a map with keys:
:report-chan - a core.async chan for reading.'ping' reponses
:report-chan - a core.async chan for reading.'ping' responses
will show up here, as will any explicit ::flow/report outputs
from :transform
Expand Down
21 changes: 7 additions & 14 deletions src/main/clojure/clojure/core/async/flow/impl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,14 @@
[clojure.core.async.flow :as-alias flow]
[clojure.core.async.flow.spi :as spi]
[clojure.core.async.flow.impl.graph :as graph]
[clojure.core.async.impl.dispatch :as disp]
[clojure.walk :as walk]
[clojure.datafy :as datafy])
(:import [java.util.concurrent Future Executors ExecutorService TimeUnit]
[java.util.concurrent.locks ReentrantLock]))

(set! *warn-on-reflection* true)

;;TODO - something specific, e.g. make aware of JDK version and vthreads
(defonce mixed-exec clojure.lang.Agent/soloExecutor)
(defonce io-exec clojure.lang.Agent/soloExecutor)
(defonce compute-exec clojure.lang.Agent/pooledExecutor)

(defn datafy [x]
(condp instance? x
clojure.lang.Fn (-> x str symbol)
Expand All @@ -32,11 +28,9 @@

(defn futurize ^Future [f {:keys [exec]}]
(fn [& args]
(let [^ExecutorService e (case exec
:compute compute-exec
:io io-exec
:mixed mixed-exec
exec)]
(let [^ExecutorService e (if (instance? ExecutorService exec)
exec
(disp/executor-for exec))]
(.submit e ^Callable #(apply f args)))))

(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
Expand All @@ -53,12 +47,11 @@

(defn create-flow
"see lib ns for docs"
[{:keys [procs conns mixed-exec io-exec compute-exec]
:or {mixed-exec mixed-exec, io-exec io-exec, compute-exec compute-exec}}]
[{:keys [procs conns mixed-exec io-exec compute-exec]}]
(let [lock (ReentrantLock.)
chans (atom nil)
execs {:mixed mixed-exec :io io-exec :compute compute-exec}
_ (assert (every? #(instance? ExecutorService %) (vals execs))
_ (assert (every? #(or (nil? %) (instance? ExecutorService %)) (vals execs))
"mixed-exe, io-exec and compute-exec must be ExecutorServices")
pdescs (reduce-kv prep-proc {} procs)
allopts (fn [iok] (into {} (mapcat #(map (fn [[k opts]] [[(:pid %) k] opts]) (iok %)) (vals pdescs))))
Expand Down Expand Up @@ -136,7 +129,7 @@
resolver (reify spi/Resolver
(get-write-chan [_ coord]
(write-chan coord))
(get-exec [_ context] (execs context)))
(get-exec [_ context] (or (execs context) (disp/executor-for context))))
start-proc
(fn [{:keys [pid proc args ins outs]}]
(try
Expand Down

0 comments on commit c56fd02

Please sign in to comment.