少しずつ条件が違うクエリを複数非同期で投げて、集約するモジュールを書いてみたんだけど、名前とかがイマイチ決まらない。なんかいいアイデアありますかねぇ。それともこれって使う条件が特殊すぎてモジュールにするにはアレかな。一応これをベースにして、PoCo::Server::TCPみたいなのとつなげて検索サーバーにしたてあげようと思ってるんだけど。

使い方はこんな感じ

# initialize... POE::Component::MultiDB::DBIC->spawn( Alias => $alias, ConnectInfo => [ .... ], SchemaClass => 'MyApp::Schema' ); # some where else... $_[KERNEL]->post( $alias, 'search_multi', { args => [ { where => $where1, attrs => $attrs1 }, { where => $where1, attrs => $attrs1 }, { where => $where1, attrs => $attrs1 }, ], aggregator => $_[SESSION]->postback('aggregate'), finalizer => $_[SESSION]->postback('finalize'), });

ソースコードはこんな感じ。

package POE::Component::MultiDB::DBIC; use strict; use warnings; use POE qw(Component::Generic); use Digest::MD5(); use Data::Dumper(); use constant DEBUG => 0; sub spawn { my $class = shift; my $args = { @_ }; my $alias = delete $args->{Alias}; Carp::croak "$class->spawn() requires an even number of arguments" if (@_ & 1); my @db; for my $id (1..10) { push @db, POE::Component::Generic->spawn( debug => DEBUG, package => 'POE::Component::MultiDB::DBIC::db', object_options => [ schema_class => $args->{SchemaClass}, connect_info => $args->{ConnectInfo} ], packages => { 'POE::Component::MultiDB::DBIC::db' => { postbacks => [ 'search', ] } } ); } POE::Session->create( heap => { db => \@db }, inline_states => { _start => sub { $_[KERNEL]->alias_set($alias) }, _stop => \&_stop, search => \&_search, search_multi => \&_search_multi, aggregate => \&_aggregate, } ); } sub _stop { my ($kernel, $heap) = @_[KERNEL, HEAP]; my $dblist = delete $heap->{db}; foreach my $db (@$dblist) { $kernel->post($db->session_id, 'shutdown'); } $kernel->alias_remove( $kernel->alias_list ); } sub _signature { local $Data::Dumper::Indent = 1; local $Data::Dumper::Terse = 1; local $Data::Dumper::Sortkeys = 1; Digest::MD5::md5_hex( Data::Dumper::Dumper(\@_) ); } sub _search_multi { my($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; my $aggregator = $args->{aggregator}; my $finalizer = $args->{finalizer}; my $search_args = $args->{args}; my $query_id = _signature('search', $args, $$, {}, time()); my %meta = ( aggregator => $aggregator, finalizer => $finalizer, queries => {} ); $heap->{active_queries} ||= {}; $heap->{active_queries}->{$query_id} = \%meta; my $dblist = $heap->{db}; my $count = 0; foreach my $sa (@$search_args) { my $db_idx = $count % scalar(@$dblist); my $db = $dblist->[$db_idx]; my $where = $sa->{where}; my $attrs = $sa->{attrs}; my $id = join('.', $query_id, ++$count); $meta{queries}{ $id } = 1; if (DEBUG) { print STDERR " ---- Sending query $id ----\n"; } $db->search( { session => $session->ID, event => 'aggregate', query_id => $query_id, id => $id, }, $where, $attrs ); } } sub _search { my($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0]; my $sa = { where => delete $args->{where}, attrs => delete $args->{attrs} }; $heap->{args} = [ ($sa) x scalar (@{$heap->{db}}) ]; _search_multi(@_); } sub _aggregate { my ($kernel, $ref, $result, $heap) = @_[KERNEL, ARG0, ARG1, HEAP]; my $meta = $heap->{active_queries}->{$ref->{query_id}}; my $map = $meta->{queries}; delete $map->{ $ref->{id} }; if (my $aggregator = $meta->{aggregator}) { $aggregator->($result, $ref); } if (keys %$map == 0 && $meta->{finalizer}) { $meta->{finalizer}->(); } } package POE::Component::MultiDB::DBIC::db; use strict; use warnings; use UNIVERSAL::require; sub new { my $class = shift; my %args = @_; my $schema_class = $args{schema_class}; $schema_class->require or die; my $schema = $schema_class->connect(@{ $args{connect_info} }); bless { schema => $schema }, $class; } sub search { my ($self, $where, $attrs) = @_; my $ret = [ $self->{schema}->resultset('Movie')->search( ref $where eq 'HASH' ? $where : undef, ref $attrs eq 'HASH' ? $attrs : { rows => 1 } ) ]; return $ret; } 1;