D-7 <altijd in beweging>

Day to day life of a Perl/Go/C/C++/whatever hacker. May include anything from tech, food, and family.

2011年09月

目的:オブジェクトストレージの旧ストレージサーバーから新ストレージサーバーにデータを引っ越しすること。rsyncやディスクを単純に交換するという方法も考えたが、ついでにデータのリバランスを行いたいので、ツールで移行する方法を採る。

概要:オブジェクトとは、ユーザーから見る「1ファイル」で、この1オブジェクトに付き、1個かそれ以上の実体(entity)が存在する。1ストレージには任意のオブジェクトの実体が最大1個保存される。この実体が複数あることによって、ストレージがダウンしたとしてもオブジェクト自体は生き残る事ができる。複数の実体がある場合は必ず複数のストレージサーバーに実体のコピーが保存されている。これを1ストレージサーバーずつ、新ストレージサーバーのほうに移動する

・・・という命題で、データの大移動を行っているのだが、ここでの顛末をつらつらと書く。

まずとりあえず基本的な構成を作るために、シングルプロセスで直線的に任意のストレージに格納されている実体を転送するプログラムから手をつけ始めた:基本的にはオブジェクトID単位で実体を取得し、必要なコピーの数を把握してから、その分新ストレージに転送、DBにデータ作成、という感じ。
use strict;
use DBI;
use Furl;

my $dbh = DBI->connect( ... );
my $sth  = $dbh->prepare( "SELECT object_id ... WHERE storage_id = ? ORDER BY object_id DESC" );


my $furl = Furl::HTTP->new;
my $object_id;
$sth->bind_columns( \($object_id) );
while ( $sth->fetchrow_arrayref ) {
     # XXX ERROR CHECKING et al IS DELIBERATELY SKIPPED IN THIS EXAMPLE
     my $url = make_url( $object_id, $storage_id );
     my @res = $furl->get( $url );

     my $count = get_needed_copy_count( $object_id );
     my @storages = get_destination_storages( $object_id );
     while ( $count > 0 && @storages ) {
           my $new_storage = shift @storages;
           if ( migrate_to_storage( $furl, $object_id, $res[4], $new_storage ) ) {
                $count--;
           }
     }
     if ($count > 0) {
           # Failed to copy?
     }
}

ちなみに各サーバーには実体が700万件から1500万件ほど格納されているので、当然これではおっつかない。もう少し頭を良くしないといけない。で、当然まずマルチプロセスにするべきだとは思ったが、多分I/O待ち時間とかも発生することを考慮してほげほげしないといけないんだろうなぁと思い、I/Oの多重化から手をつける。

もちろんマルチプロセス化から先にしてもいいんだけど、経験上1プロセスがちゃんとできてからマルチプロセス化するのと、マルチプロセス化してから各プロセスをきちんと構成していくというのを比べると、先に1プロセスでちゃんと動くように書いてからのほうが実装が楽な気がしたのでまず先にこっちから。

やったことは2点:まずオブジェクトID をループで回しつつ、1オブジェクトID=1 Coroで処理開始。ただ、Coroを全てのオブジェクトに対して即座に作成してしまうとパンクしてしまうので制御が必要なのでCoro::Semaphoreで一度に生成できるCoroを制限してみた:
use strict;
use Coro;
use DBI;
use FurlX::Coro;

my $dbh = DBI->connect( ... );
my $sth  = $dbh->prepare( "SELECT object_id ... WHERE storage_id = ? ORDER BY object_id DESC" );

my $sem = Coro::Semaphore->new(10);
my $object_id;
$sth->bind_columns( \($object_id) );
while ( $sth->fetchrow_arrayref ) {
     my $guard = $sem->guard;
     async_pool {
          my $guard = shift;
          process( $object_id );
     } $guard;
}

Coro::AnyEvent::sleep 1 while $sem->count < 10; # WAIT


sub process {
    my $furl = FurlX::Coro::HTTP->new( ... );

}
慣れてないと気づかないのがループが終わった後もCoroが終了したかという確認をしないといけないこと。全てのオブジェクトIDをDBから持ってくるという処理が終わってても、HTTP操作とかをしているcoro自体はまだ作業中かもしれないからだ。

ここではCoro::AnyEventとか突然でてきて「?」状態かもしれないが、これには理由がある。最初はここで以下のようなコードを書いていた:
while ( ... ) {
      async_pool {
          ....
      }
}

cede while $sem->count < 10;
これ、要はCoro::Semaphoreが元の値(10)まで戻っていればビジーになってるcoroはいない、という状態を待ちつつ、自分以外のcoroにタイムスライスを譲ってる。・・・はずだった。

ところがこれが終了しないのだ。色々print文を仕込んでみたところFurlX::Coro::HTTP::do_select()のselectでがっつり止まってる。なんでやねん。

で、みてるとCoro::Select::selectはAnyEvent使ってて、ってことはCoro::AnyEventを使ってて・・・ってことは基本的にはまだ生きてるcoroにタイムスライスを渡してやればよくて・・・って色々試してみた。そしたらCoro::AnyEvent::poll; Coro::AnyEvent::scheduleあたりはSEGVで死んだのだけど、Coro::AnyEvent::sleep でうまくいった、というわけ。

で、ここまでできてれば今度はプロセスを分けて処理する。オブジェクトIDは64ビットのでっかい整数なので最初はまぁこいつを均等に分割してやればいいやーという感じで以下のようなコードを書いた

use Parallel::ForkManager;

my $WORKERS = 10;
my $pfm = Parallel::ForkManager->new($WORKERS);

my $max_object_id = get_max_object_id_in_storage( $storage_id );
my $min_object_id = get_min_object_id_in_storage( $storage_id );
my $diff = $max_object_id - $min_object_id;
if ( my $remainder = $diff % $WORKERS ) {
    $diff += $remainder;
}
my $segment_size = $diff / $WORKERS;


for my $i ( 1..$WORKERS ) {
     $min_object_id = $max_object_id - $segment_size;
     if (my $pid = $pfm->start) {
          $max_object_id = $min_object_id;
          next;
     }
     process( $max_object_id, $min_object_id );
     $pfm->finish;
}

sub process {
     my $dbh =  ...;
     my $sth = $dbh->prepare("SELECT ... FROM ... WHERE object_id <= ? and object_id > ?" );

     my $sem = Coro::Semaphore->new(10);
     $sth->execute( $max_object_id, $min_object_id );
     my $object_id;
     $sth->bind_columns( \($object_id) );
     while ( $sth->fetchrow_arrayref ) {
          my $guard = $sem->guard;
          async_pool {
               my ($guard, $object_id) = @_;
               ....
          } $guard, $object_id;
     }
}
これで10プロセスでそれぞれ最大coro数10で処理ができるようになる感じ。

・・・なんだけど、これ、オブジェクトIDの範囲が例えば1から100まであったとしても、そこに必ず50オブジェクト前後はいってる、とかじゃなくて、1から100までの間にはオブジェクトが1個、101から200までの間にはオブジェクトが99個、とかいう状態がありえるのですね。そうすると、オブジェクトIDの範囲で割っても無駄が多すぎて駄目。じゃあ今度は任意の数のオブジェクトを引っ張り出してきて、その数ごとにプロセスを立ち上げる、ってやらないといけないはずなのでそうした:
my $sth = $dbh->prepare( <<EOSQL );
    SELECT object_id FROM entity WHERE
          object_id <= ?
          ORDER BY object_id DESC
          LIMIT 5000,1
EOSQL

while( $loop ) {
    $sth->execute( $max_object_id );
    if ( my $pid = $pfm->start ) {
         $max_object_id = $min_object_id;
         next;
    }
    process( $max_object_id, $min_object_id );
    $pfm->finish;
}

とりあえずポイントとしてはcount(*)とかしたら負けなので、キーのLIMIT付きORDER BYで遷移していって、どんどん処理していく事。countしたら負けです!

これだけだとまだ運用上は問題があって、例えばプロセスを途中で殺すことになったらオブジェクトIDを最初からなめるのもあほらしいのでオブジェクトIDの範囲を指定したいし、同時に動くプロセス数やcoroの数も調節したいし・・・とかあるので、その辺りはMouseでオブジェクト化してdefaultやらなにやらでごにょごにょやってボチボチきれいにした感じ。5プロセス x 10 coroで古いサーバーはだいたいiowait 90%の所くらいまではいじめられるので、その辺りのパラメターは要調整。

あとは細かいところでは$0 に現在の状況とかを逐次入れていって、psした時に何がおきてるのかわかるようにした。これは結構重宝してる。

まぁ要はFurlX::Coro(というかCoro::Select)使ってる時にcedeだけに頼ってるとプロセス終了しねぇかもよ!っというのが最大の教訓かな。cedeしてるのにFurlX::Coroが返ってこない時は普通に絶望したけどなんとかなってよかった!

ちなみに同じツールで今度は数億オブジェクトくらいあるシステムも引っ越しします。数億だって。笑っちゃうね!
    このエントリーをはてなブックマークに追加 mixiチェック

Devel::Coverって便利な道具なんですが、なんかfork + execするとうまく動きませんでした。ずっと見て見ぬ振りしてたんだけど、そういうわけにもいかなくなったので、二日ほどずっとその挙動をprintデバッグで追いかけていったら大体把握できた。

すごくはしょって言うと、Devel::Coverは裏のXSレベルでEND {} にあたる部分とかにコードを挿入してて、元のコードが

   #!perl
   exec "/bin/ls"
だったとしたら、Devel::Coverをuseすると概念的には

   #!perl
   END { do_interesting_stuff() }

   exec "/bin/ls"
という事をするようになる。PPの場合だとexec時にこのENDブロックはうまく無視されるんだけど、なんせDevel::Coverは無理矢理この動作を行っているので、ENDに当たる部分がexec()の直前に実行される。そしてもしここでdieとかCレベルでのエラーとかあるとしずかーに落ちてしまう。

ということで、自分の環境では以下のレポジトリの変更でこれが回避できた:


ちなみにこのあたりの修正ナシでTest::mysqldを使ったコードをDevel::Coverすると↑の問題にぶち当たる。本体のほうにも修正してもらうように言うつもりだけど、もし今困ってたらこれでなんとかなるはず。
    このエントリーをはてなブックマークに追加 mixiチェック

このページのトップヘ