七周七并发之Clojure

Posted on 2018-01-01 01:51:03

分离标识与状态

原子变量与持久数据结构

不纯粹的clojure:)
变量默认状态不易变,仅在十分必要才修改。

原子变量

atom创建原子变量,@ defer获取值,可以为任何类型。
swap!接受一个函数,将原子变量传给函数,以返回值为原子变量的新值,reset!重置原子变量的值。

(def players (atom ()))                                                                

(defn list-players []
  (response (json/encode @players))) 

(defn create-player [player-name]
  (swap! players conj player-name) 
  (status (response "") 201))

(defroutes app-routes 
  (GET "/players" [] (list-players))
  (PUT "/players/:player-name" [player-name] (create-player player-name)))
(defn -main [& args]
  (run-jetty (site app-routes) {:port 3000}))

注意到两个函数都访问了players

持久数据结构

数据结构被修改时总是保留之前的版本,使用共享结构而非创建完整副本。


仅以list为例,当然,尾端不同只能够复制。

状态和标识

持久数据结构分离了标识和状态,状态是随时间变化的一系列值,若获取一个标识的当前状态,将来如何修改标识也不改变获取的状态。

swap!重试。

校验器
(def non-negative (atom 0 :validator #(>= % 0)))

在原子变量的值生效前调用,返回true则允许修改。

监视器
(add-watch a :print #(println "changed from "%3"to"%4))

添加监视器时提供键值(区分监视器),和监视函数,原子变量改变时,监视器接受四个参数,键值、原子变量的引用、原子变量的旧值和新值。
监视器在原子变量的值改变后被调用。

代理和软件事务内存

代理

agent包含对值的引用,@ deref获取值。

send函数类似swap!,但接受的函数异步执行,同一时间只会调用一个,不进行重试,可以有副作用。

awaitawait-for(可设置超时)函数,阻塞直到代理的所有操作全部完成。

错误处理

代理发生错误将默认进入失效状态,任何操作都会失败。
agent-error查看是否失效,restart-agent重置代理。

软件事务内存

通过引用(ref)实现软件事务内存(STM),对多个变量进行并发的一致的修改。
ref-setref赋值,alter修改(必须在事务中使用)。

事务

显然ACID
dosync创建事务
STM运行时检测到并发事务冲突,其中的某(几)个事务将重试,因此,需要保证没有副作用。

代理具有事务性,send仅在事务成功时生效
需要产生副作用,则使用代理。

函数末尾的!表示事务不安全

  • 原子变量对单一值隔离、同步的更新
  • 代理对单一值隔离、异步 的更新
  • 引用对多个值一致、同步的更新

参考
Software Transactional Memory
slide
Clojure STM 笔记

STM还蛮值得研究的,我决定好好学习clojure,话说回来刚在gayhub上star了一个golang实现

one more thing

STM解决哲学家

(ns philosophers.core)

(def philosophers (into [] (repeatedly 5 #(ref :thinking)))) #使用引用表示哲学家

(defn claim-chopsticks [philosopher left right]
  (dosync
    (when (and (= (ensure left) :thinking) (= (ensure right) :thinking))
      (ref-set philosopher :eating))))             
      #创建事务,检查左右状态
      #使用ensure而不用@,因为这里读取了其他事务可能改变的值

(defn release-chopsticks [philosopher]
  (dosync (ref-set philosopher :thinking)))

(defn think []
  (Thread/sleep (rand 1000)))

(defn eat []
  (Thread/sleep (rand 1000)))

(defn philosopher-thread [n]
  (Thread.
    #(let [philosopher (philosophers n)
           left (philosophers (mod (- n 1) 5))
           right (philosophers (mod (+ n 1) 5))]
      (while true 
        (think)
        (when (claim-chopsticks philosopher left right) 
          (eat)
          (release-chopsticks philosopher)))))) 

(defn -main [& args]
  (let [threads (map philosopher-thread (range 5))]
    (doseq [thread threads] (.start thread))
    (doseq [thread threads] (.join thread))))

简洁,无锁

原子变量版本

将需要更新的值整合到一个数据结构,使用单个原子变量管理。

(def philosophers (atom (into [] (repeat 5 :thinking))))