ReliableMsgAgent で Ap4r と同様のディスパッチを実行する

ReliableMsgAgent で「ap4r と同様のメッセージディスパッチ」をする

  • ap4r の I/F を用いてメッセージを put する
  • put したメッセージは、 ap4r にディスパッチさせずに reliable-msg-agent に自律的に取得させる。要するに pull 型アプローチをとる ap4r のようなもの
  • その際のディスパッチ方法として、 ap4r と同様のルールに則る。
    • たとえば ap4r のディスパッチモード HTTP の場合以下条件が成立する事で成功とみなす
ap4r の準備

以下、ap4r の作業ディレクトリを /home/hamajyotan/ap4r/ と仮定
ap4r のインストール

$ gem install ap4r --no-ri --no-rdoc

activesupport のインストール。 ap4r 初期化に必要

$ gem install activesupport -v "<3.0.0" --no-ri --no-rdoc

ap4rワークスペースを作る

$ cd /home/hamajyotan/ap4r/
$ ap4r_setup .
make application root directory [/home/hamajyotan/ap4r] ...
make directories for AP4R [config, log, public, script, tmp] ...
copy files from $GEM_HOME/gems/ap4r-0.3.7/config to /home/hamajyotan/ap4r/config ...
copy files from $GEM_HOME/gems/ap4r-0.3.7/script to /home/hamajyotan/ap4r/script ...
copy file from $GEM_HOME/gems/ap4r-0.3.7/fresh_rakefile to /home/hamajyotan/ap4r/Rakefile ...

[/home/hamajyotan/ap4r] has successfully set up!

$

以下のように /home/hamajyotan/ap4r/config/queues_notargets.cfg を作成する
ap4r が勝手にメッセージをディスパッチしないようにする

---
store:
  type: disk
drb:
  host:
  port: 6438
  acl: allow 127.0.0.1 allow ::1 allow 10.0.0.0/8
dispatchers:
  -
    targets: notargets
    threads: 1
#carriers:
#  -
#    source_uri: druby://another.host.local:6438
#    threads: 1
ap4r の起動
$ cd /home/hamajyotan/ap4r/
$ ruby script/mongrel_ap4r start -A config/queues_notargets.cfg
---
===
===
** Starting AP4R Handler with config/queues_notargets.cfg
Loaded queues configuration from: /home/hamajyotan/ap4r/config/queues_notargets.cfg
Using message store: disk
Accepting requests at: druby://localhost:6438
about to start dispatchers with config
---
- threads: 1
  targets: notargets

start dispatcher: targets= #<ReliableMsg::MultiQueue:0x2b89a7d39e90>, index= 0)
dispatch targets are : notargets;
queue manager has forked dispatchers
** Signals ready.  TERM => stop.  USR2 => restart.  INT => stop (no restart).
** Mongrel available at 0.0.0.0:7438
** Use CTRL-C to stop.
** Mongrel start up process completed.
reliable-msg-agent の設定ファイルの準備

ap4r を起動したコンソールとは別窓で作業
以下、reliable-msg-agnet の作業ディレクトリを /home/hamajyotan/reliable-msg-agent/ と仮定
gem をインストールしたディレクトリ配下の resources/ ディレクトリから必要なファイルをコピーする

$ cp $GEM_HOME/gems/reliable-msg-agent-0.1.0/resources/agent.conf /home/hamajyotan/reliable-msg-agent/
$ cp $GEM_HOME/gems/reliable-msg-agent-0.1.0/resources/examples/ap4r-dispatch-agent.rb /home/hamajyotan/reliable-msg-agent/
agent.conf に設定を加える

コンフィグにエージェントの定義ファイルのパスを教える。

$ echo agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb >> /home/hamajyotan/reliable-msg-agent/agent.conf

コンフィグの確認

---
logger: " Proc.new { |file| l = Logger.new(file); l.level = Logger::DEBUG; l } "

consumers:
  -
    source_uri: druby://localhost:6438
    every: 1.0
    target: queue.agent
    threads: 1

agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb

コンフィグを少し書き換える。

  • modify_rules
    • url
      • ディスパッチ先 url http://localhosthttp://localhost:9292 に書き換える
---
logger: " Proc.new { |file| l = Logger.new(file); l.level = Logger::DEBUG; l } "

consumers:
  -
    source_uri: druby://localhost:6438
    every: 1.0
    target: queue.agent
    threads: 1
    modify_rules:
      url: " Proc.new { |url| url.port = 9292; url } "

agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb
ここでエージェントの #call メソッドの定義を確認しておく
# this script is evaluated by the context of ReliableMsg::Agnet::Agent class.

require "yaml"
require "ap4r"

#
# The method of processing the message is defined.
#
# if the evaluation result is nil or false,
# it is considered that it failes.
#
# === Args
#
# +msg+     :: fetched message from reliable-msg queue.
# +conf+    :: consumer configurations.
# +options+ :: the options (it is still unused.)
#
def call msg, conf, options = {}

  # The following codes use the mechanism of sending the message by ap4r.
  dispatcher = Ap4r::Dispatchers.new nil, [], @logger

  @logger.debug { "dispatcher get message\n#{msg.to_yaml}" }
  response = dispatcher.send(:get_dispather_instance,
                             msg.headers[:dispatch_mode],
                             msg,
                             conf).call
  @logger.debug { "dispatcher get response\n#{response.to_yaml}" }

end

ap4r の実装の一部をコンフィグで埋め込んでいる、ほぼおまじない。
これにより、 ap4r で実装されている内容と同様のメッセージディスパッチが可能になる。※ ap4r 0.3.7 で確認

reliable-msg-agent の起動
$ reliable-msg-agent start -c /home/hamajyotan/reliable-msg-agent/agent.conf
*** Starting ReliableMsg-Agent...
I, [2011-04-12T01:22:28.678494 #9696]  INFO -- : *** reliable-msg agent service starting...
I, [2011-04-12T01:22:28.678822 #9696]  INFO -- : --- starting workers.
I, [2011-04-12T01:22:28.680012 #9696]  INFO -- : *** reliable-msg agent service started.
適当な web サーバを準備する

更に別窓で作業する。
以下を満たせばなんでも良い

  • (先に、コンフィグに設定している為)9292番ポートでリスンする
  • ステータスコードに 200 で、レスポンスボディに true (を含む文字列) を返す

今回は rack で適当に作る

$ gem install rack --no-ri --no-rdoc

rack のコンフィグファイルを適当に書く
/home/hamajyotan/stub.ru

#
# /home/hamajyotan/stub.ru
#
require "rubygems"
require "rack"

class Stub
  def call env
    [200, {"Content-Type" => "text/plain"}, ["true"]]
  end
end
run Stub.new

rack で web サーバ起動しておく

$ rackup /home/hamajyotan/stub.ru
ap4r へメッセージを put する

更に別窓で作業する。
以下の代わりに ap4rrails プラグインを使って put しても良い

$ irb -rubygems -r reliable-msg
irb(main):001:0> q = ReliableMsg::Queue.new "queue.agent"
 => #<ReliableMsg::Queue:0x2ac0e926f7a8 @queue="queue.agent">
irb(main):002:0> q.put "", {:dispatch_mode => :HTTP,
irb(main):003:0>            :target_method => :POST,
irb(main):004:0>            :target_url => "http://localhost/",
irb(main):005:0>            :queue => "queue.agent",
irb(main):006:0>            :delivery => :once}
 => "2f5e6c80-4688-012e-8954-f9a630fcd34e"
irb(main):007:0> exit
$
reliable-msg-agent を起動したコンソールを見てみる
  1. サービス起動
  2. メッセージ取得
  3. url 書き換え http://localhost/ -> http://localhost:9292
  4. http://localhost:9292 へアクセスして成功

の一連のログが出力されている模様

$ reliable-msg-agent start -c ./agent.conf
*** Starting ReliableMsg-Agent...
*** reliable-msg agent service starting...
--- starting workers.
*** reliable-msg agent service started.
message fetched - <2f5e6c80-4688-012e-8954-f9a630fcd34e>
dispatcher get message
--- !ruby/object:ReliableMsg::Message
headers:
  :dispatch_mode: :HTTP
  :target_url: http://localhost/
  :created: 1302539965
  :expires:
  :queue: queue.agent
  :delivery: :once
  :id: 2f5e6c80-4688-012e-8954-f9a630fcd34e
  :target_method: :POST
  :priority: 0
  :max_deliveries: 5
id: 2f5e6c80-4688-012e-8954-f9a630fcd34e
object: ""

Ap4r::Dispatcher after modification
--- !ruby/object:ReliableMsg::Message
headers:
  :dispatch_mode: :HTTP
  :target_url: http://localhost:9292/
  :created: 1302539965
  :expires:
  :queue: queue.agent
  :delivery: :once
  :id: 2f5e6c80-4688-012e-8954-f9a630fcd34e
  :target_method: :POST
  :priority: 0
  :max_deliveries: 5
id: 2f5e6c80-4688-012e-8954-f9a630fcd34e
object: ""

response status [200 OK]
dispatcher get response
--- !ruby/object:Net::HTTPOK
body: "true"
body_exist: true
code: "200"
header:
  content-type:
  - text/plain
  connection:
  - close
  date:
  - Mon, 11 Apr 2011 16:39:25 GMT
  transfer-encoding:
  - chunked
http_version: "1.1"
message: OK
read: true
socket:
stub.ru (webサーバ) を起動したコンソールを見てみる

アクセスがあった模様

$ rackup /home/hamajyotan/stub.ru
127.0.0.1 - - [12/Apr/2011 01:39:25] "POST / HTTP/1.1" 200 - 0.0023
内容をマンガで説明
  1. push-msg.sh が ap4r へメッセージを put する
  2. ap4r の周りを巡回している reliable-msg-agent がメッセージを get する
  3. (任意)メッセージヘッダの :target_url の書き換えを行う
    • http://localhost => http://localhost:9292
  4. reliable-msg-agent は ap4r のディスパッチルールに則り http://localhost:9292 へアクセス

[追記] http のタイムアウト

ap4r の dispatchers 機能をエージェントに記述しているため、コンフィグ consumers には、ap4r における dispatchers と同様のコンフィグが書ける。
試しに Ap4r::Dispachers に実装されている http タイムアウトを実験する。

http タイムアウトに関する設定を追記

reliable-msg-agent のコンフィグファイルに http タイムアウトに関する設定を追記
3秒以内に処理が終わらなければ失敗とみなす
/home/hamajyotan/reliable-msg-agent/agent.conf

---
logger: " Proc.new { |file| l = Logger.new(file); l.level = Logger::DEBUG; l } "

consumers:
  -
    source_uri: druby://localhost:6438
    every: 1.0
    target: queue.agent
    threads: 1
    modify_rules:
      url: " Proc.new { |url| url.port = 9292; url } "
    http:
      timeout: 3

agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb
stub.ru が 10秒かかるようにしておく
#
# /home/hamajyotan/stub.ru
#
require "rubygems"
require "rack"

class Stub
  def call env
    sleep 10
    [200, {"Content-Type" => "text/plain"}, ["true"]]
  end
end
run Stub.new
上記設定でメッセージを put してみる

エージェントの処理がタイムアウトで落ちている
reliable-msg-agent では以下のログが出力される

#
# 途中省略
#
message fetched - <52222a00-4692-012e-8f7a-f9afb181b616>
dispatcher get message
--- !ruby/object:ReliableMsg::Message
headers:
  :dispatch_mode: :HTTP
  :target_url: http://localhost/test/index.html
  :created: 1302544318
  :expires:
  :queue: queue.agent
  :delivery: :once
  :id: 52222a00-4692-012e-8f7a-f9afb181b616
  :target_method: :POST
  :priority: 0
  :max_deliveries: 5
id: 52222a00-4692-012e-8f7a-f9afb181b616
object: ""

Ap4r::Dispatcher after modification
--- !ruby/object:ReliableMsg::Message
headers:
  :dispatch_mode: :HTTP
  :target_url: http://localhost:9292/test/index.html
  :created: 1302544318
  :expires:
  :queue: queue.agent
  :delivery: :once
  :id: 52222a00-4692-012e-8f7a-f9afb181b616
  :target_method: :POST
  :priority: 0
  :max_deliveries: 5
id: 52222a00-4692-012e-8f7a-f9afb181b616
object: ""

set HTTP read timeout to 3s
error in fetch-msg/agent-proc: execution expired
/home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/timeout.rb:64:in `rbuf_fill'
        /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/protocol.rb:134:in `rbuf_fill'
        /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/protocol.rb:116:in `readuntil'
        /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/protocol.rb:126:in `readline'
        /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/http.rb:2028:in `read_status_line'
#
# 途中省略
#
msg-agent:75:in `send'
        /home/hamajyotan/.rvm/gems/ruby-1.8.7-p302/gems/reliable-msg-agent-0.1.0/bin/reliable-msg-agent:75
        /home/hamajyotan/.rvm/gems/ruby-1.8.7-p302/bin/reliable-msg-agent:19:in `load'
        /home/hamajyotan/.rvm/gems/ruby-1.8.7-p302/bin/reliable-msg-agent:19