Pyspark 笔记

反向代理的配置

在服务器中做如下配置:

1
2
3
4
5
6
7
server {                                                       
listen 80;
server_name test.aldwx.com;
location /app.launch.php {
proxy_pass http://127.0.0.1:3000;
}
}

然后在服务器中的终端中输入

1
plackup -E deployment -s Starman --workers=1 -p 3000 -a app.pl

或者:

1
nohup plackup -E deployment -s Starman --workers=10 -p 3000 -a app.pl &

app.pl 是用 dancer 写的一个 demo 程序, 其中的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/perl
use Digest::MD5 qw(md5_hex);
use Dancer;
use JSON qw(encode_json);

# return the json format data
get "/app.launch.info.php" => sub {

my $data = `date`;
my $token = md5_hex($data);
my $r = {
'error' => '0',
'data' => {
'access_token' => $token
},
};
return encode_json($r);
};


get '/hello' => sub {
return "Hello 小程序";
};

get '/l.html' => sub {
return "Hello, World";
};

dance;

然后在浏览其中输入:

1
test.aldwx.com/app.launch.info.php

你会看到浏览器返给你返回一段 json 数据。

查看端口号被哪个进程占用

1
2
netstat -tulnp | grep ':80 '
tcp 0 0 0.0.0.0:80 0.0.0.0:* LISTEN 4021/nginx.conf

nginx: [error] open() “/alidata/server/nginx/logs/nginx.pid” failed (2: No such file or directory) 错误

在使用的阿里云服务器上,进程性的 nginx -s stop后再次启动nginx -s reload ,总是会报错误nginx: [error] open() “/alidata/server/nginx/logs/nginx.pid” failed (2: No such file or directory),这应该是因为把nginx进程杀死后pid丢失了,下一次再开启nginx -s reload时无法启动

1
nginx -c /path/to/config/nginx.conf

查看被占用的端口号

1
2
lsof -i | grep ":7077"
java 8927 root 230u IPv6 100990 0t0 TCP 106.2.1.77:7077 (LISTEN)

php 环境搭建

1
2
sudo apt-get update
sudo apt-get install lib apache2-mod-php5 ...

.conf 文件的配置

.conf 文件一版放在 sites-available 目录中, 用的时候再软链接到 sites-enabled 中。

免密码登录服务器

在服务器上的 ./ssh 目录中添加 ssh key, 比如我更换了新电脑:

1
vim ~/.ssh/authorized_keys

删除掉旧的 ssh keys, 加入新生成的:

1
ssh-rsa AAACCQEEEEddfdfdffrrt3334QWAUUY1223xxxXXXXXXX wodefan@Mac-mini.local

从本地上传/下载文件/文件夹到服务器

上传和下载都是在本地打开命令行终端。

  • 上传
1
2
scp ./app.py root@108.2.5.98:/daly/apk/myapp/ # 从本地上传单个文件到服务器
scp -r myjobs root@108.2.5.98:/daly/apk/ # 从本地上传整个文件夹到服务器
  • 下载
1
2
scp root@108.2.5.98:/daly/apk/myapp/today.txt . # 从服务器下载指定文件到本地目录
scp -r root@108.2.5.98:/daly/apk/apk/ . # 从服务器下载整个文件夹到本地

在 pyspark 中连接数据库

  • 启动 pyspark 时附加上相应的 mysql jar 包:
1
/data/app/spark/bin/pyspark --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar
  • 使用用户名和密码连接 mysql, 并创建一个数据框:
1
2
df = spark.read.format("jdbc").option("url", "jdbc:mysql://8.8.8.8/ald_xinen").option("dbtable", "(select * from ald_session_logs) as df").option('user', "root").option('password', "fish@sky").load()
df.take(2) # 取出前两行

如果已经启动了一个连接 mysql 数据库的 pyspark, 再重新启动一个时会报错, 这个时候就要把之前的启动的杀掉:

1
ps aux | grep 'spark'

然后找到 pyspark-shell 的 进程 id, kill -9 xxxx

  • toDF(*cols) 返回一新的数据框, 指定新的列名。
1
df.select("app_key", "uuid").toDF("f1", 'f2').take(2)

输出:

1
[Row(f1=u'2323423dsfds', f2=u'14797409227806341000'), Row(f1=u'2323423dsfds', f2=u'14797409227806341000')]
  • toJSON() 将数据框转换为字符串 RDD。每一行都被转换为 JSON 文档, 作为所返回的 RDD 中的一个元素。
1
df.select("app_key", "uuid").toJSON().first()

输出:

1
u'{"app_key":"2323423dsfds","uuid":"14797409227806341000"}'
  • toLocalIterator()

返回一个包含该数据框所有行的迭代器。这个迭代器会在该数据框的最大分区中消耗尽可能多的内存。

1
list(df.toLocalIterator())
  • withColumn(colName, col)

通过为原数据框添加一个新列替换已存在的同名列而返回一个新数据框。colName 是一个字符串, 为新列的名字。
col 为这个新列的 Column 表达式。withColumn 的第一个参数必须是已存在的列的名字, withColumn 的第二个参数必须是含有列的表达式。如果不是它会报错 AssertionError: col should be Column

1
df.withColumn('page_count', df.page_count+100).select("app_key","page_count").take(2)

输出:

1
[Row(app_key=u'2323423dsfds', page_count=110), Row(app_key=u'2323423dsfds', page_count=104)]

col 表达式可以使用多个其他列:

1
df.withColumn('avg', df.page_count/df.duration).select("app_key","avg").take(2)

输出:

1
[Row(app_key=u'2323423dsfds', avg=0.00012387736141220192), Row(app_key=u'2323423dsfds', avg=0.16666666666666666)]

withColumn 可以添加一个常数列, 但是要使用 pyspark.sql.functions 中的函数, 例如: unix_timestamp.

可以参考

  • withColumnRenamed(existing, new)

重命名已存在的列并返回一个新数据框。existing 为已存在的要重命名的列, col 为新列的名字。

duration 列重命名为 time_take 列:

1
df.withColumnRenamed('duration', 'time_take').select('app_key', 'time_take').take(2)

输出:

1
[Row(app_key=u'2323423dsfds', time_take=80725), Row(app_key=u'2323423dsfds', time_take=24)]

groupBy 之后可以使用的方法

  • agg(*exprs)

聚合计算并将结果返回为 DataFrame

可用的聚合函数有 avg, max, min, sum, ‘count’.

如果 expr 是从字符串到字符串的单个 dict 映射, 那么其键就是要执行聚合的列, 其值就是该聚合函数。

可选地, expr 还可以是一组聚合 表达式。

参数: exprs - 一个从列名(字符串)到聚合函数(字符串)的字典映射, 或者是一个 Column 列表。

1
2
gdf = df.groupBy(df.app_key)
gdf.agg({"*" : 'count' } ).collect()

输出:

1
[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', count(1)=1), Row(app_key=u'AAAAAAAAAAAAAA', count(1)=1), Row(app_key=u'6c4396a7eec8f081e890ef0adbf5090d', count(1)=4), Row(app_key=u'7c8ccee05d025bb85f15f9d45c83aba8', count(1)=12), Row(app_key=u'6428c8aa20672b64386fc1d2ce60ef3f', count(1)=3), Row(app_key=u'2323423dsfds', count(1)=66), Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', count(1)=490), Row(app_key=u'f05883cf4dd8c6016ac55d9380f568d2', count(1)=17), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', count(1)=5)]

对 app_key 排序下:

1
sorted(gdf.agg({"*": "count"}).collect())

输出:

1
[Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', count(1)=490), Row(app_key=u'2323423dsfds', count(1)=66), Row(app_key=u'6428c8aa20672b64386fc1d2ce60ef3f', count(1)=3), Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', count(1)=1), Row(app_key=u'6c4396a7eec8f081e890ef0adbf5090d', count(1)=4), Row(app_key=u'7c8ccee05d025bb85f15f9d45c83aba8', count(1)=12), Row(app_key=u'AAAAAAAAAAAAAA', count(1)=1), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', count(1)=5), Row(app_key=u'f05883cf4dd8c6016ac55d9380f568d2', count(1)=17)]
1
2
from pyspark.sql import functions as F
sorted(gdf.agg(F.max(df.duration)).collect())

输出:

1
[Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', max(duration)=28498), Row(app_key=u'2323423dsfds', max(duration)=80725), Row(app_key=u'6428c8aa20672b64386fc1d2ce60ef3f', max(duration)=352), Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', max(duration)=150), Row(app_key=u'6c4396a7eec8f081e890ef0adbf5090d', max(duration)=33), Row(app_key=u'7c8ccee05d025bb85f15f9d45c83aba8', max(duration)=168), Row(app_key=u'AAAAAAAAAAAAAA', max(duration)=23), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', max(duration)=87), Row(app_key=u'f05883cf4dd8c6016ac55d9380f568d2', max(duration)=7789)]
  • avg(*cols)

为每一组的每个数值列计算平均值。

meanavg 的别名。

参数: cols - 一组列的名字(字符串)。非数值列被忽略。

1
df.groupBy('app_key').avg('duration').collect()

输出:

1
[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', avg(duration)=150.0),Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', avg(duration)=153.81428571428572) ...]
1
df.groupBy('app_key').avg('duration', 'page_count').collect()

输出:

1
[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', avg(duration)=150.0, avg(page_count)=12.0), avg(page_count)=5.681818181818182), Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', avg(duration)=153.81428571428572, avg(page_count)=8.059183673469388) ...]
  • count()

计算每组的记录数。

1
sorted(df.groupBy(df.app_key).count().collect())

输出:

1
[Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', count=490), Row(app_key=u'2323423dsfds', count=66) ...]
  • max(*cols)

计算每组每个数值列的最大值。

1
df.groupBy('app_key').max('duration').collect()

输出:

1
[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', max(duration)=150) ...]
1
df.groupBy('app_key').max('duration', 'page_count').collect()

输出:

1
[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', max(duration)=150, max(page_count)=12)...]
  • mean() 求平均数, 同 avg
  • min() 参考 max

  • pivot(pivot_col, values=None)

透视当前[[DataFrame]]的列并执行指定的聚合。有两个版本的pivot函数:一个需要调用者指定不同值的列表以便进行透视,而另一个不指定。后者更简洁但效率较低,因为Spark需要首先在内部计算不同值的列表。

参数: pivot_col - 要透视的列名; values - 将转换为输出 DataFrame 中的列的值列表。

计算每个 app 下,每个渠道的总停留时长, 每个渠道的结果占一列:

1
df.groupBy('app_key').pivot("source", ["1", "2", "3", "4" ]).sum("duration").collect()

输出:

1
[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', 1=None, 2=None, 3=None, 4=150), Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', 1=7208, 2=54929, 3=7078, 4=6139), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', 1=None, 2=94, 3=29, 4=87) ...]

官方的例子:

1
2
3
# Compute the sum of earnings for each year by course with each course as a separate column
df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect()
[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]

或者不指定列的值(不够有效率)

1
2
df4.groupBy("year").pivot("course").sum("earnings").collect()
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
  • sum(*cols)

为每组计算每个数值列的总和。

参数: cols - 一组列的名字(字符串)。非数值列被忽略。

1
2
3
4
5
# 官方例子:
df.groupBy().sum('age').collect()
# [Row(sum(age)=7)]
df3.groupBy().sum('age', 'height').collect()
# [Row(sum(age)=7, sum(height)=165)]
  • class pyspark.sql.Column(jc)

数据框中的一列。 Column 实例可以通过如下的代码创建:

1
2
3
4
5
6
7
#1. 选择数据框中的一列
df.colName
df["colName"]

#2. 从表达式创建
df.colName + 1
1 / df.colName
  • alias(*alias)

返回这个列的新的别名或别名们(在表达式返回多列的情况下, 例如爆炸)。

1
df.select(df.duration.alias("time_take")).collect()
  • asc()

返回一个排序过的表达式, 基于给定列的名字的升序进行排列。

  • astype() 是 cast() 的别名。

  • between(lowerBound, upperBound)

如果这个表达式的值在给定的列的值之间,则计算结果为布尔真。

1
df.select(df.app_key, df.duration.between(60,80)).show(50)

show()函数输出前 20 行:

1
2
3
4
5
6
7
8
9
+--------------------+---------------------------------------+
| app_key|((duration >= 60) AND (duration <= 80))|
+--------------------+---------------------------------------+
| 2323423dsfds| false|
| 2323423dsfds| false|
| 2323423dsfds| false|
| 2323423dsfds| false|
| 2323423dsfds| true|
...
  • cast(dataType)

将列转换为 dataType 类型。

1
2
3
4
5
# 官方例子:
df.select(df.age.cast("string").alias('ages')).collect()
# [Row(ages=u'2'), Row(ages=u'5')]
df.select(df.age.cast(StringType()).alias('ages')).collect()
# [Row(ages=u'2'), Row(ages=u'5')]
1
2
# 将字符串转为 int 型
df.select(df.source.cast("int").alias('sources')).take(20)

输出:

1
[Row(sources=1), Row(sources=2),...]
  • desc()

根据给定列的名字的倒序返回排序后的表达式。

  • getField(name)

在StructField中按名称获取字段的表达式。

1
2
3
from pyspark.sql import Row
df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df.select(df.r.getField("b")).show()
1
2
3
4
5
+---+
|r.b|
+---+
| b|
+---+
1
df.select(df.r.a).show()
1
2
3
4
5
+---+
|r.a|
+---+
| 1|
+---+
  • isNotNull()

如果当前的表达式不为 null 则为真。

  • isNull()

如果当前的表达式为 null 则为真。

  • isin(*cols)

如果这个表达式的值被包含在参数的计算值之中,则该表达式为真。

1
df[df.network_type.isin("Wifi", "4g")].select('app_key', 'network_type').take(2)

输出:

1
[Row(app_key=u'2323423dsfds', network_type=u'wifi'), Row(app_key=u'2323423dsfds', network_type=u'4g')]
  • name() 是 alias() 的别名

  • otherwise(value)

计算一组条件,并返回多个可能的结果表达式之一。如果未调用 Column.otherwise(),则为不匹配的条件返回 None。

有关使用示例,请参阅 pyspark.sql.functions.when()。

参数:value - 字面值或 Column 表达式。

1
2
from pyspark.sql import functions as F
df.select(df.duration, F.when(df.page_count > 10, 1).otherwise(0)).show()

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+--------+---------------------------------------------+
|duration|CASE WHEN (page_count > 10) THEN 1 ELSE 0 END|
+--------+---------------------------------------------+
| 80725| 0|
| 24| 0|
| 21| 0|
| 21| 1|
| 66| 1|
| 17| 0|
| 31| 0|
| 1| 0|
| 80| 1|
| 3| 0|
| 11| 1|
| 6| 0|
| 23| 1|
| 4| 0|
| 2| 0|
| 23| 0|
| 60| 1|
| 16| 0|
| 4| 0|
| 4| 0|
+--------+---------------------------------------------+
  • from_unixtime

    将 unix 格式的时间戳转换为指定格式的日期

    1
    2
    3
    4
    from pyspark.sql.functions import from_unixtime

    df = spark.createDataFrame([('1486461323',)], ['start_time'])
    df.withColumn('day', from_unixtime( df.start_time , 'yyyy-MM-dd')).show()

    上面的代码中的createDataFrame 方法创建了一个只有一列的数据框, withColumn 方法为该数据框添加了一个新的名为 day 的列, from_unixtime 方法将该数据框中原来的 start_time 列转换为指定格式的日期。show() 方法打印输出。

  • pyspark.sql.functions.col(col)

    col 方法接收一个字符串列名作为参数, 根据指定的列名返回一个 Column. 作用和 df.columnName 相同。

  • pyspark.sql.functions.when(condition, value)

    计算一组条件并返回多个可能结果的表达式的其中之一。如果 Column.otherwise 没有被调用, 那么对未匹配的条件会返回 None

    参数:

    • condition - 一个布尔 Column 表达式。
    • value - 一个字面值, 或者一个 Column 表达式。
    1
    2
    df.select( when(df['age']==2, 3).otherwise(4).alias("age") ).collect()
    # [Row(age=3), Row(age=4)]
    1
    2
    df.select( when(df.age==2, df.age + 1).alias("age") ).collect()
    # [Row(age=3), Row(age=None)]-pyspark.sql.Column.when
  • pyspark.sql.Column.when(condition, value)

    计算一组条件并返回多个可能结果的表达式的其中之一。如果 Column.otherwise 没有被调用, 那么对未匹配的条件会返回 None。当 when 中有多个条件时,要用 &| 连接起来。Pyspark: multiple conditions in when clause

1
2
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()

从 df 数据框中选择出 nameage 这两列。如果 age>4 则把 age 置为 1; 如果 age<3 就把 age 置为 -1, 否则置为 0。

1
2
3
4
5
6
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+------------------------------------------------------------+
  • left_outer join

    左连接之后如果有重复的列, 则使用 drop 删除不了重复的列。需要用 select。 

  • udf 方法

    定义一个 udf 方法, 用来返回今天的日期(yyyy-MM-dd):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import datetime

# 定义一个 udf 函数
def today(day):
if day==None:
return datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d')
else:
return day

# 返回类型为字符串类型
udfday = udf(today, StringType())
# 使用
df.withColumn('day', udfday(df.day))
  • drop

    有时候使用 drop 来删除某一列时, 会出现错误, 而用 select 就可以。

将 map 转换为 select

之前一直使用 map 方法来做转换, 添加新的列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 # 今天的新访问用户数, 访问人数, 访问次数
_time_slot_df = spark.createDataFrame(slot_session_df.rdd.map(map_sessions)) \
.groupBy('app_key', 'day') \
.agg(
sum('is_first_open').alias('today_new_user_count'), # 新访问用户数
countDistinct('uuid').alias('today_user_count'), # 访问人数
sum('page_count').alias('today_visits_count'), # 访问次数
)

# 对 session_logs 日志做 map 操作, 生成需要的字段
def map_sessions(row):
if row['is_first_open']:
is_first_open = 1
else:
is_first_open = 0

return Row(
app_key = row['app_key'],
uuid = row['uuid'], # 访问人数
is_first_open = is_first_open, # 新访问用户数
page_count = row['page_count'], # 访问次数
day = datetime.datetime.fromtimestamp(int(row['start_time'])).strftime('%Y-%m-%d')
)

现在我们有 select, withColumn, when 来做:

1
2
_result_active_df = aladdin_id_df.join(_result_df, _result_df.app_key==aladdin_id_df.app_key, 'left_outer') \
.drop("_result_df.app_key","_result_df.uid")

先用 select 方法从数据框中选择出已经存在的列(s),这就是临时的新的数据框, 再用 withColumn 向该临时数据框中添加新的列。对于需要进行转换的列, 可以使用 when 方法来做运算。

1
2


 Splitting a row in a PySpark Dataframe into multiple rows

  • pyspark.functions.sql.explode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Create dummy data
df = sc.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])


# Explode column
from pyspark.sql.functions import split, explode
df.withColumn('col4',explode(split('col4',' '))).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
| 1| 2| 3| a|
| 1| 2| 3| b|
| 1| 2| 3| c|
| 4| 5| 6| d|
| 4| 5| 6| e|
| 4| 5| 6| f|
| 7| 8| 9| g|
| 7| 8| 9| h|
| 7| 8| 9| i|
+----+----+----+----+

这样就不用使用 flatMap 函数了, 它代替了 map 函数:

1
2
3
4
5
6
7
8
9
10
# 每个电话号码一行     
def map_phones(row):
r = []
for phone in re.split(',', row['phones']):
r.append(Row(
app_key = row['app_key'],
phones = phone,
sms_content = row['sms_content']
))
return r

一个 empty 引发的异常

在脚本中加入这么一句:

1
if yesterday_session_df.rdd.isEmpty: return

上面的 isEmpty 方法没有带圆括号,但是执行没有报错,但是这个语句后面的语句好像都没有执行。

本机测试

1
2
3
4
    path = "/Users/ohmyfish/mdl0731"
page_df = spark.read.json(path)

./bin/spark-submit --master local[4] ~/data_check.py 0
1
2
def calc_convert_rate(spark, one_day):
json_df=spark.read.json("hdfs://10.0.0.55:9000/ald_log_etl/" + str(the_day))