Wait the light to fall

DataStream API 介绍

焉知非鱼

Intro to the DataStream API

本次培训的重点是广泛地介绍 DataStream API,使你能够开始编写流式应用程序。

什么可以被流式化? #

Flink 的 DataStream API(Java 和 Scala)可以让你流化任何可以序列化的东西。Flink 自己的序列化器用于:

  • 基本类型,即 String, Long, Integer, Boolean, Array
  • 复合类型。Tuples, POJOs 和 Scala case classes

而 Flink 又回到了 Kryo 的其他类型。也可以在 Flink 中使用其他序列化器。特别是 Avro,得到了很好的支持。

Java 元组 和 POJO #

Flink 的本地序列化器可以有效地操作元组和 POJO。

元组

对于 Java,Flink 定义了自己的 Tuple0 到 Tuple25 类型。

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;

POJO

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许"按名称"字段引用)。

  • 类是公共的和独立的(没有非静态的内部类)。
  • 该类有一个公共的无参数构造函数。
  • 类(以及所有超级类)中的所有非静态、非瞬态字段要么是公共的(而且是非最终的),要么有公共的 getter- 和 setter- 方法,这些方法遵循 Java beans 中 getter 和 setter 的命名约定。

例如:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {};  
    public Person(String name, Integer age) {  
        . . .
    };  
}  

Person person = new Person("Fred Flintstone", 35);

Flink 的序列化器支持 POJO 类型的模式进化

Scala 元组和 case class #

这些工作就像你期望的那样。

一个完整的例子 #

这个例子将一个关于人的记录流作为输入,并将其过滤为只包括成年人。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {};

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        };

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        };
    }
}

流执行环境 #

每个 Flink 应用都需要一个执行环境,本例中的 env。流式应用需要使用一个 StreamExecutionEnvironment。

在你的应用程序中进行的 DataStream API 调用建立了一个作业图(job graph),这个作业图被附加到 StreamExecutionEnvironment 上。当调用 env.execute() 时,这个图会被打包并发送给 JobManager,JobManager 将作业并行化,并将它的片断分配给 Task Manager 执行。你的作业的每个并行片断将在一个任务槽(task slot)中执行。

注意,如果你不调用 execute(),你的应用程序将不会被运行。

img

这种分布式运行时取决于你的应用程序是可序列化的。它还要求所有的依赖关系都能在集群中的每个节点上使用。

基本的流源 #

上面的例子使用 env.fromElements(...) 构造了一个 DataStream[Person]。这是一种方便的方法,可以将一个简单的流组合起来,用于原型或测试。StreamExecutionEnvironment 上还有一个 fromCollection(Collection) 方法。所以,你可以用这个方法来代替。

val people: List[Person] = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

val flintstones: DataStream[Person] = env.fromCollection(people);

另一种方便的方法是在原型开发时将一些数据导入流中,使用 socket:

val lines: DataStream[String] = env.socketTextStream("localhost", 9999)

或从文件中读取:

val lines: DataStream[String] = env.readTextFile("file:///path");

在实际应用中,最常用的数据源是那些支持低延迟、高吞吐量并行读取并结合倒带和重放的数据源–这是高性能和容错的先决条件–如 Apache Kafka、Kinesis 和各种文件系统。REST API 和数据库也经常被用于流的丰富。

基本的流式接收器 #

上面的例子使用 adults.print() 将其结果打印到 task manager 的日志中(当在 IDE 中运行时,它将出现在你的 IDE 的控制台中)。这将在流的每个元素上调用 toString()

输出结果看起来像这样:

1> Fred: age 35
2> Wilma: age 35

其中 1>2> 表示哪个子任务(即线程)产生的输出。

在生产中,常用的接收器括 StreamingFileSink、各种数据库和一些 pub-sub 系统。

调试 #

在生产中,你的应用程序将在远程集群或一组容器中运行。而如果它失败了,它将会远程失败。JobManager 和 TaskManager 日志对调试此类故障非常有帮助,但在 IDE 内部进行本地调试要容易得多,Flink 支持这一点。你可以设置断点,检查本地变量,并逐步检查你的代码。你也可以步入 Flink 的代码,如果你好奇 Flink 是如何工作的,这可以是一个很好的方式来了解它的内部结构。

实践 #

在这一点上,你知道了足够的知识,可以开始编码和运行一个简单的 DataStream 应用程序。克隆 flink-training repo,按照 README 中的说明操作后,进行第一个练习。过滤一个流(Ride Cleansing)

进一步阅读 #