ちょっとだけ躓いたりしたので、備忘録のため。AnyEventでData::MessagePackを受け取るサーバー
use strict; use AnyEvent; use AnyEvent::Socket; use Data::MessagePack; use Data::Dumper; main(); sub main { my $host = undef; my $port = 8888; my $guard = tcp_server $host, $port, sub { my ($fh) = @_; handle($fh); }; my $cv = AE::cv; my $w; $w = AE::signal 'INT' => sub { undef $w; undef $guard; $cv->send; }; $cv->recv; } sub handle { my $fh = shift; my $packer = Data::MessagePack::Unpacker->new; my $buf = ''; my $offset = 0; my $w; $w = AE::io $fh, 0, sub { my $n = sysread $fh, $buf, 65536, length $buf; if ( $n == 0 ) { undef $w; } while (length $buf > 0) { $offset = $packer->execute( $buf, $offset ); if (! $packer->is_finished) { last; } warn Dumper( $packer->data ); substr( $buf, 0, $offset, '' ); $offset = 0; $packer->reset; } }; my $s; $s = AE::signal INT => sub { undef $w; undef $s; }; }
適当なクライアントスクリプト
use strict; use Data::MessagePack; use AnyEvent; use AnyEvent::Handle; use AnyEvent::Socket; use AnyEvent::Util; my $count = shift @ARGV || 100; my $cv = AE::cv; my $w; $w = tcp_connect '127.0.0.1' => '8888' => sub { my $fh = shift; AnyEvent::Util::fh_nonblocking($fh, 1); my $i = 0; my $h = AnyEvent::Handle->new(fh => $fh); my $next = sub { $cv->begin; $h->push_write( Data::MessagePack->pack({ foo => $i }) ); }; $next->(); $h->on_drain(sub { my $h = shift; $cv->end; if (++$i < $count) { $next->(); } else { undef $w; } }); }; $cv->recv;