Wait the light to fall

Koalas 和 Apache Spark 之间的互操作性

焉知非鱼

How PySpark users effectively work with Koalas

Koalas 是一个开源项目,它为 pandas 提供了一个 drop-in 的替代品,可以高效地扩展到数百个工人节点,用于日常的数据科学和机器学习。自去年首次推出以来,经过一年多的开发Koalas 1.0 已经发布

pandas 是数据科学家中常用的 Python 包,但它并不能扩展到大数据。当他们的数据变得庞大时,他们必须从一开始就选择和学习另一个系统,如 Apache Spark,以采用和转换他们现有的工作负载。 Koalas 通过提供 pandas 等效的 API 来填补这个空白,这些 API 可以在 Apache Spark 上工作。其中很多在之前的博文中已经介绍过,其中还包括使用 Koalas 时的最佳实践。

Koalas 不仅对 pandas 用户有用,对 PySpark 用户也很有用,因为 Koalas 支持很多 PySpark 难以实现的功能。例如,Spark 用户可以通过 Koalas 绘图 API 直接从 PySpark DataFrame 中绘制数据,类似于 pandas。PySpark DataFrame 更符合 SQL 标准,而 Koalas DataFrame 更接近 Python 本身,这为在某些情况下使用 Python 提供了更直观的工作方式。在 Koalas 文档中,有各种 pandas 对应的 API 实现。

在这篇博文中,我们重点介绍 PySpark 用户如何利用自己的知识和 PySpark 与 Koalas 之间的原生交互,更快地编写代码。我们包含了许多自带的例子,如果你安装了带 Koalas 的 Spark,或者你正在使用 Databricks Runtime,你可以运行这些例子。从 Databricks Runtime 7.1 开始,Koalas 就被打包在一起,所以您无需手动安装就可以运行。

Koalas 和 PySpark DataFrames #

在深究之前,我们先来看看 Koalas 和 PySpark DataFrames 的一般区别。

从外观上看,它们是不同的。Koalas DataFrames 无缝地沿用了 pandas DataFrames 的结构,并在底层下实现了一个索引/标识符。而 PySpark DataFrame 则更趋向于符合关系型数据库中的关系/表,并且没有唯一的行标识符。

在内部,Koalas DataFrames 是建立在 PySpark DataFrames 上的。Koalas 将 pandas APIs 翻译成 Spark SQL 的逻辑计划。该计划由复杂而强大的 Spark SQL 引擎优化和执行,Spark 社区不断对其进行改进。Koalas 还沿用 Spark 的懒惰评估语义,以实现性能的最大化。为了实现 pandas DataFrame 结构和 pandas 丰富的 API,需要隐式排序,Koalas DataFrames 的内部元数据表示 pandas 等价的索引和列标签映射到 PySpark DataFrame 中的列。

即使 Koalas 利用 PySpark 作为执行引擎,但与 PySpark 相比,你仍然可能面临轻微的性能下降。正如在 Virgin Hyperloop One 的迁移经验中所讨论的,主要原因通常是:

  • 使用了默认索引。构建默认索引的开销取决于数据大小、集群组成等。因此,总是希望避免使用默认索引。关于这一点将在下面的其他章节中详细讨论。
  • PySpark 和 pandas 中的一些 API 名称相同,但语义不同。例如,Koalas DataFrame 和 PySpark DataFrame 都有 count API。前者统计每列/行的非 NA/null 条目数,后者统计检索到的行数,包括包含 null 的行。
>>> ks.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}).count()
a    3
b    3

>>> spark.createDataFrame(
...     [[1, 4], [2, 5], [3, 6]], schema=["a", "b"]).count()
3

从 PySpark DataFrames 转换到 PySpark DataFrames #

对于一个 PySpark 用户来说,很高兴知道你可以很容易地在 Koalas DataFrame 和 PySpark DataFrame 之间来回切换,以及在底层发生了什么,这样你就不需要害怕进入 Koalas 世界,在 Spark 上应用高扩展性的 pandas API。

  • to_koalas()

当导入 Koalas 包时,它会自动将 to_koalas()方法附加到 PySpark DataFrames 中。你可以简单地使用这个方法将 PySpark DataFrames 转换为 Koalas DataFrames。

假设你有一个 PySpark DataFrame。

>>> sdf = spark.createDataFrame([(1, 10.0, 'a'), (2, 20.0, 'b'), (3, 30.0, 'c')],schema=['x', 'y', 'z'])
>>> sdf.show()
+---+----+---+
|  x|   y|  z|
+---+----+---+
|  1|10.0|  a|
|  2|20.0|  b|
|  3|30.0|  c|
+---+----+---+ 

首先,导入 Koalas 包。传统上使用 ks 作为包的别名。

>>> import databricks.koalas as ks

如上所述,用 to_koalas()方法将 Spark DataFrame 转换为 Koalas DataFrame。

>>> kdf = sdf.to_koalas()
>>> kdf
    x     y  z
0  1  10.0  a
1  2  20.0  b
2  3  30.0  c   

kdf 是一个由 PySpark DataFrame 创建的 Koalas DataFrame。当真正需要数据时,计算会被懒惰地执行,例如显示或存储计算的数据,与 PySpark 相同。

  • to_spark()

接下来,你还应该知道如何从 Koalas 回到 PySpark DataFrame。你可以在 Koalas DataFrame 上使用 to_spark()方法。

>>> sdf_from_kdf = kdf.to_spark()
>>> sdf_from_kdf.show()
+---+----+---+
|  x|   y|  z|
+---+----+---+
|  1|10.0|  a|
|  2|20.0|  b|
|  3|30.0|  c|
+---+----+---+   

现在你又有了一个 PySpark DataFrame。请注意,现在已经没有 Koalas DataFrame 中包含的索引列了。下面将讨论处理索引的最佳实践。

索引和 index_col #

如上图所示,Koalas 内部管理了几列作为 “索引 “列,以表示 pandas 的索引。这些 “索引 “列用于通过 loc/iloc 索引器访问行,或者用于 sort_index()方法中,而不指定排序键列,甚至用于结合两个以上 DataFrame 或 Series 的操作时匹配相应的行,例如 df1+df2,等等。

如果 PySpark DataFrame 中已经有这样的列,可以使用 index_col 参数来指定索引列。

>>> kdf_with_index_col = sdf.to_koalas(index_col='x')  # or index_col=['x']
>>> kdf_with_index_col
        y  z
x
1  10.0  a
2  20.0  b
3  30.0  c    

这时,列 x 不被视为常规列之一,而是索引。

如果你有多个列作为索引,你可以传递列名列表。

>>> sdf.to_koalas(index_col=['x', 'y'])
    z
x y
1 10.0  a
2 20.0  b
3 30.0  c

当回到 PySpark DataFrame 时,你还可以使用 index_col 参数来保存索引列。

>>> kdf_with_index_col.to_spark(index_col='index').show()  # or index_col=['index']
+-----+----+---+
|index|   y|  z|
+-----+----+---+
|    1|10.0|  a|
|    2|20.0|  b|
|    3|30.0|  c|
+-----+----+---+

否则,就会失去指数,如下图。

>>> kdf_with_index_col.to_spark().show()
+----+---+
|   y|  z|
+----+---+
|10.0|  a|
|20.0|  b|
|30.0|  c|
+----+---+

列名的数量应与索引列的数量一致。

>>> kdf.to_spark(index_col=['index1', 'index2']).show()
Traceback (most recent call last):
...
ValueError: length of index columns is 1; however, the length of the given 'index_col' is 2.  

默认索引 #

正如你所看到的,如果你不指定 index_col 参数,就会创建一个新的列作为索引。

>>> sdf.to_koalas()
    x     y  z
 0  1  10.0  a
 1  2  20.0  b
 2  3  30.0  c 

列从哪里来?

答案是 “默认索引”。如果没有指定 index_col 参数,Koalas 会自动将一列作为索引附加到 DataFrame 中。有三种类型的默认索引。“sequence”、“distributed-sequence “和 “distributed”。每种类型都有其独特的特点和局限性,比如性能惩罚。为了减少性能开销,强烈建议在从 PySpark DataFrame 转换时通过 index_col 指定索引列。

当 Koalas 不知道哪一列是用来做索引时,也会使用默认索引。例如,reset_index()没有任何参数,它试图将所有的索引数据转换为常规列,并重新创建一个索引。

>>> kdf_with_index_col.reset_index()
    x     y  z
 0  1  10.0  a
 1  2  20.0  b
 2  3  30.0  c

你可以通过设置 Koalas 选项 “compute.default_index_type” 来改变默认的索引类型。

ks.set_option('compute.default_index_type', 'sequence')

ks.options.compute.default_index_type = 'sequence'

顺序型 目前 Koalas 中默认使用 “序列 “类型,因为它像 pandas 一样保证了索引的连续递增。但是,它内部使用了一个非分区窗口函数,这意味着所有的数据都需要收集到一个节点中。如果节点的内存不足,性能会明显下降,或者出现 OutOfMemoryError。

>>> ks.set_option('compute.default_index_type', 'sequence')
>>> spark.range(5).to_koalas()
    id
0   0
1   1
2   2
3   3
4   4

分散式 当使用 “分布式-序列 “索引时,性能惩罚没有 “序列 “类型那么显著。它以分布式的方式计算和生成索引,但它需要另一个额外的 Spark Job 来内部生成全局序列。它也不能保证结果的自然顺序。一般来说,它会变成一个不断增加的数字。

>>> ks.set_option('compute.default_index_type', 'distributed-sequence')
>>> spark.range(5).to_koalas()
    id
3   3
1   1
2   2
4   4
0   0

分散型 “分布式 “索引几乎没有性能上的惩罚,而且总是创建单调增加的数字。如果索引只是需要作为每行的唯一数字,或行的顺序,这种索引类型将是最佳选择。但是,这些数字有一个不确定的间隙。这意味着这种索引类型不太可能被用作结合两个以上 DataFrames 或 Series 的操作的索引。

>>> ks.set_option('compute.default_index_type', 'distributed')
>>> spark.range(5).to_koalas()
                id
17179869184   0
34359738368   1
60129542144   2
77309411328   3
94489280512   4

比较 正如你所看到的,每种索引类型都有其独特的特征,如下表所示。考虑到你的工作负载,应该谨慎选择默认的索引类型。

分布式计算 Map 端操作 连续递增 性能
sequence No, 在单个 worker 节点中 No, 需要 shuffle Yes
distributed-sequence Yes Yes, 但需要另一个 Spark job Yes, 在大多数情况下
distributed Yes Yes No

参见 Koalas 文档中的默认索引类型

使用 Spark I/O #

在 pandas 中,有很多函数可以读写数据,在 Koalas 中也是如此。

下面是 pandas 中的函数列表,Koalas 在下面使用了 Spark I/O。

API 和它们的参数沿用了 pandas 对应的 API。不过,目前在行为上有细微的差别。例如,pandas 的 read_csv 可以通过 http 协议读取文件,但 Koalas 仍然不支持,因为底层的 Spark 引擎本身并不支持。

这些 Koalas 函数还有 index_col 参数,用来指定哪些列应该被用作索引,或者索引列名应该是什么,类似于上面介绍的 to_koalas()或 to_spark()函数。如果你不指定,就会附加默认的索引,或者索引列丢失。

例如,如果你不指定 index_col 参数,默认索引就会被附加,如下图所示–为了简单起见,使用了分布式默认索引。

>>> kdf.to_csv('/path/to/test.csv')
>>> kdf_read_csv = ks.read_csv('/path/to/test.csv')
>>> kdf_read_csv
                x     y  z
0            2  20.0  b
8589934592   3  30.0  c
17179869184  1  10.0  a

而如果指定 index_col 参数,指定的列就会变成一个索引。

>>> kdf.to_csv('/path/to/test.csv', index_col='index')
>>> kdf_read_csv_with_index_col = ks.read_csv("/path/to/test.csv", index_col='index')
>>> kdf_read_csv_with_index_col
        x     y  z
index
2      3  30.0  c
1      2  20.0  b
0      1  10.0  a

此外,每个函数都需要关键字参数来设置 Spark 中 DataFrameWriter 和 DataFrameReader 的选项。给定的键直接传递给它们的选项并配置行为。当 pandas-origin 参数不足以操作你的数据,但 PySpark 支持缺失的功能时,这很有用。

>>> # nullValue is the option specific to Spark’s CSV I/O.
>>> ks.read_csv('/path/to/test.csv', index_col='index', nullValue='b')
        x     y     z
index
2      3  30.0     c
1      2  20.0  None
0      1  10.0     a

Koalas 特定的 I/O 功能 #

除了以上来自 pandas 的功能外,Koalas 还有自己的功能。

首先,DataFrame.to_table 和 ks.read_table 是只需指定表名就可以写入和读取 Spark 表。这分别类似于 Spark 中的 DataFrameWriter.saveAsTable 和 DataFrameReader.table。

其次,DataFrame.to_spark_io 和 ks.read_spark_io 是用于一般的 Spark I/O。为了方便使用,有几个可选的参数,其他都是关键字参数。你可以自由设置 Spark 中 DataFrameWriter.save 和 DataFrameReader.load 使用的选项。

>>> # 'compression' is a Spark specific option.
>>> kdf.to_spark_io('/path/to/test.orc', format='orc', index_col='index', compression="snappy")
>>> kdf_read_spark_io = ks.read_spark_io('/path/to/test.orc', format='orc', index_col='index')
>>> kdf_read_spark_io
        x     y  z
index
1      2  20.0  b
0      1  10.0  a
2      3  30.0  c

上例中的 ORC 格式在 pandas 中是不支持的,但 Koalas 可以写和读,因为底层的 Spark I/O 支持它。

最后,如果你安装了 Delta Lake,Koalas 也可以写和读 Delta 表。

Delta Lake 是一个开源的存储层,为数据湖带来了可靠性。Delta Lake 提供了 ACID 事务、可扩展的元数据处理,并统一了流式和批处理数据。

与其他文件源不同的是,read_delta 函数可以让用户指定表的版本来进行时间旅行。

>>> kdf.to_delta('/path/to/test.delta', index_col='index')
>>> kdf_read_delta = ks.read_delta('/path/to/test.delta', index_col='index')
>>> kdf_read_delta
        x     y  z
index
0      1  10.0  a
1      2  20.0  b
2      3  30.0  c

>>> # Update the data and overwrite the Delta table
>>> kdf['x'] = kdf['x'] + 10
>>> kdf['y'] = kdf['y'] * 10
>>> kdf['x'] = kdf['x'] * 2
>>> kdf.to_delta('/path/to/test.delta', index_col='index')

>>> # Read the latest data
>>> ks.read_delta('/path/to/test.delta', index_col='index')
        x      y  z
index
0      22  100.0  a
1      24  200.0  b
2      26  300.0  c

>>> # Read the data of version 0
>>> ks.read_delta('/path/to/test.delta', version=0, index_col='index')
        x     y  z
index
0      1  10.0  a
1      2  20.0  b
2      3  30.0  c

详情请看 Delta Lake

Spark accessor #

Koalas 为用户提供了 spark 接入器,让用户更容易地利用现有的 PySpark API。

Series.spark.transform 和 Series.spark.apply #

Series.spark accessor 有变换和应用函数来处理底层的 Spark Column 对象。

例如,假设你有以下 Koalas DataFrame。

>>> kdf = ks.DataFrame({'a': [1, 2, 3, 4]]})
>>> kdf
    a
0  1
1  2
2  3
3  4

你可以使用 astype 函数来铸造类型,但如果你还不习惯,你可以使用 Series.spark.transform 函数来铸造 Spark 列。

>>> import numpy as np
>>> from pyspark.sql.types import DoubleType
>>> 
>>> kdf['a_astype_double'] = kdf.a.astype(np.float64)
>>> kdf['a_cast_double'] = kdf.a.spark.transform(lambda scol: scol.cast(DoubleType()))
>>> kdf[['a', 'a_astype_double', 'a_cast_double']]
    a  a_astype_double  a_cast_double
0  1              1.0            1.0
1  2              2.0            2.0
2  3              3.0            3.0
3  4              4.0            4.0

传递给 Series.spark.transform 函数的用户函数取用 Spark 的 Column 对象,可以使用 PySpark 函数对其进行操作。

也可以在 transform/apply 函数中使用 pyspark.sql.function 的函数。

>>> from pyspark.sql import functions as F
>>> 
>>> kdf['a_sqrt'] = kdf.a.spark.transform(lambda scol: F.sqrt(scol))
>>> kdf['a_log'] = kdf.a.spark.transform(lambda scol: F.log(scol))
>>> kdf[['a', 'a_sqrt', 'a_log']]
    a    a_sqrt     a_log
0  1  1.000000  0.000000
1  2  1.414214  0.693147
2  3  1.732051  1.098612
3  4  2.000000  1.386294

Series.spark.transform 的用户函数应该返回与其输入相同长度的 Spark 列,而 Series.spark.apply 的用户函数可以返回不同长度的 Spark 列,比如调用聚合函数。

>>> kdf.a.spark.apply(lambda scol: F.collect_list(scol))
0    [1, 2, 3, 4]
Name: a, dtype: object

DataFrame.spark.apply #

同样,DataFrame.spark accessor 也有一个 apply 函数。用户函数接受并返回一个 Spark DataFrame,并可以应用任何转换。如果你想在 Spark DataFrame 中保留索引列,你可以设置 index_col 参数。在这种情况下,用户函数必须在返回的 Spark DataFrame 中包含一个同名的列。

>>> kdf.spark.apply(lambda sdf: sdf.selectExpr("index * 10 as index", "a + 1 as a"), index_col="index")
    a
index
0      2
10     3
20     4
30     5

如果你省略 index_col,它将使用默认的索引。

>>> kdf.spark.apply(lambda sdf: sdf.selectExpr("a + 1 as a"))
    a
17179869184  2
42949672960  3
68719476736  4
94489280512  5

Spark schema #

你可以通过 DataFrame.spark.schema 和 DataFrame.spark.print_schema 查看当前的底层 Spark 模式。如果你想知道包括索引列在内的模式,它们都需要 index_col 参数。

>>> import numpy as np
>>> import pandas as pd
>>> 
>>> kdf = ks.DataFrame({'a': list('abc'),
...                     'b': list(range(1, 4)),
...                     'c': np.arange(3, 6).astype('i1'),
...                     'd': np.arange(4.0, 7.0, dtype='float64'),
...                     'e': [True, False, True],
...                     'f': pd.date_range('20130101', periods=3)},
...                    columns=['a', 'b', 'c', 'd', 'e', 'f'])

>>> # Print the schema out in Spark’s DDL formatted string
>>> kdf.spark.schema().simpleString()
'struct<a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'
>>> kdf.spark.schema(index_col='index').simpleString()
'struct<index:bigint,a:string,b:bigint,c:tinyint,d:double,e:boolean,f:timestamp>'

>>> # Print out the schema as same as Spark’s DataFrame.printSchema()
>>> kdf.spark.print_schema()
root
    |-- a: string (nullable = false)
    |-- b: long (nullable = false)
    |-- c: byte (nullable = false)
    |-- d: double (nullable = false)
    |-- e: boolean (nullable = false)
    |-- f: timestamp (nullable = false)

>>> kdf.spark.print_schema(index_col='index')
root
    |-- index: long (nullable = false)
    |-- a: string (nullable = false)
    |-- b: long (nullable = false)
    |-- c: byte (nullable = false)
    |-- d: double (nullable = false)
    |-- e: boolean (nullable = false)
    |-- f: timestamp (nullable = false)

解释 Spark 计划 #

如果你想知道当前的 Spark 计划,你可以使用 DataFrame.spark.explain()。

>>> # Same as Spark’s DataFrame.explain()
>>> kdf.spark.explain()
== Physical Plan ==
Scan ExistingRDD[...]

>>> kdf.spark.explain(True)
== Parsed Logical Plan ==
...

== Analyzed Logical Plan ==
...

== Optimized Logical Plan ==
...

== Physical Plan ==
Scan ExistingRDD[...]

>>> # New style of mode introduced from Spark 3.0.
>>> kdf.spark.explain(mode="extended")
== Parsed Logical Plan ==
...

== Analyzed Logical Plan ==
...

== Optimized Logical Plan ==
...

== Physical Plan ==
Scan ExistingRDD[...]

缓存 #

spark 访问器还提供了缓存相关的函数,cache、persist、unpersist 和 store_level 属性。你可以使用 cache 函数作为上下文管理器来解除缓存的 persist。让我们看一个例子。

>>> from pyspark import StorageLevel
>>> 
>>> with kdf.spark.cache() as cached:
...   print(cached.spark.storage_level)
...
Disk Memory Deserialized 1x Replicated

>>> with kdf.spark.persist(StorageLevel.MEMORY_ONLY) as cached:
...   print(cached.spark.storage_level)
...
Memory Serialized 1x Replicated

当上下文完成后,缓存会自动清除。如果你想保留它的缓存,你可以按照下面的方法来做。

>>> cached = kdf.spark.cache()
>>> print(cached.spark.storage_level)
Disk Memory Deserialized 1x Replicated

当不再需要它时,你必须显式调用 DataFrame.spark.unpersist()来从缓存中删除它。

>>> cached.spark.unpersist()

提示 #

在 Koalas 中,有一些类似于 join 的操作,比如合并、加入和更新。虽然实际的 join 方法取决于底层的 Spark 计划器,但你仍然可以用 ks.broadcast()函数或 DataFrame.spark.hint()方法指定一个提示。

>>> kdf1 = ks.DataFrame({'key': ['foo', 'bar', 'baz', 'foo'],
...                      'value': [1, 2, 3, 5]},
...                     columns=['key', 'value'])
>>> kdf2 = ks.DataFrame({'key': ['foo', 'bar', 'baz', 'foo'],
...                      'value': [5, 6, 7, 8]},
...                     columns=['key', 'value'])
>>> kdf1.merge(kdf2, on='key').explain()
== Physical Plan ==
...
... SortMergeJoin ...
...

>>> kdf1.merge(ks.broadcast(kdf2), on='key').explain()
== Physical Plan ==
...
... BroadcastHashJoin ...
...

>>> kdf1.merge(kdf2.spark.hint('broadcast'), on='key').explain()
== Physical Plan ==
...
... BroadcastHashJoin ...
...

特别是,如果底层 Spark 是 3.0 或以上版本,DataFrame.spark.hint()更有用,因为 Spark 3.0 中提供了更多的提示。

结束语 #

Koalas DataFrame 与 PySpark DataFrame 相似,因为 Koalas 内部使用 PySpark DataFrame。在外部,Koalas DataFrame 的工作方式就像 pandas DataFrame 一样。

为了填补这个空白,Koalas 有许多功能,对于熟悉 PySpark 的用户来说,可以轻松地使用 Koalas 和 PySpark DataFrame。虽然在转换过程中需要额外的注意处理索引,但 Koalas 为 PySpark 用户提供了两种 DataFrame 之间的简单转换,为 PySpark 提供了读/写的输入/输出 API,并提供了 spark 访问器以暴露 PySpark 友好的功能,如缓存和内部探索 DataFrame。此外,spark 访问器还提供了一种自然的方式来玩弄 Koalas 系列和 PySpark 列。

PySpark 用户可以从 Koalas 中获益,如上图所示。请在 Databricks Runtime 中试用这些示例并了解更多信息。

阅读更多 #

要了解更多关于 Koalas 的信息,请看以下资源。

原文链接: https://databricks.com/blog/2020/08/11/interoperability-between-koalas-and-apache-spark.html