单线程串行与多线程(进程)并行
在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。
1. 单线程串行
# -*- coding: UTF-8 -*-import requests, timedef be_async(url): print(url) res = requests.get(url) print(res)url_list = ["https://www.baidu.com", "http://github.com/"]a = time.time() # 起始时间for url in url_list: be_async(url)b = time.time() # 结束时间print("cost time: %s s"%(b-a))
2. 多线程
# -*- coding: UTF-8 -*-from concurrent.futures import ThreadPoolExecutorimport requestsimport timedef be_async(url): print(url) res = requests.get(url) print(res)url_list = ["https://www.baidu.com", "http://github.com/"]pool = ThreadPoolExecutor(4)a = time.time() # 起始时间for url in url_list: pool.submit(be_async, url)pool.shutdown(wait=True)b = time.time() # 结束时间print("cost time: %s s"%(b-a))
注:python2里没有线程池,只有python3里面有
+回调函数
# -*- coding: UTF-8 -*-import requests, timefrom concurrent.futures import ThreadPoolExecutordef be_async(url): print(url) res = requests.get(url) print(res)url_list = ["https://www.baidu.com", "http://github.com/"]def callback(future): print(future.result)pool = ThreadPoolExecutor(4)a = time.time() # 起始时间for url in url_list: r = pool.submit(be_async, url) r.add_done_callback(callback)pool.shutdown(wait=True)b = time.time() # 结束时间print("cost time: %s s"%(b-a))
3. 多进程
# -*- coding: UTF-8 -*-from concurrent.futures import ProcessPoolExecutorimport requestsimport timedef be_async(url): print(url) res = requests.get(url) print(res)url_list = ["https://www.baidu.com", "http://github.com/"]if __name__ == '__main__': pool = ProcessPoolExecutor(4) a = time.time() # 起始时间 for url in url_list: pool.submit(be_async, url) pool.shutdown(wait=True) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
+回调函数
# -*- coding: UTF-8 -*-import requests, timefrom concurrent.futures import ProcessPoolExecutordef be_async(url): print(url) res = requests.get(url) print(res)url_list = ["https://www.baidu.com", "http://github.com/"]def callback(future): print(future.result)if __name__ == '__main__': pool = ProcessPoolExecutor(4) a = time.time() # 起始时间 for url in url_list: r = pool.submit(be_async, url) r.add_done_callback(callback) pool.shutdown(wait=True) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
异步非阻塞IO
通过上述代码均可以完成对请求性能的提高,对于多线程和多进行的缺点是在IO阻塞时会造成了线程和进程的浪费,所以异步会是首选:
1. asyncio 示例,该模块只能发送tcp协议的请求,python3.3之后才有
原理:
代码:
# -*- coding: UTF-8 -*-import asyncio@asyncio.coroutinedef be_async(host, url = "/"): print(host, url) reader,writer = yield from asyncio.open_connection(host, 80) req_header = '''GET %s HTTP/1.0\r\nHost: %s\r\n\r\n'''%(url, host) req_header = req_header.encode(encoding="utf-8") writer.write(req_header) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close()tasks = [ be_async("www.cnblogs.com", url="/gaosy-math"), be_async("github.com", url="/gaoshao52")]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()
知识点:伪造http请求
GET: GET <url地址> HTTP/1.0\r\nHost: <ip或主机名>\r\n\r\n
POST: POST <url地址> HTTP/1.0\r\nHost: <ip或主机名>\r\n\r\n<请求体> 请求头的数据是用\r\n隔开的,请求头与请求体是用\r\n\r\n隔开的
格式: 请求方式 请求地址 协议 请求头数据(用\r\n隔开) 请求体(k1=v1&k2=v2 格式)
例:基于socket实现http请求
# -*- coding: UTF-8 -*-import socketclient = socket.socket()# 连接client.connect(("www.51cto.com", 80))# 发送请求client.sendall(b'GET / HTTP/1.0\r\nHost: www.51cto.com\r\n\r\n')# 接受res = client.recv(81960)print(res.decode("gbk"))client.close()
2. asyncio+aiohttp 示例
原理: 与上面的一致
代码:
# -*- coding: UTF-8 -*-import asyncioimport aiohttp@asyncio.coroutinedef be_async(url): print(url) res = yield from aiohttp.request('GET', url) data = yield from res.read() print(url, data) res.close()tasks = [ be_async("https://github.com/gaoshao52"), be_async("https://www.cnblogs.com/gaosy-math")]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()
注:python3运行有问题
3. asyncio+requests 示例
原理:一致
代码:
# -*- coding: UTF-8 -*-import asyncioimport requests@asyncio.coroutinedef be_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) res = yield from future print(res.url, res.text)tasks = [ be_async(requests.get, "https://github.com/gaoshao52"), be_async(requests.get, "https://www.cnblogs.com/gaosy-math")]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()
4. gevent+requests 示例
原理:一致
代码:
# -*- coding: UTF-8 -*-import geventfrom gevent import monkey; monkey.patch_all()import requestsdef be_async(method, url ,req_kwargs): print(method, url, req_kwargs) res = requests.request(method=method, url=url, **req_kwargs) print(res.url, res.status_code)gevent.joinall([ gevent.spawn(be_async, method='GET', url="https://github.com/gaoshao52", req_kwargs={}), gevent.spawn(be_async, method='GET', url="https://www.cnblogs.com/gaosy-math", req_kwargs={})])# ##### 发送请求(协程池控制最大协程数量) ###### from gevent.pool import Pool# pool = Pool(None)# gevent.joinall([# pool.spawn(be_async, method='GET', url="https://github.com/gaoshao52", req_kwargs={}),# pool.spawn(be_async, method='GET', url="https://www.cnblogs.com/gaosy-math", req_kwargs={})# ])
monkey.patch_all() 会把原来的socket 设置成非阻塞
5. grequests 示例
原理:gevent+requests的封装
代码:
# -*- coding: UTF-8 -*-import grequestsrequest_list = [ grequests.get("http://www.runoob.com/python/python-json.html"), grequests.get("https://www.git-scm.com/download/win")]##### 执行并获取响应列表 ###### response_list = grequests.map(request_list)# print(response_list)# ##### 执行并获取响应列表(处理异常) #####def exception_handler(request, exception): print(request,exception) print("Request failed")response_list = grequests.map(request_list, exception_handler=exception_handler)print(response_list)
6. twisted 示例
原理:
代码:
# -*- coding: UTF-8 -*-from twisted.web.client import getPage, deferfrom twisted.internet import reactordef all_done(arg): reactor.stop()def callback(contents): print(contents.decode("utf-8"))deferred_list = []url_list = ['http://www.bing.com', 'http://www.baidu.com', ]for url in url_list: deferred = getPage(url.encode(encoding="utf-8")) deferred.addCallback(callback) deferred_list.append(deferred)dlist = defer.DeferredList(deferred_list)dlist.addBoth(all_done)reactor.run()
# -*- coding: UTF-8 -*-from twisted.internet import reactorfrom twisted.web.client import getPageimport urllib.parsedef one_done(arg): # arg 是二进制 print(arg.decode("utf-8")) reactor.stop()post_data = urllib.parse.urlencode({ 'check_data': 'adf'})post_data = post_data.encode(encoding="utf-8")headers = {b'Content-Type': b'application/x-www-form-urlencoded'}response = getPage('http://dig.chouti.com/login'.encode(encoding="utf-8"), method='POST'.encode(encoding="utf-8"), postdata=post_data, cookies={}, headers=headers)response.addBoth(one_done)reactor.run()
Twisted模块安装:
1. https://www.lfd.uci.edu/~gohlke/pythonlibs/ 下载
2. pip install Twisted-18.7.0-cp36-cp36m-win_amd64.whl
7.tornado 示例
原理:与twisted一致
代码:
# -*- coding: UTF-8 -*-from tornado.httpclient import AsyncHTTPClientfrom tornado.httpclient import HTTPRequestfrom tornado import ioloopdef handle_response(response): """ 处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop() :param response: :return: """ if response.error: print("Error:", response.error) else: print(response.body)def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response)ioloop.IOLoop.current().add_callback(func)ioloop.IOLoop.current().start()
8. 自定义一个异步非阻塞模块,与twisted tornado原理一致
# -*- coding: UTF-8 -*-import socket, selectclass Request(object): def __init__(self, sock, info): self.sock = sock self.info = info def fileno(self): return self.sock.fileno()class MyName(object): def __init__(self): self.sock_list = [] self.conns = [] def add_request(self, req_info): ''' 创建socket连接 :param req_info: :return: ''' client = socket.socket() client.setblocking(False) try: client.connect((req_info.get("host"), req_info.get("port"))) except BlockingIOError as e: pass obj = Request(client, req_info) self.sock_list.append(obj) self.conns.append(obj) def run(self): ''' 开始事件循环,检测连接成功?是否有数据返回? :return: ''' while True: # select.select([sock对象]) # 可以是任何对象,对象一定要有fileno方法 # select 内部会执行 对象.fileno() 方法 # select.select([Request对象]) r, w, e = select.select(self.sock_list, self.conns, [], 0.05) # 每隔0.05秒检测是否有变化 # w, 是否连接成功 for obj in w: # 检查obj:request对象 data = "GET {} HTTP/1.0\r\nHost: {}\r\n\r\n".format(obj.info['path'], obj.info['host']) obj.sock.send(data.encode("utf-8")) self.conns.remove(obj) # # 数据返回,接受 for obj in r: res = obj.sock.recv(8096) print(obj.info['host']) obj.info['callback'](res) self.sock_list.remove(obj) # 所有的请求已经返回 if not self.sock_list: breakdef done01(res): print(res)def done02(res): print(res)url_list = [ { "host": "www.baidu.com", "port": 80 , "path": "/", "callback": done01}, { "host": "www.51cto.com", "port": 80, "path": "/", "callback": done02}, { "host": "www.cnblogs.com", "port": 80, "path": "/alex3714/articles/5885096.html", "callback": done01},]my_name = MyName()for item in url_list: my_name.add_request(item)my_name.run()
本文参考