Wait the light to fall

Sdk

焉知非鱼

Sdk

SDK #

有状态函数应用程序由一个或多个模块组成。一个模块是一个由运行时加载的函数捆绑,并提供给消息。来自所有加载模块的函数都是多路复用的,并且可以自由地相互发送消息。

有状态函数支持两种类型的模块。远程模块和嵌入式模块。

远程模块 #

远程模块作为 Apache Flink® 运行时的外部进程运行;在同一容器中,作为 sidecar,使用无服务器平台或其他外部位置。这种模块类型可以支持任何数量的语言 SDK。远程模块通过 YAML 配置文件在系统中注册。

技术指标 #

一个远程模块配置由一个元部分和一个规范部分组成。meta 包含了模块的辅助信息;而 spec 则描述了模块中包含的功能并定义了它们的持久值。

定义函数 #

module.spec.functions 声明了一个由远程模块实现的函数对象列表。一个函数通过一些属性来描述。

  • function.meta.kind
    • 用于与远程功能通信的协议。
    • 所支持的值 - http
  • function.meta.type
    • 函数类型那个, 被定义为 <namespace>/<name>
  • function.spec.endpoint
    • 函数可到达的端点。
    • 所支持的 schemes: http, https.
    • 使用 http+unix 或 https+unix 方案支持通过 UNIX 域套接字进行传输。
    • 当使用 UNIX 域套接字时,端点格式是: http+unix://<socket-file-path>/<serve-url-path>。例如, http+unix:///uds.sock/path/of/url
  • function.spec.states
    • 在远程函数中声明的持久化值的列表
    • 每个条目由 name 属性和可选的 expireAfter 属性组成。
    • expireAfter 的默认值为 0,表示状态过期被禁用。
  • function.spec.maxNumBatchRequests
    • 在调用系统背压之前,一个函数可以处理的特定地址的最大记录数。
    • 默认值:1000
  • function.spec.timeout
    • 运行时在失败前等待远程函数返回的最长时间。这涵盖了整个调用过程,包括连接到函数端点、编写请求、函数处理和读取响应。
    • 默认值:1分钟
  • function.spec.connectTimeout
    • 运行时等待连接到远程函数端点的最长时间。
    • 默认值:10秒。
  • function.spec.readTimeout
    • 运行时等待单个读IO操作的最大时间,如读取调用响应。
    • 默认值:10秒。
  • function.spec.writeTimeout
    • 运行时等待单个写IO操作的最大时间,比如写调用请求。
    • 默认值:10秒。

完整示例 #

version: "2.0"

module:
  meta:
    type: remote
  spec:
    functions:
      - function:
        meta:
          kind: http
          type: example/greeter
        spec:
          endpoint: http://<host-name>/statefun
          states:
            - name: seen_count
              expireAfter: 5min
          maxNumBatchRequests: 500
          timeout: 2min

嵌入式模块 #

嵌入式模块与 Apache Flink® 运行时共存,并嵌入其中。

这种模块类型只支持基于 JVM 的语言,并通过实现 StatefulFunctionModule 接口来定义。嵌入模块提供了一个单一的配置方法,有状态的函数根据其函数类型与系统绑定。运行时配置可以通过 globalConfiguration 来实现,它是应用程序 flink-conf.yaml 中前缀 statefun.module.global-config 下的所有配置以及以 --key value 形式传递的任何命令行参数的联合。

package org.apache.flink.statefun.docs;

import java.util.Map;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class BasicFunctionModule implements StatefulFunctionModule {

	public void configure(Map<String, String> globalConfiguration, Binder binder) {

		// Declare the user function and bind it to its type
		binder.bindFunctionProvider(FnWithDependency.TYPE, new CustomProvider());

		// Stateful functions that do not require any configuration
		// can declare their provider using java 8 lambda syntax
		binder.bindFunctionProvider(Identifiers.HELLO_TYPE, unused -> new FnHelloWorld());
	}
}

嵌入式模块利用 Java 的服务提供者接口(SPI)进行发现。这意味着每个 JAR 都应该在 META_INF/services 资源目录下包含一个文件 org.apache.flink.statefun.sdk.spi.StatefulFunctionModule,该文件列出了它提供的所有可用模块。

org.apache.flink.statefun.docs.BasicFunctionModule