五、解析数据
5.1 使用 lxml + XPath 解析数据
1
2
3
4
5
6
7
8
9
10
11
12
13
import requests as rq
from lxml import etree
headers = {'Users-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Mobile Safari/537.36'}
res = rq.get('https://github.com', headers=headers)
# 使用 etree 解析元素
html = etree.HTML(res.text)
# 通过 XPath 定位元素
html.xpath('/html/body/div[4]/main/div[5]/div/div[1]/div[2]/p[1]')[0].text
# 注意 XPath 就可以精简,不需要全部复制
# 取最后一部分相对路径一般就能较准确定位元素
5.2 使用 BeautifulSoup 解析数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from bs4 import BeautifulSoup
# 使用 BeautifulSoup 类解析
soup = BeautifulSoup(res.text, 'lxml')
# 精确(惰性)查找标签并输出文本
soup.find(name='p', attrs={'class': 'lead-mktg mb-4'}).text
soup.find('p', class_='lead-mktg mb-4') # 常用的属性也可以直接作为 kwargs
soup.find_all('link') # 一网打尽相关标签
# 通过 CSS 选择器定位
soup.select('p.lead-mktg.text-gray.text-center.col-md-8.mx-auto.mb-4')
# 更多函数方法
help(BeautifulSoup)
5.3 使用 re + 正则表达式解析数据
1
2
3
4
5
6
7
import re
reg = r'<li .*?>(.*?)</li>' # 定义正则表达式(r表示忽略转义字符的转义)
pattern = re.compile(reg) # 编译成 re.Pattern 对象
lis = pattern.findall(res.text) # 返回所有匹配列表
# 移除源码的换行符更加稳健:res.text.replace('\n', '')
# 或者添加一个参数匹配换行符:re.compile(reg, re.S)
元字符 | 说明 | |
---|---|---|
0 | . | 代表任意字符 |
1 | | | 逻辑或操作符 |
2 | [ ] | 匹配内部的任一字符或子表达式 |
3 | [^] | 对字符集和取非 |
4 | - | 定义一个区间 |
5 | \ | 对下一字符取非(通常是普通变特殊,特殊变普通) |
6 | * | 匹配前面的字符或者子表达式 0 次或多次 |
8 | + | 匹配前一个字符或子表达式 1 次或多次 |
10 | ? | 加在任意字符或子表达式后,表示惰性匹配 |
11 | {n} | 匹配前一个字符或子表达式 |
12 | {m,n} | 匹配前一个字符或子表达式至少m次至多n次 |
13 | {n,} | 匹配前一个字符或者子表达式至少n次 |
15 | ^ | 匹配字符串的开头 |
17 | $ | 匹配字符串结束 |
19 | \c | 匹配一个控制字符 |
20 | \d | 匹配任意数字,等同于[0-9] |
21 | \D | 匹配数字以外的字符,等同于[^0-9] |
22 | \t | 匹配制表符 |
23 | \w | 匹配任意数字字母下划线,等同于[a-z0-9A-Z_] |
24 | \W | 不匹配数字字母下划线,等同于[^a-z0-9A-Z_] |
5.4 直接解析 json 数据
1
2
3
4
5
6
7
8
import json
# 对于 json 数据的接口,返回的数据已经是规整的了,可直接转换成字典
url = 'http://m.maoyan.com/mmdb/comments/movie/343208.json?_v_=yes&offset=0&startTime=2018-12-13%2000%3A28%3A40'
res = rq.get(url, headers=headers)
# loads 函数可以处理相应格式的文本数据
data = json.loads(res.text)
data.keys()
5.5 使用 selenium 解析数据
上一章请求数据中已经演示了使用 selenium 定位元素,一般来说,解析数据会多种方法结合使用,来达到最准确最稳健的处理流程。
例如对于爬取网页表格,会先用 selenium 或 BeautifulSoup 或 lxml 定位表格具体位置,再将对应 HTML 文本传给 pandas.read_html 解析。对于爬取其他文本数据或数值数据,也是先用以上方法初步确定元素位置,再用正则表达式、或 python 内置的字符处理函数(如 strip、replace 等)以及 pandas 和 numpy 的其他函数完成具体数据的获取和清洗。
个人更推荐使用正则表达式方法,因为一步到位简单粗暴。甚至网页重构了,只要元素还在,都能找到。而且写起来直观易懂,基本上只需掌握一个表达式就可以解析所有的网页了: (.*?)
。
六、储存数据
6.1 写入本地文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-
"""
将 pandas.DataFrame 写入磁盘,包括 CSV、Markdown、Excel 等格式
"""
import pandas as pd
df = pd.DataFrame({'a':[1,2,3], 'b':[4,5,6]})
# 写出为 CSV
df.to_csv('test.csv', index=False)
# 在爬虫中将数据追加到 CSV
df.to_csv('test.csv', mode='a', encoding='utf-8', header=0, index=False)
# 写出为 Excel(Excel 是默认 GBK 的)
df.to_excel('test.xlsx', encoding='gbk', index=False)
# 写出为 Markdown
with open('test.md', 'w', ecoding='utf-8') as f:
f.write(df.to_markdown())
# 写出为 Pickle(python 专用数据形式)
df.to_pickle('test.pkl')
# 写出为 MAT(matlab 专用数据形式)
from scipy.io import savemat, loadmat
savemat('test.mat', df.to_dict())
6.2 写入 MySQL 数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# -*- coding: utf-8 -*-
import pymysql
import sqlalchemy
import numpy as np
import pandas as pd
from getpass import getpass
from warnings import filterwarnings
filterwarnings('ignore')
class MysqlHelper:
'''Python + MySQL 常用读写功能集成'''
def __init__(self, host='localhost', port=3306, charset='utf8'):
'''Initializing'''
self.host=host
self.port=port
self.charset=charset
self.user = input('>>> 请输入账户名:\n')
self.password = getpass('>>> 请输入密码:\n')
self.database = input('>>> 请输入要访问的数据库:\n')
self.conn = pymysql.connect(
host=self.host,port=self.port,
charset=self.charset,
user=self.user,
password=self.password,
database=self.database
)
self.cur = self.conn.cursor()
self.engine = sqlalchemy.create_engine(
f'mysql+pymysql://{self.user}:{self.password}' +
f'@{host}:{port}/{self.database}?charset={charset}'
)
def execute(self, sql, params=None):
'''重新定义 pymysql 的 execute 方法,一步到位'''
rows = self.cur.execute(sql, params)
results = self.cur.fetchall()
return {'affected rows': rows, 'returns': results}
def write_df(self, df, table):
'''将 DataFrame 写入已经存在的表'''
df.to_sql(
table,
con=self.engine,
if_exists='append',
index=False)
print('>>> 成功写入数据库!')
def create_table(self, tbname, sql=None):
'''智能创建建表语句'''
if sql:
self.execute(sql)
else:
fields, types, nulls = [], [], []
while True:
field = input('>>> 请输入字段名:\n')
type = input('>>> 请定义数据类型:\n')
null = input('>>> 是否允许空值?(y/n) \n')
fields.append(field)
types.append(type)
nulls.append(null)
ctn = input('继续(1)| 撤销上次输入(2)| 结束(3):\n')
if ctn == '3':
pk = input('>>> 请定义主键:(多个用 "," 隔开)\n')
break
elif ctn == '2':
fields.pop()
types.pop()
nulls.pop()
nulls = ['NOT NULL' if n=='n' else 'NULL' for n in nulls]
fields = [' '.join(tpl) for tpl in zip(fields, types, nulls)]
sql = f"CREATE TABLE IF NOT EXISTS {tbname}("
sql += f"{', '.join(fields)}, PRIMARY KEY ({pk}));"
self.execute(sql)
print(f'\n>>> 成功创建表 {tbname} !')
def returns_to_df(self, sql, cols, show=False):
'''将查询结果输出为 DataFrame,cols 需要与查询的字段一一对应'''
result = self.execute(sql)
rows = result['affected rows']
returns = result['returns']
df = pd.DataFrame(np.random.randn(rows, len(cols)), columns=cols)
for i, r in enumerate(returns):
for j in range(len(r)):
df[cols[j]][i] = r[j]
if show:
print(df.to_markdown())
else:
return df
def flash_connection(self):
'''建表或更新后需要重建连接才能查询新内容'''
self.conn.close()
self.conn = pymysql.connect(
host=self.host, port=self.port, charset=self.charset,
user=self.user, password=self.password, database=self.database
)
self.cur = self.conn.cursor()
self.engine = sqlalchemy.create_engine(
f'mysql+pymysql://{self.user}:{self.password}' +
f'@{self.host}:{self.port}/{self.database}?charset={self.charset}'
)
将上面的脚本命名为 mysql_helper.py
,然后复制到 Python 安装目录下的 ./Lib/site-packages/
文件夹下,就可以在任意位置引用其功能。以下是操作示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import numpy as np
import pandas as pd
from mysql_helper import MysqlHelper
# 新建 helper 对象(此处需要输入用户名、密码和数据库名)
helper = MysqlHelper()
# 查看数据库中的表格
helper.execute('show tables;') # 直接查询
# 更友好的方式(可用于所有查询语句,需要自定义cols参数)
helper.returns_to_df('show tables;', cols=['tables'], show=True)
description = ['filed', 'type', 'null', 'key', 'default', 'extra']
helper.returns_to_df('describe jdphone;', cols=description, show=True)
# 可以直接将查询结果返回为 DataFrame
sql = '''
SELECT 品牌, COUNT(*) AS 计数
FROM jdphone
GROUP BY 品牌
ORDER BY 计数 DESC;
'''
result = helper.returns_to_df(sql, cols=['品牌', '计数'])
# 写入数据
df = pd.DataFrame(np.intp(np.random.randn(4,4)*100), columns=['A','B','C','D'])
# 按照提示建表(与待插入的 df 相对应)
helper.create_table('test')
# 查看建表情况
helper.returns_to_df('describe test;', cols=description, show=True)
# 写入数据
helper.write_df(df, 'test')
# 刷新连接
helper.flush_connection()
# 查看数据
helper.returns_to_df('select * from test;', cols=['A','B','C','D'], show=True)
# 结束:
helper.conn.close()
6.3 写入 MongoDB 数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
"""
MongoDB 数据存取操作
"""
import numpy as np
import pandas as pd
from pymongo import MongoClient
class MongoHelper():
def __init__(self, dbname='paradise', collection='test'):
'''Initializing'''
exec(f'self.db = MongoClient().{dbname}')
exec(f'self.tb = self.db.{collection}')
def write_table(self, dict):
'''Writing dict to MongoDB'''
self.tb.insert(dict)
def read_table(self, output=False):
'''Reading table from MongoDB'''
content_dict_list = [item for item in self.tb.find()]
if not output:
return content_dict_list
else:
df = pd.DataFrame(content_dict_list).drop('_id', axis=1)
filename = f'MongoDB.{self.tb.full_name}.csv'
df.to_csv(filename, index=False)
print(f'>>> 成功写出文件: {filename}')
def utils(self):
'''数据库读写中的常用操作,持续更新'''
def df_to_dict(df):
return df.to_dict(orient='records')
def dict_to_df(_dict_):
df = pd.DataFrame(_dict_)
if '_id' in _dict_[0].keys():
df.drop('_id', axis=1, inplace=True)
return df
return {'df_to_dict': df_to_dict,
'dict_to_df': dict_to_df}
if __name__ == "__main__":
# 操作示例
df = pd.DataFrame(
np.intp(np.random.randn(4,4)*100),
columns=["A", "B", "C", "D"])
helper = MongoHelper()
helper.tb.drop()
helper.write_table(helper.utils()['df_to_dict'](df))
helper.utils()['dict_to_df'](helper.read_table())