python3.8爬虫需要的包
python爬取基于m3u8协议的ts文件并合并前言
简单学习过网络爬虫,只是之前都是照着书上做并发,大概能理解,却还是无法自己用到自己项目中,这里自己研究实现一个网页嗅探html5播放控件中基于m3u8协议ts格式视频资源的项目,并未考虑过复杂情况,毕竟只是练练手.
源码
|
# coding=utf-8 import asyncio import multiprocessing import os import re import time from math import floor from multiprocessing import manager import aiohttp import requests from lxml import html import threading from src.my_lib import retry from src.my_lib import time_statistics class m3u8download: _path = "./resource\\" # 本地文件路径 _url_seed = none # 资源所在链接前缀 _target_url = {} # 资源任务目标字典 _mode = "" _headers = { "user-agent" : "mozilla/5.0" } # 浏览器代理 _target_num = 100 def __init__( self ): self ._ml = manager(). list () # 进程通信列表 if not os.path.exists( self ._path): # 检测本地目录存在否 os.makedirs( self ._path) exec_str = r 'chcp 65001' os.system(exec_str) # 先切换utf-8输出,防止控制台乱码 def sniffing( self , url): self ._url = url print ( "开始嗅探..." ) try : r = requests.get( self ._url) # 访问嗅探网址,获取网页信息 except : print ( "嗅探失败,网址不正确" ) os.system( "pause" ) else : tree = html.fromstring(r.content) try : source_url = tree.xpath( '//video//source/@src' )[ 0 ] # 嗅探资源控制文件链接,这里只针对一个资源控制文件 # self._url_seed = re.split("/\w+\.m3u8", source_url)[0] # 从资源控制文件链接解析域名 except : print ( "嗅探失败,未发现资源" ) os.system( "pause" ) else : self .analysis(source_url) def analysis( self , source_url): try : self ._url_seed = re.split( "/\w+\.m3u8" , source_url)[ 0 ] # 从资源控制文件链接解析域名 with requests.get(source_url) as r: # 访问资源控制文件,获得资源信息 src = re.split( "\n*#.+\n" , r.text) # 解析资源信息 for sub_src in src: # 将资源地址储存到任务字典 if sub_src: self ._target_url[sub_src] = self ._url_seed + "/" + sub_src except exception as e: print ( "资源无法成功解析" , e) os.system( "pause" ) else : self ._target_num = len ( self ._target_url) print ( "sniffing success!!!,found" , self ._target_num, "url." ) self ._mode = input ( "1:-> 单进程(low b)\n2:-> 多进程+多线程(网速开始biubiu飞起!)\n3:-> 多进程+协程(最先进的并发!!!)\n" ) if self ._mode = = "1" : for path, url in self ._target_url.items(): self ._download(path, url) elif self ._mode = = "2" or self ._mode = = "3" : self ._multiprocessing() def _multiprocessing( self , processing_num = 4 ): # 多进程,多线程 target_list = {} # 进程任务字典,储存每个进程分配的任务 pool = multiprocessing.pool(processes = processing_num) # 开启进程池 i = 0 # 任务分配标识 for path, url in self ._target_url.items(): # 分配进程任务 target_list[path] = url i + = 1 if i % 10 = = 0 or i = = len ( self ._target_url): # 每个进程分配十个任务 if self ._mode = = "2" : pool.apply_async( self ._sub_multithreading, kwds = target_list) # 使用多线程驱动方法 else : pool.apply_async( self ._sub_coroutine, kwds = target_list) # 使用协程驱动方法 target_list = {} pool.close() # join函数等待所有子进程结束 pool.join() # 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool while true: if self ._judge_over(): self ._combine() break def _sub_multithreading( self , * * kwargs): for path, url in kwargs.items(): # 根据进程任务开启线程 t = threading.thread(target = self ._download, args = (path, url,)) t.start() @retry () def _download( self , path, url): # 同步下载方法 with requests.get(url, headers = self ._headers) as r: if r.status_code = = 200 : with open ( self ._path + path, "wb" )as file : file .write(r.content) self ._ml.append( 0 ) # 每成功一个就往进程通信列表增加一个值 percent = '%.2f' % ( len ( self ._ml) / self ._target_num * 100 ) print ( len ( self ._ml), ": " , path, "->ok" , "\tcomplete:" , percent, "%" ) # 显示下载进度 else : print (path, r.status_code, r.reason) def _sub_coroutine( self , * * kwargs): tasks = [] for path, url in kwargs.items(): # 根据进程任务创建协程任务列表 tasks.append(asyncio.ensure_future( self ._async_download(path, url))) loop = asyncio.get_event_loop() # 创建异步事件循环 loop.run_until_complete(asyncio.wait(tasks)) # 注册任务列表 async def _async_download( self , path, url): # 异步下载方法 async with aiohttp.clientsession() as session: async with session.get(url, headers = self ._headers) as resp: try : assert resp.status = = 200 , "e" # 断言状态码为200,否则抛异常,触发重试装饰器 with open ( self ._path + path, "wb" )as file : file .write(await resp.read()) except exception as e: print (e) else : self ._ml.append( 0 ) # 每成功一个就往进程通信列表增加一个值 percent = '%.2f' % ( len ( self ._ml) / self ._target_num * 100 ) print ( len ( self ._ml), ": " , path, "->ok" , "\tcomplete:" , percent, "%" ) # 显示下载进度 def _combine( self ): # 组合资源方法 try : print ( "开始组合资源..." ) identification = str (floor(time.time())) exec_str = r 'copy /b "' + self ._path + r '*.ts" "' + self ._path + 'video' + identification + '.mp4"' os.system(exec_str) # 使用cmd命令将资源整合 exec_str = r 'del "' + self ._path + r '*.ts"' os.system(exec_str) # 删除原来的文件 except : print ( "资源组合失败" ) else : print ( "资源组合成功!" ) def _judge_over( self ): # 判断是否全部下载完成 if len ( self ._ml) = = len ( self ._target_url): return true return false @time_statistics def app(): multiprocessing.freeze_support() url = input ( "输入嗅探网址:\n" ) m3u8 = m3u8download() m3u8.sniffing(url) # m3u8.analysis(url) if __name__ = = "__main__" : app() |
这里是两个装饰器的实现:
|
import time def time_statistics(fun): def function_timer( * args, * * kwargs): t0 = time.time() result = fun( * args, * * kwargs) t1 = time.time() print ( "total time running %s: %s seconds" % (fun.__name__, str (t1 - t0))) return result return function_timer def retry(retries = 3 ): def _retry(fun): def wrapper( * args, * * kwargs): for _ in range (retries): try : return fun( * args, * * kwargs) except exception as e: print ( "@" , fun.__name__, "->" , e) return wrapper return _retry |
打包成exe文件
使用pyinstaller -f download.py将程序打包成单个可执行文件.
这里需要注意一下,因为程序含有多进程,需要在执行前加一句multiprocessing.freeze_support(),不然程序会反复执行多进程前的功能.
关于协程
协程在python3.5进化到了async await版本,用 async 标记异步方法,在异步方法里对耗时操作使用await标记.这里使用了一个进程驱动协程的方法,在进程池创建多个协程任务,使用asyncio.get_event_loop()创建协程事件循环,使用run_until_complete()注册协程任务,asyncio.wait()方法接收一个任务列表进行协程注册.
关于装饰器
装饰器源于闭包原理,这里使用了两种装饰器.
- @time_statistics:统计耗时,装饰器自己无参型
- @retry():设置重试次数,装饰器自己有参型
- 按我理解是有参型是将无参型装饰器包含在内部,而调用是加()的,关于():
- 不带括号时,调用的是这个函数本身
- 带括号(此时必须传入需要的参数),调用的是函数的return结果
关于cmd控制台
程序会使用cmd命令来将下载的ts文件合并.
因为cmd默认使用gb2312编码,调用os.system()需要先切换成通用的utf-8输出,否则系统信息会乱码.
而且使用cmd命令时参数最好加双引号,以避免特殊符号报错.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持开心学习网。
原文链接:https://blog.csdn.net/qq_37258787/article/details/80298084