Wait the light to fall

第十二天 - 在 Dancer 应用程序中使用 Minion

焉知非鱼

$work,我们使用 Dancer 构建了一个 API,用于生成 PDF 文档和 XML 文件。此 API 是保险登记系统的重要组成部分:生成 PDF 以立即在 Web 浏览器中传送给客户端,并且 XML 一旦可用就会立即传送给运营商。由于XML 通常需要花费大量时间来生成,因此在后台生成作业,以免在较长时间内占用应用程序服务器。完成后,开发了一个自行开发的流程管理系统,并通过 fork() 进程,跟踪其 pid,并希望我们以后可以成功获得完成的流程。

这种方法存在一些问题: - 它很脆弱 - 它不能扩展 - 作为开发人员搞砸某事太容易了

在2019年,我们不得不加大承担更大的工作量。目前的解决方案根本无法处理我们预计需要处理的工作量。z直到遇见 Minion

注意:本文中使用的技术与 Dancer 或 Dancer2 同样适用。

为何选择 Minion? #

我们研究了 Minion 的几种替代品,包括 beanstalkd 和 celeryd。然而,使用其中任何一个意味着涉及我们已经过度征税的基础设施团队;使用 Minion 允许我们使用我的团队已经拥有的专业知识,而不必向其他人提供帮助。从开发的角度来看,使用 Perl 开发的产品为我们提供了最快的实施时间。

扩展我们现有的设置几乎是不可能的。处理我们分叉的进程所消耗的资源不仅难以处理,而且不可能在多个服务器上运行作业。从 Minion 开始也给了我们一个急需的机会来清理一些需要重构的代码。通过最少量的工作,我们能够清理我们的 XML 呈现代码并使其在 Minion 中运行。通过此清理,我们可以更轻松地获取有关XML呈现作业消耗了多少内存和 CPU 的信息。这些信息对于我们规划未来容量至关重要。

进入 Minion #

由于我们是一个舞蹈工作室,而不是 Mojolicious,因此我们可以从 Mojolicious 那里获得很多与 Minion 合作的便利。鉴于我们还在商业模式中共享一些基于 Minion 的代码,我们必须围绕 Minion 构建一些自己的管道:

package MyJob::JobQueue;

use Moose;
use Minion;

use MyJob::Models::FooBar;
with 'MyJob::Roles::ConfigReader';

has 'runner' => (
    is      => 'ro',
    isa     => 'Minion',
    lazy    => 1,
    default => sub( $self ) {
        $ENV{ MOJO_PUBSUB_EXPERIMENTAL } = 1;
        Minion->new( mysql => MyJob::DBConnectionManager->new->get_connection_uri({
            db_type => 'feeds',
            io_type => 'rw',
        }));
    },
);

我们在 Minion 周围包含了一个简单的 Moose 类,以便使用我们想要的额外功能轻松添加到任何类或 Dancer 应用程序。

我们在作业未运行的某一点遇到了问题,因为我们将它们添加到没有工作程序配置为处理的队列中。为了防止再次发生这种情况,我们添加了代码以防止我们将代码添加到不存在的队列中:

my @QUEUE_TYPES = qw( default InstantXML PayrollXML ChangeRequest );

sub has_invalid_queues( $self, @queues ) {
    return 1 if $self->get_invalid_queues( @queues );
    return 0;
}

sub get_invalid_queues( $self, @queues ) {
    my %queue_map;
    @queue_map{ @QUEUE_TYPES } = ();
    my @invalid_queues = grep !exists $queue_map{ $_ }, @queues;
    return @invalid_queues;
}

有了这个,如果开发人员试图将作业添加到无效队列,我们​​的 queue_job() 方法很容易抛出错误:

sub queue_job( $self, $args ) {
    my $job_name = $args->{ name     } or die "queue_job(): must define job name!";
    my $guid     = $args->{ guid     } or die "queue_job(): must have GUID to process!";
    my $title    = $args->{ title    } // $job_name;
    my $queue    = $args->{ queue    } // 'default';
    my $job_args = $args->{ job_args };

    die "queue_job(): Invalid job queue '$queue' specified" if $self->has_invalid_queues( $queue );

    my %notes = ( title => $title, guid  => $guid );

    return $self->runner->enqueue( $job_name => $job_args => { notes => \%notes, queue => $queue });
}

创建 jobs #

在我们的基础模型类(基于 Moose)中,我们将为我们的作业运行器创建一个属性

has 'job_runner' => (
    is      => 'ro',
    isa     => 'Empowered::JobQueue',
    lazy    => 1,
    default => sub( $self ) { return Empowered::JobQueue->new->runner; },
);

在模型本身中,创建新的可排队任务非常简单:

$self->runner->add_task( InstantXML =>
    sub( $job, $request_path, $guid, $company_db, $force, $die_on_error = 0 ) {
        $job->note(
            request_path => $request_path,
            feed_id      => 2098,
            group        => $company_db,
        );
        MyJob::Models::FooBar->new( request_path => $request_path )->generate_xml({
            pdf_guid     => $guid,
            group        => $company_db,
            force        => $force,
            die_on_error => $die_on_error,
        });
});

运行 jobs #

从 Dancer 开始工作非常简单:

use Dancer2;
use MyJob::JobQueue;

sub job_queue {
    return MyJob::JobQueue->new;
}

get '/my/api/route/:guid/:group/:force' => sub {
    my $guid  = route_parameters->get( 'guid' );
    my $group = route_parameters->get( 'group' );
    my $force = route_parameters->get( 'force' );

    debug "GENERATING XML ONLY FOR $guid";
    job_queue->queue_job({
        name     => "InstantXML",
        guid     => $guid,
        title    => "Instant XML Generator",
        queue    => 'InstantXML',
        job_args => [ $self->request_path, $guid, $group, $force ],
    });
}

创建和配置作业队列工作者 #

我们希望在一个位置轻松配置所有主机和环境的 Minions。由于我们在 Dancer 中使用了大量的 YAML,因此在 YAML 中指定 Minion 配置对我们来说很有意义:

# What port does the dashboard listen on?
dashboard_port: 4000

# Add the rest later.
dashboards:
    UNKNOWN: http://localhost:3000/
    DEV: http://my.development.host.tld:8001/

# Hosts that have no entry assume the default configuration
default:
    max_children: 4
    queues:
        - default

# Host-specific settings
jcrome-precision-3510:
    max_children: 8
    queues:
        - default
        - InstantXML
        - PayrollXML
        - ChangeRequest

我们的排队工作人员看起来像:

#!/usr/bin/env perl

use MyJob::Base;
use MyJob::JobQueue;
use MyJob::Log4p;
use MyJob::Util::Logger;
use MyJob::Util::SysTools qw(get_hostname);

my $config     = MyJob::Config->new->config;
my $hostconfig = get_hostconfig();
my $minion     = MyJob::JobQueue->new;
my $worker     = $minion->runner->worker;

my $log_eng = MyJob::Log4p->new({ logger_name => "Minion" });
my $logger  = MyJob::Util::Logger->new->logger($log_eng);

以上是我们的典型样板。阅读我们的配置,并创建一个工人可以使用的记录器。

接下来,当作业出列时,我们要记录工作人员选择了一个作业(审计目的所需),并且我们更改了进程名称,因此如果进程挂起,我们就知道该进程正在尝试运行什么。如果作业中发生未经检查的异常,则工作人员将捕获该异常并将其记录给我们:

$worker->on( dequeue => sub( $worker, $job ) {
    my $id    = $job->id;
    my $notes = $job->info->{ notes };
    my $title = $notes->{ title };
    my $guid  = $notes->{ guid };

    $job->on( spawn => sub( $job, $pid ) {
        $0 = "$title $guid";
        $logger->info( "$title: Created child process $pid for job $id by parent $$ - $guid");
    });

    $job->on( failed => sub( $job, $error ) {
        chomp $error;
        $logger->error( $error );
    });
});

为了帮助我们进行未来的容量规划,我们希望我们的工作人员告诉我们他们是否正在以峰值容量运行,因此请在发生此事件时进行记录:

$worker->on( busy => sub( $worker ) {
    my $max = $worker->status->{ jobs };
    $logger->log( "$0: Running at capacity (performing $max jobs)." );
});

现在,我们将配置(如下所示)应用于 worker。当 worker 启动时,它会告诉我们有关它是如何配置的信息(这在开发期间非常有用):

my $max_jobs = $hostconfig->{ max_children };
my @queues   = @{ $hostconfig->{ queues }};

if( $minion->has_invalid_queues( @queues ) ){
    print "Invalid job queues specified: " . join( ',', $minion->get_invalid_queues( @queues ) );
    say ". Aborting!";
    exit 1;
}

say "Starting Job Queue Worker on " . get_hostname();
say "- Configured to run a max of $max_jobs jobs";
say "- Listening for jobs on queues: ", join(', ', @queues );
$worker->status->{ jobs }   = $max_jobs;
$worker->status->{ queues } = \@queues;
$worker->run;

还记得我们上面配置的 YAML 文件吗?最后一位为该工作者运行的主机提取信息( get_hostname() 是本地主机名函数):

sub get_hostconfig {
    my $minion_config = MyJob::Config->new({ filename => "environments/minions.yml" })->config;
    my $hostname      = get_hostname();

    if( exists $minion_config->{ $hostname }) {
        return $minion_config->{ $hostname };
    } else {
        return $minion_config->{ default };
    }
}

监督工人 #

我们的 Minion 仪表板几乎与 @preaction 在 Who Watches the Minions 中发布的仪表板完全相同。如果您想了解更多,我强烈建议您阅读他的文章。

结果 #

在大约两周的时间内,我们从没有对 Minion 的实际知识到开始运行。我们在此过程中做了一些改进和改进,但快速转变是与 Minion 合作简单的真实证明。

我们现在已经拥有了所有必要的部分来横向和纵向扩展 XML 渲染:感谢 Minion,我们可以轻松地跨多个框运行 XML 作业,并且可以在以前的同一硬件上更高效地同时运行更多作业。这种设置使我们能够像客户群一样快速增长。

进一步阅读 #