ちょっとだけ躓いたりしたので、備忘録のため。AnyEventでData::MessagePackを受け取るサーバー
use strict;
use AnyEvent;
use AnyEvent::Socket;
use Data::MessagePack;
use Data::Dumper;
main();
sub main {
my $host = undef;
my $port = 8888;
my $guard = tcp_server $host, $port, sub {
my ($fh) = @_;
handle($fh);
};
my $cv = AE::cv;
my $w; $w = AE::signal 'INT' => sub {
undef $w;
undef $guard;
$cv->send;
};
$cv->recv;
}
sub handle {
my $fh = shift;
my $packer = Data::MessagePack::Unpacker->new;
my $buf = '';
my $offset = 0;
my $w; $w = AE::io $fh, 0, sub {
my $n = sysread $fh, $buf, 65536, length $buf;
if ( $n == 0 ) {
undef $w;
}
while (length $buf > 0) {
$offset = $packer->execute( $buf, $offset );
if (! $packer->is_finished) {
last;
}
warn Dumper( $packer->data );
substr( $buf, 0, $offset, '' );
$offset = 0;
$packer->reset;
}
};
my $s; $s = AE::signal INT => sub {
undef $w;
undef $s;
};
}
適当なクライアントスクリプト
use strict;
use Data::MessagePack;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Util;
my $count = shift @ARGV || 100;
my $cv = AE::cv;
my $w; $w = tcp_connect '127.0.0.1' => '8888' => sub {
my $fh = shift;
AnyEvent::Util::fh_nonblocking($fh, 1);
my $i = 0;
my $h = AnyEvent::Handle->new(fh => $fh);
my $next = sub {
$cv->begin;
$h->push_write( Data::MessagePack->pack({ foo => $i }) );
};
$next->();
$h->on_drain(sub {
my $h = shift;
$cv->end;
if (++$i < $count) {
$next->();
} else {
undef $w;
}
});
};
$cv->recv;