少しずつ条件が違うクエリを複数非同期で投げて、集約するモジュールを書いてみたんだけど、名前とかがイマイチ決まらない。なんかいいアイデアありますかねぇ。それともこれって使う条件が特殊すぎてモジュールにするにはアレかな。一応これをベースにして、PoCo::Server::TCPみたいなのとつなげて検索サーバーにしたてあげようと思ってるんだけど。
使い方はこんな感じ
# initialize...
POE::Component::MultiDB::DBIC->spawn(
Alias => $alias,
ConnectInfo => [ .... ],
SchemaClass => 'MyApp::Schema'
);
# some where else...
$_[KERNEL]->post( $alias, 'search_multi', {
args => [
{ where => $where1, attrs => $attrs1 },
{ where => $where1, attrs => $attrs1 },
{ where => $where1, attrs => $attrs1 },
],
aggregator => $_[SESSION]->postback('aggregate'),
finalizer => $_[SESSION]->postback('finalize'),
});
ソースコードはこんな感じ。
package POE::Component::MultiDB::DBIC;
use strict;
use warnings;
use POE qw(Component::Generic);
use Digest::MD5();
use Data::Dumper();
use constant DEBUG => 0;
sub spawn
{
my $class = shift;
my $args = { @_ };
my $alias = delete $args->{Alias};
Carp::croak "$class->spawn() requires an even number of arguments" if (@_ & 1);
my @db;
for my $id (1..10) {
push @db, POE::Component::Generic->spawn(
debug => DEBUG,
package => 'POE::Component::MultiDB::DBIC::db',
object_options => [ schema_class => $args->{SchemaClass}, connect_info => $args->{ConnectInfo} ],
packages => {
'POE::Component::MultiDB::DBIC::db' => {
postbacks => [
'search',
]
}
}
);
}
POE::Session->create(
heap => { db => \@db },
inline_states => {
_start => sub { $_[KERNEL]->alias_set($alias) },
_stop => \&_stop,
search => \&_search,
search_multi => \&_search_multi,
aggregate => \&_aggregate,
}
);
}
sub _stop
{
my ($kernel, $heap) = @_[KERNEL, HEAP];
my $dblist = delete $heap->{db};
foreach my $db (@$dblist) {
$kernel->post($db->session_id, 'shutdown');
}
$kernel->alias_remove( $kernel->alias_list );
}
sub _signature
{
local $Data::Dumper::Indent = 1;
local $Data::Dumper::Terse = 1;
local $Data::Dumper::Sortkeys = 1;
Digest::MD5::md5_hex( Data::Dumper::Dumper(\@_) );
}
sub _search_multi
{
my($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
my $aggregator = $args->{aggregator};
my $finalizer = $args->{finalizer};
my $search_args = $args->{args};
my $query_id = _signature('search', $args, $$, {}, time());
my %meta = (
aggregator => $aggregator,
finalizer => $finalizer,
queries => {}
);
$heap->{active_queries} ||= {};
$heap->{active_queries}->{$query_id} = \%meta;
my $dblist = $heap->{db};
my $count = 0;
foreach my $sa (@$search_args) {
my $db_idx = $count % scalar(@$dblist);
my $db = $dblist->[$db_idx];
my $where = $sa->{where};
my $attrs = $sa->{attrs};
my $id = join('.', $query_id, ++$count);
$meta{queries}{ $id } = 1;
if (DEBUG) {
print STDERR " ---- Sending query $id ----\n";
}
$db->search(
{
session => $session->ID,
event => 'aggregate',
query_id => $query_id,
id => $id,
},
$where,
$attrs
);
}
}
sub _search
{
my($kernel, $session, $heap, $args) = @_[KERNEL, SESSION, HEAP, ARG0];
my $sa = { where => delete $args->{where}, attrs => delete $args->{attrs} };
$heap->{args} = [ ($sa) x scalar (@{$heap->{db}}) ];
_search_multi(@_);
}
sub _aggregate
{
my ($kernel, $ref, $result, $heap) = @_[KERNEL, ARG0, ARG1, HEAP];
my $meta = $heap->{active_queries}->{$ref->{query_id}};
my $map = $meta->{queries};
delete $map->{ $ref->{id} };
if (my $aggregator = $meta->{aggregator}) {
$aggregator->($result, $ref);
}
if (keys %$map == 0 && $meta->{finalizer}) {
$meta->{finalizer}->();
}
}
package POE::Component::MultiDB::DBIC::db;
use strict;
use warnings;
use UNIVERSAL::require;
sub new
{
my $class = shift;
my %args = @_;
my $schema_class = $args{schema_class};
$schema_class->require or die;
my $schema = $schema_class->connect(@{ $args{connect_info} });
bless { schema => $schema }, $class;
}
sub search
{
my ($self, $where, $attrs) = @_;
my $ret = [ $self->{schema}->resultset('Movie')->search(
ref $where eq 'HASH' ? $where : undef,
ref $attrs eq 'HASH' ? $attrs : { rows => 1 }
) ];
return $ret;
}
1;
コメント