爬到的数据当然要固化下来了,一般是存储到本地就可以了,简单的直接用字符串、json存储即可,相对复杂、庞大的数据可以用sqlite存储,有条件的可以直接通过接口存储到服务器,调用接口前文已经讲了不少,这里就不赘述了,下面讲一些常用的数据存储方法。

操作文件和目录

大部分文件和目录操作在os模块,还有部分在shutil模块。

操作目录

>>> import os

# 查看当前目录的绝对路径:
>>> path = os.path.abspath('.')
>>> print(path)
/Users/linxiaobin/Developer/python

# 在某个目录下创建一个新目录,首先把新目录的完整路径表示出来:
>>> path = os.path.join(path, 'testdir')
>>> print(path)
/Users/linxiaobin/Developer/python/testdir

# 创建一级目录:
>>> os.mkdir(path)
# 创建多级目录:
>>> os.makedirs('/tmp/a/b/c')
# 删掉一个目录:
>>> os.rmdir(path)

>>> import shutil
# 删除目录
>>> shutil.rmtree('testdir')
# 拷贝目录
>>> shutil.copytree('srcdir', 'desdir')

# 拆分路径
>>> print(os.path.split(path))
('/Users/linxiaobin/Developer/python', 'testdir')

# 拆分文件扩展名
>>> print(os.path.splitext('/path/to/file.txt'))
('/path/to/file', '.txt')

把两个路径合成一个时,不要直接拼字符串,而要通过os.path.join()函数,这样可以正确处理不同操作系统的路径分隔符。同样,要拆分路径时不要直接拆分字符串,而要通过os.path.split()函数,这样可以把一个路径拆分为两部分,后一部分总是最后级别的目录或文件名。

这些合并、拆分路径的函数并不要求目录和文件要真实存在,它们只对字符串进行操作。

检索目录

# 判断目录是否存在
>>> os.path.isdir('hello.py')
False
# 判断文件是否存在
>>> os.path.isfile('hello.py')
True
# 列出目录下所有文件
>>> os.listdir('.')
['.DS_Store', 'hello.py', 'hello.txt']

# 列出所有的.py文件
>>> [x for x in os.listdir('.') if os.path.isfile(x) and os.path.splitext(x)[1]=='.py']
['apis.py', 'config.py', 'models.py', 'pymonitor.py', 'test_db.py', 'urls.py', 'wsgiapp.py']

操作文件

>>> import os
# 对文件重命名:
>>> os.rename('test.txt', 'test.py')
# 删掉文件:
>>> os.remove('test.py')

>>> import shutil
# 拷贝文件
>>> shutil.copyfile('hello.py', 'hello.txt')

str

读文件

>>> with open('/tmp/aaa.txt', 'r') as f:
... print(f.read())
...
测试

调用read()会一次性读取文件的全部内容,如果文件有10G,内存就爆了,所以,要保险起见,可以反复调用read(size)方法,每次最多读取size个字节的内容。另外,调用readline()可以每次读取一行内容,调用readlines()一次读取所有内容并按行返回list。因此,要根据需要决定怎么调用。
如果文件很小,read()一次性读取最方便;如果不能确定文件大小,反复调用read(size)比较保险;如果是配置文件,调用readlines()最方便:

上面的方法默认以UTF-8编码读取字符串,可以通过设置 open() 函数的 encoding 参数来指定编码格式:

>>> with open('/tmp/aaa.txt', 'r' encoding='gbk') as f:
... print(f.read())

写文件

with open('/tmp/aaa.txt', 'w') as f:
f.write('测试')

二进制

读取文件

>>> with open('/tmp/aaa.txt', 'rb') as f:
... print(f.read())
...
b'a\xe5\x93\x88\n'

代码中的rb表示以只读二进制格式打开文件。

写文件

>>> with open('/tmp/aaa.txt', 'wb') as f:
... f.write('测试'.encode('utf_8'))

代码中的rw表示以写二进制格式打开文件。

json

读文件

>>> import json
>>> with open('/tmp/aaa.txt', 'r') as f:
... d = json.loads(f.read())
... print(d)
...
{'age': 20, 'score': 88, 'name': 'Bob'}

写文件

>>> import json
>>> d = dict(name='Bob', age=20, score=88)
>>> with open('/tmp/aaa.txt', 'w') as f:
... f.write(json.dumps(d))

json 除了可以序列化 dict,还可以用于序列化对象的属性,见这里

sqlite

sqlite 的应用有兴趣的读者可以看看这个封装库apsw,笔者在此次开发是直接使用系统的 sqlite3 模块,先上代码,后面再解释需要注意的地方。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
from singleton import *
import sqlite3
import threading

__author__ = 'Lin Xiaobin'

__all__ = ['Cacher']


@singleton
class Cacher(object):

def __init__(self):
path = self._dbPath()
self._connection = sqlite3.connect(path, check_same_thread=False)
self._lock = threading.Lock()

def _directory(self):
path = os.getcwd()
path = os.path.split(path)[0]
path = os.path.join(path, 'data')
return path

def _dbPath(self):
dir = self._directory()
path = os.path.join(dir, 'data.db')
return path

def _checkWorkspace(self):
# print('begin check workspace')
dir = self._directory()
if not os.path.isdir(dir):
print('>>>> err: dir %s not exists.' % dir)

path = self._dbPath()
if not os.path.isfile(path):
print('>>>> err: db %s not exists.' % dir)
# print('end check workspace')

def _execute(self, cmd, isQueury=True):
values = []
rowCount = 0
self._lock.acquire()
try:
con = self._connection
cursor = con.cursor()
cursor.execute(cmd)
values = cursor.fetchall()
rowCount = cursor.rowcount
con.commit()
except Exception as e:
print('except: ', e)
raise e
finally:
cursor.close()
self._lock.release()

if isQueury:
return values
else:
return rowCount

def _cleanEmptyData(self):
self._execute('vacuum')

Cacher()._checkWorkspace()

if __name__ == '__main__':
pass
  • singleton

    这里将缓存操作封装成单例,主要是为了让多线程应用在数据操作时有同一个出口,避免造成数据混乱。

  • 检查操作目录和数据库路径

    注意到下面这行代码的缩进,和Cacher类定义是同一级的,在加载这个类文件的时候,会执行该检测方法,这里只检测了工作目录和db文件是否存在,说明db文件是通过其他方式创建后放到指定位置的,这样有个好处是不用写初始化的代码,毕竟初始化表格用图形界面更方便些,也可以在这里进行数据库的初始化操作,这个可据个人喜好而定。

    Cacher()._checkWorkspace()
  • 初始化连接

    self._connection = sqlite3.connect(path, check_same_thread=False)

    check_same_thread 默认值是True,即如检验初始化连接和执行sql语句的线程是否是同一个,如果值为True,而线程又不是同一个,会报异常。因为我们的爬虫程序是多线程的,为了减少创建多个连接带来的性能损耗,以及开发的复杂度,这里我们将这个检查标记设置为False。

  • Lock

    多线程开发,绕不开锁的问题,即使这里使用了单例,如果没有在执行sql语句的时候加锁,仍然会引起操作异常,从而导致数据丢失。

  • _execute()

    _execute()方法封装了sql语句的执行方法,内部处理了加锁、解锁、异常处理,笔者的所有sql操作都是通过这个方法执行的,剩下的只需要拼凑sql语句了。

  • vacuum

    在对 sqlite 进行增删操作后,磁盘文件会越来越大,默认情况下,即使调用sql语句删除旧数据,磁盘文件也不会减小,此时执行 vacuum 可以回收无用的磁盘空间。

  • _directory()

    print(__file__)

    这里还要提一下存储目录的问题,一般数据存储的目录不是绝对路径,我们希望它是相对于可执行程序的一个目录,通过 __file__ 可以获取到 python 文件的所在目录,笔者一开始也是使用这个方法定位目录的,这样在直接运行Python程序的情况下是不会有问题的,但是一旦打包成exe文件,通过该方法获取到的目录就不是我们期望的目录了,因此这里笔者改用 os.getcwd() 获取程序运行时的目录。