Threading Nqp Through a Channel
— 焉知非鱼categories: [“Raku”] #
考虑到 nqp 比普通 Perl 6 和线程快,两者结合起来应该会给我们带来不错的速度。在上一篇文章中承诺的使用 Supply 并没有真正的帮助。emit
会阻塞,直到 Supply 的内部队列被清空。如果我们想递归处理文件,文件系统可能会在递归线程被解除阻塞后停滞。如果我们在消费者中对文件系统施加压力,我们最好使用一个能迅速填满文件路径的 Channel
。
让我们先模拟一个每时每刻都会停滞的消费者,并以 $c
为 Channel
。
my @files;
react {
whenever $c -> $path {
@files.push: $path;
sleep 1 if rand < 0.00001;
}
}
如果我们尽可能快地抽出路径,我们可以填满相当多的 RAM,并给 CPU 缓存带来很大的压力。经过一些试验和错误,我发现当有超过64个工作线程等待被放到机器线程上时,在通道上的 .send
之前 sleep
会有帮助。这些信息可以通过 Telemetry::Instrument::ThreadPool::Snap.new<gtq>
访问。
my $c = Channel.new;
start {
my @dirs = '/snapshots/home-2019-01-29';
while @dirs.shift -> str $dir {
my Mu $dirh := nqp::opendir(nqp::unbox_s($dir));
while my str $name = nqp::nextfiledir($dirh) {
next if $name eq '.' | '..';
my str $abs-path = nqp::concat( nqp::concat($dir, '/'), $name);
next if nqp::fileislink($abs-path);
if Telemetry::Instrument::ThreadPool::Snap.new<gtq> > 64 {
say Telemetry::Instrument::ThreadPool::Snap.new<gtq>;
say 'sleeping';
sleep 0.1;
}
$c.send($abs-path) if nqp::stat($abs-path, nqp::const::STAT_ISREG);
@dirs.push: $abs-path if nqp::stat($abs-path, nqp::const::STAT_ISDIR);
}
CATCH { default { put BOLD .Str, ' ⟨', $dir, '⟩' } }
nqp::closedir($dirh); }
$c.close;
}
睡眠0.1s再发送下一个值,有点天真。最好是观察等待的工人数量,只有当它下降到64以下时才继续。但这是不同模块的任务。在 Perl 6 中,我们并没有在具有阻塞性质的 Supply
和值泵送 Channel
之间找到一个中间地带。所以这样的模块可能会很有用。
但这还需要等待。在处理大的二进制文件时,我似乎踩到了 IO::Handle.read
的一个 bug。我们在 roast 上做了大量的测试来处理小数据。对大数据的处理并没有得到很好的测试,我不知道那里潜伏着什么怪物。
by gfldex.