第十二天 - 在 Dancer 应用程序中使用 Minion

$work,我们使用 Dancer 构建了一个 API,用于生成 PDF 文档和 XML 文件。此 API 是保险登记系统的重要组成部分:生成 PDF 以立即在 Web 浏览器中传送给客户端,并且 XML 一旦可用就会立即传送给运营商。由于XML 通常需要花费大量时间来生成,因此在后台生成作业,以免在较长时间内占用应用程序服务器。完成后,开发了一个自行开发的流程管理系统,并通过 fork() 进程,跟踪其 pid,并希望我们以后可以成功获得完成的流程。

这种方法存在一些问题: - 它很脆弱 - 它不能扩展 - 作为开发人员搞砸某事太容易了

在2019年,我们不得不加大承担更大的工作量。目前的解决方案根本无法处理我们预计需要处理的工作量。z直到遇见 Minion

注意:本文中使用的技术与 Dancer 或 Dancer2 同样适用。

为何选择 Minion?

我们研究了 Minion 的几种替代品,包括 beanstalkd 和 celeryd。然而,使用其中任何一个意味着涉及我们已经过度征税的基础设施团队;使用 Minion 允许我们使用我的团队已经拥有的专业知识,而不必向其他人提供帮助。从开发的角度来看,使用 Perl 开发的产品为我们提供了最快的实施时间。

扩展我们现有的设置几乎是不可能的。处理我们分叉的进程所消耗的资源不仅难以处理,而且不可能在多个服务器上运行作业。从 Minion 开始也给了我们一个急需的机会来清理一些需要重构的代码。通过最少量的工作,我们能够清理我们的 XML 呈现代码并使其在 Minion 中运行。通过此清理,我们可以更轻松地获取有关XML呈现作业消耗了多少内存和 CPU 的信息。这些信息对于我们规划未来容量至关重要。

进入 Minion

由于我们是一个舞蹈工作室,而不是 Mojolicious,因此我们可以从 Mojolicious 那里获得很多与 Minion 合作的便利。鉴于我们还在商业模式中共享一些基于 Minion 的代码,我们必须围绕 Minion 构建一些自己的管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package MyJob::JobQueue;

use Moose;
use Minion;

use MyJob::Models::FooBar;
with 'MyJob::Roles::ConfigReader';

has 'runner' => (
is => 'ro',
isa => 'Minion',
lazy => 1,
default => sub( $self ) {
$ENV{ MOJO_PUBSUB_EXPERIMENTAL } = 1;
Minion->new( mysql => MyJob::DBConnectionManager->new->get_connection_uri({
db_type => 'feeds',
io_type => 'rw',
}));
},
);

我们在 Minion 周围包含了一个简单的 Moose 类,以便使用我们想要的额外功能轻松添加到任何类或 Dancer 应用程序。

我们在作业未运行的某一点遇到了问题,因为我们将它们添加到没有工作程序配置为处理的队列中。为了防止再次发生这种情况,我们添加了代码以防止我们将代码添加到不存在的队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
my @QUEUE_TYPES = qw( default InstantXML PayrollXML ChangeRequest );

sub has_invalid_queues( $self, @queues ) {
return 1 if $self->get_invalid_queues( @queues );
return 0;
}

sub get_invalid_queues( $self, @queues ) {
my %queue_map;
@queue_map{ @QUEUE_TYPES } = ();
my @invalid_queues = grep !exists $queue_map{ $_ }, @queues;
return @invalid_queues;
}

有了这个,如果开发人员试图将作业添加到无效队列,我们​​的 queue_job() 方法很容易抛出错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
sub queue_job( $self, $args ) {
my $job_name = $args->{ name } or die "queue_job(): must define job name!";
my $guid = $args->{ guid } or die "queue_job(): must have GUID to process!";
my $title = $args->{ title } // $job_name;
my $queue = $args->{ queue } // 'default';
my $job_args = $args->{ job_args };

die "queue_job(): Invalid job queue '$queue' specified" if $self->has_invalid_queues( $queue );

my %notes = ( title => $title, guid => $guid );

return $self->runner->enqueue( $job_name => $job_args => { notes => \%notes, queue => $queue });
}

创建 jobs

在我们的基础模型类(基于 Moose)中,我们将为我们的作业运行器创建一个属性

1
2
3
4
5
6
has 'job_runner' => (
is => 'ro',
isa => 'Empowered::JobQueue',
lazy => 1,
default => sub( $self ) { return Empowered::JobQueue->new->runner; },
);

在模型本身中,创建新的可排队任务非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$self->runner->add_task( InstantXML =>
sub( $job, $request_path, $guid, $company_db, $force, $die_on_error = 0 ) {
$job->note(
request_path => $request_path,
feed_id => 2098,
group => $company_db,
);
MyJob::Models::FooBar->new( request_path => $request_path )->generate_xml({
pdf_guid => $guid,
group => $company_db,
force => $force,
die_on_error => $die_on_error,
});
});

运行 jobs

从 Dancer 开始工作非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
use Dancer2;
use MyJob::JobQueue;

sub job_queue {
return MyJob::JobQueue->new;
}

get '/my/api/route/:guid/:group/:force' => sub {
my $guid = route_parameters->get( 'guid' );
my $group = route_parameters->get( 'group' );
my $force = route_parameters->get( 'force' );

debug "GENERATING XML ONLY FOR $guid";
job_queue->queue_job({
name => "InstantXML",
guid => $guid,
title => "Instant XML Generator",
queue => 'InstantXML',
job_args => [ $self->request_path, $guid, $group, $force ],
});
}

创建和配置作业队列工作者

我们希望在一个位置轻松配置所有主机和环境的 Minions。由于我们在 Dancer 中使用了大量的 YAML,因此在 YAML 中指定 Minion 配置对我们来说很有意义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# What port does the dashboard listen on?
dashboard_port: 4000

# Add the rest later.
dashboards:
UNKNOWN: http://localhost:3000/
DEV: http://my.development.host.tld:8001/

# Hosts that have no entry assume the default configuration
default:
max_children: 4
queues:
- default

# Host-specific settings
jcrome-precision-3510:
max_children: 8
queues:
- default
- InstantXML
- PayrollXML
- ChangeRequest

我们的排队工作人员看起来像:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env perl

use MyJob::Base;
use MyJob::JobQueue;
use MyJob::Log4p;
use MyJob::Util::Logger;
use MyJob::Util::SysTools qw(get_hostname);

my $config = MyJob::Config->new->config;
my $hostconfig = get_hostconfig();
my $minion = MyJob::JobQueue->new;
my $worker = $minion->runner->worker;

my $log_eng = MyJob::Log4p->new({ logger_name => "Minion" });
my $logger = MyJob::Util::Logger->new->logger($log_eng);

以上是我们的典型样板。阅读我们的配置,并创建一个工人可以使用的记录器。

接下来,当作业出列时,我们要记录工作人员选择了一个作业(审计目的所需),并且我们更改了进程名称,因此如果进程挂起,我们就知道该进程正在尝试运行什么。如果作业中发生未经检查的异常,则工作人员将捕获该异常并将其记录给我们:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$worker->on( dequeue => sub( $worker, $job ) {
my $id = $job->id;
my $notes = $job->info->{ notes };
my $title = $notes->{ title };
my $guid = $notes->{ guid };

$job->on( spawn => sub( $job, $pid ) {
$0 = "$title $guid";
$logger->info( "$title: Created child process $pid for job $id by parent $$ - $guid");
});

$job->on( failed => sub( $job, $error ) {
chomp $error;
$logger->error( $error );
});
});

为了帮助我们进行未来的容量规划,我们希望我们的工作人员告诉我们他们是否正在以峰值容量运行,因此请在发生此事件时进行记录:

1
2
3
4
$worker->on( busy => sub( $worker ) {
my $max = $worker->status->{ jobs };
$logger->log( "$0: Running at capacity (performing $max jobs)." );
});

现在,我们将配置(如下所示)应用于 worker。当 worker 启动时,它会告诉我们有关它是如何配置的信息(这在开发期间非常有用):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
my $max_jobs = $hostconfig->{ max_children };
my @queues = @{ $hostconfig->{ queues }};

if( $minion->has_invalid_queues( @queues ) ){
print "Invalid job queues specified: " . join( ',', $minion->get_invalid_queues( @queues ) );
say ". Aborting!";
exit 1;
}

say "Starting Job Queue Worker on " . get_hostname();
say "- Configured to run a max of $max_jobs jobs";
say "- Listening for jobs on queues: ", join(', ', @queues );
$worker->status->{ jobs } = $max_jobs;
$worker->status->{ queues } = \@queues;
$worker->run;

还记得我们上面配置的 YAML 文件吗?最后一位为该工作者运行的主机提取信息( get_hostname() 是本地主机名函数):

1
2
3
4
5
6
7
8
9
10
sub get_hostconfig {
my $minion_config = MyJob::Config->new({ filename => "environments/minions.yml" })->config;
my $hostname = get_hostname();

if( exists $minion_config->{ $hostname }) {
return $minion_config->{ $hostname };
} else {
return $minion_config->{ default };
}
}

监督工人

我们的 Minion 仪表板几乎与 @preaction 在 Who Watches the Minions 中发布的仪表板完全相同。如果您想了解更多,我强烈建议您阅读他的文章。

结果

在大约两周的时间内,我们从没有对 Minion 的实际知识到开始运行。我们在此过程中做了一些改进和改进,但快速转变是与 Minion 合作简单的真实证明。

我们现在已经拥有了所有必要的部分来横向和纵向扩展 XML 渲染:感谢 Minion,我们可以轻松地跨多个框运行 XML 作业,并且可以在以前的同一硬件上更高效地同时运行更多作业。这种设置使我们能够像客户群一样快速增长。

进一步阅读