D-7 <altijd in beweging>

Day to day life of a Perl/Go/C/C++/whatever hacker. May include anything from tech, food, and family.

タグ:AnyEvent

ちょっとだけ躓いたりしたので、備忘録のため。AnyEventでData::MessagePackを受け取るサーバー
use strict;
use AnyEvent;
use AnyEvent::Socket;
use Data::MessagePack;
use Data::Dumper;

main();

sub main {
    my $host = undef;
    my $port = 8888;
    my $guard = tcp_server $host, $port, sub {
        my ($fh) = @_;
        handle($fh);
    };

    my $cv = AE::cv;
    my $w; $w = AE::signal 'INT' => sub {
        undef $w;
        undef $guard;
        $cv->send;
    };

    $cv->recv;
}


sub handle {
    my $fh = shift;

    my $packer = Data::MessagePack::Unpacker->new;
    my $buf = '';
    my $offset = 0;
    my $w; $w = AE::io $fh, 0, sub {
        my $n = sysread $fh, $buf, 65536, length $buf;
        if ( $n == 0 ) {
            undef $w;
        }

        while (length $buf > 0) {
            $offset = $packer->execute( $buf, $offset );
            if (! $packer->is_finished) {
                last;
            }

            warn Dumper( $packer->data );
            substr( $buf, 0, $offset, '' );
            $offset = 0;
            $packer->reset;
        }
    };
    my $s; $s = AE::signal INT => sub {
        undef $w;
        undef $s;
    };
}
適当なクライアントスクリプト
use strict;
use Data::MessagePack;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Util;
my $count = shift @ARGV || 100;

my $cv = AE::cv;
my $w; $w = tcp_connect '127.0.0.1' => '8888' => sub {
    my $fh = shift;

    AnyEvent::Util::fh_nonblocking($fh, 1);
    my $i = 0;
    my $h = AnyEvent::Handle->new(fh => $fh);
    my $next = sub {
        $cv->begin;
        $h->push_write( Data::MessagePack->pack({ foo => $i }) );
    };

    $next->();
    $h->on_drain(sub {
        my $h = shift;
        $cv->end;
        if (++$i < $count) {
            $next->();
        } else {
            undef $w;
        }
    });
};

$cv->recv;
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr

Web+DB Press vol.56でAnyEvent入門記事を書きました。

いつも使うわけではないだろうけれども、これからはイベント駆動、非同期処理はサーバーサイドのエンジニアには必須なツールの一つになっていくと思います。

AnyEventを使うと簡単に非同期処理が手軽に書けるので、興味のあるかたは是非確認してみてください!
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr

まだこう、細かいpros/consがわからないのでなんとも言えないんだけど、とりあえずaio_open/aio_writeと普通のopen/print/closeで同じ事した場合とでベンチマークとか取ってみた。これでいいのかなー

環境はMac OS X 10.5.8, 2.4 GHz Intel Core 2 Duo, 4GM RAM.

Comparing with buffer size 10...
         Rate normal    aio
normal 80.0/s     --   -19%
aio    99.0/s    24%     --
Comparing with buffer size 100...
         Rate normal    aio
normal 80.0/s     --   -18%
aio    97.1/s    21%     --
Comparing with buffer size 1000...
         Rate normal    aio
normal 76.9/s     --   -13%
aio    88.5/s    15%     --
Comparing with buffer size 10000...
         Rate normal    aio
normal 52.4/s     --   -27%
aio    71.9/s    37%     --
Comparing with buffer size 100000...
         Rate normal    aio
normal 15.9/s     --   -63%
aio    42.7/s   169%     --

コードはこちら、githubで。
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr



例えば、AnyEvent::DBIで Q4Mを使って、*常に*なんらかのイベントをqueue_wait()する状態にしたいとする。するとexec()が終わったらまた同じexec()を呼ぶ事になるので、例えばこんなコードを書くかもしれない。

    use strict;
    use AnyEvent::DBI;

    my $sql = "SELECT ....  FROM queue_table WHERE queue_wait('queue_table', 10)";
    my $dispatch;
    $dispatch = sub {
         $dbh->exec( $sql, sub {
              # 返ってきた値でなんかする
              $dispatch->();
         });
    };

    $dispatch->();

    # まぁ他の事してるプログラムで使うだろうから、本来はいらないけど・・・
    AE::cv->recv;
でもこれだと再帰的に exec()を呼んで、さらにまた$dispatchからexecを呼んで・・・って事になってしまうよね。これを無くしたい。検証のためにもっとも単純なクロージャを使った再帰呼び出しをまず書いてみる。ここではスタックの深さをついでに表示しておく:
use strict;
 
my $dispatch; $dispatch = sub {
    my $i = 0;
    while(1) {
        my @caller = caller($i++);
        # noop
        last if ! scalar @caller;
    }
    print "$i\n";
    $dispatch->();
};

$dispatch->();
このコードを実行すると、エラーを起こすまで、少しずつ表示される数が大きくなっていく。これは再帰的に関数を呼び出し続けている数なわけで、どこかの時点でdeep recursion云々言われる。当たり前だよね。

さて、ではこれをAnyEventを使ってる場合はどうするか。まずひとつ簡単な方法としては idleウォッチャーを設定する事ができる:
use strict;
use AnyEvent;

my $idle;
my $dispatch; $dispatch = sub {
    my $i = 0;
    while(1) {
        my @caller = caller($i++);
        # noop
        last if ! scalar @caller;
    }
    print "$i\n";
    $idle = AE::idle $dispatch;

};

$idle = AE::idle $dispatch;

AE::cv->recv;
このコードを実行した場合、自分の環境では"6"が繰り返し表示されるだけ。idle()はイベントループが「次にビジーでない状態」に与えられた関数を実行するウォッチャーなので、続々と実行してくれるわけです。

「次にビジーでない状態」という曖昧な状態ではなく、確実にすぐ実行したい場合はタイマーを使ってもよいかも。この場合最初の引数に0を渡しておいてやれば、とにかく次の機会にすぐクロージャを実行してくれる。この場合も自分の環境では"6"が繰り返し表示される:
use strict;
use AnyEvent;

my $count = 1;
my $w;
my $dispatch; $dispatch = sub {
    my $i = 0;
    while(1) {
        my @caller = caller($i++);
        # noop
        last if ! scalar @caller;
    }
    print "$i\n";
    $w = AE::timer 0, 0, $dispatch;
};

$w = AE::timer 0, 0, $dispatch;

AE::cv->recv;
というわけで、直接Perlの関数を呼び出すのではなく、イベントループに一旦預ける事によって再帰的呼び出しを続けるのを減らすことができるのです。
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr

# 追記: 2010 19 Feb - このコード何回も書くの面倒くさくなったので、
# CPANにモジュールアップロードしてしまいました
# http://search.cpan.org/dist/AnyEvent-FIFO/

まぁ考えてみれば単純な話ですけど、いわゆるGuardを使うとよいです。参考はAnyEvent::HTTP

コールバック$cbの最初の引数をguard変数にしておき、キューの1スロットを使用している間はこのguard変数をundefしないようにしておく感じです。
   use strict;
   use AnyEvent;
   use AnyEvent::Util;

   my @q; # 実際にコールバックを入れておくところ
   my $ACTIVE = 0; # 現在の使用中スロット
   my $MAX_ACTIVE = 1; # 最大何個のコールバックを「同時」に行うか(もちろん本当に同時じゃないよ!)

  sub drain_queue {
      while ( @q && $ACTIVE < $MAX_ACTIVE ) {
          if (my $cb = shift @q) {
              $ACTIVE++;
              $cb->( AnyEvent::Util::guard {
                  $ACTIVE--;
                  drain_queue();
              });
          }
      }
  }
デモ程度に 1から10までの数字を(最低)0.5秒あけてから表示するコード:
my $cv = AE::cv;

# XXX - 12/1: よくよく考えたらここはbegin()/end()使うところだったので
# 修正しておいた
for my $x (1..10) {
    $cv->begin;
    push @q, sub {
        my $guard = shift;
        my $w; $w = AE::timer 0.5, 0, sub {
            undef $guard;
            undef $w;
            warn $x;
            $cv->end;
        };
    };
}

drain_queue();

$cv->recv;
これ以外の実装の仕方もあるんだけれども、これだと次のqueue popとかを自前で呼ぶ必要がないのが素敵です。
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr

Morris君の中身を書き換えてAnyEvent::DBIにしてみたわけだが、一個だけはまった部分があったので、メモっておく。

例えば以下のようなコードをAnyEvent::DBIを使うように変更したいとする。
use strict; use DBI; my $dbh = DBI->connect('dbi:SQLite:dbname=test.db'); $dbh->do( "CREATE TABLE IF NOT EXIST t1 (c1 int not null, c2 int not null)"); $dbh->do( "DELETE FROM t1"); for my $i (0..10) { $dbh->do( "INSERT INTO t1 (c1, c2) VALUES (?, ?)", $i, $i * 2); } my $sth = $dbh->prepare("SELECT * FROM t1"); # まぁ本当ならbind_columns使うけど while ( my $row = $sth->fetchrow_arrayref ) ) { print "c1 = $row->[0], c2 = $row->[1]\n"; }
これをまずさくっと以下のようにすると動かない
use strict; use AnyEvent::DBI; my $dbh = AnyEvent::DBI->new( "dbi:SQLite:dbname=test.db", undef, undef, undef, exec_server => 1, ); $dbh->exec( "CREATE TABLE IF NOT EXIST t1 (c1 int not null, c2 int not null)"); $dbh->exec( "DELETE FROM t1"); for my $i (0..10) { $dbh->exec( "INSERT INTO t1 (c1, c2) VALUES (?, ?)", $i, $i * 2); } my $cv = AnyEvent->condvar; $dbh->exec("SELECT * FROM t1", sub { my ($dbh, $rows, $rv) = @_; foreach my $row (@$rows) { print "c1 = $row->[0], c2 = $row->[1]\n"; } $cv->send; }); $cv->recv;

理由はシンプル。exec()の最後の引数としてコールバックを指定していないから。もちろん、本来であれば一個一個戻り値とかを確認すべきなんだけど、とりあえず動かしたい場合とかあるじゃない。そういう場合に結構はまりがちなので、ここは何もしない空のコールバックを用意する
use strict; use AnyEvent::DBI; my $noop = sub {}; my $dbh = AnyEvent::DBI->new( "dbi:SQLite:dbname=test.db", undef, undef, undef, exec_server => 1, ); $dbh->exec( "CREATE TABLE IF NOT EXIST t1 (c1 int not null, c2 int not null)", $noop); $dbh->exec( "DELETE FROM t1", $noop); for my $i (0..10) { $dbh->exec( "INSERT INTO t1 (c1, c2) VALUES (?, ?)", $i, $i * 2, $noop); } my $cv = AnyEvent->condvar; $dbh->exec("SELECT * FROM t1", sub { my ($dbh, $rows, $rv) = @_; foreach my $row (@$rows) { print "c1 = $row->[0], c2 = $row->[1]\n"; } $cv->send; }); $cv->recv;
ちょっぴり面倒くさいね!
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr


Moose + POEなIRCボット作ろうと前に思い立って、結構長いこと弊社内で使ってたんですが、このたびAnyEvent化して色々モダナイズしました。やっぱり非同期なhttp_getしてる部分とかは抜群に速いなぁ。

AnyEvent したついでにAnyEvent::MPにも対応したですよ。POE::IKCと違って、一旦セットアップしちゃえば、Morrisの外から何かをポストするのにコマンドラインから一発でできる;
aemp morris privmsg "#channel" "Your comment"
ただMPってセットアップが結構面倒くさいのね。以下はまったり考え込んだりしてしまったところ:

  • 最初動かない時になにやってんんだかわからないので、PERL_ANYEVENT_MP_TRACEとPERL_ANYEVENT_MP_WARNLEVELを設定したほうがいい。後者は10くらいにしないとなんも有用な事を言ってくれない。
  • seedの役割をするプロセスが必要。Morris自身がseedになれないかなーと思ってやってみたけど、トレースで「refuses to talks to myself」みたいなこと言われた。
  • .perl-anyevent-mpファイルに受け側のbind情報を書いておいたほうがデバッグは楽かも。実際に動かす段階ではどのポートでも使えるようにしたほうが簡単かもしれない。
  • rcv/sndとportの使い方、シンタックスエラーにならない限り間違った使い方をしていてもなんの注意もしてくれないので、よくわかってない間はAnyEvent::MPディストリビューションの eg/内を見たほうがいいかもしれない。
  • MPやAnyEvent::IRCのようなネットワーク接続型のAnyEventモジュールを使っていると環境によっては"Out of memory!"エラーが出てくる事がある。もしバックエンドを特に指定しないでAnyEventを動かしている場合は、素直にEV.pmをアップグレードしてみるとよい。なぜか直る。
以上。Morris君は結構お気に入りなのであった。
    このエントリーをはてなブックマークに追加 mixiチェック Share on Tumblr

このページのトップヘ