Remote Data

Dask can read data from a variety of data stores including local file systems,network file systems, cloud object stores, and Hadoop. Typically this is doneby prepending a protocol like "s3://" to paths used in common data accessfunctions like dd.read_csv:

  1. import dask.dataframe as dd
  2. df = dd.read_csv('s3://bucket/path/to/data-*.csv')
  3. df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')
  4.  
  5. import dask.bag as db
  6. b = db.read_text('hdfs://path/to/*.json').map(json.loads)

Dask uses fsspec for local, cluster and remote data IO. Other file interaction, suchas loading of configuration, is done using ordinary python method.

The following remote services are well supported and tested against the maincodebase:

  • Local or Network File System: file:// - the local file system, default in the absence of any protocol
  • Hadoop File System: hdfs:// - Hadoop Distributed File System, for resilient, replicatedfiles within a cluster. This uses PyArrow as the backend.
  • Amazon S3: s3:// - Amazon S3 remote binary store, often used with Amazon EC2,using the library s3fs
  • Google Cloud Storage: gcs:// or gs: - Google Cloud Storage, typically used with Google Computeresource using gcsfs (in development)
  • HTTP(s): http:// or https:// for reading data directly from HTTP web servers
  • Azure Datalake Storage: adl://, for use with the MicrosoftAzure platform, using azure-data-lake-store-python, is unavailable in the current release offsspec, but a new version using Microsoft’s “protocol 2” should come soon.

fsspec also provides other file sytstems that may be of interest to Dask users, such asssh, ftp and webhdfs. See the documentation for more information.

When specifying a storage location, a URL should be provided using the generalform protocol://path/to/data. If no protocol is provided, the localfile system is assumed (same as file://).

Lower-level details on how Dask handles remote data is described is describedbelow in the Internals section

Optional Parameters

Two methods exist for passing parameters to the backend file system driver:extending the URL to include username, password, server, port, etc.; andproviding storage_options, a dictionary of parameters to pass on. Thesecond form is more general, as any number of file system-specific optionscan be passed.

Examples:

  1. df = dd.read_csv('hdfs://[email protected]:port/path/*.csv')
  2.  
  3. df = dd.read_parquet('s3://bucket/path',
  4. storage_options={'anon': True, 'use_ssl': False})

Details on how to provide configuration for the main back-endsare listed next, but further details can be found in the documentation pages of therelevant back-end.

Each back-end has additional installation requirements and may not be availableat runtime. The dictionary fsspec.registry contains thecurrently imported file systems. To see which backends fsspec knows howto import, you can do

  1. from fsspec.registry import known_implementations
  2. known_implementations

Note that some backends appear twice, if they can be referenced with multipleprotocol strings, like “http” and “https”.

Local File System

Local files are always accessible, and all parameters passed as part of the URL(beyond the path itself) or with the storage_optionsdictionary will be ignored.

This is the default back-end, and the one used if no protocol is passed at all.

We assume here that each worker has access to the same file system - eitherthe workers are co-located on the same machine, or a network file systemis mounted and referenced at the same path location for every worker node.

Locations specified relative to the current working directory will, ingeneral, be respected (as they would be with the built-in python open),but this may fail in the case that the client and worker processes do notnecessarily have the same working directory.

Hadoop File System

The Hadoop File System (HDFS) is a widely deployed, distributed, data-local filesystem written in Java. This file system backs many clusters running Hadoop andSpark. HDFS support can be provided by PyArrow.

By default, the back-end attempts to read the default server and port fromlocal Hadoop configuration files on each node, so it may be that noconfiguration is required. However, the server, port, and user can be passed aspart of the url: hdfs://user:pass@server:port/path/to/data, or using thestorage_options= kwarg.

Extra Configuration for PyArrow

The following additional options may be passed to the PyArrow driver viastorage_options:

  • host, port, user: Basic authentication
  • kerb_ticket: Path to kerberos ticket cache

PyArrow’s libhdfs driver can also be affected by a few environmentvariables. For more information on these, see the PyArrow documentation.

Amazon S3

Amazon S3 (Simple Storage Service) is a web service offered by Amazon WebServices.

The S3 back-end available to Dask is s3fs, and is importable when Dask isimported.

Authentication for S3 is provided by the underlying library boto3. As describedin the auth docs, this could be achieved by placing credentials files in oneof several locations on each node: ~/.aws/credentials, ~/.aws/config,/etc/boto.cfg, and ~/.boto. Alternatively, for nodes locatedwithin Amazon EC2, IAM roles can be set up for each node, and then no furtherconfiguration is required. The final authentication option for usercredentials can be passed directly in the URL(s3://keyID:keySecret/bucket/key/name) or using storage_options. Inthis case, however, the key/secret will be passed to all workers in-the-clear,so this method is only recommended on well-secured networks.

The following parameters may be passed to s3fs using storage_options:

  • anon: Whether access should be anonymous (default False)
  • key, secret: For user authentication
  • token: If authentication has been done with some other S3 client
  • use_ssl: Whether connections are encrypted and secure (default True)
  • client_kwargs: Dict passed to the boto3 client, with keys suchas region_name or endpoint_url. Notice: do not pass the configoption here, please pass it’s content to config_kwargs instead.
  • config_kwargs: Dict passed to the s3fs.S3FileSystem, which passes it tothe boto3 client’s config option.
  • requester_pays: Set True if the authenticated user will assume transfercosts, which is required by some providers of bulk data
  • default_block_size, default_fill_cache: These are not of particularinterest to Dask users, as they concern the behaviour of the bufferbetween successive reads
  • kwargs: Other parameters are passed to the boto3 Session object,such as profile_name, to pick one of the authentication sections fromthe configuration files referred to above (see here)

Using Other S3-Compatible Services

By using the endpoint_url option, you may use other s3-compatible services,for example, using AlibabaCloud OSS:

  1. dask_function(...,
  2. storage_options={
  3. "key": ...,
  4. "secret": ...,
  5. "client_kwargs": {
  6. "endpoint_url": "http://some-region.some-s3-compatible.com",
  7. },
  8. # this dict goes to boto3 client's `config`
  9. # `addressing_style` is required by AlibabaCloud, other services may not
  10. "config_kwargs": {"s3": {"addressing_style": "virtual"}},
  11. })

Google Cloud Storage

Google Cloud Storage is a RESTful online file storage web service for storingand accessing data on Google’s infrastructure.

The GCS back-end is identified by theprotocol identifiers gcs and gs, which are identical in their effect.

Multiple modes of authentication are supported. These options should beincluded in the storage_options dictionary as {'token': ..} submitted with your callto a storage-based Dask function/method. See the gcsfs documentation for furtherdetails.

General recommendations for distributed clusters, in order:

  • use anon for public data
  • use cloud if this is available
  • use gcloud to generate a JSON file, and distribute this to all workers, andsupply the path to the file
  • use gcsfs directly with the browser method to generate a token cache file(~/.gcs_tokens) and distribute this to all workers, thereafter using method cache

A final suggestion is shown below, which may be the fastest and simplest for authenticated access (asopposed to anonymous), since it will not require re-authentication. However, this methodis not secure since credentials will be passed directly around the cluster. This is fine ifyou are certain that the cluster is itself secured. You need to create a GCSFileSystem objectusing any method that works for you and then pass its credentials directly:

  1. gcs = GCSFileSystem(...)
  2. dask_function(..., storage_options={'token': gcs.session.credentials})

Azure

Warning

Support for AzureDLFileSystem (ADL) is not currently offered. We hope to provide bothdatalake and blob support using Protocol 2 soon.

authentication in storage_options=,and all other parameters will be passed on to the AzureDLFileSystem constructor(follow the link for further information). The auth parameters are passed directly toworkers, so this should only be used within a secure cluster.

HTTP(S)

Direct file-like access to arbitrary URLs is available over HTTP and HTTPS. However,there is no such thing as glob functionality over HTTP, so only explicit listsof files can be used.

Server implementations differ in the information they provide - they may or maynot specify the size of a file via a HEAD request or at the start of a download -and some servers may not respect byte range requests. The HTTPFileSystem thereforeoffers best-effort behaviour: the download is streamed but, if more data is seenthan the configured block-size, an error will be raised. To be able to access suchdata you must read the whole file in one shot (and it must fit in memory).

Using a block size of 0 will return normal requests streaming file-like objects,which are stable, but provide no random access.

Developer API

The prototype for any file system back-end can be found infsspec.spec.AbstractFileSystem. Any new implementation should provide thesame API, or directly subclass, and make itself available as a protocol to Dask. For example, thefollowing would register the protocol “myproto”, described by the implementationclass MyProtoFileSystem. URLs of the form myproto:// would thereafterbe dispatched to the methods of this class:

  1. fsspec.registry['myproto'] = MyProtoFileSystem

However, it would be better to submit a PR to fsspec to include the class inthe known_implementations.

Internals

Dask contains internal tools for extensible data ingestion in thedask.bytes package and using fsspec.. These functions are developer-focused rather than fordirect consumption by users. These functions power user facing functions likedd.readcsv _and db.readtext _which are probably more useful for mostusers.

read_bytes(urlpath[, delimiter, not_zero, …])Given a path or paths, return delayed objects that read from those paths.
open_files(urlpath[, mode, compression, …])Given a path or paths, return a list of OpenFile objects.

These functions are extensible in their output formats (bytes, file objects),their input locations (file system, S3, HDFS), line delimiters, and compressionformats.

Both functions are lazy, returning eitherpoint to blocks of bytes (read_bytes) or open file objects(open_files). They can handle different storage backends by prependingprotocols like s3:// or hdfs:// (see below). They handle compression formatslisted in fsspec.compression, some of which may require additional packagesto be installed.

These functions are not used for all data sources. Some data sources like HDF5are quite particular and receive custom treatment.

Delimiters

The read_bytes function takes a path (or globstring of paths) and producesa sample of the first file and a list of delayed objects for each of the otherfiles. If passed a delimiter such as delimiter=b'\n', it will ensure thatthe blocks of bytes start directly after a delimiter and end directly before adelimiter. This allows other functions, like pd.read_csv, to operate onthese delayed values with expected behavior.

These delimiters are useful both for typical line-based formats (log files,CSV, JSON) as well as other delimited formats like Avro, which may separatelogical chunks by a complex sentinel string. Note that the delimiter findingalgorithm is simple, and will not account for characters that are escaped,part of a UTF-8 code sequence or within the quote marks of a string.

Compression

These functions support widely available compression technologies like gzip,bz2, xz, snappy, and lz4. More compressions can be easilyadded by inserting functions into dictionaries available in thefsspec.compression module. This can be done at runtime and need not beadded directly to the codebase.

However, most compression technologies like gzip do not support efficientrandom access, and so are useful for streaming open_files but not useful forread_bytes which splits files at various points.

API

  • dask.bytes.readbytes(_urlpath, delimiter=None, not_zero=False, blocksize='128 MiB', sample='10 kiB', compression=None, include_path=False, **kwargs)
  • Given a path or paths, return delayed objects that read from those paths.

The path may be a filename like '2015-01-01.csv' or a globstringlike '2015--.csv'.

The path may be preceded by a protocol, like s3:// or hdfs:// ifthose libraries are installed.

This cleanly breaks data by a delimiter if given, so that block boundariesstart directly after a delimiter and end on the delimiter.

Parameters:

  • urlpath:string or list
  • Absolute or relative filepath(s). Prefix with a protocol like s3://to read from alternative filesystems. To read from multiple files youcan pass a globstring or a list of paths, with the caveat that theymust all have the same protocol.

  • delimiter:bytes

  • An optional delimiter, like b'\n' on which to split blocks ofbytes.

  • not_zero:bool

  • Force seek of start-of-file delimiter, discarding header.

  • blocksize:int, str

  • Chunk size in bytes, defaults to “128 MiB”

  • compression:string or None

  • String like ‘gzip’ or ‘xz’. Must support efficient random access.

  • sample:int, string, or boolean

  • Whether or not to return a header sample.Values can be False for “no sample requested”Or an integer or string value like 2**20 or "1 MiB"

  • include_path:bool

  • Whether or not to include the path with the bytes representing a particular file.Default is False.

  • **kwargs:dict

  • Extra options that make sense to a particular storage connection, e.g.host, port, username, password, etc.Returns:
  • sample:bytes
  • The sample header

  • blocks:list of lists of dask.Delayed

  • Each list corresponds to a file, and each delayed object computes to ablock of bytes from that file.

  • paths:list of strings, only included if include_path is True

  • List of same length as blocks, where each item is the path to the filerepresented in the corresponding block.

Examples

  1. >>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n') # doctest: +SKIP
  2. >>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n') # doctest: +SKIP
  3. >>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True) # doctest: +SKIP
  • dask.bytes.openfiles(_urlpath, mode='rb', compression=None, encoding='utf8', errors=None, name_function=None, num=1, protocol=None, newline=None, **kwargs)
  • Given a path or paths, return a list of OpenFile objects.

For writing, a str path must contain the “” character, which will be filledin by increasing numbers, e.g., “part” -> “part1”, “part2” if num=2.

For either reading or writing, can instead provide explicit list of paths.

Parameters:

  • urlpath: string or list
  • Absolute or relative filepath(s). Prefix with a protocol like s3://to read from alternative filesystems. To read from multiple files youcan pass a globstring or a list of paths, with the caveat that theymust all have the same protocol.

  • mode: ‘rb’, ‘wt’, etc.

  • compression: string
  • Compression to use. See dask.bytes.compression.files for options.

  • encoding: str

  • For text mode only

  • errors: None or str

  • Passed to TextIOWrapper in text mode

  • name_function: function or None

  • if opening a set of files for writing, those files do not yet exist,so we need to generate their names by formatting the urlpath foreach sequence number

  • num: int [1]

  • if writing mode, number of files we expect to create (passed toname+function)

  • protocol: str or None

  • If given, overrides the protocol found in the URL.

  • newline: bytes or None

  • Used for line terminator in text mode. If None, uses system default;if blank, uses no translation.

  • **kwargs: dict

  • Extra options that make sense to a particular storage connection, e.g.host, port, username, password, etc.Returns:
  • List of OpenFile objects.

Examples

  1. >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
  2. >>> files = open_files(
  3. ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
  4. ... ) # doctest: +SKIP