concurrent.el リリース

今まで何の説明も無しに自分のアプリで使ってきた謎ライブラリ concurrent.el ですが、一区切りが付いた気がしましたのでリリースしたいと思います。

この記事では concurrent.el の基盤である deferred.el について簡単に紹介して、 concurrent.el の機能と適用例を紹介します。

あらすじ

  • deferred.el復習
  • concurrent.el紹介
  • 機能一覧、コード例
  • cacoo.elでの設計解説

deferred.el 紹介

deferred.el の詳しい使い方やAPIなどはREADMEの文書がまとまっていますので、手っ取り早く使いたい人はそちらを参照してみてください。 おそらく、他の言語でDeferredに慣れていればすぐに使えるのではないかと思います。

Deferred / 非同期タスクをつなげていく

deferred.el は非同期のタスクをつなげていくプログラムをつくっていきます。具体的には、ある「コールバック1」を実行した後で「コールバック2」を実行するというような、コールバックの連鎖を構築します。書いているプログラムはコールバックのつなげ方を指示するようなプログラムになります。

つまりコールバックの書き方を少し変えただけなのですが、ソースコードが格段に分かりやすくなり、また非同期をつなげるときの柔軟性もはるかに向上します。また、エラー時の処理も分かりやすくもれなく書くことが出来るため、プログラムの信頼性も上がります。

JavaScript上の実装である JSDeferred を参考に実装していますので、そちらになれている人はすぐに使えると思います。また、JavaScriptからすると5年以上後発であるため、基本APIは枯れていてかなり固まっています。おそらく、今後も機能追加はあったとしても非互換な修正はほとんど無いだろうと思っています。

基本機能一覧

簡単に機能を紹介します。

非同期タスクの開始
すぐに開始 deferred:next
一定時間待って開始 deferred:wait
複数のタスクを並行に開始 deferred:parallel, earlier
リストや数を受け取ってループ開始 deferred:loop
同期的に開始 deferred:succeed
外部プロセスを開始 deferred:process など
HTTP接続を開始 deferred:url-retrieve など
タスクをつなぐ
前のタスクにつなぐ deferred:nextc
エラー時のタスクをつなぐ deferred:error
動的につなぐ タスクの返値で次に実行したいDeferredオブジェクトを返す
正常・エラーにかかわらずタスクを割り込ませる deferred:watch
非同期でのtry-catch-finally deferred:try
待ち合わせ
並行タスクが全部終了するまで待つ parallel
早く終了した一つのタスクだけを待つ earlier
非同期で欲しくなるもの

非常に便利な Deferred なのですが、 慣れてくると処理を切ったりつなげたりというかなり低レベルな処理が続くことに気がつきます。また、多くの場面で似たような Deferred 処理を書くことも多くなってきました。

どのような場合があるのかまとめてみました。

  • 非同期プロセス数の制御
    • wgetやconvertなどの同時実行数を制限したい
    • ロックや処理を直列化したい
    • →ロック、セマフォ
  • 実行順序の制御
    • 複数のタスクを非同期に実行したいが、順序や依存関係で制御したい
    • →データフロー
  • スレッド
    • UIにアニメーションを手軽に入れたい
    • バックグラウンドで処理を小刻みに行いたい
  • 非同期プロセス間の通信
    • Windowやバッファ間で通信したい
      • でも依存関係を増やしたくない
      • →非同期通信
    • GUIでよくあるイベント管理システムが欲しい
      • Emacsのイベントはプリミティブ過ぎる
      • 興味あるイベント
      • ブロードキャスト
      • イベントシステムの階層構造
    • →ストリーム、チャンネル

おそらく、これはEmacsだけの問題ではなくて、非同期をたくさん使う場合にはみんな同じ悩みを持っているのではないかと思います。

そこで deferred.el の上に、上記のような良くある機能を構築してみました。

concurrent.el

基本的には、上で述べたような要求を満たすような機能を設計・実装しました。その際、これまでの開発の経験や、マルチスレッド、並行プログラミングの考え方などを参考にしました。

ただ、並行プログラミングやGUIの専門家ではないので、良い方法をがあれば是非知りたいと思っていますし、 concurrent.el の実装自体についても議論があれば教えて欲しいと思っています。是非みなさま、よろしくお願いします。

開発は deferred.el と同じリポジトリに入っています。

機能一覧

現在の機能一覧です。コードサンプルについてはすぐ後で例示します。(Wikipedia便利ですね。)

  • thread
    • 適当な単位で処理を分割して実行するスレッド

ここに挙げた機能は、いくつかツールを作って検証しているので自分の中ではある程度固まっています。ただ、自分一人しか使っていませんので、今後修正や機能追加があれば、互換性を維持しながら積極的に行っていきたいと思っています。

また、今後以下の項目についても必要であれば実装しようかなと思っています。

  • モニタ
  • 遅延実行
  • プロセス間通信、comintのサポート
  • 差分リスト?
コード例

Theadの例:lexical-letを評価するとその場でアニメーションします。引数の時間は、bodyの処理の間隔です。

(lexical-let 
    ((count 0) (anm "-/|\\-")
     (end 50) (pos (point)))
  (cc:thread 
   60 
   (message "Animation started.")
   (while (> end (incf count))
     (save-excursion
       (when (< 1 count)
         (goto-char pos) (delete-char 1))
       (insert (char-to-string 
                (aref anm (% count (length anm)))))))
   (save-excursion
     (goto-char pos) (delete-char 1))
   (message "Animation finished.")))

whileを使うことでスレッドをループさせることが出来ます。whileの中身は一気に実行されます。

無限ループや重い処理でEmacsが固まらないように注意してください。もし無限ループに突入してしまったり、固まってしまったら deferred:clear-queue コマンドで回復できる可能性があります。

Generatorの例:fib-genにジェネレーターを作ります。ジェネレーター生成body内のyield関数で値を返します。非同期なのでしょうがないのですが、コールバックで値を受け取るところがいまいちかも知れません。

(setq fib-list nil)
(setq fib-gen
      (lexical-let ((a1 0) (a2 1))
        (cc:generator
         (lambda (x) (push x fib-list)) ; コールバックで結果受け取り
         (yield a1)
         (yield a2)
         (while t
           (let ((next (+ a1 a2)))
             (setq a1 a2
                   a2 next)
             (yield next))))))

(funcall fib-gen) ; 何度か呼んでみる
(funcall fib-gen) (funcall fib-gen)
(funcall fib-gen) (funcall fib-gen)

fib-list ; => (3 2 1 1 0)

Semaphoreの例:cc:semaphore-acquire 関数が deferred を返すので、それに続けて実行させたいタスクをつなげていきます。時系列で挙動が変わっていくのでコード中に簡単な説明を書いてみました。

;; permit=1のセマフォ作成
(setq smp (cc:semaphore-create 1))

;; 続けて3つ実行しようとする
(deferred:nextc (cc:semaphore-acquire smp)
  (lambda(x) 
    (message "go1")))
(deferred:nextc (cc:semaphore-acquire smp)
  (lambda(x) 
    (message "go2")))
(deferred:nextc (cc:semaphore-acquire smp)
  (lambda(x) 
    (message "go3")))

;; => 1つ目だけ実行されて go1 が表示される

(cc:semaphore-release smp) ; permitを返す

;; => 2つ目が実行されて go2 が表示される

(cc:semaphore-waiting-deferreds smp) ; go3 を表示するdeferred

(cc:semaphore-release-all smp) ; => permitを初期化して go3 を表示するdeferredを返す

(cc:semaphore-waiting-deferreds smp) ; => nil

Dataflowの例: cc:dataflow-environment 関数で変数を格納する「環境」を作ります。 cc:dataflow-get は値の取得とそれに続くタスクをつなげる deferred を返します。 cc:dataflow-set で値をバインドします。例ではキーに文字列を使っていますが、キーには任意のオブジェクトを指定できます。

(setq dfenv (cc:dataflow-environment))

;; ○基本の使い方

;; ↓同期的に値を取得。ブロックしない。
(cc:dataflow-get-sync dfenv "abc") ; => nil まだ値が無い。

(deferred:$ ; abc という値を取ってきて表示する処理
  (cc:dataflow-get dfenv "abc")
  (deferred:nextc it
    (lambda (x) (message "Got abc : %s" x))))
;; => 値がないので処理はブロックしたまま

(cc:dataflow-set dfenv "abc" 256) ; 値をセット
;; => ここで先ほどブロックしていた処理が再開し、 "Got abc : 256" が表示される

(cc:dataflow-get-sync dfenv "abc") ; => 256

(cc:dataflow-clear dfenv "abc") ; 値を未バインドに戻す

(cc:dataflow-get-sync dfenv "abc") ; => nil

;; ○リストをキーにする

(deferred:$
  (cc:dataflow-get dfenv '("http://example.com/a.jpg" 300))
  (deferred:nextc it
    (lambda (x) (message "a.jpg:300 OK %s" x))))

(cc:dataflow-set dfenv '("http://example.com/a.jpg" 300) 'jpeg)

;; => a.jpg:300 OK jpeg

;; ○2つの値を待ち受ける

(deferred:$ ; abc, def の2つの値を使う
  (deferred:parallel
    (cc:dataflow-get dfenv "abc")
    (cc:dataflow-get dfenv "def"))
  (deferred:nextc it
    (lambda (values) 
      (apply 'message "Got values : %s, %s" values)
      (apply '+ values)))
  (deferred:nextc it
    (lambda (x) (insert (format ">> %s" x)))))
;; => もちろんブロックする

(cc:dataflow-get-waiting-keys dfenv) ; => ("def" "abc")
(cc:dataflow-get-avalable-pairs dfenv) ; => ((("http://example.com/a.jpg" 300) . jpeg))

(cc:dataflow-set dfenv "abc" 128) ; ここではまだブロックしたまま
(cc:dataflow-set dfenv "def" 256) ; ここでやっと動く
;; => Got values : 128, 256

Signalの例: cc:signal-channel でシグナルを流すチャンネルを作成します。その後、signalに応答する処理を接続していきます。

;; シグナルのチャンネルを作成
(setq channel (cc:signal-channel))

(cc:signal-connect ; foo というシグナルを拾う
 channel 'foo
 (lambda (event) (message "Signal : %S" event)))

(cc:signal-connect
 channel t  ; t にするとすべてのシグナルを拾う
 (lambda (event) 
   (destructuring-bind (event-name (args)) event
     (message "Listener : %S / %S" event-name args))))

(deferred:$ ; deferred で非同期タスクを接続できる
  (cc:signal-connect channel 'foo)
  (deferred:nextc it
    (lambda (x) (message "Deferred Signal : %S" x))))

(cc:signal-send channel 'foo "hello signal!")
;; =>
;; Listener : foo / "hello signal!"
;; Signal : (foo ("hello signal!"))
;; Deferred Signal : (foo ("hello signal!"))

(cc:signal-send channel 'some "some signal!")
;; =>
;; Listener : some / "some signal!"

dataflowの内部には、変数へのアクセスやバインドのシグナルを発信するchannelがあります。これを使って、未バインドの変数に値を作成してセットするようなことが出来ます。

signalやdataflowは、カスケード接続して親子関係を構築できます。例えば、親dataflowにデフォルト値(フォールバックの値)を入れておくとか、channelで親子関係を構築してローカルなイベントとグローバルなイベントを分けて効率的にイベントを管理するなどが出来ます。

活用例紹介

前述の非同期で欲しくなるもののひっくり返しになりますが、自分のツール(anything-books.el, cacoo.el, 3D迷路)で使っているところを簡単に紹介します。後で、cacoo.el についてはもう少し詳しく解説したいと思います。

  • 通信・プロセス数制限
    • semaphoreで実行数制御
  • アニメーション
    • threadで動作中のアニメーションを表示
    • 処理本体からの signal イベントを拾って、スレッドの開始や停止
  • ウインドウ間の通信
    • signal/channel で表示要求や処理イベントを通知
    • イベント受け取り先のウインドウやバッファがいなくてもかまわない
  • プロセス間の通信
    • 通信プロトコルを signal/channel でラップ
    • 透過的なメッセージパッシング通信を実現
  • キャッシュ、依存関係
    • dataflow でキャッシュ管理
    • キャッシュデータ間の複雑な依存関係やイベントを管理
      • 依存関係により勝手に処理がブロックし、準備が出来たら再開する
    • キャッシュデータ要求の signal を拾って、キャッシュミスならデータの取得や加工を開始

cacoo.elのアーキテクチャ

cacoo.el は本格的に concurrent.el を使ってみたツールです。いろいろなところに concurrent.el の機能を使っていますが、そのうちの重要ないくつかについて解説してみます。

cacoo.el 自体については以前の記事を参照してください。

全体

プログラムは大きく分けて2つに分かれます。一つはバッファの中のマークアップを画像で置き換える処理。もう一つは Cacoo API に接続して、Cacooの絵をAnythingで選択して貼り付ける処理です。



cacoo.elの概要。画像埋め込み周りとAnything周り。

前者のマークアップの画像を置き換える処理では、「wgetで画像取得」「convertでサイズ変更」「画像をバッファに表示」という一連のタスクがあります。また、それぞれのタスクの成果物はキャッシュとしてローカルに保存されます。これらをうまく管理することが一つの目標です。

後者のCacooAPIに接続する部分では、バックグラウンドのネットワーク処理は deferred.el で行いますが、Anythingの起動を遅延させたり、プレビュー画像を表示させたりするところで concurrent.el を使っています。

semaphoreによるプロセス数制御

通信、画像取得、ImageMagickによるリサイズ処理は、外部プロセスを起動しています。これらは非同期に実行されますが、何も考えずに非同期で次々に実行させると、大量の外部プロセスが同時に起動してしまいます。数個程度であればそれほど問題ないのですが、何十個も画像がある場合は、同時に大量のHTTPアクセスが発生してサーバーに拒否されたり、あるいは大量のプロセスが起動してメモリ不足になってしまうなどの、ユーザーにとって好ましくない状況になります。

通常、このような同時実行するタスクの量を制限したい場合にはセマフォを使います。 cacoo.el では、設定変数の cacoo:process-num で制限するプロセス数を指定し、実際には cacoo:process-semaphore にセットされるセマフォオブジェクトで同時実行タスクを管理します。デフォルトは4になっています。

cacoo:process-semaphore は各地の非同期タスクの中で横断的に出てくるのですが、全体を通して起動されるプロセス数が上限を超えないように調整されます。

もし、上限を超えるプロセスの実行が要求された場合、セマフォがその処理をブロックします。そして先に実行されている処理が終了して余裕が出てきた段階で、待たせておいたタスクを実行します。

emacs-w3m にも同様の処理キューのような仕組みがありますが、特別に非同期を扱うためにかなり複雑なコードになっています。非同期タスク用セマフォは一般的にニーズが高いと機能ではないかと思っています。

thread によるアニメーションと signal による処理状況の通知

時間のかかるタスクを実行しているとき、ユーザーに何らかの進行状況の通知をするべきです。通常のGUIアプリであれば、プログレスバーを表示したりアニメーションを表示するものが多いです。

cacoo.el では、Anythingでのプレビュー画像表示のところで、くるくる回るアニメーション表示と、進行状況の通知を行っています。

concurrent.el の thread を使うことでアニメーションの更新・停止処理をシンプルに書くことが出来ました。thread が無ければ自力でタイマーを駆使したりフラグで止めたりしなければならず、やりたいことの割には複雑なコードになります。

また、処理状況の通知については、メインの処理である「取得→リサイズ→表示」の非同期タスクが処理状況を signal でブロードキャストし、それをGUI周りのコンポーネントが適当にイベントを拾ってきて更新するというような構成にしました。実際にはまだ整理し切れてないところもあるのですが、メインの非同期処理とGUIのコードがある程度分離できたように思います。



Anythingでのプレビューの動作。非同期メッセージングで通知する。

Dataflowによる画像キャッシュ管理

画像のマークアップを表示するためには、画像取得とリサイズを行う必要があります。毎回画像を取りに行くのは無駄ですので、取ってきたオリジナルの画像とリサイズした画像はローカルにキャッシュしておきます。この処理の流れとキャッシュデータの依存関係を表すと下の図のようになります。



処理の流れ、画像キャッシュの依存関係

矢印が依存方向ですので、左側がデータの上流です。

最初、データの流れに従って左側から順に非同期タスクを処理するようなプログラムを書いていました。図に書くと、下図のようです。



データの流れに沿って処理をつなげた。左側から順に実行する。

キャッシュについては、キャッシュがあるかどうかを調べて、あればタスクをスキップします。

この方法には問題がいくつかあることが分かりました。まずマークアップごとに独立したタスクになるため、URLが同じ画像があっても、無駄に画像を取りに行ってしまったり、データや処理が競合して正しく画像が表示できなくなります。

また、「画像を取りに行ってリサイズして表示するまで」が一つのアトミックな単位になるため、処理をバラバラにして非同期タスクのスループットを上げるようなことが出来ません。

そこで、データフロー変数を使って依存関係をそのままプログラムに落とし、依存関係の間は非同期メッセージングで接続することにしました。絵に描くと下図のようです。



データフロー変数を使って、依存先のデータがあるつもりのプログラムにする

これにより、アトミックな非同期タスクが依存関係の矢印1つ分に分割され、効率よくタスクを実行することが出来るようになりました。さらに、データフロー変数で全体のデータを共有するようになったため、マークアップ間で同じURLがあった場合は、同一の画像データを使うようになりました。これで、競合や無駄な処理が起きなくなりました。

また、プログラム自体も短くなり、さらに各タスクが粗結合になったため、さらなる拡張もやりやすくなりました。以前の実装ではプラグインはかなり特別なコードを書いて、しかもキャッシュがうまく出来ず、効率が悪かったのですが、今回の実装ではデータフロー変数の枠組みの中で効率よく処理できるようになりました。

一方で、「画像を取りに行ってリサイズして表示する」全体のタスクがバラバラになったため、コードから全体のタスク流れを読み取ることが難しくなりました。このような依存関係をつなげていって全体を構築するアーキテクチャでは、全体を俯瞰するドキュメントが重要になります。

Dataflowによる画像リスト管理

CacooのマイナーモードをONにすると、バックグラウンドで Cacoo API からシートの一覧を取得します。もし、一覧の取得途中でAnythingを起動した場合、ユーザーにしばらく待ってもらって一覧取得後にAnythingを起動するようにします。

今回の実装ではデータフロー変数を使って、シート一覧の取得とAnythingの起動の待ち合わせを自動的に行えるようにしました。Anythingはこの変数を参照するコードを書くだけで、データがまだバインド(取得完了)されていなければ自動的にブロックするようになります。絵に描くと下図のようです。



シート一覧取得時の待ち合わせ

非同期の待ち合わせはフラグなどを多用して分かりにくくなりがちなのですが、このような抽象的な機能を使うことで、誰がデータの生産者で誰が消費者なのかをコードで示すことが出来るようになります。

ただ、何度もAnythingコマンドが発行されると(ありがちなケース)、バインド待ちのタスクが溜まっていってしまうので、やっぱりフラグを使って一つだけになるように制限してしまいました。多分、差分リストのようなテクニックを使うことで、もうちょっとうまく出来るような気がするのですが、あまり高度になりすぎても意味不明になりそうだったので、この実装はこれで良かったと思っています。

まとめ

cacoo.el では concurrent.el の実力テストということで、積極的にいろいろなところに活用してみました。大半はうまくいきましたし、まだまだ設計や実装に改良の余地があるところもありました。

OOPでは、静的な入れ物としてのクラスやインスタンスは便利だったのですが、依存関係や実行順序などの動的な側面の管理はあまり決定打がなかった気がしていました。今回の並列プログラミング言語風の機能は、そういった動的な側面をうまく管理できそうな感触を持ちました。

ということで、今後も concurrent.el を改良していきたいと思います。

今後の予定

なんとか英語でもAPIドキュメントをそろえて、 deferred.el, concurrent.el を宣伝してみようと思います。

deferred.el だけだとプリミティブ過ぎたり足りない部品があったり、良さがうまく伝わらないかなと思っていたので、ちょっと躊躇していました。あと、Node.jsでシングルスレッドの非同期プログラミング環境が広まっているので、そういうものと絡めて「シングルスレッドのEmacsでも十分カッコイイよ!」と宣伝できると良いのかなと思っています。標準で入るといいですね。

後は、並列系がやっとそろったのでやっと skype.el の非同期化と機能改良に取りかかれると思います。
その前に、calfw.el のリリースとか、e2wm.elの改善とかリリースとか、いろいろ山盛りかも知れません。。。