七周七并发之函数式

Posted on 2018-01-01 01:56:59

函数式编程

没有可变状态,多线程不使用锁就可以安全访问。

  • 可变状态的隐藏和逃逸
    对象内部可能隐藏有可变状态
    同步的方法返回值可能引用可变状态

Clojure

  • s-表达式
  • def 定义常量
  • 矢量(数组)使用方括号表示
  • map字面量 花括号 key以:开头
(+ 1 (* 2 3))

(def meaning-of-life 42)

(if (< meaning-of-life 0) "negative" "none-negative")

(defn percentage [x p] (* x (/ p 100.0)))
数列求和
  • 递归
    (defn recursive-sum [numbers]
      (if (empty? numbers)
           0
           (+ (first numbers) (recursive-sum (rest numbers)))))
    
  • reduce函数
    为集合中的每个元素都调用一次化简函数
(defn reduce-sum [numbers]
    (reduce (fn [acc x] ( + acc x)) 0 numbers))
  • 替换匿名函数fn
(defn sum[numbers]
    (reduce + numbers))

操作符有其特征值,当0参数调用时,将返回特征值。

Clojure中序列元素仅在需要时被求值。

简单并行

使用 clojure.core.reducers包的 fold

(ns sum.core
    (:require [clojure.core.reducers :as r]))

(defn parallel-sum [numbers]
    (r/fold + numbers))

词频统计

几个函数
  • get

从map中查找键并返回

  • assoc

在原有map基础上返回一个包含指定键值对的新map

(defn word-frequencies [words]
    (reduce 
       (fn [counts word ] (assoc counts word (inc (get counts word 0))))
{ } words))

使用标准库

(frequencies [])
  • map
    接受一个函数f和一个序列,返回一个新序列
    对每个元素调用一次f
  • partial
    接受一个函数和若干参数,返回被局部代入的函数

字符串分割

(defn get-words [text] (re-seq #"\w+" text))
user=> (map get-words ["aas sad dd" "a b c"])
(("aas" "sad" "dd") ("a" "b" "c"))

对字符串序列使用 get-words显然会得到二维序列

使用 mapcat代替 map
相当于 (apply concat ( map f colls))

其中, concat返回一个包含传入 collection 全部元素的惰性序列。

组装词频统计

(defn count-words-sequential [pages]
     (frequencies (mapcat get-words pages)))
lazy-seq

Clojure 的序列是惰性的,仅在需要时被求值。
(take 10 (range 0 10000000000))将立刻得到输出,而 (range 0 10000000000)。。。。
头元素在使用后可以被舍弃,尾元素需要时才生成。

并行化

使用 pmap,类似 map,可在需要结果时并行计算,仅生成所需要的结果(semi-lazy)。

(pmap #(frequencies (get-words %)) pages)
(merge-with f  & maps)

利用宏 #()快速创建匿名函数, %标识参数。
merge-with合并map,调用 (f val-in-result val-in latter)

并行版本词频统计

(defn count-words-parallel [pages]
    (reduce (partial merge-with +)
         (pmap #(frequencies ( get-words %))  pages )))

优化

大量合并操作影响性能,进行批处理,使用 partition-all对序列元素分区,构成多个序列。

( defn count-words [pages]
    (reduce (partical merge-with +)
        (pmap count-words-sequential ( partition-all 100 pages))))

以下是对reducer源码的分析和思考

reducer

clojure.core.reducers中的 map接受函数和序列,返回化简器 reducible,化简器表述如何产生结果,不能直接使用,而要作为参数传给 reduce/fold进行求值,从而避免构造中间状态的序列,对整个嵌套链的集合操作也可以使用 fold并行化。

实现

协议(protocol),类似接口,一系列方法的集合,定义了一个抽象的概念。

(defprotocol CollReduce
    (coll-reduce [ coll f ] [ coll f init ]))

第一个参数类似于 this(coll-reduce coll f)可理解为 coll.collReduce(f)

reduce中只是简单调用 coll-reduce,执行交给集合本身

(defn reduce
  ([f coll]
        (coll-reduce coll f))
  ([f init coll]
        (coll-reduce coll f val)))

对函数进行转换,返回转换函数 transformf

(defn mapping [f]
    (fn [f1]
      (fn [result input]
        (f1 result (f input)))))

reducer接受一个 reducible和转换函数 xf,返回新的 reducibleCollReduce协议的实例),其 coll-reduce方法将以 xf转换 f,作为参数调用输入 reduciblecoll-reduce方法。
这个嵌套还蛮绕的= ̄ω ̄=

(defn reducer
    ([coll xf]
     (reify
      CollReduce
      (coll-reduce [_ f1 init]
        (coll-reduce coll (xf f1) init)))))
(defn map [f coll]
    (reducer coll (mapping f)))

map则仅需调用 reducer,嵌套使用时, mapping函数将组合不同的 f,从而 reducible传给 reduce计算时只进行一次化简。

fold

分治整个集合。

(defn fold
    ([reducef coll] (fold reducef reducef coll))
    ([combinef reducef coll] (fold 512 combinef reducef coll))
    ([n combinef reducef coll]
        (coll-fold coll n combinef reducef)))

(defn folder
  ([coll xf]
      (reify
        CollReduce
          (coll-reduce [_ f1]
            (coll-reduce coll (xf f1) (f1)))
          (coll-reduce [_ f1 init]
            (coll-reduce coll (xf f1) init))
        CollFold
          (coll-fold [_ n combinef reducef]
            (coll-fold coll n combinef (xf reducef))))))

fold实现词频统计

(defn parallel-frequencies [coll]
  (r/fold
    (partical merge-with +)
    (fn [counts x] (assoc counts x (inc (get counts x 0))))
    coll))

可以参考这

并发

函数式程序描述结果是什么样的而非如何得到结果,因此,如何安排求值顺序相对自由。

引用透明性

任何调用函数的地方都可以用其运行结果替换,而不产生副作用。

数据流

引自wiki

dataflow programming emphasizes the movement of data and models programs as a series of connections. Explicitly defined inputs and outputs connect operations, which function like black boxes. An operation runs as soon as all of its inputs become valid.

当某函数依赖的数据可用时,函数才可执行,所有函数能够同时执行。

Future模型

future函数接受并在单独的线程中执行一段代码,返回一个 future对象。

使用 deref@future对象解引用获取其代表的值,解引用将阻塞当前线程直到其值可用。

Promise模型

类似 future对象,但仅在使用 deliverpromise对象赋值后,使用该对象的代码才会执行。

user=> (def stupid (promise))
#'user/stupid

user=> (future (println @stupid))
#object[clojure.core$future_call$reify__6962 0x19b89d4 {:status :pending, :val nil}]

user=> (deliver stupid "sb")
sb
#object[clojure.core$promise$reify__7005 0x205d38da {:status :ready, :val "sb"}]

函数式web服务

接收实时文本数据(带有序号的片段)并翻译。
如图

(def snippets (repeatedly promise)) 
#无穷懒惰序列

(defn accept-snippet [n text]
    (deliver (nth snippets n) text))
#某个片段可用时,调用函数处理

( future
    (doseq [snippet (map deref snippets)]
        (println snippet)))
#创建线程串行处理,例子而已

就不纠结于具体实现的细节了。
值得注意的是,占用内存将随着序列snippets会不断增长。