InMemoryDataset

class paddle.distributed.InMemoryDataset [源代码]

InMemoryDataset,它将数据加载到内存中,并在训练前随机整理数据。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()

init ( \*kwargs* )

注意:

1. 该API只在非 Dygraph 模式下生效

对InMemoryDataset的实例进行配置初始化。

参数:

  • kwargs - 可选的关键字参数,由调用者提供, 目前支持以下关键字配置。

  • batch_size (int) - batch size的大小. 默认值为1。

  • thread_num (int) - 用于训练的线程数, 默认值为1。

  • use_var (list) - 用于输入的variable列表,默认值为[]。

  • input_type (int) - 输入到模型训练样本的类型. 0 代表一条样本, 1 代表一个batch。 默认值为0。

  • fs_name (str) - hdfs名称. 默认值为””。

  • fs_ugi (str) - hdfs的ugi. 默认值为””。

  • pipe_command (str) - 在当前的 dataset 中设置的pipe命令用于数据的预处理。pipe命令只能使用UNIX的pipe命令,默认为”cat”。

  • download_cmd (str) - 数据下载pipe命令。 pipe命令只能使用UNIX的pipe命令, 默认为”cat”。

返回:None。

代码示例

  1. import paddle
  2. import os
  3. paddle.enable_static()
  4. with open("test_queue_dataset_run_a.txt", "w") as f:
  5. data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
  6. data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
  7. data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
  8. data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
  9. f.write(data)
  10. with open("test_queue_dataset_run_b.txt", "w") as f:
  11. data = "2 1 2 2 5 4 2 2 7 2 1 3\n"
  12. data += "2 6 2 2 1 4 2 2 4 2 2 3\n"
  13. data += "2 5 2 2 9 9 2 2 7 2 1 3\n"
  14. data += "2 7 2 2 1 9 2 3 7 2 5 3\n"
  15. f.write(data)
  16. slots = ["slot1", "slot2", "slot3", "slot4"]
  17. slots_vars = []
  18. for slot in slots:
  19. var = paddle.static.data(
  20. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  21. slots_vars.append(var)
  22. dataset = paddle.distributed.InMemoryDataset()
  23. dataset.init(
  24. batch_size=1,
  25. thread_num=2,
  26. input_type=1,
  27. pipe_command="cat",
  28. use_var=slots_vars)
  29. dataset.set_filelist(
  30. ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
  31. dataset.load_into_memory()
  32. place = paddle.CPUPlace()
  33. exe = paddle.static.Executor(place)
  34. startup_program = paddle.static.Program()
  35. main_program = paddle.static.Program()
  36. exe.run(startup_program)
  37. exe.train_from_dataset(main_program, dataset)
  38. os.remove("./test_queue_dataset_run_a.txt")
  39. os.remove("./test_queue_dataset_run_b.txt")

_init_distributed_settings ( \*kwargs* )

注意:

1. 该API只在非 Dygraph 模式下生效 2. 本api需要在机大规模参数服务器训练下生效,敬请期待详细使用文档

对InMemoryDataset的实例进行分布式训练相关配置的初始化。

参数:

  • kwargs - 可选的关键字参数,由调用者提供, 目前支持以下关键字配置。

  • merge_size (int) - 通过样本id来设置合并,相同id的样本将会在shuffle之后进行合并,你应该在一个data生成器里面解析样本id。merge_size表示合并的最小数量,默认值为-1,表示不做合并。

  • parse_ins_id (bool) - 是否需要解析每条样的id,默认值为False。

  • parse_content (bool) - 是否需要解析每条样本的content, 默认值为False。

  • fleet_send_batch_size (int) - 设置发送batch的大小,默认值为1024。

  • fleet_send_sleep_seconds (int) - 设置发送batch后的睡眠时间,默认值为0。

  • fea_eval (bool) - 设置特征打乱特征验证模式,来修正特征级别的重要性, 特征打乱需要 fea_eval 被设置为True. 默认值为False。

  • candidate_size (int) - 特征打乱特征验证模式下,用于随机化特征的候选池大小. 默认值为10000。

返回:None。

代码示例

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. dataset.init(
  5. batch_size=1,
  6. thread_num=2,
  7. input_type=1,
  8. pipe_command="cat",
  9. use_var=[])
  10. dataset._init_distributed_settings(
  11. parse_ins_id=True,
  12. parse_content=True,
  13. fea_eval=True,
  14. candidate_size=10000)

update_settings ( \*kwargs* )

注意:

1. 该API只在非 Dygraph 模式下生效

对InMemoryDataset的实例通过init和_init_distributed_settings初始化的配置进行更新。

参数:

  • kwargs - 可选的关键字参数,由调用者提供, 目前支持以下关键字配置。

  • batch_size (int) - batch size的大小. 默认值为1。

  • thread_num (int) - 用于训练的线程数, 默认值为1。

  • use_var (list) - 用于输入的variable列表,默认值为[]。

  • input_type (int) - 输入到模型训练样本的类型. 0 代表一条样本, 1 代表一个batch。 默认值为0。

  • fs_name (str) - hdfs名称. 默认值为””。

  • fs_ugi (str) - hdfs的ugi. 默认值为””。

  • pipe_command (str) - 在当前的 dataset 中设置的pipe命令用于数据的预处理。pipe命令只能使用UNIX的pipe命令,默认为”cat”。

  • download_cmd (str) - 数据下载pipe命令。 pipe命令只能使用UNIX的pipe命令, 默认为”cat”。

  • merge_size (int) - 通过样本id来设置合并,相同id的样本将会在shuffle之后进行合并,你应该在一个data生成器里面解析样本id。merge_size表示合并的最小数量,默认值为-1,表示不做合并。

  • parse_ins_id (bool) - 是否需要解析每条样的id,默认值为False。

  • parse_content (bool) 是否需要解析每条样本的content, 默认值为False。

  • fleet_send_batch_size (int) - 设置发送batch的大小,默认值为1024。

  • fleet_send_sleep_seconds (int) - 设置发送batch后的睡眠时间,默认值为0。

  • fea_eval (bool) - 设置特征打乱特征验证模式,来修正特征级别的重要性, 特征打乱需要 fea_eval 被设置为True. 默认值为False。

  • candidate_size (int) - 特征打乱特征验证模式下,用于随机化特征的候选池大小. 默认值为10000。

返回:None。

代码示例

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. dataset.init(
  5. batch_size=1,
  6. thread_num=2,
  7. input_type=1,
  8. pipe_command="cat",
  9. use_var=[])
  10. dataset._init_distributed_settings(
  11. parse_ins_id=True,
  12. parse_content=True,
  13. fea_eval=True,
  14. candidate_size=10000)
  15. dataset.update_settings(batch_size=2)

load_into_memory ( )

注意:

1. 该API只在非 Dygraph 模式下生效

向内存中加载数据。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.load_into_memory()

preload_into_memory ( thread_num=None )

向内存中以异步模式加载数据。

参数:

  • thread_num (int) - 异步加载数据时的线程数。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.preload_into_memory()
  19. dataset.wait_preload_done()

wait_preload_done ( )

等待 preload_into_memory 完成。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.preload_into_memory()
  19. dataset.wait_preload_done()

local_shuffle ( )

局部shuffle。加载到内存的训练样本进行单机节点内部的打乱

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.load_into_memory()
  19. dataset.local_shuffle()

global_shuffle ( fleet=None, thread_num=12 )

全局shuffle。只能用在分布式模式(单机多进程或多机多进程)中。您如果在分布式模式中运行,应当传递fleet而非None。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.load_into_memory()
  19. dataset.global_shuffle()

参数:

  • fleet (Fleet) – fleet单例。默认为None。

  • thread_num (int) - 全局shuffle时的线程数。

release_memory ( )

当数据不再使用时,释放InMemoryDataset内存数据。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.load_into_memory()
  19. dataset.global_shuffle()
  20. exe = paddle.static.Executor(paddle.CPUPlace())
  21. startup_program = paddle.static.Program()
  22. main_program = paddle.static.Program()
  23. exe.run(startup_program)
  24. exe.train_from_dataset(main_program, dataset)
  25. dataset.release_memory()

get_memory_data_size ( fleet=None )

用户可以调用此函数以了解加载进内存后所有workers中的样本数量。

注解

该函数可能会导致性能不佳,因为它具有barrier。

参数:

  • fleet (Fleet) – fleet对象。

返回:内存数据的大小。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. slots = ["slot1", "slot2", "slot3", "slot4"]
  5. slots_vars = []
  6. for slot in slots:
  7. var = paddle.static.data(
  8. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  9. slots_vars.append(var)
  10. dataset.init(
  11. batch_size=1,
  12. thread_num=2,
  13. input_type=1,
  14. pipe_command="cat",
  15. use_var=slots_vars)
  16. filelist = ["a.txt", "b.txt"]
  17. dataset.set_filelist(filelist)
  18. dataset.load_into_memory()
  19. print dataset.get_memory_data_size()

get_shuffle_data_size ( fleet=None )

获取shuffle数据大小,用户可以调用此函数以了解局域/全局shuffle后所有workers中的样本数量。

注解

该函数可能会导致局域shuffle性能不佳,因为它具有barrier。但其不影响局域shuffle。

参数:

  • fleet (Fleet) – fleet对象。

返回:shuffle数据的大小。

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. dataset = paddle.distributed.InMemoryDataset()
  5. slots = ["slot1", "slot2", "slot3", "slot4"]
  6. slots_vars = []
  7. for slot in slots:
  8. var = paddle.static.data(
  9. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  10. slots_vars.append(var)
  11. dataset.init(
  12. batch_size=1,
  13. thread_num=2,
  14. input_type=1,
  15. pipe_command="cat",
  16. use_var=slots_vars)
  17. filelist = ["a.txt", "b.txt"]
  18. dataset.set_filelist(filelist)
  19. dataset.load_into_memory()
  20. dataset.global_shuffle()
  21. print dataset.get_shuffle_data_size()

slots_shuffle ( slots )

该方法是在特征层次上的一个打乱方法,经常被用在有着较大缩放率实例的稀疏矩阵上,为了比较metric,比如auc,在一个或者多个有着baseline的特征上做特征打乱来验证特征level的重要性。

参数:

  • slots (list[string]) - 要打乱特征的集合

代码示例:

  1. import paddle
  2. paddle.enable_static()
  3. dataset = paddle.distributed.InMemoryDataset()
  4. dataset._init_distributed_settings(fea_eval=True)
  5. slots = ["slot1", "slot2", "slot3", "slot4"]
  6. slots_vars = []
  7. for slot in slots:
  8. var = paddle.static.data(
  9. name=slot, shape=[None, 1], dtype="int64", lod_level=1)
  10. slots_vars.append(var)
  11. dataset.init(
  12. batch_size=1,
  13. thread_num=2,
  14. input_type=1,
  15. pipe_command="cat",
  16. use_var=slots_vars)
  17. filelist = ["a.txt", "b.txt"]
  18. dataset.set_filelist(filelist)
  19. dataset.load_into_memory()
  20. dataset.slots_shuffle(['slot1'])