alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (parallel.clj)

This example Java source code file (parallel.clj) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

binaryop, clojure, comparable, comparator, hickey, intandobjectpredicate, note, ops, parallelarray, parallelarraywithmapping, predicate, returns, rich, the

The parallel.clj Java example source code

;   Copyright (c) Rich Hickey. All rights reserved.
;   The use and distribution terms for this software are covered by the
;   Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
;   which can be found in the file epl-v10.html at the root of this distribution.
;   By using this software in any fashion, you are agreeing to be bound by
;   the terms of this license.
;   You must not remove this notice, or any other, from this software.

(ns ^{:doc "DEPRECATED Wrapper of the ForkJoin library (JSR-166)."
       :author "Rich Hickey"}
    clojure.parallel)
(alias 'parallel 'clojure.parallel)

(comment "
The parallel library wraps the ForkJoin library scheduled for inclusion in JDK 7:

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

You'll need jsr166y.jar in your classpath in order to use this
library.  The basic idea is that Clojure collections, and most
efficiently vectors, can be turned into parallel arrays for use by
this library with the function par, although most of the functions
take collections and will call par if needed, so normally you will
only need to call par explicitly in order to attach bound/filter/map
ops. Parallel arrays support the attachment of bounds, filters and
mapping functions prior to realization/calculation, which happens as
the result of any of several operations on the
array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform
composite operations in steps, as would normally be done with
sequences, maps and filters are instead attached and thus composed by
providing ops to par. Note that there is an order sensitivity to the
attachments - bounds precede filters precede mappings.  All operations
then happen in parallel, using multiple threads and a sophisticated
work-stealing system supported by fork-join, either when the array is
realized, or to perform aggregate operations like preduce/pmin/pmax
etc. A parallel array can be realized into a Clojure vector using
pvec.
")

(import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter 
                           ParallelArrayWithMapping 
                           Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate 
                           Ops$IntAndObjectPredicate Ops$IntAndObjectToObject))

(defn- op [f]
  (proxy [Ops$Op] []
    (op [x] (f x))))

(defn- binary-op [f]
  (proxy [Ops$BinaryOp] []
    (op [x y] (f x y))))

(defn- int-and-object-to-object [f]
  (proxy [Ops$IntAndObjectToObject] []
    (op [i x] (f x i))))

(defn- reducer [f]
  (proxy [Ops$Reducer] []
    (op [x y] (f x y))))

(defn- predicate [f]
  (proxy [Ops$Predicate] []
    (op [x] (boolean (f x)))))

(defn- binary-predicate [f]
  (proxy [Ops$BinaryPredicate] []
    (op [x y] (boolean (f x y)))))

(defn- int-and-object-predicate [f]
  (proxy [Ops$IntAndObjectPredicate] []
    (op [i x] (boolean (f x i)))))

(defn par
  "Creates a parallel array from coll. ops, if supplied, perform
  on-the-fly filtering or transformations during parallel realization
  or calculation. ops form a chain, and bounds must precede filters,
  must precede maps. ops must be a set of keyword value pairs of the
  following forms:

     :bound [start end] 

  Only elements from start (inclusive) to end (exclusive) will be
  processed when the array is realized.

     :filter pred 

  Filter preds remove elements from processing when the array is realized. pred
  must be a function of one argument whose return will be processed
  via boolean.

     :filter-index pred2 

  pred2 must be a function of two arguments, which will be an element
  of the collection and the corresponding index, whose return will be
  processed via boolean.

     :filter-with [pred2 coll2] 

  pred2 must be a function of two arguments, which will be
  corresponding elements of the 2 collections.

     :map f 

  Map fns will be used to transform elements when the array is
  realized. f must be a function of one argument.

     :map-index f2 

  f2 must be a function of two arguments, which will be an element of
  the collection and the corresponding index.

     :map-with [f2 coll2]

  f2 must be a function of two arguments, which will be corresponding
  elements of the 2 collections."

  ([coll] 
     (if (instance? ParallelArrayWithMapping coll)
       coll
       (. ParallelArray createUsingHandoff  
        (to-array coll) 
        (. ParallelArray defaultExecutor))))
  ([coll & ops]
     (reduce (fn [pa [op args]] 
                 (cond
                  (= op :bound) (. pa withBounds (args 0) (args 1))
                  (= op :filter) (. pa withFilter (predicate args))
                  (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1)))
                  (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args))
                  (= op :map) (. pa withMapping (parallel/op args))
                  (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1)))
                  (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args))
                  :else (throw (Exception. (str "Unsupported par op: " op)))))
             (par coll) 
             (partition 2 ops))))

;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;;
(defn pany
  "Returns some (random) element of the coll if it satisfies the bound/filter/map"
  [coll] 
  (. (par coll) any))

(defn pmax
  "Returns the maximum element, presuming Comparable elements, unless
  a Comparator comp is supplied"
  ([coll] (. (par coll) max))
  ([coll comp] (. (par coll) max comp)))

(defn pmin
  "Returns the minimum element, presuming Comparable elements, unless
  a Comparator comp is supplied"
  ([coll] (. (par coll) min))
  ([coll comp] (. (par coll) min comp)))

(defn- summary-map [s]
  {:min (.min s) :max (.max s) :size (.size s) :min-index (.indexOfMin s) :max-index (.indexOfMax s)})

(defn psummary 
  "Returns a map of summary statistics (min. max, size, min-index, max-index, 
  presuming Comparable elements, unless a Comparator comp is supplied"
  ([coll] (summary-map (. (par coll) summary)))
  ([coll comp] (summary-map (. (par coll) summary comp))))

(defn preduce 
  "Returns the reduction of the realized elements of coll
  using function f. Note f will not necessarily be called
  consecutively, and so must be commutative. Also note that 
  (f base an-element) might be performed many times, i.e. base is not
  an initial value as with sequential reduce."
  [f base coll]
  (. (par coll) (reduce (reducer f) base)))

;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;;

(defn- pa-to-vec [pa]
  (vec (. pa getArray)))

(defn- pall
  "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
  [coll]
  (if (instance? ParallelArrayWithMapping coll)
    (. coll all)
    (par coll)))

(defn pvec 
  "Returns the realized contents of the parallel array pa as a Clojure vector"
  [pa] (pa-to-vec (pall pa)))

(defn pdistinct
  "Returns a parallel array of the distinct elements of coll"
  [coll]
  (pa-to-vec (. (pall coll) allUniqueElements)))

;this doesn't work, passes null to reducer?
(defn- pcumulate [coll f init]
  (.. (pall coll) (precumulate (reducer f) init)))

(defn psort 
  "Returns a new vector consisting of the realized items in coll, sorted, 
  presuming Comparable elements, unless a Comparator comp is supplied"
  ([coll] (pa-to-vec (. (pall coll) sort)))
  ([coll comp] (pa-to-vec (. (pall coll) sort comp))))

(defn pfilter-nils
  "Returns a vector containing the non-nil (realized) elements of coll"
  [coll]
  (pa-to-vec (. (pall coll) removeNulls)))

(defn pfilter-dupes 
  "Returns a vector containing the (realized) elements of coll, 
  without any consecutive duplicates"
  [coll]
  (pa-to-vec (. (pall coll) removeConsecutiveDuplicates)))


(comment
(load-file "src/parallel.clj")
(refer 'parallel)
(pdistinct [1 2 3 2 1])
;(pcumulate [1 2 3 2 1] + 0) ;broken, not exposed
(def a (make-array Object 1000000))
(dotimes i (count a)
  (aset a i (rand-int i)))
(time (reduce + 0 a))
(time (preduce + 0 a))
(time (count (distinct a)))
(time (count (pdistinct a)))

(preduce + 0 [1 2 3 2 1])
(preduce + 0 (psort a))
(pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x))))
(pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]]))

(psummary ;or pvec/pmax etc
 (par [11 2 3 2] 
      :filter-with [(fn [x y] (> y x)) 
                    [110 2 33 2]]
      :map #(* % 2)))

(preduce + 0
  (par [11 2 3 2] 
       :filter-with [< [110 2 33 2]]))

(time (reduce + 0 (map #(* % %) (range 1000000))))
(time (preduce + 0 (par (range 1000000) :map-index *)))
(def v (range 1000000))
(time (preduce + 0 (par v :map-index *)))
(time (preduce + 0 (par v :map  #(* % %))))
(time (reduce + 0 (map #(* % %) v)))
)

Other Java examples (source code examples)

Here is a short list of links related to this Java parallel.clj source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.