Perlで並列処理

複数のファイルをダウンロードするとき、wget を使っているのですが、複数のファイルをダウンロードするときは、URLのリストをファイルにして、-i でwgetに食わせてました。でもこれだと順番に一個ずつダウンロードしていくので効率が悪いです。サイズの小さなファイルだと気にならないんですが・・・。

で、Perl の system関数で、複数のwgetプロセスを作れば・・・と考えました。けど、ダウンロードするファイル数と同じ数のプロセスを作ると効率が悪いし、ダウンロード先のサーバーに迷惑をかけてしまいます。ので、同時に起動するwgetの数を制限するようにしたらいいんじゃないかと。

起動するプロセス数を制限するには、Parallel::ForkManagerモジュールが簡単で便利でいいんですが・・・Windows環境では不自然・・・ということで、threadsthreads::shared モジュールを選択。

下記のような感じのPerlスクリプトをnohup コマンドで起動すれば、あとはターミナルを切断しても勝手にやってくれる。終わったら携帯にメールを投げるようなコードを付け足せば・・・より便利かなと。

#!/usr/bin/perl

####################################################
# parallel-wget.pl urllist1.txt urllist2.txt ...
#  '-'(ハイフン) を指定すると標準入力から読み込む
#
# Perl 5.8.8 で確認
####################################################
use strict;
use warnings;
use threads;
use threads::shared;
use IO::File;
use File::Basename;

#何個のWGETを起動するか = スレッドの個数
my $NUM = 4;

#wgetコマンド。パスが通ってない場合はフルパスを。
my $WGET_COMMAND = 'wget';

#WGETオプション引数の指定
my @WGET_OPTION = qw(-nd -a Thread%02d.log --content-disposition);
# sprintf関数に渡されます。 %02d には スレッド番号が入ります。

#スレッド間で共有する配列。
my @WGETS : shared;

#Startup code
&{sub
{
  @WGETS = get_wget_commandline(@_);

  my @threads;
  $NUM = scalar(@WGETS) if(@WGETS < $NUM);
  
  #わざわざ $i_ なんて使わず、$_を使えばいいところだけど、
  # $_ を使うと "Scalars leaked: 1" なんてエラーが出るので・・・。
  # ガーベージコレクタのせい? 出たり、出なかったりする。なんで?
  foreach my $i_(1..$NUM)
    {
      push @threads,threads->create(\&ThreadStart,$i_);
    }
  
  $_->join foreach(@threads);
  
}}(@ARGV);

#ワーカースレッド関数
sub ThreadStart
{
  #引数はスレッドの番号
  my $num = shift;
  
  local $| = 1;

  #スレッド間で共有された配列が空になるまでスレッドを回す。
  while(@WGETS)
    {
      my $wget_command_line;
      {
        lock(@WGETS);
        $wget_command_line = shift @WGETS;
      }
      system(sprintf($wget_command_line,$num)) if($wget_command_line);
    }

  print STDERR "Thread($num) terminated...\n";
}

#URLが列挙されているファイルからwgetコマンドラインを組み立て、
#それらを配列に格納して返す。
sub get_wget_commandline
{
  my @retval;

  #code here
  foreach my $ifile_(@_)
    {
      next if($ifile_ ne '-' && !(-e $ifile_));

      my $fin = ($ifile_ eq '-') ? IO::File->new_from_fd(fileno(STDIN),'<') : IO::File->new($ifile_);

      die "can not detect input stream...\n" unless($fin);

      my ($name,$dir,$suffix) = fileparse($ifile_,qr/\.[^\.]*/) if($ifile_ ne '-');

      $name = "wget_$$" unless($name);
      $name = qq("$name") if($name =~ /\s/);

      my $wget_command_line = join(' ',($WGET_COMMAND,@WGET_OPTION,"-P $name",''));
  
      while(my $line = $fin->getline)
        {
          #改行文字、空行を削除
          $line =~ s/[\r\n]+//;
          next if($line =~ /^\s*$/);
          
          push @retval,$wget_command_line . $line;
        }
    }

  return @retval;
}
__END__

Perlでスレッドはなんか怪しい挙動。

Scalars leaked: 1

CentOS環境では上記エラー(警告か?)が出るけど、Windows上のActive Perl では出ない。
foreachとかforのループで、$_を渡すと出たり出なかった・・・?よく分からん。

プロセスの仕組みが根本的に違うWindowsだと並列処理はスレッドベースになって複雑にならざるを得ないのが残念すね。

スレッドプールなモジュールをインストールしようと思ったんですが、単純な問題だったので、そこまでやる必要なかった感じ。

Perlで並列処理」への1件のフィードバック

  1. 匿名 のコメント:

    perlのスレッドのチュートリアルで参考にさせてもらいました。

    #my ($upstream, $cur_prime) = @_; # leakはここが原因。@_を使っているから?

    my $upstream = shift;
    my $cur_prime = shift;
    # shift を使うとleakうんぬんが消えました!

コメントは停止中です。