change_stream – Watch changes on a collection, database, or cluster

Watch changes on a collection, a database, or the entire cluster.

  • class pymongo.changestream.ChangeStream(_target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after)
  • The internal abstract base class for change stream cursors.

Should not be called directly by application developers. Use pymongo.collection.Collection.watch(),pymongo.database.Database.watch(), orpymongo.mongo_client.MongoClient.watch() instead.

New in version 3.6.

See also

The MongoDB documentation on

changeStreams

  • alive
  • Does this cursor have the potential to return more data?

Note

Even if alive is True, next() can raiseStopIteration and try_next() can return None.

New in version 3.8.

  • close()
  • Close this ChangeStream.

  • next()

  • Advance the cursor.

This method blocks until the next change document is returned or anunrecoverable error is raised. This method is used when iterating overall changes in the cursor. For example:

  1. try:
  2. resume_token = None
  3. pipeline = [{'$match': {'operationType': 'insert'}}]
  4. with db.collection.watch(pipeline) as stream:
  5. for insert_change in stream:
  6. print(insert_change)
  7. resume_token = stream.resume_token
  8. except pymongo.errors.PyMongoError:
  9. # The ChangeStream encountered an unrecoverable error or the
  10. # resume attempt failed to recreate the cursor.
  11. if resume_token is None:
  12. # There is no usable resume token because there was a
  13. # failure during ChangeStream initialization.
  14. logging.error('...')
  15. else:
  16. # Use the interrupted ChangeStream's resume token to create
  17. # a new ChangeStream. The new stream will continue from the
  18. # last seen insert change without missing any events.
  19. with db.collection.watch(
  20. pipeline, resume_after=resume_token) as stream:
  21. for insert_change in stream:
  22. print(insert_change)

Raises StopIteration if this ChangeStream is closed.

  • resume_token
  • The cached resume token that will be used to resume after the mostrecently returned change.

New in version 3.9.

  • try_next()
  • Advance the cursor without blocking indefinitely.

This method returns the next change document without waitingindefinitely for the next change. For example:

  1. with db.collection.watch() as stream:
  2. while stream.alive:
  3. change = stream.try_next()
  4. # Note that the ChangeStream's resume token may be updated
  5. # even when no changes are returned.
  6. print("Current resume token: %r" % (stream.resume_token,))
  7. if change is not None:
  8. print("Change document: %r" % (change,))
  9. continue
  10. # We end up here when there are no recent changes.
  11. # Sleep for a while before trying again to avoid flooding
  12. # the server with getMore requests when no changes are
  13. # available.
  14. time.sleep(10)

If no change document is cached locally then this method runs a singlegetMore command. If the getMore yields any documents, the nextdocument is returned, otherwise, if the getMore returns no documents(because there have been no changes) then None is returned.

Returns:The next change document or None when no document is availableafter running a single getMore or when the cursor is closed.

New in version 3.8.

  • class pymongo.changestream.ClusterChangeStream(_target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after)
  • A change stream that watches changes on all collections in the cluster.

Should not be called directly by application developers. Usehelper method pymongo.mongo_client.MongoClient.watch() instead.

New in version 3.7.

  • class pymongo.changestream.CollectionChangeStream(_target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after)
  • A change stream that watches changes on a single collection.

Should not be called directly by application developers. Usehelper method pymongo.collection.Collection.watch() instead.

New in version 3.7.

  • class pymongo.changestream.DatabaseChangeStream(_target, pipeline, full_document, resume_after, max_await_time_ms, batch_size, collation, start_at_operation_time, session, start_after)
  • A change stream that watches changes on all collections in a database.

Should not be called directly by application developers. Usehelper method pymongo.database.Database.watch() instead.

New in version 3.7.