多线程爬虫实战:糗事百科

参考上一节课糗事百科单进程项目

python下多线程的思考

Queue是python中的标准库,可以直接import Queue引用;队列是线程间最常用的交换数据的形式

对于资源,加锁是个重要的环节。因为python原生的list,dict等,都是not thread safe的。而Queue,是线程安全的,因此在满足使用条件下,建议使用队列

Python Queue模块有三种队列及构造函数:

1、Python Queue模块的FIFO队列先进先出。 class Queue.Queue(maxsize)

2、LIFO类似于堆,即先进后出。 class Queue.LifoQueue(maxsize)

3、还有一种是优先级队列级别越低越先出来。 class Queue.PriorityQueue(maxsize)

Queue(队列对象)

  • 初始化: class Queue.Queue(maxsize) FIFO 先进先出

  • 包中的常用方法:

    Queue.qsize() 返回队列的大小

    Queue.empty() 如果队列为空,返回True,反之False

    Queue.full() 如果队列满了,返回True,反之False

    Queue.full 与 maxsize 大小对应

    Queue.get([block[, timeout]])获取队列,timeout等待时间

  1. 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True
  2. 如果队列为空且blockTrueget()就使调用线程暂停,直至有项目可用。
  3. 如果队列为空且blockFalse,队列将引发Empty异常。
  • 创建一个“队列”对象

    import Queue myqueue = Queue.Queue(maxsize = 10)

  • 将一个值放入队列中

    myqueue.put(10)

  • 将一个值从队列中取出

    myqueue.get()

多线程项目案例

多线程示意图

多线程爬虫实战:糗事百科 - 图1

  1. # -*- coding:utf-8 -*-
  2. import requests
  3. from lxml import etree
  4. from Queue import Queue
  5. import threading
  6. import time
  7. import json
  8. class thread_crawl(threading.Thread):
  9. '''
  10. 抓取线程类
  11. '''
  12. def __init__(self, threadID, q):
  13. threading.Thread.__init__(self)
  14. self.threadID = threadID
  15. self.q = q
  16. def run(self):
  17. print "Starting " + self.threadID
  18. self.qiushi_spider()
  19. print "Exiting ", self.threadID
  20. def qiushi_spider(self):
  21. # page = 1
  22. while True:
  23. if self.q.empty():
  24. break
  25. else:
  26. page = self.q.get()
  27. print 'qiushi_spider=', self.threadID, ',page=', str(page)
  28. url = 'http://www.qiushibaike.com/hot/page/' + str(page) + '/'
  29. headers = {
  30. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',
  31. 'Accept-Language': 'zh-CN,zh;q=0.8'}
  32. # 多次尝试失败结束、防止死循环
  33. timeout = 4
  34. while timeout > 0:
  35. timeout -= 1
  36. try:
  37. content = requests.get(url, headers=headers)
  38. data_queue.put(content.text)
  39. break
  40. except Exception, e:
  41. print 'qiushi_spider', e
  42. if timeout < 0:
  43. print 'timeout', url
  44. class Thread_Parser(threading.Thread):
  45. '''
  46. 页面解析类;
  47. '''
  48. def __init__(self, threadID, queue, lock, f):
  49. threading.Thread.__init__(self)
  50. self.threadID = threadID
  51. self.queue = queue
  52. self.lock = lock
  53. self.f = f
  54. def run(self):
  55. print 'starting ', self.threadID
  56. global total, exitFlag_Parser
  57. while not exitFlag_Parser:
  58. try:
  59. '''
  60. 调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。
  61. 如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。
  62. 如果队列为空且block为False,队列将引发Empty异常。
  63. '''
  64. item = self.queue.get(False)
  65. if not item:
  66. pass
  67. self.parse_data(item)
  68. self.queue.task_done()
  69. print 'Thread_Parser=', self.threadID, ',total=', total
  70. except:
  71. pass
  72. print 'Exiting ', self.threadID
  73. def parse_data(self, item):
  74. '''
  75. 解析网页函数
  76. :param item: 网页内容
  77. :return:
  78. '''
  79. global total
  80. try:
  81. html = etree.HTML(item)
  82. result = html.xpath('//div[contains(@id,"qiushi_tag")]')
  83. for site in result:
  84. try:
  85. imgUrl = site.xpath('.//img/@src')[0]
  86. title = site.xpath('.//h2')[0].text
  87. content = site.xpath('.//div[@class="content"]')[0].text.strip()
  88. vote = None
  89. comments = None
  90. try:
  91. vote = site.xpath('.//i')[0].text
  92. comments = site.xpath('.//i')[1].text
  93. except:
  94. pass
  95. result = {
  96. 'imgUrl': imgUrl,
  97. 'title': title,
  98. 'content': content,
  99. 'vote': vote,
  100. 'comments': comments,
  101. }
  102. with self.lock:
  103. # print 'write %s' % json.dumps(result)
  104. self.f.write(json.dumps(result, ensure_ascii=False).encode('utf-8') + "\n")
  105. except Exception, e:
  106. print 'site in result', e
  107. except Exception, e:
  108. print 'parse_data', e
  109. with self.lock:
  110. total += 1
  111. data_queue = Queue()
  112. exitFlag_Parser = False
  113. lock = threading.Lock()
  114. total = 0
  115. def main():
  116. output = open('qiushibaike.json', 'a')
  117. #初始化网页页码page从1-10个页面
  118. pageQueue = Queue(50)
  119. for page in range(1, 11):
  120. pageQueue.put(page)
  121. #初始化采集线程
  122. crawlthreads = []
  123. crawlList = ["crawl-1", "crawl-2", "crawl-3"]
  124. for threadID in crawlList:
  125. thread = thread_crawl(threadID, pageQueue)
  126. thread.start()
  127. crawlthreads.append(thread)
  128. #初始化解析线程parserList
  129. parserthreads = []
  130. parserList = ["parser-1", "parser-2", "parser-3"]
  131. #分别启动parserList
  132. for threadID in parserList:
  133. thread = Thread_Parser(threadID, data_queue, lock, output)
  134. thread.start()
  135. parserthreads.append(thread)
  136. # 等待队列清空
  137. while not pageQueue.empty():
  138. pass
  139. # 等待所有线程完成
  140. for t in crawlthreads:
  141. t.join()
  142. while not data_queue.empty():
  143. pass
  144. # 通知线程是时候退出
  145. global exitFlag_Parser
  146. exitFlag_Parser = True
  147. for t in parserthreads:
  148. t.join()
  149. print "Exiting Main Thread"
  150. with lock:
  151. output.close()
  152. if __name__ == '__main__':
  153. main()