Wait the light to fall

Python 演练

焉知非鱼

Python Walkthrough

Python 演练 #

Stateful Functions 为构建健壮的、有状态的事件驱动的应用程序提供了一个平台。它提供了对状态和时间的精细控制,这使得高级系统的实现成为可能。在本步骤指南中,您将学习如何使用 Stateful Functions API 构建有状态的应用程序。

你要构建什么? #

就像软件中所有伟大的介绍一样,这个演练将从开头开始:打招呼。该应用程序将运行一个简单的函数,该函数将接受一个请求并以问候语进行响应。它不会试图涵盖所有复杂的应用程序开发,而是专注于构建一个有状态的函数 - 这是你实现业务逻辑的地方。

先决条件 #

这个演练假设您对 Python 有一定的了解,但即使您来自不同的编程语言,您也应该能够跟上。

帮助,我卡住了 #

如果你被卡住了,请查看社区支持资源。特别是 Apache Flink 的用户邮件列表,一直被认为是 Apache 项目中最活跃的一个,也是快速获得帮助的好方法。

如何跟进 #

如果你想跟上,你需要一台装有 Python 3 以及 Docker 的电脑。

注意:为了简洁起见,本演练中的每个代码块可能不包含完整的周边类。完整的代码可以在本页底部找到。

你可以通过点击这里下载一个包含骨架项目的 zip 文件。

解压包后,你会发现一些文件。这些文件包括 dockerfiles 和数据生成器,用于在本地自包含环境中运行此演练。

$ tree statefun-walkthrough
statefun-walkthrough
├── Dockerfile
├── docker-compose.yml
├── generator
│   ├── Dockerfile
│   ├── event-generator.py
│   └── messages_pb2.py
├── greeter
│   ├── Dockerfile
│   ├── greeter.py
│   ├── messages.proto
│   ├── messages_pb2.py
│   └── requirements.txt
└── module.yaml

从事件开始 #

Stateful Functions 是一个事件驱动的系统,所以开发从定义我们的事件开始。问候者应用程序将使用协议缓冲区定义其事件。当一个特定用户的问候请求被摄入时,它将被路由到相应的函数。响应将返回一个适当的问候。第三种类型,SeenCount,是一个实用类,后期将用于帮助管理用户到目前为止被看到的次数。

syntax = "proto3";

package example;

// External request sent by a user who wants to be greeted
message GreetRequest {
    // The name of the user to greet
    string name = 1;
}
// A customized response sent to the user
message GreetResponse {
    // The name of the user being greeted
    string name = 1;
    // The users customized greeting
    string greeting = 2;
}
// An internal message used to store state
message SeenCount {
    // The number of times a users has been seen so far
    int64 seen = 1;
}

我们的第一个函数 #

在底层,消息是使用有状态的函数来处理的,也就是任何绑定到 StatefulFunction 运行时的两个参数函数。函数用 @function.bind 装饰器绑定到运行时。当绑定一个函数时,它会被注解为一个函数类型。这是在向这个函数发送消息时用来引用它的名称。

当你打开文件 greeter/greeter.py 时,你应该看到以下代码。

from statefun import StatefulFunctions

functions = StatefulFunctions()

@functions.bind("example/greeter")
def greet(context, greet_request):
    pass

一个有状态函数需要两个参数,即上下文和消息。上下文提供了对有状态函数运行时功能的访问,如状态管理和消息传递。您将在本演练中探索其中的一些功能。

另一个参数是传递给这个函数的输入消息。默认情况下,消息是以 protobuf Any 的形式传递的。如果一个函数只接受一个已知的类型,你可以使用 Python 3 类型语法覆盖消息类型。这样您就不需要对消息进行拆包或检查类型。

from messages_pb2 import GreetRequest
from statefun import StatefulFunctions

functions = StatefulFunctions()

@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
    pass

发送回复 #

有状态函数接受消息,也可以将消息发送出去。消息可以被发送到其他函数,以及外部系统(或出口)。

一个流行的外部系统是 Apache Kafka。第一步,让我们更新 greeter/greeter.py 中的函数,通过向 Kafka 主题发送问候语来响应每个输入。

from messages_pb2 import GreetRequest, GreetResponse
from statefun import StatefulFunctions

functions = StatefulFunctions()

@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
    response = GreetResponse()
    response.name = greet_request.name
    response.greeting = "Hello {}".format(greet_request.name)
    
    egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
    context.pack_and_send_egress("example/greets", egress_message)

对于每条消息,都会构造一个响应,并发送到一个名为 greetings 的 Kafka 主题,该主题按名称分区。egress_message 被发送到一个名为 example/greets 的出口。这个标识符指向一个特定的 Kafka 集群,并在下面的部署中进行配置。

一个有状态的 Hello #

这是一个很好的开端,但并没有展现出有状态函数的真正威力 - 与状态一起工作。假设你想根据每个用户发送请求的次数,为他们生成个性化的响应。

def compute_greeting(name, seen):
    """
    Compute a personalized greeting, based on the number of times this @name had been seen before.
    """
    templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
    if seen < len(templates):
        greeting = templates[seen] % name
    else:
        greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)

    response = GreetResponse()
    response.name = name
    response.greeting = greeting

    return response

为了"记住"多条问候信息,你需要将一个持久化的值域( seen_count )关联到 Greet 函数。对于每个用户,函数现在可以跟踪他们被看到的次数。

@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
    state = context.state('seen_count').unpack(SeenCount)
    if not state:
        state = SeenCount()
        state.seen = 1
    else:
        state.seen += 1
    context.state('seen_count').pack(state)

    response = compute_greeting(greet_request.name, state.seen)

    egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
    context.pack_and_send_egress("example/greets", egress_message)

状态 seen_count 始终是当前名称的范围,因此它可以独立地跟踪每个用户。

连接在一起 #

有状态的 Function 应用程序使用 http 与 Apache Flink 运行时进行通信。Python SDK 提供了一个 RequestReplyHandler,它可以基于 RESTful HTTP POSTS 自动分配函数调用。RequestReplyHandler 可以使用任何 HTTP 框架暴露。

一个流行的 Python web 框架是 Flask。它可以用来快速、轻松地将应用程序暴露给 Apache Flink 运行时。

from statefun import StatefulFunctions
from statefun import RequestReplyHandler

functions = StatefulFunctions()

@functions.bind("example/greeter")
def greeter(context, message: GreetRequest):
    pass

handler = RequestReplyHandler(functions)

# Serve the endpoint

from flask import request
from flask import make_response
from flask import Flask

app = Flask(__name__)

@app.route('/statefun', methods=['POST'])
def handle():
    response_data = handler(request.data)
    response = make_response(response_data)
    response.headers.set('Content-Type', 'application/octet-stream')
    return response


if __name__ == "__main__":
    app.run()

配置运行时 #

有状态函数运行时通过向 Flask 服务器进行 http 调用来向 greeter 函数发出请求。要做到这一点,它需要知道它可以使用什么端点来到达服务器。这也是配置我们连接到输入和输出 Kafka 主题的好时机。配置在一个名为 module.yaml 的文件中。

version: "1.0"
module:
  meta:
    type: remote
  spec:
    functions:
      - function:
          meta:
            kind: http
            type: example/greeter
          spec:
            endpoint: http://python-worker:8000/statefun
            states:
              - seen_count
            maxNumBatchRequests: 500
            timeout: 2min
    ingresses:
      - ingress:
          meta:
            type: statefun.kafka.io/routable-protobuf-ingress
            id: example/names
          spec:
            address: kafka-broker:9092
            consumerGroupId: my-group-id
            topics:
              - topic: names
                typeUrl: com.googleapis/example.GreetRequest
                targets:
                  - example/greeter
    egresses:
      - egress:
          meta:
            type: statefun.kafka.io/generic-egress
            id: example/greets
          spec:
            address: kafka-broker:9092
            deliverySemantic:
              type: exactly-once
              transactionTimeoutMillis: 100000

这个配置做了一些有趣的事情。

首先是声明我们的函数 example/greeter。它包括它可以到达的端点以及函数可以访问的状态。

ingress 是将 GreetRequest 消息路由到函数的输入 Kafka 主题。除了 broker 地址和消费者组等基本属性,它还包含一个目标列表。这些是每个消息将被发送到的函数。

出口是输出的 Kafka 集群。它包含 broker 特定的配置,但允许每个消息路由到任何主题。

部署 #

现在已经构建了 greeter 应用程序,是时候部署了。部署 Stateful Function 应用程序最简单的方法是使用社区提供的基础映像并加载你的模块。基础镜像提供了 Stateful Function 运行时,它将使用提供的 module.yaml 来为这个特定的工作进行配置。这可以在根目录下的 Docker 文件中找到。

FROM flink-statefun:2.2.0

RUN mkdir -p /opt/statefun/modules/greeter
ADD module.yaml /opt/statefun/modules/greeter

现在您可以使用提供的 Docker 设置在本地运行此应用程序。

$ docker-compose up -d

那么,要想在行动中看到例子,就看看话题问候出来的内容。

docker-compose logs -f event-generator 

想更进一步? #

这个 Greeter 永远不会忘记一个用户。试着修改这个函数,使它能够为任何没有与系统交互的用户花超过60秒的时间重置 seen_count

查看 Python SDK 页面以获得更多关于如何实现这一功能的信息。

完整应用 #

from messages_pb2 import SeenCount, GreetRequest, GreetResponse

from statefun import StatefulFunctions
from statefun import RequestReplyHandler
from statefun import kafka_egress_record

functions = StatefulFunctions()

@functions.bind("example/greeter")
def greet(context, greet_request: GreetRequest):
    state = context.state('seen_count').unpack(SeenCount)
    if not state:
        state = SeenCount()
        state.seen = 1
    else:
        state.seen += 1
    context.state('seen_count').pack(state)

    response = compute_greeting(greet_request.name, state.seen)

    egress_message = kafka_egress_record(topic="greetings", key=greet_request.name, value=response)
    context.pack_and_send_egress("example/greets", egress_message)


def compute_greeting(name, seen):
    """
    Compute a personalized greeting, based on the number of times this @name had been seen before.
    """
    templates = ["", "Welcome %s", "Nice to see you again %s", "Third time is a charm %s"]
    if seen < len(templates):
        greeting = templates[seen] % name
    else:
        greeting = "Nice to see you at the %d-nth time %s!" % (seen, name)

    response = GreetResponse()
    response.name = name
    response.greeting = greeting

    return response


handler = RequestReplyHandler(functions)

#
# Serve the endpoint
#

from flask import request
from flask import make_response
from flask import Flask

app = Flask(__name__)


@app.route('/statefun', methods=['POST'])
def handle():
    response_data = handler(request.data)
    response = make_response(response_data)
    response.headers.set('Content-Type', 'application/octet-stream')
    return response


if __name__ == "__main__":
    app.run()