使用 Python 解压缩 gzip 数据流

有一个使用场景:将 tar.gz 读成字节数组(byte array) 然后发往 Kafka。我要查看 Kafka 里面的消息, 直接查看的话就是二进制的乱码,首先想到的是把 Kafka 里面保存的字节数组存储到本地,每一行存成一个 .tar.gz 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*-
import sys,os
from confluent_kafka import Consumer, KafkaError
import gzip

def run(args):

c = Consumer({
"bootstrap.servers" : args[1], # broker
'group.id': args[2], # group id, 每次不一样
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
})

c.subscribe([ args[3] ]); # topic
i=0
while True:
msg = c.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break

try:
outF = file( '/data/app/tar/' + str(i) + ".tar.gz", 'wb')
outF.write(msg.value())
outF.close()
i+=1
print i
except Exception,e:
print e
c.close()


if __name__ == '__main__':
print sys.argv
run(sys.argv)

这样就能打开压缩文件查看了。但是这样也不方便。所以要直接将字节数组解压成文本文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# -*- coding: utf-8 -*-
import sys,os
from confluent_kafka import Consumer, KafkaError
import gzip,io
import zlib

def run(args):

c = Consumer({
"bootstrap.servers" : args[1],
'group.id': args[2], # group id
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
})

c.subscribe([ args[3] ]); # topic
i=0
while True:
msg = c.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
try:
decompressed_data = zlib.decompress(msg.value(),zlib.MAX_WBITS|32) # header 自动检测
print decompressed_data
except Exception,e:
print e
c.close()


if __name__ == '__main__':
print sys.argv
run(sys.argv)

解压缩字节数组并读取文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# -*- coding: utf-8 -*-
import sys,os
from confluent_kafka import Consumer, KafkaError
import tarfile, io

def run(args):

c = Consumer({
"bootstrap.servers" : args[1], # "10.10.20.11:9092"
'group.id': args[2], # group id, 每次不一样
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
})

c.subscribe([ args[3] ]); # dc-diagnostic-report
i=0
while True:
msg = c.poll(1.0)

if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
try:
file_like_object = io.BytesIO(msg.value())
tar = tarfile.open(fileobj=file_like_object)
# use "tar" as a regular TarFile object
for member in tar.getmembers():
print(member)
f = tar.extractfile(member)
content = f.read()
print(content)

tar.close()
except Exception,e:
print e
c.close()


if __name__ == '__main__':
print sys.argv
run(sys.argv)

参考链接