Wait the light to fall

User Defined Sources and Sinks

焉知非鱼

User Defined Sources and Sinks

用户自定义源和接收器

动态表是 Flink 的表与 SQL API 的核心概念,用于统一处理有界和无界数据。

因为动态表只是一个逻辑概念,Flink 并不拥有数据本身。相反,动态表的内容存储在外部系统(如数据库、键值存储、消息队列)或文件中。

动态源和动态汇可以用来从外部系统读取和写入数据。在文档中,源和汇通常被总结为连接器一词。

Flink 为 Kafka、Hive 和不同的文件系统提供了预定义的连接器。有关内置表源和汇的更多信息,请参阅连接器部分

本页主要介绍如何开发一个自定义的、用户定义的连接器。

注意: Flink 1.11 中引入了新的表源和表汇接口,作为 FLIP-95 的一部分。同时工厂接口也被重新设计。FLIP-95 还没有完全实现。许多能力接口还不支持(例如用于过滤器或分区推倒)。如果有必要,还请看一下旧 table source 和接收器的页面。为了向后兼容,这些接口仍然被支持。

概述 #

在许多情况下,实现者不需要从头开始创建一个新的连接器,而是希望稍微修改现有的连接器或挂入现有的堆栈。在其他情况下,实现者希望创建专门的连接器。

本节将为这两种用例提供帮助。它解释了表连接器的一般架构,从 API 中的纯声明到将在集群上执行的运行时代码。

填充的箭头显示了在翻译过程中,对象如何从一个阶段转换到下一个阶段的其他对象。

img

元数据 #

表 API 和 SQL 都是声明式 API。这包括表的声明。因此,执行 CREATE TABLE 语句的结果是更新目标目录中的元数据。

对于大多数目录实现来说,外部系统中的物理数据不会因为这样的操作而被修改。特定于连接器的依赖关系还不必存在于 classpath 中。在 WITH 子句中声明的选项既不进行验证,也不进行其他解释。

动态表(通过 DDL 创建或由目录提供)的元数据被表示为 CatalogTable 的实例。表名将在必要时在内部被解析为 CatalogTable。

计划(Planning) #

当涉及到表程序的规划和优化时,需要将 CatalogTable 解析为 DynamicTableSource(用于在 SELECT 查询中读取)和 DynamicTableSink(用于在 INSERT INTO 语句中写入)。

DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供了连接器特有的逻辑,用于将 CatalogTable 的元数据翻译成 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,工厂的目的是验证选项(如示例中的 ‘port’ = ‘5022’),配置编码/解码格式(如果需要),并创建表连接器的参数化实例。

默认情况下,DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例是通过 Java 的服务提供商接口(SPI)发现的。连接器选项(如本例中的’连接器’=‘自定义’)必须对应一个有效的工厂标识符。

虽然在类的命名中可能并不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被看作是有状态的工厂,最终产生具体的运行时实现来读取/写入实际数据。

规划者使用源和汇实例来执行特定连接器的双向通信,直到找到一个最佳的逻辑计划。根据可选声明的能力接口(如 Supp SupportsProjectionPushDown 或 Supp SupportsOverwrite),规划者可能会对一个实例进行更改,从而对生成的运行时实现进行突变。

运行时 #

逻辑规划完成后,规划师将从表连接器中获取运行时实现。运行时逻辑在 Flink 的核心连接器接口中实现,如 InputFormat 或 SourceFunction。

这些接口被另一层抽象归为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。

例如,OutputFormatProvider(提供 org.apache.flink.api.common.io.OutputFormat) 和 SinkFunctionProvider(提供 org.apache.flink.streaming.api.function.sink.SinkFunction) 都是规划者可以处理的 SinkRuntimeProvider 的具体实例。

扩展点 #

本节解释了用于扩展 Flink 的表连接器的可用接口。

动态表因素 #

动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器。

org.apache.flink.table.fants.DynamicTableSourceFactory 可以实现来构造一个 DynamicTableSource。

org.apache.flink.table.fants.DynamicTableSinkFactory 可以被实现来构造一个 DynamicTableSink。

默认情况下,使用连接器选项的值作为工厂标识符和 Java 的服务提供者接口来发现工厂。

在 JAR 文件中,可以在服务文件中添加对新实现的引用。

META-INF/services/org.apache.flink.table.factory.Factory。

框架将检查单个匹配的工厂,该工厂由工厂标识符和请求的基类(如 DynamicTableSourceFactory)唯一识别。

如果有必要,工厂发现过程可以由目录实现绕过。为此,目录需要在 org.apache.flink.table.catalog.Catalog#getFactory 中返回一个实现请求的基类的实例。

动态 Table Source #

根据定义,动态表可以随时间变化。

当读取一个动态表时,其内容可以被认为是。

  • 一个变化日志(有限的或无限的),所有的变化都会被持续消耗,直到变化日志耗尽。这由 ScanTableSource 接口来表示。
  • 一个持续变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值。这由 LookupTableSource 接口来表示。

一个类可以同时实现这两个接口。规划师根据指定的查询来决定它们的用途。

扫描 Table Source #

ScanTableSource 在运行时扫描来自外部存储系统的所有行,扫描的行不一定只包含插入,也可以包含更新和删除。

扫描的行不一定只包含插入,也可以包含更新和删除。因此,该表源可用于读取(有限或无限)的变更日志。返回的变更日志模式表示计划员在运行时可以预期的变更集。

对于常规的批处理方案,源可以发出只插入行的有界流。

对于常规的流式方案,源可以发出只插入行的无界流。

对于变化数据捕获(CDC)场景,源可以发出有界或无界的流,包含插入、更新和删除行。

Table Source 可以实现更多的能力接口,如 Supp SupportsProjectionPushDown,可能在规划期间突变一个实例。所有的能力都列在 org.apache.flink.table.connector.source.abilities 包和 org.apache.flink.table.connector.source.ScanTableSource 的文档中。

ScanTableSource 的运行时实现必须产生内部数据结构。因此,记录必须以 org.apache.flink.table.data.RowData 的形式发出。框架提供了运行时转换器,这样一个源仍然可以在普通的数据结构上工作,并在最后进行转换。

查询 Table Source #

LookupTableSource 在运行时通过一个或多个键来查找外部存储系统的行。

与 ScanTableSource 相比,LookupTableSource 不需要读取整个表,可以在必要的时候从外部表(可能是不断变化的)中懒惰地获取单个值。

与 ScanTableSource 相比,LookupTableSource 目前只支持发出只插入的变化。

不支持更多的能力。更多信息请参见 org.apache.flink.table.connector.source.LookupTableSource 的文档。

LookupTableSource 的运行时实现是一个 TableFunction 或 AsyncTableFunction。该函数将在运行时调用给定的查找键的值。

动态 Table Sink #

根据定义,动态表可以随时间变化。

在编写动态表时,可以始终将内容视为一个 changelog(有限或无限),对于这个 changelog,所有的变化都会被连续写出来,直到 changelog 用完为止。返回的 changelog 模式表明了 sink 在运行时接受的变化集。

对于常规的批处理方案,sink 可以只接受只插入的行,并写出有界流。

对于常规的流式方案,sink 可以只接受只插入的行,并且可以写出无约束的流。

对于变化数据捕获(CDC)场景,table sink 可以写出有界流或无界流,有插入、更新和删除行。

Table sink 可以实现更多的能力接口,如 SupportsOverwrite,可能在规划期间突变一个实例。所有的能力都列在 org.apache.flink.table.connector.sink.abilities 包和 org.apache.flink.table.connector.sink.DynamicTableSink 的文档中。

DynamicTableSink 的运行时实现必须消耗内部数据结构。因此,记录必须被接受为 org.apache.flink.table.data.RowData。该框架提供了运行时转换器,这样一个 sink 仍然可以在普通的数据结构上工作,并在开始时执行转换。

编码/解码格式 #

一些表连接器接受不同的格式,对键和/或值进行编码和解码。

格式的工作模式类似于 DynamicTableSourceFactory->DynamicTableSource->ScanRuntimeProvider,工厂负责翻译选项,源头负责创建运行时逻辑。

因为格式可能位于不同的模块中,所以使用 Java 的服务提供者接口发现它们,类似于表工厂。为了发现格式工厂,动态表工厂会搜索与工厂标识符和连接器特定基类相对应的工厂。

例如,Kafka 表源需要一个 DeserializationSchema 作为解码格式的运行时接口。因此,Kafka 表源工厂使用 value.format 选项的值来发现一个 DeserializationFormatFactory。

目前支持以下格式工厂。

  • org.apache.flink.table.factories.DeserializationFormatFactory
  • org.apache.flink.table.factories.SerializationFormatFactory

格式工厂将选项翻译成 EncodingFormat 或 DecodingFormat。这些接口是另一种工厂,为给定的数据类型产生专门的格式运行时逻辑。

例如,对于 Kafka table source 工厂,DeserializationFormatFactory 将返回一个 EncodingFormat<DeserializationSchema>,它可以传递到 Kafka 表源中。

全栈示例 #

本节简要介绍了如何实现一个扫描表源,其解码格式支持 changelog 语义。这个例子说明了所有提到的组件如何一起发挥作用。它可以作为一个参考实现。

特别是,它展示了如何

  • 创建解析和验证选项的工厂。
  • 实现表连接器。
  • 实现和发现自定义格式。
  • 并使用提供的实用程序,如数据结构转换器和 FactoryUtil。

Table Source 使用一个简单的单线程 SourceFunction 来打开一个监听传入字节的套接字。原始字节由一个可插拔的格式解码成行。该格式期望以 changelog 标志作为第一列。

我们将使用上面提到的大部分接口来实现下面的 DDL。

CREATE TABLE UserScores (name STRING, score INT)
WITH (
  'connector' = 'socket',
  'hostname' = 'localhost',
  'port' = '9999',
  'byte-delimiter' = '10',
  'format' = 'changelog-csv',
  'changelog-csv.column-delimiter' = '|'
);

由于该格式支持 changelog 语义,我们能够在运行时摄取更新,并创建一个能够持续评估变化数据的更新视图。

SELECT name, SUM(score) FROM UserScores GROUP BY name;

使用以下命令在终端中摄取数据。

> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18

工厂 #

本节说明了如何将来自目录的元数据翻译成具体的连接器实例。

这两个工厂都被添加到 META-INF/services 目录中。

SocketDynamicTableFactory

SocketDynamicTableFactory 将目录表翻译成表源。由于表源需要解码格式,为了方便,我们使用提供的 FactoryUtil 发现格式。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

public class SocketDynamicTableFactory implements DynamicTableSourceFactory {

  // define all options statically
  public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
    .stringType()
    .noDefaultValue();

  public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
    .intType()
    .noDefaultValue();

  public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
    .intType()
    .defaultValue(10); // corresponds to '\n'

  @Override
  public String factoryIdentifier() {
    return "socket"; // used for matching to `connector = '...'`
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HOSTNAME);
    options.add(PORT);
    options.add(FactoryUtil.FORMAT); // use pre-defined option for format
    return options;
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(BYTE_DELIMITER);
    return options;
  }

  @Override
  public DynamicTableSource createDynamicTableSource(Context context) {
    // either implement your custom validation logic here ...
    // or use the provided helper utility
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    // discover a suitable decoding format
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
      DeserializationFormatFactory.class,
      FactoryUtil.FORMAT);

    // validate all options
    helper.validate();

    // get the validated options
    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    // derive the produced data type (excluding computed columns) from the catalog table
    final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

    // create and return dynamic table source
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }
}

ChangelogCsvFormatFactory

ChangelogCsvFormatFactory 将特定格式的选项翻译成一种格式。SocketDynamicTableFactory 中的 FactoryUtil 负责相应地调整选项键,并处理像 changelog-csv.column-delimiter 那样的前缀。

因为这个工厂实现了 DeserializationFormatFactory,所以它也可以用于其他支持反序列化格式的连接器,比如 Kafka 连接器。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;

public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {

  // define all options statically
  public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
    .stringType()
    .defaultValue("|");

  @Override
  public String factoryIdentifier() {
    return "changelog-csv";
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    return Collections.emptySet();
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(COLUMN_DELIMITER);
    return options;
  }

  @Override
  public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
      DynamicTableFactory.Context context,
      ReadableConfig formatOptions) {
    // either implement your custom validation logic here ...
    // or use the provided helper method
    FactoryUtil.validateFactoryOptions(this, formatOptions);

    // get the validated options
    final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);

    // create and return the format
    return new ChangelogCsvFormat(columnDelimiter);
  }
}

Table Source 和解码格式 #

本节说明了如何从规划层的实例转化为运到集群的运行时实例。

SocketDynamicTableSource

在规划过程中会用到 SocketDynamicTableSource。在我们的例子中,我们没有实现任何可用的能力接口。因此,主要的逻辑可以在 getScanRuntimeProvider(...) 中找到,我们在其中实例化了所需的 SourceFunction 和其运行时的 DeserializationSchema。这两个实例都被参数化为返回内部数据结构(即 RowData)。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

public class SocketDynamicTableSource implements ScanTableSource {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
  private final DataType producedDataType;

  public SocketDynamicTableSource(
      String hostname,
      int port,
      byte byteDelimiter,
      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
      DataType producedDataType) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.decodingFormat = decodingFormat;
    this.producedDataType = producedDataType;
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // in our example the format decides about the changelog mode
    // but it could also be the source itself
    return decodingFormat.getChangelogMode();
  }

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

    // create runtime classes that are shipped to the cluster

    final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
      runtimeProviderContext,
      producedDataType);

    final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
      hostname,
      port,
      byteDelimiter,
      deserializer);

    return SourceFunctionProvider.of(sourceFunction, false);
  }

  @Override
  public DynamicTableSource copy() {
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }

  @Override
  public String asSummaryString() {
    return "Socket Table Source";
  }
}

ChangelogCsvFormat

ChangelogCsvFormat 是一种解码格式,在运行时使用 DeserializationSchema。它支持发出 INSERT 和 DELETE 更改。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {

  private final String columnDelimiter;

  public ChangelogCsvFormat(String columnDelimiter) {
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  @SuppressWarnings("unchecked")
  public DeserializationSchema<RowData> createRuntimeDecoder(
      DynamicTableSource.Context context,
      DataType producedDataType) {
    // create type information for the DeserializationSchema
    final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
      producedDataType);

    // most of the code in DeserializationSchema will not work on internal data structures
    // create a converter for conversion at the end
    final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);

    // use logical types during runtime for parsing
    final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();

    // create runtime class
    return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // define that this format can produce INSERT and DELETE rows
    return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.DELETE)
      .build();
  }
}

运行时 #

为了完整起见,本节说明了 SourceFunction 和 DeserializationSchema 的运行时逻辑。

ChangelogCsvDeserializer

ChangelogCsvDeserializer 包含了一个简单的解析逻辑,用于将字节转换为带有行种类的整数行和字符串。最后的转换步骤将这些转换为内部数据结构。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {

  private final List<LogicalType> parsingTypes;
  private final DataStructureConverter converter;
  private final TypeInformation<RowData> producedTypeInfo;
  private final String columnDelimiter;

  public ChangelogCsvDeserializer(
      List<LogicalType> parsingTypes,
      DataStructureConverter converter,
      TypeInformation<RowData> producedTypeInfo,
      String columnDelimiter) {
    this.parsingTypes = parsingTypes;
    this.converter = converter;
    this.producedTypeInfo = producedTypeInfo;
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    // return the type information required by Flink's core interfaces
    return producedTypeInfo;
  }

  @Override
  public void open(InitializationContext context) {
    // converters must be open
    converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
  }

  @Override
  public RowData deserialize(byte[] message) {
    // parse the columns including a changelog flag
    final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
    final RowKind kind = RowKind.valueOf(columns[0]);
    final Row row = new Row(kind, parsingTypes.size());
    for (int i = 0; i < parsingTypes.size(); i++) {
      row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
    }
    // convert to internal data structure
    return (RowData) converter.toInternal(row);
  }

  private static Object parse(LogicalTypeRoot root, String value) {
    switch (root) {
      case INTEGER:
        return Integer.parseInt(value);
      case VARCHAR:
        return value;
      default:
        throw new IllegalArgumentException();
    }
  }

  @Override
  public boolean isEndOfStream(RowData nextElement) {
    return false;
  }
}

SocketSourceFunction

SocketSourceFunction 打开一个套接字并消耗字节。它通过给定的字节定界符(默认为 \n)分割记录,并将解码委托给一个可插拔的 DeserializationSchema。源函数只能以 1 的并行度工作。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;

public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DeserializationSchema<RowData> deserializer;

  private volatile boolean isRunning = true;
  private Socket currentSocket;

  public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.deserializer = deserializer;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    return deserializer.getProducedType();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    deserializer.open(() -> getRuntimeContext().getMetricGroup());
  }

  @Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // open and consume from socket
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            // buffer until delimiter
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode and emit record
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); // print and continue
      }
      Thread.sleep(1000);
    }
  }

  @Override
  public void cancel() {
    isRunning = false;
    try {
      currentSocket.close();
    } catch (Throwable t) {
      // ignore
    }
  }
}

原文链接: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html