Wait the light to fall

Threading Nqp Through a Channel

焉知非鱼

categories: [“Raku”] #

考虑到 nqp 比普通 Perl 6 和线程快,两者结合起来应该会给我们带来不错的速度。在上一篇文章中承诺的使用 Supply 并没有真正的帮助。emit 会阻塞,直到 Supply 的内部队列被清空。如果我们想递归处理文件,文件系统可能会在递归线程被解除阻塞后停滞。如果我们在消费者中对文件系统施加压力,我们最好使用一个能迅速填满文件路径的 Channel

让我们先模拟一个每时每刻都会停滞的消费者,并以 $cChannel

my @files;
react {
    whenever $c -> $path {
        @files.push: $path;
        sleep 1 if rand < 0.00001;
    }
}

如果我们尽可能快地抽出路径,我们可以填满相当多的 RAM,并给 CPU 缓存带来很大的压力。经过一些试验和错误,我发现当有超过64个工作线程等待被放到机器线程上时,在通道上的 .send 之前 sleep 会有帮助。这些信息可以通过 Telemetry::Instrument::ThreadPool::Snap.new<gtq> 访问。

my $c = Channel.new;
start {
    my @dirs = '/snapshots/home-2019-01-29';
    while @dirs.shift -> str $dir {
        my Mu $dirh := nqp::opendir(nqp::unbox_s($dir));
        while my str $name = nqp::nextfiledir($dirh) {
            next if $name eq '.' | '..';
            my str $abs-path = nqp::concat( nqp::concat($dir, '/'), $name);
            next if nqp::fileislink($abs-path);
            if Telemetry::Instrument::ThreadPool::Snap.new<gtq> > 64 {
                say Telemetry::Instrument::ThreadPool::Snap.new<gtq>;
                say 'sleeping';
                sleep 0.1;
        }
        $c.send($abs-path) if nqp::stat($abs-path, nqp::const::STAT_ISREG);
        @dirs.push: $abs-path if nqp::stat($abs-path, nqp::const::STAT_ISDIR);
    }
    CATCH { default { put BOLD .Str, '', $dir, '' } }
    nqp::closedir($dirh); }
    $c.close;
}

睡眠0.1s再发送下一个值,有点天真。最好是观察等待的工人数量,只有当它下降到64以下时才继续。但这是不同模块的任务。在 Perl 6 中,我们并没有在具有阻塞性质的 Supply 和值泵送 Channel 之间找到一个中间地带。所以这样的模块可能会很有用。

但这还需要等待。在处理大的二进制文件时,我似乎踩到了 IO::Handle.read 的一个 bug。我们在 roast 上做了大量的测试来处理小数据。对大数据的处理并没有得到很好的测试,我不知道那里潜伏着什么怪物。

by gfldex.