在上一篇文章《神器!五分钟完成大型爬虫项目!》,我们介绍了一个类似于 Scrapy 的开源爬虫框架——feapder,并着重介绍了该框架的一种应用——AirSpider,它是一个轻量级的爬虫。
接下来我们再来介绍另一种爬虫应用——Spider,它是是一款基于 redis 的分布式爬虫,适用于海量数据采集,支持断点续爬、爬虫报警、数据自动入库等功能。
安装
和 AirSpider 一样,我们也是通过命令行安装。
由于 Spider 是分布式爬虫,可能涉及到多个爬虫,所以最好以项目的方式来创建。
创建项目
我们首先来创建项目:
feapder create -p spider-project
创建的项目目录是这样的:
创建好项目后,开发时我们需要将项目设置为工作区间,否则引入非同级目录下的文件时,编译器会报错。
设置工作区间方式(以pycharm为例):项目->右键->Mark Directory as -> Sources Root。
创建爬虫
创建爬虫的命令行语句为:
feapder create -s <spider_name> <spider_type>
- AirSpider 对应的 spider_type 值为 1
- Spider 对应的 spider_type 值为 2
- BatchSpider 对应的 spider_type 值为 3
默认 spider_type 值为 1。
所以创建 Spider 的语句为:
feapder create -s spider_test 2
运行语句后,我们可以看到在 spiders 目录下生成了 spider_test.py 文件。
对应的文件内容为:
import feapderclass SpiderTest(feapder.Spider): # 自定义数据库,若项目中有setting.py文件,此自定义可删除 __custom_setting__ = dict( REDISDB_IP_PORTS="localhost:6379", REDISDB_USER_PASS=", REDISDB_DB=0 ) def start_requests(self): yield feapder.Request("http://www.baidu.com") def parse(self, request, response): print(response)if __name__ == "__main__": SpiderTest(redis_key="xxx:xxx").start()
因Spider是基于redis做的分布式,因此模板代码默认给了redis的配置方式。关于 Redis 的配置信息:
- REDISDB_IP_PORTS:连接地址,若为集群或哨兵模式,多个连接地址用逗号分开,若为哨兵模式,需要加个REDISDB_SERVICE_NAME参数。
- REDISDB_USER_PASS:连接密码。
- REDISDB_DB:数据库。
在 main 函数中,我们可以看到有个 redis_key 的参数,这个参数是redis中存储任务等信息的 key 前缀,如 redis_key=”feapder:spider_test”, 则redis中会生成如下:
特性
我们在 AirSpider 里面讲的方法,在 Spider 这里都支持,下面我们来看看 Spider 相对于 AirSpider 的不同之处。
数据自动入库
写过爬虫的人都知道,如果要将数据持久化到 MySQL 数据库,如果碰到字段特别多的情况,就会很烦人,需要解析之后手写好多字段,拼凑 SQL 语句。
这个问题,Spider 帮我们想到了,我们可以利用框架帮我们自动入库。
建表
第一步,我们需要在数据库中创建一张数据表,这个大家都会,这里就不说了。
配置 setting
将 setting.py 里面的数据库配置改为自己的配置:
# # MYSQLMYSQL_IP = "MYSQL_PORT = MYSQL_DB = "MYSQL_USER_NAME = "MYSQL_USER_PASS = "
也就是这几个配置。
生成实体类 Item
接着,我们将命令行切换到我们项目的 items 目录,运行命令:
feapder create -i <item_name>
我这里数据库里使用的是 report 表,所以命令为:
feapder create -i report
然后,我们就可以在 items 目录下看到生成的 report_item.py 实体类了。我这里生成的实体类内容是:
from feapder import Itemclass ReportItem(Item): "" This class was generated by feapder. command: feapder create -i report. "" __table_name__ = "report" def __init__(self, *args, **kwargs): self.count = None self.emRatingName = None # 评级名称 self.emRatingValue = None # 评级代码 self.encodeUrl = None # 链接 # self.id = None self.indvInduCode = None # 行业代码 self.indvInduName = None # 行业名称 self.lastEmRatingName = None # 上次评级名称 self.lastEmRatingValue = None # 上次评级代码 self.orgCode = None # 机构代码 self.orgName = None # 机构名称 self.orgSName = None # 机构简称 self.predictNextTwoYearEps = None self.predictNextTwoYearPe = None self.predictNextYearEps = None self.predictNextYearPe = None self.predictThisYearEps = None self.predictThisYearPe = None self.publishDate = None # 发表时间 self.ratingChange = None # 评级变动 self.researcher = None # 研究员 self.stockCode = None # 股票代码 self.stockName = None # 股票简称 self.title = None # 报告名称
若字段有默认值或者自增,则默认注释掉,可按需打开。大家可以看到我这张表的 id 字段在这里被注释了。
若item字段过多,不想逐一赋值,可通过如下方式创建:
feapder create -i report 1
这时候生成的实体类是这样的:
class ReportItem(Item): "" This class was generated by feapder. command: feapder create -i report 1. "" __table_name__ = "report 1" def __init__(self, *args, **kwargs): self.count = kwargs.get('count') self.emRatingName = kwargs.get('emRatingName') # 评级名称 self.emRatingValue = kwargs.get('emRatingValue') # 评级代码 self.encodeUrl = kwargs.get('encodeUrl') # 链接 # self.id = kwargs.get('id') self.indvInduCode = kwargs.get('indvInduCode') # 行业代码 self.indvInduName = kwargs.get('indvInduName') # 行业名称 self.lastEmRatingName = kwargs.get('lastEmRatingName') # 上次评级名称 self.lastEmRatingValue = kwargs.get('lastEmRatingValue') # 上次评级代码 self.orgCode = kwargs.get('orgCode') # 机构代码 self.orgName = kwargs.get('orgName') # 机构名称 self.orgSName = kwargs.get('orgSName') # 机构简称 self.predictNextTwoYearEps = kwargs.get('predictNextTwoYearEps') self.predictNextTwoYearPe = kwargs.get('predictNextTwoYearPe') self.predictNextYearEps = kwargs.get('predictNextYearEps') self.predictNextYearPe = kwargs.get('predictNextYearPe') self.predictThisYearEps = kwargs.get('predictThisYearEps') self.predictThisYearPe = kwargs.get('predictThisYearPe') self.publishDate = kwargs.get('publishDate') # 发表时间 self.ratingChange = kwargs.get('ratingChange') # 评级变动 self.researcher = kwargs.get('researcher') # 研究员 self.stockCode = kwargs.get('stockCode') # 股票代码 self.stockName = kwargs.get('stockName') # 股票简称 self.title = kwargs.get('title') # 报告名称
这样当我们请求回来的json数据时,可直接赋值,如:
response_data = {"title":" 测试"} # 模拟请求回来的数据item = SpiderDataItem(**response_data)
想要数据自动入库也比较简单,在解析完数据之后,将数据赋值给 Item,然后 yield 就行了:
def parse(self, request, response): html = response.content.decode("utf-8") if len(html): content = html.replace('datatable1351846(', '')[:-1] content_json = json.loads(content) print(content_json) for obj in content_json['data']: result = ReportItem() result['orgName'] = obj['orgName'] #机构名称 result['orgSName'] = obj['orgSName'] #机构简称 result['publishDate'] = obj['publishDate'] #发布日期 result['predictNextTwoYearEps'] = obj['predictNextTwoYearEps'] #后年每股盈利 result['title'] = obj['title'] #报告名称 result['stockName'] = obj['stockName'] #股票名称 result['stockCode'] = obj['stockCode'] #股票code result['orgCode'] = obj['stockCode'] #机构code result['predictNextTwoYearPe'] = obj['predictNextTwoYearPe'] #后年市盈率 result['predictNextYearEps'] = obj['predictNextYearEps'] # 明年每股盈利 result['predictNextYearPe'] = obj['predictNextYearPe'] # 明年市盈率 result['predictThisYearEps'] = obj['predictThisYearEps'] #今年每股盈利 result['predictThisYearPe'] = obj['predictThisYearPe'] #今年市盈率 result['indvInduCode'] = obj['indvInduCode'] # 行业代码 result['indvInduName'] = obj['indvInduName'] # 行业名称 result['lastEmRatingName'] = obj['lastEmRatingName'] # 上次评级名称 result['lastEmRatingValue'] = obj['lastEmRatingValue'] # 上次评级代码 result['emRatingValue'] = obj['emRatingValue'] # 评级代码 result['emRatingName'] = obj['emRatingName'] # 评级名称 result['ratingChange'] = obj['ratingChange'] # 评级变动 result['researcher'] = obj['researcher'] # 研究员 result['encodeUrl'] = obj['encodeUrl'] # 链接 result['count'] = int(obj['count']) # 近一月个股研报数 yield result
返回item后,item 会流进到框架的 ItemBuffer, ItemBuffer 每.05秒或当item数量积攒到5000个,便会批量将这些 item 批量入库。表名为类名去掉 Item 的小写,如 ReportItem 数据会落入到 report 表。
调试
开发过程中,我们可能需要针对某个请求进行调试,常规的做法是修改下发任务的代码。但这样并不好,改来改去可能把之前写好的逻辑搞乱了,或者忘记改回来直接发布了,又或者调试的数据入库了,污染了库里已有的数据,造成了很多本来不应该发生的问题。
本框架支持Debug爬虫,可针对某条任务进行调试,写法如下:
if __name__ == "__main__": spider = SpiderTest.to_DebugSpider( redis_key="feapder:spider_test", request=feapder.Request("http://www.baidu.com") ) spider.start()
对比之前的启动方式:
spider = SpiderTest(redis_key="feapder:spider_test")spider.start()
可以看到,代码中 to_DebugSpider 方法可以将原爬虫直接转为 debug 爬虫,然后通过传递 request 参数抓取指定的任务。
通常结合断点来进行调试,debug 模式下,运行产生的数据默认不入库。
除了指定 request 参数外,还可以指定 request_dict 参数,request_dict 接收字典类型,如 request_dict={“url”:”http://www.baidu.com”}, 其作用于传递 request 一致。request 与 request_dict 二者选一传递即可。
运行多个爬虫
通常,一个项目下可能存在多个爬虫,为了规范,建议启动入口统一放到项目下的 main.py 中,然后以命令行的方式运行指定的文件。
例如如下项目:
项目中包含了两个spider,main.py写法如下:
from spiders import *from feapder import Requestfrom feapder import ArgumentParserdef test_spider(): spider = test_spider.TestSpider(redis_key="spider:report") spider.start()def test_spider2(): spider = test_spider.TestSpider2(redis_key="spider:report") spider.start()if __name__ == "__main__": parser = ArgumentParser(description="Spider测试") parser.add_argument( "--test_spider", action="store_true", help="测试Spider", function=test_spider ) parser.add_argument( "--test_spider2", action="store_true", help="测试Spider2", function=test_spider2 ) parser.start()
这里使用了 ArgumentParser 模块,使其支持命令行参数,如运行 test_spider:
python3 main.py –test_spider
分布式
分布式说白了就是启动多个进程,处理同一批任务。Spider 支持启动多份,且不会重复发下任务,我们可以在多个服务器上部署启动,也可以在同一个机器上启动多次。
总结
到这里, Spider 分布式爬虫咱就讲完了,还有一些细节的东西,大家在用的时候还需要琢磨一下。总体来说,这个框架还是比较好用的,上手简单,应对一些不是很复杂的场景绰绰有余,大家可以尝试着将自己的爬虫重构一下,试试这款框架