Title

Open In Colab

  1. /usr/local/lib/python3.8/dist-packages/torch/cuda/__init__.py:52: UserWarning: CUDA initialization: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx (Triggered internally at /pytorch/c10/cuda/CUDAFunctions.cpp:100.)
  2. return torch._C._cuda_getDeviceCount() > 0
  1. bs = 4
  2. letters = list(string.ascii_lowercase)

DataLoader helpers

fastai includes a replacement for Pytorch’s DataLoader which is largely API-compatible, and adds a lot of useful functionality and flexibility. Before we look at the class, there are a couple of helpers we’ll need to define.

fa_collate[source]

fa_collate(t)

A replacement for PyTorch default_collate which maintains types and handles Sequences

  1. t = [(1,(2,3)),(1,(2,3))]
  2. test_eq(fa_collate(t), default_collate(t))
  3. test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
  4. t = [(1,(2,(3,4))),(1,(2,(3,4)))]
  5. test_eq(fa_collate(t), default_collate(t))
  6. test_eq(L(fa_collate(t)).map(type), [Tensor,tuple])
  7. test_eq(L(fa_collate(t)[1]).map(type), [Tensor,tuple])

fa_convert[source]

fa_convert(t)

A replacement for PyTorch default_convert which maintains types and handles Sequences

  1. t0 = array([1,2])
  2. t = [t0,(t0,t0)]
  3. test_eq(fa_convert(t), default_convert(t))
  4. test_eq(L(fa_convert(t)).map(type), [Tensor,tuple])

class SkipItemException[source]

SkipItemException() :: Exception

Raised to notify DataLoader to skip an item

class DataLoader[source]

DataLoader(dataset=None, bs=None, num_workers=0, pin_memory=False, timeout=0, batch_size=None, shuffle=False, drop_last=False, indexed=None, n=None, device=None, persistent_workers=False, wif=None, before_iter=None, after_item=None, before_batch=None, after_batch=None, after_iter=None, create_batches=None, create_item=None, create_batch=None, retain=None, get_idxs=None, sample=None, shuffle_fn=None, do_batch=None) :: GetAttr

API compatible with PyTorch DataLoader, with a lot more callbacks and flexibility

Arguments to DataLoader:

  • dataset: dataset from which to load the data. Can be either map-style or iterable-style dataset.
  • bs (int): how many samples per batch to load (if batch_size is provided then batch_size will override bs). If bs=None, then it is assumed that dataset.__getitem__ returns a batch.
  • num_workers (int): how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process.
  • pin_memory (bool): If True, the data loader will copy Tensors into CUDA pinned memory before returning them.
  • timeout (float>0): the timeout value in seconds for collecting a batch from workers.
  • batch_size (int): It is only provided for PyTorch compatibility. Use bs.
  • shuffle (bool): If True, then data is shuffled every time dataloader is fully read/iterated.
  • drop_last (bool): If True, then the last incomplete batch is dropped.
  • indexed (bool): The DataLoader will make a guess as to whether the dataset can be indexed (or is iterable), but you can override it with this parameter. True by default.
  • n (int): Defaults to len(dataset). If you are using iterable-style dataset, you can specify the size with n.
  • device (torch.device): Defaults to default_device() which is CUDA by default. You can specify device as torch.device('cpu').

Override item and use the default infinite sampler to get a stream of unknown length (stop() when you want to stop the stream).

  1. class RandDL(DataLoader):
  2. def create_item(self, s):
  3. r = random.random()
  4. return r if r<0.95 else stop()
  5. L(RandDL())
  1. (#80) [0.7080409288034344,0.03380592302379748,0.3295210988884517,0.3984895442219716,0.2466406073732288,0.784596719349558,0.7405184556807134,0.8781785423004165,0.06125487321640455,0.49829694909644495...]
  1. L(RandDL(bs=4, drop_last=True)).map(len)
  1. (#2) [4,4]
  1. dl = RandDL(bs=4, num_workers=4, drop_last=True)
  2. L(dl).map(len)
  1. (#20) [4,4,4,4,4,4,4,4,4,4...]
  1. test_num_workers = 0 if sys.platform == "win32" else 4
  2. test_eq(dl.fake_l.num_workers, test_num_workers)
  3. with dl.fake_l.no_multiproc():
  4. test_eq(dl.fake_l.num_workers, 0)
  5. L(dl).map(len)
  6. test_eq(dl.fake_l.num_workers, test_num_workers)
  1. def _rand_item(s):
  2. r = random.random()
  3. return r if r<0.95 else stop()
  4. L(DataLoader(create_item=_rand_item))
  1. (#50) [0.013926194175715834,0.5455661909308923,0.5540701885046865,0.38608142489299,0.11039197023987835,0.5609994837228025,0.511809356329029,0.21402937998255644,0.2856955111775398,0.3976737532547229...]

If you don’t set bs, then dataset is assumed to provide an iterator or a __getitem__ that returns a batch.

  1. ds1 = DataLoader(letters)
  2. test_eq(L(ds1), letters)
  3. test_eq(len(ds1), 26)
  4. test_shuffled(L(DataLoader(letters, shuffle=True)), letters)
  5. ds1 = DataLoader(letters, indexed=False)
  6. test_eq(L(ds1), letters)
  7. test_eq(len(ds1), 26)
  8. t2 = L(tensor([0,1,2]),tensor([3,4,5]))
  9. ds2 = DataLoader(t2)
  10. test_eq_type(L(ds2), t2)
  11. t3 = L(array([0,1,2], dtype=np.int64),array([3,4,5], dtype=np.int64))
  12. ds3 = DataLoader(t3)
  13. test_eq_type(L(ds3), t3.map(tensor))
  14. ds4 = DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))
  15. test_eq_type(L(ds4), t3)
  16. test_eq(t3.f, 1)

If you do set bs, then dataset is assumed to provide an iterator or a __getitem__ that returns a single item of a batch.

  1. def twoepochs(d): return ' '.join(''.join(list(o)) for _ in range(2) for o in d)
  1. ds1 = DataLoader(letters, bs=4, drop_last=True, num_workers=0)
  2. test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')
  3. ds1 = DataLoader(letters,4,num_workers=2)
  4. test_eq(twoepochs(ds1), 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')
  5. ds1 = DataLoader(range(12), bs=4, num_workers=3)
  6. test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))
  7. ds1 = DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
  8. test_eq_type(L(ds1), L(['0','1','2','3'],['4','5','6','7'],['8','9','10']))
  9. test_eq(t3.f, 2)
  10. it = iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
  11. test_eq_type([next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])])

Iterable dataloaders require specific tests.

  1. class DummyIterableDataset(IterableDataset):
  2. def __iter__(self):
  3. yield from range(11)
  4. ds1 = DataLoader(DummyIterableDataset(), bs=4)
  5. # Check it yields fine, and check we can do multiple passes
  6. for i in range(3):
  7. test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10])))
  8. # Check `drop_last` works fine (with multiple passes, since this will prematurely terminate the iterator)
  9. ds1 = DataLoader(DummyIterableDataset(), bs=4, drop_last=True)
  10. for i in range(3):
  11. test_eq_type(L(ds1), L(tensor([0,1,2,3]),tensor([4,5,6,7])))
  1. class SleepyDL(list):
  2. def __getitem__(self,i):
  3. time.sleep(random.random()/50)
  4. return super().__getitem__(i)
  5. t = SleepyDL(letters)
  6. %time test_eq(DataLoader(t, num_workers=0), letters)
  7. %time test_eq(DataLoader(t, num_workers=2), letters)
  8. %time test_eq(DataLoader(t, num_workers=4), letters)
  9. dl = DataLoader(t, shuffle=True, num_workers=1)
  10. test_shuffled(L(dl), letters)
  11. test_shuffled(L(dl), L(dl))
  12. L(dl)
  1. CPU times: user 4 ms, sys: 0 ns, total: 4 ms
  2. Wall time: 268 ms
  3. CPU times: user 12 ms, sys: 28 ms, total: 40 ms
  4. Wall time: 196 ms
  5. CPU times: user 32 ms, sys: 20 ms, total: 52 ms
  6. Wall time: 137 ms
  1. (#26) ['b','z','j','i','c','m','l','p','u','v'...]
  1. class SleepyQueue():
  2. "Simulate a queue with varying latency"
  3. def __init__(self, q): self.q=q
  4. def __iter__(self):
  5. while True:
  6. time.sleep(random.random()/100)
  7. try: yield self.q.get_nowait()
  8. except queues.Empty: return
  9. q = Queue()
  10. for o in range(30): q.put(o)
  11. it = SleepyQueue(q)
  12. if not (sys.platform == "win32" and IN_NOTEBOOK):
  13. %time test_shuffled(L(DataLoader(it, num_workers=4)), L(range(30)))
  1. CPU times: user 8 ms, sys: 44 ms, total: 52 ms
  2. Wall time: 110 ms
  1. class A(TensorBase): pass
  2. for nw in (0,2):
  3. t = A(tensor([1,2]))
  4. dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
  5. b = first(dl)
  6. test_eq(type(b), A)
  7. t = (A(tensor([1,2])),)
  8. dl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
  9. b = first(dl)
  10. test_eq(type(b[0]), A)
  1. list(DataLoader(list(range(50)),bs=32,shuffle=True,num_workers=3))
  1. [tensor([24, 25, 15, 38, 36, 42, 6, 18, 17, 3, 47, 33, 22, 44, 11, 23, 9, 49,
  2. 31, 30, 8, 37, 14, 41, 27, 13, 34, 1, 12, 21, 2, 28]),
  3. tensor([45, 39, 43, 26, 7, 35, 32, 20, 46, 48, 40, 19, 5, 16, 10, 0, 29, 4])]
  1. class A(TensorBase): pass
  2. t = A(tensor(1,2))
  3. tdl = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
  4. b = first(tdl)
  5. test_eq(type(b), A)
  6. # Unknown attributes are delegated to `dataset`
  7. test_eq(tdl.pop(), tensor(1,2))

Override get_idxs to return the same index until consumption of the DL. This is intented to test consistent sampling behavior when num_workers>1.

  1. class AdamantDL(DataLoader):
  2. def get_idxs(self):
  3. r=random.randint(0,self.n-1)
  4. return [r] * self.n
  5. test_eq(torch.cat(tuple(AdamantDL((list(range(50))),bs=16,num_workers=4))).unique().numel(),1)
  1. from subprocess import Popen, PIPE
  2. # test num_workers > 0 in scripts works when python process start method is spawn
  3. process = Popen(["python", "dltest.py"], stdout=PIPE)
  4. _, err = process.communicate(timeout=15)
  5. exit_code = process.wait()
  6. test_eq(exit_code, 0)

Company logo

©2021 fast.ai. All rights reserved.
Site last generated: Mar 31, 2021