爬虫相关

基于Tornado的异步爬虫

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import time
  4. import logging
  5. from datetime import timedelta
  6. from tornado import httpclient, gen, ioloop, queues
  7. import traceback
  8. from bs4 import BeautifulSoup
  9. def logged(class_):
  10. logging.basicConfig(level=logging.INFO)
  11. class_.logger = logging.getLogger(class_.__name__)
  12. return class_
  13. @logged
  14. class AsyncSpider(object):
  15. """A simple class of asynchronous spider."""
  16. def __init__(self, urls, concurrency=10, results=None, **kwargs):
  17. super(AsyncSpider, self).__init__(**kwargs)
  18. self.concurrency = concurrency
  19. self._q = queues.Queue()
  20. self._fetching = set()
  21. self._fetched = set()
  22. if results is None:
  23. self.results = []
  24. for url in urls:
  25. self._q.put(url)
  26. httpclient.AsyncHTTPClient.configure(
  27. "tornado.curl_httpclient.CurlAsyncHTTPClient"
  28. )
  29. def fetch(self, url, **kwargs):
  30. fetch = getattr(httpclient.AsyncHTTPClient(), 'fetch')
  31. http_request = httpclient.HTTPRequest(url, **kwargs)
  32. return fetch(http_request, raise_error=False)
  33. def handle_html(self, url, html):
  34. """处理html页面"""
  35. print(url)
  36. def handle_response(self, url, response):
  37. """处理http响应,对于200响应码直接处理html页面,
  38. 否则按照需求处理不同响应码"""
  39. if response.code == 200:
  40. self.handle_html(url, response.body)
  41. elif response.code == 599: # retry
  42. self._fetching.remove(url)
  43. self._q.put(url)
  44. @gen.coroutine
  45. def get_page(self, url):
  46. # yield gen.sleep(10) # sleep when need
  47. try:
  48. response = yield self.fetch(url)
  49. self.logger.debug('######fetched %s' % url)
  50. except Exception as e:
  51. self.logger.debug('Exception: %s %s' % (e, url))
  52. raise gen.Return(e)
  53. raise gen.Return(response) # py3 can just return response
  54. @gen.coroutine
  55. def _run(self):
  56. @gen.coroutine
  57. def fetch_url():
  58. current_url = yield self._q.get()
  59. try:
  60. if current_url in self._fetching:
  61. return
  62. self.logger.debug('fetching****** %s' % current_url)
  63. self._fetching.add(current_url)
  64. response = yield self.get_page(current_url)
  65. self.handle_response(current_url, response) # handle reponse
  66. self._fetched.add(current_url)
  67. finally:
  68. self._q.task_done()
  69. @gen.coroutine
  70. def worker():
  71. while True:
  72. yield fetch_url()
  73. # Start workers, then wait for the work queue to be empty.
  74. for _ in range(self.concurrency):
  75. worker()
  76. yield self._q.join(timeout=timedelta(seconds=300000))
  77. try:
  78. assert self._fetching == self._fetched
  79. except AssertionError: # some http error not handle
  80. print(self._fetching-self._fetched)
  81. print(self._fetched-self._fetching)
  82. def run(self):
  83. io_loop = ioloop.IOLoop.current()
  84. io_loop.run_sync(self._run)
  85. class MySpider(AsyncSpider):
  86. def fetch(self, url, **kwargs):
  87. """重写父类fetch方法可以添加cookies,headers,timeout等信息"""
  88. cookies_str = "PHPSESSID=j1tt66a829idnms56ppb70jri4; pspt=%7B%22id%22%3A%2233153%22%2C%22pswd%22%3A%228835d2c1351d221b4ab016fbf9e8253f%22%2C%22_code%22%3A%22f779dcd011f4e2581c716d1e1b945861%22%7D; key=%E9%87%8D%E5%BA%86%E5%95%84%E6%9C%A8%E9%B8%9F%E7%BD%91%E7%BB%9C%E7%A7%91%E6%8A%80%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8; think_language=zh-cn; SERVERID=a66d7d08fa1c8b2e37dbdc6ffff82d9e|1444973193|1444967835; CNZZDATA1254842228=1433864393-1442810831-%7C1444972138"
  89. headers = {
  90. 'User-Agent': 'mozilla/5.0 (compatible; baiduspider/2.0; +http://www.baidu.com/search/spider.html)',
  91. 'cookie': cookies_str
  92. }
  93. return super(MySpider, self).fetch(
  94. url, headers=headers,
  95. #proxy_host="127.0.0.1", proxy_port=8787, # for proxy
  96. )
  97. def handle_html(self, url, html):
  98. print(url)
  99. #print(BeautifulSoup(html, 'lxml').find('title'))
  100. def main():
  101. st = time.time()
  102. urls = []
  103. n = 1000
  104. for page in range(1, n):
  105. urls.append('http://www.jb51.net/article/%s.htm' % page)
  106. s = MySpider(urls, 10)
  107. s.run()
  108. print(time.time()-st)
  109. print(60.0/(time.time()-st)*1000, 'per minute')
  110. print(60.0/(time.time()-st)*1000/60.0, 'per second')
  111. if __name__ == '__main__':
  112. main()

写爬虫会遇到的一些工具函数

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. """
  4. chrome有个功能,对于请求可以直接右键copy as curl,然后在命令行里边用curl
  5. 模拟发送请求。现在需要把此curl字符串处理成requests库可以传入的参数格式,
  6. http://stackoverflow.com/questions/23118249/whats-the-difference-between-request-payload-vs-form-data-as-seen-in-chrome
  7. """
  8. import re
  9. from functools import wraps
  10. import traceback
  11. import requests
  12. def encode_to_dict(encoded_str):
  13. """ 将encode后的数据拆成dict
  14. >>> encode_to_dict('name=foo')
  15. {'name': foo'}
  16. >>> encode_to_dict('name=foo&val=bar')
  17. {'name': 'foo', 'val': 'var'}
  18. """
  19. pair_list = encoded_str.split('&')
  20. d = {}
  21. for pair in pair_list:
  22. if pair:
  23. key = pair.split('=')[0]
  24. val = pair.split('=')[1]
  25. d[key] = val
  26. return d
  27. def parse_curl_str(s):
  28. """convert chrome curl string to url, headers_dict and data"""
  29. pat = re.compile("'(.*?)'")
  30. str_list = [i.strip() for i in re.split(pat, s)] # 拆分curl请求字符串
  31. url = ''
  32. headers_dict = {}
  33. data = ''
  34. for i in range(0, len(str_list)-1, 2):
  35. arg = str_list[i]
  36. string = str_list[i+1]
  37. if arg.startswith('curl'):
  38. url = string
  39. elif arg.startswith('-H'):
  40. header_key = string.split(':', 1)[0].strip()
  41. header_val = string.split(':', 1)[1].strip()
  42. headers_dict[header_key] = header_val
  43. elif arg.startswith('--data'):
  44. data = string
  45. return url, headers_dict, data
  46. def retry(retries=3):
  47. """一个失败请求重试,或者使用下边这个功能强大的retrying
  48. pip install retrying
  49. https://github.com/rholder/retrying
  50. :param retries: number int of retry times.
  51. """
  52. def _retry(func):
  53. @wraps(func)
  54. def _wrapper(*args, **kwargs):
  55. index = 0
  56. while index < retries:
  57. index += 1
  58. try:
  59. response = func(*args, **kwargs)
  60. if response.status_code == 404:
  61. print(404)
  62. break
  63. elif response.status_code != 200:
  64. print(response.status_code)
  65. continue
  66. else:
  67. break
  68. except Exception as e:
  69. traceback.print_exc()
  70. response = None
  71. return response
  72. return _wrapper
  73. return _retry
  74. _get = requests.get
  75. @retry(5)
  76. def get(*args, **kwds):
  77. if 'timeout' not in kwds:
  78. kwds['timeout'] = 10
  79. if 'headers' not in kwds:
  80. headers = {
  81. 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36',
  82. }
  83. kwds['headers'] = headers
  84. return _get(*args, **kwds)
  85. requests.get = get
  86. def retry_get_html(*args, **kwargs):
  87. try:
  88. return get(*args, **kwargs).content
  89. except AttributeError:
  90. return ''
  91. def lazy_property(fn):
  92. attr_name = '_lazy_' + fn.__name__
  93. @property
  94. def _lazy_property(self):
  95. if not hasattr(self, attr_name):
  96. setattr(self, attr_name, fn(self))
  97. return getattr(self, attr_name)
  98. return _lazy_property
  99. def my_ip():
  100. # url = 'https://api.ipify.org?format=json'
  101. url = 'http://httpbin.org/ip'
  102. return requests.get(url).text
  103. def form_data_to_dict(s):
  104. """form_data_to_dict s是从chrome里边复制得到的form-data表单里的字符串,
  105. 注意*必须*用原始字符串r""
  106. :param s: form-data string
  107. """
  108. arg_list = [line.strip() for line in s.split('\n')]
  109. d = {}
  110. for i in arg_list:
  111. if i:
  112. k = i.split(':', 1)[0].strip()
  113. v = ''.join(i.split(':', 1)[1:]).strip()
  114. d[k] = v
  115. return d
  116. if __name__ == '__main__':
  117. import sys
  118. from pprint import pprint
  119. try:
  120. curl_str = sys.argv[1] # 用三引号括起来作为参数
  121. url, headers_dict, data = parse_curl_str(curl_str)
  122. print(url)
  123. pprint(headers_dict)
  124. print(data)
  125. except IndexError:
  126. exit(0)

如何使用代理

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # requests proxy demo
  4. import requests
  5. # install lantern first, 这是使用lantern的代理地址
  6. proxies = {
  7. "http": "http://127.0.0.1:8787",
  8. "https": "http://127.0.0.1:8787",
  9. }
  10. url = 'http://httpbin.org/ip'
  11. r = requests.get(url, proxies=proxies)
  12. print(r.text)
  13. # requests from version 2.10.0 support socks proxy
  14. # pip install -U requests[socks]
  15. proxies = {'http': "socks5://myproxy:9191"}
  16. requests.get('http://example.org', proxies=proxies)
  17. # tornado proxy demo
  18. # sudo apt-get install libcurl-dev librtmp-dev
  19. # pip install tornado pycurl
  20. from tornado import httpclient, ioloop
  21. config = {
  22. 'proxy_host': 'YOUR_PROXY_HOSTNAME_OR_IP_ADDRESS',
  23. 'proxy_port': 3128
  24. }
  25. httpclient.AsyncHTTPClient.configure(
  26. "tornado.curl_httpclient.CurlAsyncHTTPClient")
  27. def handle_request(response):
  28. if response.error:
  29. print "Error:", response.error
  30. else:
  31. print response.body
  32. ioloop.IOLoop.instance().stop()
  33. http_client = httpclient.AsyncHTTPClient()
  34. http_client.fetch("http://twitter.com/",
  35. handle_request, **config)
  36. ioloop.IOLoop.instance().start()

使用线程池

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import concurrent.futures
  4. import bs4
  5. import requests
  6. class ThreadPoolCrawler(object):
  7. def __init__(self, urls, concurrency=10, **kwargs):
  8. self.urls = urls
  9. self.concurrency = concurrency
  10. self.results = []
  11. def handle_response(self, url, response):
  12. pass
  13. def get(self, *args, **kwargs):
  14. return requests.get(*args, **kwargs)
  15. def run(self):
  16. with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrency) as executor:
  17. future_to_url = {
  18. executor.submit(self.get, url): url for url in self.urls
  19. }
  20. for future in concurrent.futures.as_completed(future_to_url):
  21. url = future_to_url[future]
  22. try:
  23. response = future.result()
  24. except Exception as e:
  25. import traceback
  26. traceback.print_exc()
  27. else:
  28. self.handle_response(url, response)
  29. class TestCrawler(ThreadPoolCrawler):
  30. def handle_response(self, url, response):
  31. soup = bs4.BeautifulSoup(response.text, 'lxml')
  32. title = soup.find('title')
  33. self.results.append({url: title})
  34. def main():
  35. import time
  36. urls = ['http://localhost:8000'] * 300
  37. for nums in [2, 5, 10, 15, 20, 50, 70, 100]:
  38. beg = time.time()
  39. s = TestCrawler(urls, nums)
  40. s.run()
  41. print(nums, time.time()-beg)
  42. if __name__ == '__main__':
  43. main()

使用tor代理ip

  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # http://ningning.today/2016/03/07/python/python-requests-tor-crawler/
  4. import os
  5. import requests
  6. import requesocks
  7. #url = 'https://api.ipify.org?format=json'
  8. url = 'http://httpbin.org/ip'
  9. def getip_requests(url):
  10. print "(+) Sending request with plain requests..."
  11. r = requests.get(url)
  12. print "(+) IP is: " + r.text.replace("\n", "")
  13. def getip_requesocks(url):
  14. print "(+) Sending request with requesocks..."
  15. session = requesocks.session()
  16. session.proxies = {'http': 'socks5://127.0.0.1:9050',
  17. 'https': 'socks5://127.0.0.1:9050'}
  18. r = session.get(url)
  19. print "(+) IP is: " + r.text.replace("\n", "")
  20. def tor_requests():
  21. proxies = {
  22. 'http': 'socks5://127.0.0.1:9050',
  23. 'https': 'socks5://127.0.0.1:9050',
  24. }
  25. r = requests.get(url, proxies=proxies)
  26. print r.text
  27. def main():
  28. print "Running tests..."
  29. getip_requests(url)
  30. getip_requesocks(url)
  31. os.system("""(echo authenticate '"yourpassword"'; echo signal newnym; echo quit) | nc localhost 9051""")
  32. getip_requesocks(url)
  33. if __name__ == "__main__":
  34. main()
  35. #tor_requests()