ReliableMsgAgent で Ap4r と同様のディスパッチを実行する
ReliableMsgAgent で「ap4r と同様のメッセージディスパッチ」をする
- ap4r の I/F を用いてメッセージを put する
- put したメッセージは、 ap4r にディスパッチさせずに reliable-msg-agent に自律的に取得させる。要するに pull 型アプローチをとる ap4r のようなもの
- その際のディスパッチ方法として、 ap4r と同様のルールに則る。
ap4r の準備
以下、ap4r の作業ディレクトリを /home/hamajyotan/ap4r/ と仮定
ap4r のインストール
$ gem install ap4r --no-ri --no-rdoc
activesupport のインストール。 ap4r 初期化に必要
$ gem install activesupport -v "<3.0.0" --no-ri --no-rdoc
$ cd /home/hamajyotan/ap4r/ $ ap4r_setup . make application root directory [/home/hamajyotan/ap4r] ... make directories for AP4R [config, log, public, script, tmp] ... copy files from $GEM_HOME/gems/ap4r-0.3.7/config to /home/hamajyotan/ap4r/config ... copy files from $GEM_HOME/gems/ap4r-0.3.7/script to /home/hamajyotan/ap4r/script ... copy file from $GEM_HOME/gems/ap4r-0.3.7/fresh_rakefile to /home/hamajyotan/ap4r/Rakefile ... [/home/hamajyotan/ap4r] has successfully set up! $
以下のように /home/hamajyotan/ap4r/config/queues_notargets.cfg を作成する
ap4r が勝手にメッセージをディスパッチしないようにする
--- store: type: disk drb: host: port: 6438 acl: allow 127.0.0.1 allow ::1 allow 10.0.0.0/8 dispatchers: - targets: notargets threads: 1 #carriers: # - # source_uri: druby://another.host.local:6438 # threads: 1
ap4r の起動
$ cd /home/hamajyotan/ap4r/ $ ruby script/mongrel_ap4r start -A config/queues_notargets.cfg --- === === ** Starting AP4R Handler with config/queues_notargets.cfg Loaded queues configuration from: /home/hamajyotan/ap4r/config/queues_notargets.cfg Using message store: disk Accepting requests at: druby://localhost:6438 about to start dispatchers with config --- - threads: 1 targets: notargets start dispatcher: targets= #<ReliableMsg::MultiQueue:0x2b89a7d39e90>, index= 0) dispatch targets are : notargets; queue manager has forked dispatchers ** Signals ready. TERM => stop. USR2 => restart. INT => stop (no restart). ** Mongrel available at 0.0.0.0:7438 ** Use CTRL-C to stop. ** Mongrel start up process completed.
reliable-msg-agent の設定ファイルの準備
ap4r を起動したコンソールとは別窓で作業
以下、reliable-msg-agnet の作業ディレクトリを /home/hamajyotan/reliable-msg-agent/ と仮定
gem をインストールしたディレクトリ配下の resources/ ディレクトリから必要なファイルをコピーする
$ cp $GEM_HOME/gems/reliable-msg-agent-0.1.0/resources/agent.conf /home/hamajyotan/reliable-msg-agent/ $ cp $GEM_HOME/gems/reliable-msg-agent-0.1.0/resources/examples/ap4r-dispatch-agent.rb /home/hamajyotan/reliable-msg-agent/
agent.conf に設定を加える
コンフィグにエージェントの定義ファイルのパスを教える。
$ echo agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb >> /home/hamajyotan/reliable-msg-agent/agent.conf
コンフィグの確認
--- logger: " Proc.new { |file| l = Logger.new(file); l.level = Logger::DEBUG; l } " consumers: - source_uri: druby://localhost:6438 every: 1.0 target: queue.agent threads: 1 agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb
コンフィグを少し書き換える。
- modify_rules
- url
- ディスパッチ先 url http://localhost を http://localhost:9292 に書き換える
- url
--- logger: " Proc.new { |file| l = Logger.new(file); l.level = Logger::DEBUG; l } " consumers: - source_uri: druby://localhost:6438 every: 1.0 target: queue.agent threads: 1 modify_rules: url: " Proc.new { |url| url.port = 9292; url } " agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb
ここでエージェントの #call メソッドの定義を確認しておく
# this script is evaluated by the context of ReliableMsg::Agnet::Agent class. require "yaml" require "ap4r" # # The method of processing the message is defined. # # if the evaluation result is nil or false, # it is considered that it failes. # # === Args # # +msg+ :: fetched message from reliable-msg queue. # +conf+ :: consumer configurations. # +options+ :: the options (it is still unused.) # def call msg, conf, options = {} # The following codes use the mechanism of sending the message by ap4r. dispatcher = Ap4r::Dispatchers.new nil, [], @logger @logger.debug { "dispatcher get message\n#{msg.to_yaml}" } response = dispatcher.send(:get_dispather_instance, msg.headers[:dispatch_mode], msg, conf).call @logger.debug { "dispatcher get response\n#{response.to_yaml}" } end
ap4r の実装の一部をコンフィグで埋め込んでいる、ほぼおまじない。
これにより、 ap4r で実装されている内容と同様のメッセージディスパッチが可能になる。※ ap4r 0.3.7 で確認
reliable-msg-agent の起動
$ reliable-msg-agent start -c /home/hamajyotan/reliable-msg-agent/agent.conf *** Starting ReliableMsg-Agent... I, [2011-04-12T01:22:28.678494 #9696] INFO -- : *** reliable-msg agent service starting... I, [2011-04-12T01:22:28.678822 #9696] INFO -- : --- starting workers. I, [2011-04-12T01:22:28.680012 #9696] INFO -- : *** reliable-msg agent service started.
適当な web サーバを準備する
更に別窓で作業する。
以下を満たせばなんでも良い
- (先に、コンフィグに設定している為)9292番ポートでリスンする
- ステータスコードに 200 で、レスポンスボディに true (を含む文字列) を返す
今回は rack で適当に作る
$ gem install rack --no-ri --no-rdoc
rack のコンフィグファイルを適当に書く
/home/hamajyotan/stub.ru
# # /home/hamajyotan/stub.ru # require "rubygems" require "rack" class Stub def call env [200, {"Content-Type" => "text/plain"}, ["true"]] end end run Stub.new
rack で web サーバ起動しておく
$ rackup /home/hamajyotan/stub.ru
ap4r へメッセージを put する
更に別窓で作業する。
以下の代わりに ap4r の rails プラグインを使って put しても良い
$ irb -rubygems -r reliable-msg irb(main):001:0> q = ReliableMsg::Queue.new "queue.agent" => #<ReliableMsg::Queue:0x2ac0e926f7a8 @queue="queue.agent"> irb(main):002:0> q.put "", {:dispatch_mode => :HTTP, irb(main):003:0> :target_method => :POST, irb(main):004:0> :target_url => "http://localhost/", irb(main):005:0> :queue => "queue.agent", irb(main):006:0> :delivery => :once} => "2f5e6c80-4688-012e-8954-f9a630fcd34e" irb(main):007:0> exit $
reliable-msg-agent を起動したコンソールを見てみる
- サービス起動
- メッセージ取得
- url 書き換え http://localhost/ -> http://localhost:9292
- http://localhost:9292 へアクセスして成功
の一連のログが出力されている模様
$ reliable-msg-agent start -c ./agent.conf *** Starting ReliableMsg-Agent... *** reliable-msg agent service starting... --- starting workers. *** reliable-msg agent service started. message fetched - <2f5e6c80-4688-012e-8954-f9a630fcd34e> dispatcher get message --- !ruby/object:ReliableMsg::Message headers: :dispatch_mode: :HTTP :target_url: http://localhost/ :created: 1302539965 :expires: :queue: queue.agent :delivery: :once :id: 2f5e6c80-4688-012e-8954-f9a630fcd34e :target_method: :POST :priority: 0 :max_deliveries: 5 id: 2f5e6c80-4688-012e-8954-f9a630fcd34e object: "" Ap4r::Dispatcher after modification --- !ruby/object:ReliableMsg::Message headers: :dispatch_mode: :HTTP :target_url: http://localhost:9292/ :created: 1302539965 :expires: :queue: queue.agent :delivery: :once :id: 2f5e6c80-4688-012e-8954-f9a630fcd34e :target_method: :POST :priority: 0 :max_deliveries: 5 id: 2f5e6c80-4688-012e-8954-f9a630fcd34e object: "" response status [200 OK] dispatcher get response --- !ruby/object:Net::HTTPOK body: "true" body_exist: true code: "200" header: content-type: - text/plain connection: - close date: - Mon, 11 Apr 2011 16:39:25 GMT transfer-encoding: - chunked http_version: "1.1" message: OK read: true socket:
stub.ru (webサーバ) を起動したコンソールを見てみる
アクセスがあった模様
$ rackup /home/hamajyotan/stub.ru 127.0.0.1 - - [12/Apr/2011 01:39:25] "POST / HTTP/1.1" 200 - 0.0023
[追記] http のタイムアウト
ap4r の dispatchers 機能をエージェントに記述しているため、コンフィグ consumers には、ap4r における dispatchers と同様のコンフィグが書ける。
試しに Ap4r::Dispachers に実装されている http タイムアウトを実験する。
http タイムアウトに関する設定を追記
reliable-msg-agent のコンフィグファイルに http タイムアウトに関する設定を追記
3秒以内に処理が終わらなければ失敗とみなす
/home/hamajyotan/reliable-msg-agent/agent.conf
--- logger: " Proc.new { |file| l = Logger.new(file); l.level = Logger::DEBUG; l } " consumers: - source_uri: druby://localhost:6438 every: 1.0 target: queue.agent threads: 1 modify_rules: url: " Proc.new { |url| url.port = 9292; url } " http: timeout: 3 agent: /home/hamajyotan/reliable-msg-agent/ap4r-dispatch-agent.rb
stub.ru が 10秒かかるようにしておく
# # /home/hamajyotan/stub.ru # require "rubygems" require "rack" class Stub def call env sleep 10 [200, {"Content-Type" => "text/plain"}, ["true"]] end end run Stub.new
上記設定でメッセージを put してみる
エージェントの処理がタイムアウトで落ちている
reliable-msg-agent では以下のログが出力される
# # 途中省略 # message fetched - <52222a00-4692-012e-8f7a-f9afb181b616> dispatcher get message --- !ruby/object:ReliableMsg::Message headers: :dispatch_mode: :HTTP :target_url: http://localhost/test/index.html :created: 1302544318 :expires: :queue: queue.agent :delivery: :once :id: 52222a00-4692-012e-8f7a-f9afb181b616 :target_method: :POST :priority: 0 :max_deliveries: 5 id: 52222a00-4692-012e-8f7a-f9afb181b616 object: "" Ap4r::Dispatcher after modification --- !ruby/object:ReliableMsg::Message headers: :dispatch_mode: :HTTP :target_url: http://localhost:9292/test/index.html :created: 1302544318 :expires: :queue: queue.agent :delivery: :once :id: 52222a00-4692-012e-8f7a-f9afb181b616 :target_method: :POST :priority: 0 :max_deliveries: 5 id: 52222a00-4692-012e-8f7a-f9afb181b616 object: "" set HTTP read timeout to 3s error in fetch-msg/agent-proc: execution expired /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/timeout.rb:64:in `rbuf_fill' /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/protocol.rb:134:in `rbuf_fill' /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/protocol.rb:116:in `readuntil' /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/protocol.rb:126:in `readline' /home/hamajyotan/.rvm/rubies/ruby-1.8.7-p302/lib/ruby/1.8/net/http.rb:2028:in `read_status_line' # # 途中省略 # msg-agent:75:in `send' /home/hamajyotan/.rvm/gems/ruby-1.8.7-p302/gems/reliable-msg-agent-0.1.0/bin/reliable-msg-agent:75 /home/hamajyotan/.rvm/gems/ruby-1.8.7-p302/bin/reliable-msg-agent:19:in `load' /home/hamajyotan/.rvm/gems/ruby-1.8.7-p302/bin/reliable-msg-agent:19