Python 爬虫快速入门(下)

一个可复制的爬虫工作流 -- 解析与存储数据

Posted by Paradise on March 9, 2021

五、解析数据

5.1 使用 lxml + XPath 解析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
import requests as rq
from lxml import etree

headers = {'Users-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Mobile Safari/537.36'}
res = rq.get('https://github.com', headers=headers)

# 使用 etree 解析元素
html = etree.HTML(res.text)
# 通过 XPath 定位元素
html.xpath('/html/body/div[4]/main/div[5]/div/div[1]/div[2]/p[1]')[0].text

# 注意 XPath 就可以精简,不需要全部复制
# 取最后一部分相对路径一般就能较准确定位元素

5.2 使用 BeautifulSoup 解析数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from bs4 import BeautifulSoup

# 使用 BeautifulSoup 类解析
soup = BeautifulSoup(res.text, 'lxml')

# 精确(惰性)查找标签并输出文本
soup.find(name='p', attrs={'class': 'lead-mktg mb-4'}).text    
soup.find('p', class_='lead-mktg mb-4')     # 常用的属性也可以直接作为 kwargs
soup.find_all('link')                       # 一网打尽相关标签
# 通过 CSS 选择器定位
soup.select('p.lead-mktg.text-gray.text-center.col-md-8.mx-auto.mb-4')

# 更多函数方法
help(BeautifulSoup)

5.3 使用 re + 正则表达式解析数据

1
2
3
4
5
6
7
import re

reg = r'<li .*?>(.*?)</li>'     # 定义正则表达式(r表示忽略转义字符的转义)
pattern = re.compile(reg)       # 编译成 re.Pattern 对象
lis = pattern.findall(res.text) # 返回所有匹配列表
# 移除源码的换行符更加稳健:res.text.replace('\n', '')
# 或者添加一个参数匹配换行符:re.compile(reg, re.S)
  元字符 说明
0 . 代表任意字符
1 | 逻辑或操作符
2 [ ] 匹配内部的任一字符或子表达式
3 [^] 对字符集和取非
4 - 定义一个区间
5 \ 对下一字符取非(通常是普通变特殊,特殊变普通)
6 * 匹配前面的字符或者子表达式 0 次或多次
8 + 匹配前一个字符或子表达式 1 次或多次
10 ? 加在任意字符或子表达式后,表示惰性匹配
11 {n} 匹配前一个字符或子表达式
12 {m,n} 匹配前一个字符或子表达式至少m次至多n次
13 {n,} 匹配前一个字符或者子表达式至少n次
15 ^ 匹配字符串的开头
17 $ 匹配字符串结束
19 \c 匹配一个控制字符
20 \d 匹配任意数字,等同于[0-9]
21 \D 匹配数字以外的字符,等同于[^0-9]
22 \t 匹配制表符
23 \w 匹配任意数字字母下划线,等同于[a-z0-9A-Z_]
24 \W 不匹配数字字母下划线,等同于[^a-z0-9A-Z_]

5.4 直接解析 json 数据

1
2
3
4
5
6
7
8
import json

# 对于 json 数据的接口,返回的数据已经是规整的了,可直接转换成字典
url = 'http://m.maoyan.com/mmdb/comments/movie/343208.json?_v_=yes&offset=0&startTime=2018-12-13%2000%3A28%3A40'
res = rq.get(url, headers=headers)
# loads 函数可以处理相应格式的文本数据
data = json.loads(res.text)
data.keys()

5.5 使用 selenium 解析数据

上一章请求数据中已经演示了使用 selenium 定位元素,一般来说,解析数据会多种方法结合使用,来达到最准确最稳健的处理流程。

例如对于爬取网页表格,会先用 selenium 或 BeautifulSoup 或 lxml 定位表格具体位置,再将对应 HTML 文本传给 pandas.read_html 解析。对于爬取其他文本数据或数值数据,也是先用以上方法初步确定元素位置,再用正则表达式、或 python 内置的字符处理函数(如 strip、replace 等)以及 pandas 和 numpy 的其他函数完成具体数据的获取和清洗。

个人更推荐使用正则表达式方法,因为一步到位简单粗暴。甚至网页重构了,只要元素还在,都能找到。而且写起来直观易懂,基本上只需掌握一个表达式就可以解析所有的网页了: (.*?)

六、储存数据

6.1 写入本地文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding: utf-8 -*-
"""
将 pandas.DataFrame 写入磁盘,包括 CSV、Markdown、Excel 等格式
"""
import pandas as pd
df = pd.DataFrame({'a':[1,2,3], 'b':[4,5,6]})

# 写出为 CSV
df.to_csv('test.csv', index=False)
# 在爬虫中将数据追加到 CSV
df.to_csv('test.csv', mode='a', encoding='utf-8', header=0, index=False)

# 写出为 Excel(Excel 是默认 GBK 的)
df.to_excel('test.xlsx', encoding='gbk', index=False)

# 写出为 Markdown
with open('test.md', 'w', ecoding='utf-8') as f:
    f.write(df.to_markdown())

# 写出为 Pickle(python 专用数据形式)
df.to_pickle('test.pkl')

# 写出为 MAT(matlab 专用数据形式)
from scipy.io import savemat, loadmat
savemat('test.mat', df.to_dict())

6.2 写入 MySQL 数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# -*- coding: utf-8 -*-
import pymysql
import sqlalchemy
import numpy as np
import pandas as pd
from getpass import getpass
from warnings import filterwarnings

filterwarnings('ignore')

class MysqlHelper:
    '''Python + MySQL 常用读写功能集成'''

    def __init__(self, host='localhost', port=3306, charset='utf8'):
        '''Initializing'''
        self.host=host
        self.port=port
        self.charset=charset
        self.user = input('>>> 请输入账户名:\n')
        self.password = getpass('>>> 请输入密码:\n')
        self.database = input('>>> 请输入要访问的数据库:\n')
        self.conn = pymysql.connect(
            host=self.host,port=self.port,
            charset=self.charset,
            user=self.user,
            password=self.password,
            database=self.database
            )
        self.cur = self.conn.cursor()
        self.engine = sqlalchemy.create_engine(
            f'mysql+pymysql://{self.user}:{self.password}' +
            f'@{host}:{port}/{self.database}?charset={charset}'
            )

    def execute(self, sql, params=None):
        '''重新定义 pymysql 的 execute 方法,一步到位'''
        rows = self.cur.execute(sql, params)
        results = self.cur.fetchall()
        return {'affected rows': rows, 'returns': results}

    def write_df(self, df, table):
        '''将 DataFrame 写入已经存在的表'''
        df.to_sql(
            table, 
            con=self.engine, 
            if_exists='append', 
            index=False)
        print('>>> 成功写入数据库!')

    def create_table(self, tbname, sql=None):
        '''智能创建建表语句'''
        if sql:
            self.execute(sql)
        else:
            fields, types, nulls = [], [], []
            while True:
                field = input('>>> 请输入字段名:\n')
                type = input('>>> 请定义数据类型:\n')
                null = input('>>> 是否允许空值?(y/n) \n')
                fields.append(field)
                types.append(type)
                nulls.append(null)
                ctn = input('继续(1)| 撤销上次输入(2)| 结束(3):\n')
                if ctn == '3':
                    pk = input('>>> 请定义主键:(多个用 "," 隔开)\n')
                    break
                elif ctn == '2':
                    fields.pop()
                    types.pop()
                    nulls.pop()
            nulls = ['NOT NULL' if n=='n' else 'NULL' for n in nulls]
            fields = [' '.join(tpl) for tpl in zip(fields, types, nulls)]
                
            sql = f"CREATE TABLE IF NOT EXISTS {tbname}("
            sql += f"{', '.join(fields)}, PRIMARY KEY ({pk}));"
            self.execute(sql)
            print(f'\n>>> 成功创建表 {tbname} !')

    def returns_to_df(self, sql, cols, show=False):
        '''将查询结果输出为 DataFrame,cols 需要与查询的字段一一对应'''
        result = self.execute(sql)
        rows = result['affected rows']
        returns = result['returns']
        df = pd.DataFrame(np.random.randn(rows, len(cols)), columns=cols)
        for i, r in enumerate(returns):
            for j in range(len(r)):
                df[cols[j]][i] = r[j]
        if show:
            print(df.to_markdown())
        else:
            return df
    
    def flash_connection(self):
        '''建表或更新后需要重建连接才能查询新内容'''
        self.conn.close()
        self.conn = pymysql.connect(
            host=self.host, port=self.port, charset=self.charset, 
            user=self.user, password=self.password, database=self.database
        )
        self.cur = self.conn.cursor()
        self.engine = sqlalchemy.create_engine(
            f'mysql+pymysql://{self.user}:{self.password}' + 
            f'@{self.host}:{self.port}/{self.database}?charset={self.charset}'
        )

将上面的脚本命名为 mysql_helper.py,然后复制到 Python 安装目录下的 ./Lib/site-packages/ 文件夹下,就可以在任意位置引用其功能。以下是操作示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import numpy as np
import pandas as pd
from mysql_helper import MysqlHelper

# 新建 helper 对象(此处需要输入用户名、密码和数据库名)
helper = MysqlHelper()

# 查看数据库中的表格
helper.execute('show tables;')      # 直接查询
# 更友好的方式(可用于所有查询语句,需要自定义cols参数)
helper.returns_to_df('show tables;', cols=['tables'], show=True)
description = ['filed', 'type', 'null', 'key', 'default', 'extra']
helper.returns_to_df('describe jdphone;', cols=description, show=True)

# 可以直接将查询结果返回为 DataFrame
sql = '''
    SELECT 品牌, COUNT(*) AS 计数
    FROM jdphone 
    GROUP BY 品牌
    ORDER BY 计数 DESC;
'''
result = helper.returns_to_df(sql, cols=['品牌', '计数'])

# 写入数据
df = pd.DataFrame(np.intp(np.random.randn(4,4)*100), columns=['A','B','C','D'])
# 按照提示建表(与待插入的 df 相对应)
helper.create_table('test')
# 查看建表情况
helper.returns_to_df('describe test;', cols=description, show=True)
# 写入数据
helper.write_df(df, 'test')
# 刷新连接
helper.flush_connection()
# 查看数据
helper.returns_to_df('select * from test;', cols=['A','B','C','D'], show=True)

# 结束:
helper.conn.close()

6.3 写入 MongoDB 数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
"""
MongoDB 数据存取操作
"""
import numpy as np
import pandas as pd
from pymongo import MongoClient


class MongoHelper():

    def __init__(self, dbname='paradise', collection='test'):
        '''Initializing'''
        exec(f'self.db = MongoClient().{dbname}')
        exec(f'self.tb = self.db.{collection}')
    
    def write_table(self, dict):
        '''Writing dict to MongoDB'''
        self.tb.insert(dict)

    def read_table(self, output=False):
        '''Reading table from MongoDB'''
        content_dict_list = [item for item in self.tb.find()]
        if not output:
            return content_dict_list
        else:
            df = pd.DataFrame(content_dict_list).drop('_id', axis=1)
            filename = f'MongoDB.{self.tb.full_name}.csv'
            df.to_csv(filename, index=False)
            print(f'>>> 成功写出文件: {filename}')

    def utils(self):
        '''数据库读写中的常用操作,持续更新'''
        def df_to_dict(df):
            return df.to_dict(orient='records')
        def dict_to_df(_dict_):
            df = pd.DataFrame(_dict_)
            if '_id' in _dict_[0].keys():
                df.drop('_id', axis=1, inplace=True)
            return df
        return {'df_to_dict': df_to_dict,
                'dict_to_df': dict_to_df}

if __name__ == "__main__":
    # 操作示例
    df = pd.DataFrame(
        np.intp(np.random.randn(4,4)*100),
        columns=["A", "B", "C", "D"])
    helper = MongoHelper()
    helper.tb.drop()
    helper.write_table(helper.utils()['df_to_dict'](df))
    helper.utils()['dict_to_df'](helper.read_table())