Introducing Pandas UDF for PySpark

Introducing Pandas UDF for PySpark

更新:此博客于 2018 年 2 月 22 日更新,以包含一些更改。

这篇博文在即将发布的 Apache Spark 2.3 版本中引入了 Pandas UDFs(即 Vectorized UDFs) 特性,这大大提高了 Python 中用户定义函数(UDF)的性能和可用性。

在过去的几年中,Python 已经成为数据科学家的默认语言。像 pandasnumpystatsmodelscikit-learn 这样的软件包已经获得了广泛的采用并成为主流工具包。同时,Apache Spark 已成为处理大数据的事实标准。为了使数据科学家能够利用大数据的价值,Spark 在 0.7 版中添加了 Python API,并支持user-defined functions。这些用户定义的函数一次只能操作一行,因此会遭遇高序列化和调用开销。因此,许多数据管道在 Java 和 Scala 中定义 UDF,然后从 Python 中调用它们。

基于 Apache Arrow 构建的 Pandas UDF 为您提供了两全其美的功能 - 完全用 Python 定义低开销,高性能 UDF的能力。

在 Spark 2.3 中,将会有两种类型的 Pandas UDF: 标量(scalar)和分组映射(grouped map)。接下来,我们使用四个示例程序来说明它们的用法:Plus One,累积概率,减去平均值,普通最小二乘线性回归。

Scalar Pandas UDFs

Scalar Pandas UDFs 用于向量化标量运算。要定义一个标量 Pandas UDF,只需使用 @pandas_udf 来注释一个 Python 函数,该函数接受 pandas.Series 作为参数并返回另一个相同大小的 pandas.Series。下面我们用两个例子来说明:Plus One 和 Cumulative Probability。

Plus One

计算 v + 1 是演示 row-at-a-time UDFs 和 scalar Pandas UDFs 之间差异的简单示例。请注意,在这种情况下内置的列运算符可能执行得更快。

使用一次一行的 UDF:

1
2
3
4
5
6
7
8
9
from pyspark.sql.functions import udf

# 使用 udf 定义一个 row-at-a-time 的 udf
@udf('double')
# 输入/输出都是单个 double 类型的值
def plus_one(v):
return v + 1

df.withColumn('v2', plus_one(df.v))

使用 Pandas UDFs:

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import pandas_udf, PandasUDFType

# 使用 pandas_udf 定义一个 Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# 输入/输出都是 double 类型的 pandas.Series

def pandas_plus_one(v):
return v + 1

df.withColumn('v2', pandas_plus_one(df.v))

上面的例子定义了一次一行的 UDF “plus_one” 和一个执行相同的“加一”计算的 scala Pandas UDF “pandas_plus_one”。除了函数装饰器之外,UDF 的定义是相同的:“udf” vs “pandas_udf”。

在一次一行的版本中,用户定义的函数接收一个 double 类型的参数 “v” 并将 “v + 1” 的结果作为 double 来返回。在 Pandas 版本中,用户定义函数接收 pandas.Series 类型的参数 “v”,并将 “v + 1” 的结果作为pandas.Series 返回。因为 “v + 1” 是在 pandas.Series 上进行矢量化的,所以 Pandas 版本比 row-at-a-time 的版本快得多。

请注意,使用 scala pandas UDF 时有两个重要要求:

  • 输入和输出序列必须具有相同的大小。
  • 如何将一列分割为多个 pandas.Series 是Spark的内部的事,因此用户定义函数的结果必须独立于分割。

累积概率

这个例子展示了 scalar Pandas UDF 更实际的用法:使用 scipy 包计算正态分布 N(0,1) 中值的累积概率

1
2
3
4
5
6
7
8
9
import pandas as pd
from scipy import stats

@pandas_udf('double')
def cdf(v):
return pd.Series(stats.norm.cdf(v))


df.withColumn('cumulative_probability', cdf(df.v))

stats.norm.cdf 在标量值和 pandas.Series 上都是可用的,并且此示例也可以使用一次一行的 UDF 编写。与前面的例子类似,Pandas 版本运行速度更快,如后面的“性能比较”部分所示。

Grouped Map Pandas UDFs

Python 用户对数据分析中的 split-apply-combine 模式非常熟悉。Grouped Map Pandas UDF 是针对这种情况设计的,它们针对某些组的所有数据进行操作,例如“针对每个日期应用此操作”。

Grouped Map Pandas UDF 首先根据 groupby 运算符中指定的条件将 Spark DataFrame 分组,然后将用户定义的函数(pandas.DataFrame -> pandas.DataFrame)应用于每个组,并将结果组合并作为新的 Spark DataFrame 返回。

Grouped map Pandas UDF 使用与 scalar Pandas UDF 使用相同的函数装饰器 pandas_udf,但它们有一些区别:

  • 用户定义函数的输入:

    • Scalar: pandas.Series
    • Grouped map: pandas.DataFrame
  • 用户定义函数的输出:

    • Scalar: pandas.Series
    • Grouped map: pandas.DataFrame
  • 分组语义:

    • Scalar: 无分组语义
    • Grouped map: 由 “groupby” 从句定义
  • 输出大小:

    • Scalar: 和输入大小一样
    • Grouped map: 任何尺寸
  • 函数装饰器中的返回类型:

    • Scalar: 一个 DataType,用于指定返回的 pandas.Series 的类型
    • Grouped map: 一个 StructType,用于指定返回的 pandas.DataFrame 中每列的名称和类型

接下来,让我们通过两个示例来说明 grouped map Pandas UDF 的使用场景。

Subtract Mean

此示例显示了简单使用 grouped map Pandas UDFs:从组中的每个值中减去平均值。

1
2
3
4
5
6
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
return pdf.assign(v=pdf.v - pdf.v.mean())

df.groupby('id').apply(subtract_mean)

在这个例子中,我们从每个组的 v 值中减去 v 的均值。分组语义由 “groupby” 函数定义,即每个输入到用户定义函数的 pandas.DataFrame 具有相同的 “id” 值。这个用户定义函数的输入和输出模式是相同的,所以我们将“df.schema” 传递给装饰器 pandas_udf 来指定模式。

Grouped map Pandas UDF 也可以作为驱动程序上的独立 Python 函数调用。这对于调试非常有用,例如:

1
2
3
4
5
6
sample = df.filter(id == 1).toPandas()
# Run as a standalone function on a pandas.DataFrame and verify result
subtract_mean.func(sample)

# Now run with Spark
df.groupby('id').apply(substract_mean)

在上面的示例中,我们首先将 Spark DataFrame 的一个小子集转换为 pandas.DataFrame,然后将 subtract_mean 作为独立的 Python 函数运行。验证函数逻辑后,我们可以在整个数据集上使用 Spark 调用 UDF。

普通最小二乘线性回归

最后一个示例显示了如何使用 statsmodels 为每个组运行 OLS 线性回归。对于每个组,我们根据统计模型 Y = bX + c 计算对于 X = (x1,x2) 的 beta b = (b1,b2)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
group_key = pdf[group_column].iloc[0]
y = pdf[y_column]
X = pdf[x_columns]
X = sm.add_constant(X)
model = sm.OLS(y, X).fit()

return pd.DataFrame([[group_key] + [model.params[i] for i in x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)

此示例演示了 grouped map Pandas UDF 可以与任何任意的 python 函数一起使用:pandas.DataFrame -> pandas.DataFrame。返回的 pandas.DataFrame 可以具有与输入不同的行数和列数。

性能比较

最后,我们想要显示 row-at-a-time UDF 和 Pandas UDF 之间的性能比较。我们为以上三个示例运行微基准测试(plus one, cumulative probability 和 subtract mean)。

配置和方法

我们在 Databricks 社区版的单节点 Spark 群集上运行了基准测试。

配置细节:
数据:带有 Int 列和 Double 列的 10M 行 DataFrame
集群:6.0 GB 内存,0.88 核心,1 个 DBU
Databricks 运行时版本:Latest RC(4.0,Scala 2.11)

有关基准的详细实现,请查看 Pandas UDF Notebook

img

如图表所示,Pandas UDF 的表现比 row-at-a-time UDF 好得多,范围从 3倍100倍 不等。

结论和未来工作

即将推出的 Spark 2.3 版本为基本改进Python中用户定义函数的功能和性能奠定了基础。今后,我们计划在聚合和窗口函数中引入对 Pandas UDF 的支持。相关工作可以在 SPARK-22216 中进行跟踪。

Pandas UDFs 是 Spark 社区努力的一个很好的例子。我们要感谢 Bryan Cutler, Hyukjin Kwon, Jeff Reback, Liang-Chi Hsieh, Leif Walsh, Li Jin, Reynold Xin, Takuya Ueshin, Wenchen Fan, Wes McKinney, Xiao Li 以及其他人的贡献。最后,特别感谢 Apache Arrow 社区让这项工作成为可能。

下一步是什么

您可以尝试 Pandas UDF notebook ,并且此功能现在作为 Databricks Runtime 4.0 测试版的一部分提供.