Item Pipeline

After an item has been scraped by a spider, it is sent to the Item Pipelinewhich processes it through several components that are executed sequentially.

Each item pipeline component (sometimes referred as just “Item Pipeline”) is aPython class that implements a simple method. They receive an item and performan action over it, also deciding if the item should continue through thepipeline or be dropped and no longer processed.

Typical uses of item pipelines are:

  • cleansing HTML data
  • validating scraped data (checking that the items contain certain fields)
  • checking for duplicates (and dropping them)
  • storing the scraped item in a database

Writing your own item pipeline

Each item pipeline component is a Python class that must implement the following method:

  • processitem(_self, item, spider)
  • This method is called for every item pipeline component. process_item()must either: return a dict with data, return an Item(or any descendant class) object, return aDeferred or raiseDropItem exception. Dropped items are no longerprocessed by further pipeline components.

Parameters:

  • item (Item object or a dict) – the item scraped
  • spider (Spider object) – the spider which scraped the item

Additionally, they may also implement the following methods:

  • openspider(_self, spider)
  • This method is called when the spider is opened.

Parameters:spider (Spider object) – the spider which was opened

  • closespider(_self, spider)
  • This method is called when the spider is closed.

Parameters:spider (Spider object) – the spider which was closed

  • fromcrawler(_cls, crawler)
  • If present, this classmethod is called to create a pipeline instancefrom a Crawler. It must return a new instanceof the pipeline. Crawler object provides access to all Scrapy corecomponents like settings and signals; it is a way for pipeline toaccess them and hook its functionality into Scrapy.

Parameters:crawler (Crawler object) – crawler that uses this pipeline

Item pipeline example

Price validation and dropping items with no prices

Let’s take a look at the following hypothetical pipeline that adjusts theprice attribute for those items that do not include VAT(price_excludes_vat attribute), and drops those items which don’tcontain a price:

  1. from scrapy.exceptions import DropItem
  2.  
  3. class PricePipeline(object):
  4.  
  5. vat_factor = 1.15
  6.  
  7. def process_item(self, item, spider):
  8. if item.get('price'):
  9. if item.get('price_excludes_vat'):
  10. item['price'] = item['price'] * self.vat_factor
  11. return item
  12. else:
  13. raise DropItem("Missing price in %s" % item)

Write items to a JSON file

The following pipeline stores all scraped items (from all spiders) into asingle items.jl file, containing one item per line serialized in JSONformat:

  1. import json
  2.  
  3. class JsonWriterPipeline(object):
  4.  
  5. def open_spider(self, spider):
  6. self.file = open('items.jl', 'w')
  7.  
  8. def close_spider(self, spider):
  9. self.file.close()
  10.  
  11. def process_item(self, item, spider):
  12. line = json.dumps(dict(item)) + "\n"
  13. self.file.write(line)
  14. return item

Note

The purpose of JsonWriterPipeline is just to introduce how to writeitem pipelines. If you really want to store all scraped items into a JSONfile you should use the Feed exports.

Write items to MongoDB

In this example we’ll write items to MongoDB using pymongo.MongoDB address and database name are specified in Scrapy settings;MongoDB collection is named after item class.

The main point of this example is to show how to use from_crawler()method and how to clean up the resources properly.:

  1. import pymongo
  2.  
  3. class MongoPipeline(object):
  4.  
  5. collection_name = 'scrapy_items'
  6.  
  7. def __init__(self, mongo_uri, mongo_db):
  8. self.mongo_uri = mongo_uri
  9. self.mongo_db = mongo_db
  10.  
  11. @classmethod
  12. def from_crawler(cls, crawler):
  13. return cls(
  14. mongo_uri=crawler.settings.get('MONGO_URI'),
  15. mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
  16. )
  17.  
  18. def open_spider(self, spider):
  19. self.client = pymongo.MongoClient(self.mongo_uri)
  20. self.db = self.client[self.mongo_db]
  21.  
  22. def close_spider(self, spider):
  23. self.client.close()
  24.  
  25. def process_item(self, item, spider):
  26. self.db[self.collection_name].insert_one(dict(item))
  27. return item

Take screenshot of item

This example demonstrates how to return aDeferred from the process_item() method.It uses Splash to render screenshot of item url. Pipelinemakes request to locally running instance of Splash. After request is downloaded,it saves the screenshot to a file and adds filename to the item.

  1. import scrapy
  2. import hashlib
  3. from urllib.parse import quote
  4.  
  5.  
  6. class ScreenshotPipeline(object):
  7. """Pipeline that uses Splash to render screenshot of
  8. every Scrapy item."""
  9.  
  10. SPLASH_URL = "http://localhost:8050/render.png?url={}"
  11.  
  12. async def process_item(self, item, spider):
  13. encoded_item_url = quote(item["url"])
  14. screenshot_url = self.SPLASH_URL.format(encoded_item_url)
  15. request = scrapy.Request(screenshot_url)
  16. response = await spider.crawler.engine.download(request, spider)
  17.  
  18. if response.status != 200:
  19. # Error happened, return item.
  20. return item
  21.  
  22. # Save screenshot to file, filename will be hash of url.
  23. url = item["url"]
  24. url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
  25. filename = "{}.png".format(url_hash)
  26. with open(filename, "wb") as f:
  27. f.write(response.body)
  28.  
  29. # Store filename in item.
  30. item["screenshot_filename"] = filename
  31. return item

Duplicates filter

A filter that looks for duplicate items, and drops those items that werealready processed. Let’s say that our items have a unique id, but our spiderreturns multiples items with the same id:

  1. from scrapy.exceptions import DropItem
  2.  
  3. class DuplicatesPipeline(object):
  4.  
  5. def __init__(self):
  6. self.ids_seen = set()
  7.  
  8. def process_item(self, item, spider):
  9. if item['id'] in self.ids_seen:
  10. raise DropItem("Duplicate item found: %s" % item)
  11. else:
  12. self.ids_seen.add(item['id'])
  13. return item

Activating an Item Pipeline component

To activate an Item Pipeline component you must add its class to theITEM_PIPELINES setting, like in the following example:

  1. ITEM_PIPELINES = {
  2. 'myproject.pipelines.PricePipeline': 300,
  3. 'myproject.pipelines.JsonWriterPipeline': 800,
  4. }

The integer values you assign to classes in this setting determine theorder in which they run: items go through from lower valued to highervalued classes. It’s customary to define these numbers in the 0-1000 range.