当前位置:脚本大全 > > 正文

python对mysql数据分析(python使用adbapi实现MySQL数据库的异步存储)

时间:2021-11-03 15:11:20类别:脚本大全

python对mysql数据分析

python使用adbapi实现MySQL数据库的异步存储

之前一直在写有关scrapy爬虫的事情,今天我们看看使用scrapy如何把爬到的数据放在mysql数据库中保存。

有关python操作mysql数据库的内容,网上已经有很多内容可以参考了,但都是在同步的操作mysql数据库。在数据量不大的情况下,这种方法固然可以,但是一旦数据量增长后,mysql就会出现崩溃的情况,因为网上爬虫的速度要远远高过往数据库中插入数据的速度。为了避免这种情况发生,我们就需要使用异步的方法来存储数据,爬虫与数据存储互不影响。

为了显示方便,我们把程序设计的简单一点,只是爬一页的数据。我们今天选择伯乐在线这个网站来爬取,只爬取第一页的数据。

首先我们还是要启动一个爬虫项目,然后自己建了一个爬虫的文件jobbole.py。我们先来看看这个文件中的代码

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • # -*- coding: utf-8 -*-
  • import io
  • import sys
  • import scrapy
  • import re
  • import datetime
  • from scrapy.http import request
  • from urllib import parse
  • from articlespider.items import jobbolearticleitem, articleitemloader
  • from scrapy.loader import itemloader
  • sys.stdout = io.textiowrapper(sys.stdout.buffer,encoding='utf-8')
  •  
  • class jobbolespider(scrapy.spider):
  •  """docstring for jobbolespider"""
  •  name = "jobbole"
  •  allowed_domain = ["blog.jobbole.com"]
  •  start_urls = ['http://blog.jobbole.com/all-posts/']
  •  
  •  def parse(self, response):
  •  """
  •  1.获取列表页中的文章url
  •  """
  •  # 解析列表汇中所有文章url并交给scrapy下载器并进行解析
  •  post_nodes = response.css("#archive .floated-thumb .post-thumb a")
  •  for post_node in post_nodes:
  •  image_url = post_node.css("img::attr(src)").extract_first("")# 这里取出每篇文章的封面图,并作为meta传入request
  •  post_url = post_node.css("::attr(href)").extract_first("")
  •  yield request(url = parse.urljoin(response.url, post_url), meta = {"front_image_url":image_url}, callback = self.parse_detail)
  •  
  •  def parse_detail(self, response):
  •  article_item = jobbolearticleitem()
  •  # 通过itemloader加载item
  •  # 通过add_css后的返回值都是list型,所有我们再items.py要进行处理
  •  item_loader = articleitemloader(item = jobbolearticleitem(), response = response)
  •  item_loader.add_css("title", ".entry-header h1::text")
  •  item_loader.add_value("url", response.url)
  •  # item_loader.add_value("url_object_id", get_md5(response.url))
  •  item_loader.add_value("url_object_id", response.url)
  •  item_loader.add_css("create_date", "p.entry-meta-hide-on-mobile::text")
  •  item_loader.add_value("front_image_url", [front_image_url])
  •  item_loader.add_css("praise_nums", ".vote-post-up h10::text")
  •  item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text")
  •  item_loader.add_css("fav_nums", ".bookmark-btn::text")
  •  item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text")
  •  item_loader.add_css("content", "li.entry")
  •  
  •  article_item = item_loader.load_item()
  •  print(article_item["tags"])
  •  
  •  yield article_item
  •  pass
  • 这里我把代码进行了简化,首先对列表页发出请求,这里只爬取一页数据,然后分析每一页的url,并且交给scrapy对每一个url进行请求,得到每篇文章的详情页,把详情页的相关内容放在mysql数据库中。
    这里使用itemloader来进行页面的解析,这样解析有个最大的好处就是可以把解析规则存放在数据库中,实现对解析规则的动态加载。但是要注意一点是使用itemloader中css方式和xpath方式得到的数据都是list型,因此还需要在items.py中再对相对应的数据进行处理。

    接下来我们就来看看items.py是如何处理list数据的。

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • # -*- coding: utf-8 -*-
  •  
  • # define here the models for your scraped items
  • #
  • # see documentation in:
  • # https://doc.scrapy.org/en/latest/topics/items.html
  • import datetime
  • import re
  •  
  •  
  • import scrapy
  • from scrapy.loader import itemloader
  • from scrapy.loader.processors import mapcompose, takefirst,join
  • from articlespider.utils.common import get_md5
  •  
  •  
  • def convert_date(value):
  •  try:
  •  create_date = datetime.datetime.strptime(create_date, "%y/%m/%d").date()
  •  except exception as e:
  •  create_date = datetime.datetime.now().date()
  •  return create_date
  •  
  • def get_nums(value):
  •  match_re = re.match(".*?(\d+).*", value)
  •  if match_re:
  •  nums = int(match_re.group(1))
  •  else:
  •  nums = 0
  •  
  •  return nums
  •  
  • def remove_comment_tags(value):
  •  # 去掉tags中的评论内容
  •  if "评论" in value:
  •  # 这里做了修改,如果返回"",则在list中仍然会占位,会变成类似于["程序员",,"解锁"]这样
  •  # return ""
  •  return none
  •  else:
  •  return value
  •  
  • def return_value(value):
  •  return
  •  
  • class articleitemloader(itemloader):
  •  """docstring for ariticleitemloader"""
  •  # 自定义itemloader
  •  default_output_processor = takefirst()
  •  
  • class articlespideritem(scrapy.item):
  •  # define the fields for your item here like:
  •  # name = scrapy.field()
  •  pass
  •  
  • class jobbolearticleitem(scrapy.item):
  •  """docstring for articlespideritem"""
  •  title = scrapy.field()
  •  create_date = scrapy.field(
  •  input_processor = mapcompose(convert_date)
  •  )
  •  url = scrapy.field()
  •  url_object_id = scrapy.field(
  •  output_processor = mapcompose(get_md5)
  •  )
  •  # 这里注意front_image_url还是一个list,在进行sql语句时还需要处理
  •  front_image_url = scrapy.field(
  •  output_processor = mapcompose(return_value)
  •  )
  •  front_image_path = scrapy.field()
  •  praise_nums = scrapy.field(
  •  input_processor = mapcompose(get_nums)
  •  )
  •  comment_nums = scrapy.field(
  •  input_processor = mapcompose(get_nums)
  •  )
  •  fav_nums = scrapy.field(
  •  input_processor = mapcompose(get_nums)
  •  )
  •  # tags要做另行处理,因为tags我们需要的就是list
  •  tags = scrapy.field(
  •  input_processor = mapcompose(remove_comment_tags),
  •  output_processor = join(",")
  •  )
  •  content = scrapy.field()
  • 首先我们看到定义了一个类articleitemloader,在这个类中只有一句话,就是对于每个items都默认采用list中的第一个元素,这样我们就可以把每个items中的第一个元素取出来。但是要注意,有些items我们是必须要用list型的,比如我们给imagepipeline的数据就要求必须是list型,这样我们就需要对front_image_url单独进行处理。这里我们做了一个小技巧,对front_image_url什么都不错,因为我们传过来的front_image_url就是list型
    在items的field中有两个参数,一个是input_processor,另一个是output_processor,这两个参数可以帮助我们对items的list中的每个元素进行处理,比如有些需要用md5进行加密,有些需要用正则表达式进行筛选或者排序等等。

    在进行mysql的pipeline之前,我们需要设计数据库,下面是我自己设计的数据库的字段,仅供参考

    python对mysql数据分析(python使用adbapi实现MySQL数据库的异步存储)

    这里我把url_object_id作为该表的主键,由于它不会重复,所以适合做主键。

    下面我们来看看数据库的pipeline。

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • # -*- coding: utf-8 -*-
  •  
  • # define your item pipelines here
  • #
  • # don't forget to add your pipeline to the item_pipelines setting
  • # see: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
  • import codecs
  • import json
  • from twisted.enterprise import adbapi
  • import mysqldb
  • import mysqldb.cursors
  •  
  •  
  • class mysqltwistedpipeline(object):
  •  """docstring for mysqltwistedpipeline"""
  •  #采用异步的机制写入mysql
  •  def __init__(self, dbpool):
  •  self.dbpool = dbpool
  •  
  •  @classmethod
  •  def from_settings(cls, settings):
  •  dbparms = dict(
  •  host = settings["mysql_host"],
  •  db = settings["mysql_dbname"],
  •  user = settings["mysql_user"],
  •  passwd = settings["mysql_password"],
  •  charset='utf8',
  •  cursorclass=mysqldb.cursors.dictcursor,
  •  use_unicode=true,
  •  )
  •  dbpool = adbapi.connectionpool("mysqldb", **dbparms)
  •  
  •  return cls(dbpool)
  •  
  •  def process_item(self, item, spider):
  •  #使用twisted将mysql插入变成异步执行
  •  query = self.dbpool.runinteraction(self.do_insert, item)
  •  query.adderrback(self.handle_error, item, spider) #处理异常
  •  return item
  •  
  •  def handle_error(self, failure, item, spider):
  •  # 处理异步插入的异常
  •  print (failure)
  •  
  •  def do_insert(self, cursor, item):
  •  #执行具体的插入
  •  #根据不同的item 构建不同的sql语句并插入到mysql中
  •  # insert_sql, params = item.get_insert_sql()
  •  # print (insert_sql, params)
  •  # cursor.execute(insert_sql, params)
  •  insert_sql = """
  •  insert into jobbole_article(title, url, create_date, fav_nums, url_object_id)
  •  values (%s, %s, %s, %s, %s)
  •  """
  •  # 可以只使用execute,而不需要再使用commit函数
  •  cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"], item["url_object_id"]))
  • 在这里我们只是演示一下,我们只向数据库中插入5个字段的数据,分别是title,url,create_date,fav_nums,url_object_id。

    当然你也可以再加入其它的字段。

    首先我们看看from_settings这个函数,它可以从settings.py文件中取出我们想想要的数据,这里我们把数据库的host,dbname,username和password都放在settings.py中。实际的插入语句还是在process_item中进行,我们自己定义了一个函数do_insert,然后把它传给dbpool中用于插入真正的数据。

    最后我们来看看settings.py中的代码,这里就很简单了。

  • ?
  • 1
  • 2
  • 3
  • 4
  • mysql_host = "localhost"
  • mysql_dbname = "article_wilson"
  • mysql_user = "root"
  • mysql_password = "root"
  • 其实这里是和pipeline中的代码是想对应的,别忘了把在settings.py中把pipeline打开。

  • ?
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • item_pipelines = {
  •  # 'articlespider.pipelines.articlespiderpipeline': 300,
  •  # 'articlespider.pipelines.jsonwithencodingpipeline': 1
  •  
  •  # # 'scrapy.pipelines.images.imagepipeline': 1,
  •  # 'articlespider.pipelines.jsonexporterpipleline': 1
  •  # 'articlespider.pipelines.articleimagepipeline': 2
  •  # 'articlespider.pipelines.mysqlpipeline': 1
  •  'articlespider.pipelines.mysqltwistedpipeline': 1
  • }
  • 好了,现在我们可以跑一程序吧。

    scrapy crawl jobbole

    下面是运行结果的截图

    python对mysql数据分析(python使用adbapi实现MySQL数据库的异步存储)

    好了,以上就是今天的全部内容了。

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持开心学习网。

    原文链接:https://blog.csdn.net/Wilson_Iceman/article/details/79270235

    上一篇下一篇

    猜您喜欢

    热门推荐