4.4 生产者-消费者

很多OpenCL应用中,前一个内核的输出可能就会作为下一个内核的输入。换句话说,第一个内核是生产者,第二个内核是消费者。很多应用中生产者和消费者是并发工作的,生产者只将产生的数据交给消费者。OpenCL 2.0中提供管道内存对象,用来帮助生产者-消费者这样的应用。管道所提供的潜在功能性帮助,无论生产者-消费者内核是串行执行或并发执行。

本节中,我们将使用管道创建一个生产者-消费者应用,其中生产者和消费者分别用内核构成,这两个内核使用的是本章前两个例子:卷积和直方图。卷积内核将会对图像进行处理,然后使用管道将输出图像传入直方图内核中(如图4.5所示)。为了描述额外的功能,展示管道如何使用处理单元提高应用效率。本节的例子我们将使用多设备完成。卷积内核将执行在GPU设备上,直方图内核将执行在CPU设备上。多个设备上执行内核可以保证两个内核能够并发执行,其中管道就用来传输生产者需要的数据(且为消费者需要的数据)。对于管道对象的详细描述将在第6章展开。那么现在,让我们来了解一下本节例子的一些基本需求。

管道内存中的数据(称为packets)组织为先入先出(FIFO)结构。管道对象的内存在全局内存上开辟,所以可以被多个内核同时访问。这里需要注意的是,管道上存储的数据,主机端无法访问。

内核中管道属性可能是只读(read_only)或只写(write_only),不过不能是读写。如果管道对象没有指定是只读或只写,那么编译器将默认其为只读。管道在内核的参数列表中,通过使用关键字pipe进行声明,后跟数据访问类型,和数据包的数据类型。例如,pipe __read_only float *input将会创建一个只读管道,该管道中包含的数据为单精度浮点类型。

4.5 生产者-消费者 - 图1

图4.5 生产者内核将滤波后生成的像素点,通过管道传递给消费者内核,让消费者内核产生直方图:(a)为原始图像;(b)为滤波后图像;(c)为生成的直方图。

为了访问管道,OpenCL C提供内置函数read_pipe()write_pipe()

  1. int read_pipe(pipe gentype p, gentype *ptr);
  2. int write_pipe(pipe gentype p, const gentype *ptr);

当一个工作项调用read_pipe()(程序清单4.10,第16行),一个包将从管道p中读取到ptr中。如果包读取正常,该函数返回0;如果管道为空,则该函数返回一个负值。write_pipe()(程序清单4.9,第50行)与读取类似,会将ptr上的包写入到管道p中。如果包写入正常,该函数返回0;如果管道已满,则该函数返回一个负值

程序清单4.9和4.10展示了我们应用中内核的实现。当我们指定目标消费者内核运行在CPU时,那么只有一个工作项去创建直方图。同样,当我们显式的指定一个CPU,我们需要之间将直方图的结果存放在全局内存中(第8章将对这样的权衡做更细化的讨论)。

  1. __constant sampler_t sampler =
  2. CLK_NORMALIZED_COORDS_FALSE |
  3. CLK_FILTER_NEAREST |
  4. CLK_ADDRESS_CLAMP_TO_EDGE;
  5. __kernel
  6. void producerKernel(
  7. image2d_t __read_only inputImage,
  8. pipe __write_only float *outputPipe,
  9. __constant float *filter,
  10. int filterWidth)
  11. {
  12. /* Store each work-item's unique row and column */
  13. int column = get_global_id(0);
  14. int row = get_global_id(1);
  15. /* Half the width of the filter is needed for indexing
  16. * memory later*/
  17. int halfWidth = (int)(filterWidth / 2);
  18. /* Used to hold the value of the output pixel */
  19. float sum = 0.0f;
  20. /* Iterator for the filter */
  21. int filterIdx = 0;
  22. /* Each work-item iterates around its local area on the basis of the
  23. * size of the filter */
  24. int2 coords; // Coordinates for accessing the image
  25. /* Iterate the filter rows */
  26. for (int i = -halfWidth; i <= halfWidth; i++)
  27. {
  28. coords.y = row + i;
  29. /* Iterate over the filter columns */
  30. for (int j = -halfWidth; j <= halfWidth; j++)
  31. {
  32. coords.x = column + j;
  33. /* Read a pixel from the image. A single channel image
  34. * stores the pixel in the x coordinate of the returned
  35. * vector. */
  36. float4 pixel;
  37. pixel = read_imagef(inputImage, sampler, coords);
  38. sum += pixel.x * filter[filterIdx++];
  39. }
  40. }
  41. /* Write the output pixel to the pipe */
  42. write_pipe(outputPipe, &sum);
  43. }

程序清单4.9 卷积内核(生产者)

  1. __kernel
  2. void consumerKernel(
  3. pipe __read_only float *inputPipe,
  4. int totalPixels,
  5. __global int *histogram)
  6. {
  7. int pixelCnt;
  8. float pixel;
  9. /* Loop to process all pixels from the producer kernel */
  10. for (pixelCnt = 0; pixelCnt < totalPixels; pixelCnt++)
  11. {
  12. /* Keep trying to read a pixel from the pipe
  13. * until one becomes available */
  14. while(read_pipe(inputPipe, &pixel));
  15. /* Add the pixel value to the histogram */
  16. histogram[(int)pixel]++;
  17. }
  18. }

程序清单4.10 卷积内核(消费者)

虽然,存储在管道中的数据不能被主机访问,不过在主机端还是需要使用对应的API创建对应的管道对象。其创建API如下所示:

  1. cl_pipe clCreatePipe(
  2. cl_context context,
  3. cl_mem_flags flags,
  4. cl_uint pipe_packet_size,
  5. cl_uint pipe_max_packets,
  6. const cl_pipe_properties *properties,
  7. cl_int *errcode_ret)

我们需要考虑两个内核不是并发的情况;因此,我们就需要创建足够大的管道对象能存放下图像元素数量个包:

  1. cl_mem pipe = clCreatepipe(context, 0, sizeof(float), imageRows * imageCols, NULL, &status);

利用多个设备的话,就需要在主机端多加几步。当创建上下文对象时,需要提供两个设备(一个CPU设备,一个GPU设备),并且每个设备都需要有自己的命令队列。另外,程序对象需要产生两个内核。加载内核是,需要分别入队其各自的命令队列:生产者(卷积)内核需要入队GPU命令队列,消费者(直方图)内核需要入队CPU命令队列。完整的代码在程序清单4.11中。

  1. /* System includes */
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. /* OpenCL includes */
  6. #include <CL/cl.h>
  7. /* Utility functions */
  8. #include "utils.h"
  9. #include "bmp-utils.h"
  10. /* Filter for the convolution */
  11. static float gaussianBlurFilter[25] = {
  12. 1.0f / 273.0f, 4.0f / 273.0f, 7.0f / 273.0f, 4.0f / 273.0f, 1.0f / 273.0f,
  13. 4.0f / 273.0f, 16.0f / 273.0f, 26.0f / 273.0f, 16.0f / 273.0f, 4.0f / 273.0f,
  14. 7.0f / 273.0f, 26.0f / 273.0f, 41.0f / 273.0f, 26.0f / 273.0f, 7.0f / 273.0f,
  15. 4.0f / 273.0f, 16.0f / 273.0f, 26.0f / 273.0f, 16.0f / 273.0f, 4.0f / 273.0f,
  16. 1.0f / 273.0f, 4.0f / 273.0f, 7.0f / 273.0f, 4.0f / 273.0f, 1.0f / 273.0f
  17. };
  18. static const int filterWidth = 5;
  19. static const int filterSize = 25 * sizeof(float);
  20. /* Number of histogram bins */
  21. static const int HIST_BINS = 256;
  22. int main(int argc, char *argv[])
  23. {
  24. /* Host data */
  25. float *hInputImage = NULL;
  26. int *hOutputHistogram = NULL;
  27. /* Allocate space for the input image and read the
  28. * data from dist */
  29. int imageRows;
  30. int imageCols;
  31. hInputImage = readBmpFloat("../../Images/cat.bmp", &imageRows, &imageCols);
  32. const int imageElements = imageRows * imageCols;
  33. const size_t imageSize = imageElements * sizeof(float);
  34. /* Allocate space for the histogram on the host */
  35. const int histogramSize = HIST_BINS * sizeof(int);
  36. hOutputHistogram = (int *)malloc(histogramSize);
  37. if (!hOutputHistogram){ exit(-1); }
  38. /* Use this to check the output of each API call */
  39. cl_int status;
  40. /* Get the first platform */
  41. cl_platform_id platform;
  42. status = clGetPlatformIDs(1, &platform, NULL);
  43. check(status);
  44. /* Get the devices */
  45. cl_device_id devices[2];
  46. cl_device_id gpuDevice;
  47. cl_device_id cpuDevice;
  48. status = clGetDeviceIDs(platform, CL_DEVICE_TYPE_CPU, 1, &gpuDevice, NULL);
  49. status = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 1, &cpuDevice, NULL);
  50. check(status);
  51. devices[0] = gpuDevice;
  52. devices[1] = cpuDevice;
  53. /* Create a context and associate it with the devices */
  54. cl_context context;
  55. context = clCreateContext(NULL, 2, devices, NULL, NULL, &status);
  56. check(status);
  57. /* Create the command-queues */
  58. cl_command_queue gpuQueue;
  59. cl_command_queue cpuQueue;
  60. gpuQueue = clCreateCommandQueue(context, gpuDevice, 0, &status);
  61. check(status);
  62. cpuQueue = clCreateCommandQueue(context, cpuDevice, 0, &status);
  63. check(status);
  64. /* The image desriptor describes how the data will be stored
  65. * in memory. This descriptor initializes a 2D image with no pitch*/
  66. cl_image_desc desc;
  67. desc.image_type = CL_MEM_OBJECT_IMAGE2D;
  68. desc.image_width = imageCols;
  69. desc.image_height = imageRows;
  70. desc.image_depth = 0;
  71. desc.image_array_size = 0;
  72. desc.image_row_pitch = 0;
  73. desc.image_slice_pitch = 0;
  74. desc.num_mip_levels = 0;
  75. desc.num_samples = 0;
  76. desc.buffer = NULL;
  77. /* The image format descibes the properties of each pixel */
  78. cl_image_format format;
  79. format.image_channel_order = CL_R; // single channel
  80. format.image_channel_data_type = CL_FLOAT;
  81. /* Create the input image and initialize it using a
  82. * pointer to the image data on the host. */
  83. cl_mem inputImage;
  84. inputImage = clCreateImage(context, CL_MEM_READ_ONLY, &format, &desc, NULL, NULL);
  85. /* Create a buffer object for the ouput histogram */
  86. cl_mem ouputHistogram;
  87. outputHisrogram = clCreateBuffer(context, CL_MEM_WRITE_ONLY, &format, &desc, NULL, NULL);
  88. /* Create a buffer for the filter */
  89. cl_mem filter;
  90. filter = clCreateBuffer(context, cl_MEM_READ_ONLY, filterSize, NULL, &status);
  91. check(status);
  92. cl_mem pipe;
  93. pipe = clCreatePipe(context, 0, sizeof(float), imageRows * imageCols, NULL, &status);
  94. /* Copy the host image data to the GPU */
  95. size_t origin[3] = {0,0,0}; // Offset within the image to copy from
  96. size_t region[3] = {imageCols, imageRows, 1}; // Elements to per dimension
  97. status = clEnqueueWriteImage(gpuQueue, inputImage, CL_TRUE, origin, region, 0, 0, hInputImage, 0, NULL, NULL);
  98. check(status);
  99. /* Write the filter to the GPU */
  100. status = clEnqueueWriteBuffer(gpuQueue, filter, CL_TRUE, 0, filterSize, gaussianBlurFilter, 0, NULL, NULL);
  101. check(status);
  102. /* Initialize the output istogram with zeros */
  103. int zero = 0;
  104. status = clEnqueueFillBuffer(cpuQueue, outputHistogram, &zero, sizeof(int), 0, histogramSize, 0, NULL, NULL);
  105. check(status);
  106. /* Create a program with source code */
  107. char *programSource = readFile("producer-consumer.cl");
  108. size_t programSourceSize = strlen(programSource);
  109. cl_program program = clCreateProgramWithSource(context, 1, (const char**)&programSource, &programSourceLen, &status);
  110. check(status);
  111. /* Build (compile) the program for the devices */
  112. status = clBuildProgram(program, 2, devices, NULL, NULL, NULL);
  113. if (status != CL_SUCCESS)
  114. {
  115. printCompilerError(program, gpuDevice);
  116. exit(-1);
  117. }
  118. /* Create the kernel */
  119. cl_kernel producerKernel;
  120. cl_kernel consumerKernel;
  121. producerKernel = clCreateKernel(program, "producerKernel", &status);
  122. check(status);
  123. consumerKernel = clCreateKernel(program, "consumerKernel", &status);
  124. check(status);
  125. /* Set the kernel arguments */
  126. status = clSetKernelArg(producerKernel, 0, sizeof(cl_mem), &inputImage);
  127. status |= clSetKernelArg(producerKernel, 1, sizeof(cl_mem), &pipe);
  128. status |= clSetKernelArg(producerKernel, 2, sizeof(int), &filterWidth);
  129. check(status);
  130. status |= clSetKernelArg(consumerKernel, 0, sizeof(cl_mem), &pipe);
  131. status |= clSetKernelArg(consumerKernel, 1, sizeof(int), &imageElements);
  132. status |= clSetKernelArg(consumerKernel, 2, sizeof(cl_mem), &outputHistogram);
  133. check(status);
  134. /* Define the index space and work-group size */
  135. size_t producerGlobalSize[2];
  136. producerGlobalSize[0] = imageCols;
  137. producerGlobalSize[1] = imageRows;
  138. size_t producerLocalSize[2];
  139. producerLocalSize[0] = 8;
  140. producerLocalSize[1] = 8;
  141. size_t consumerGlobalSize[1];
  142. consumerGlobalSize[0] = 1;
  143. size_t consumerLocalSize[1];
  144. consumerLocalSize[0] = 1;
  145. /* Enqueue the kernels for execution */
  146. status = clEnqueueNDRangeKernel(gpuQueue, producerKernel, 2, NULL, producerGlobalSize, producerLocalSize, 0, NULL, NULL);
  147. status = clEnqueueNDRangeKernel(cpuQueue, consumerKernel, 2, NULL, consumerGlobalSize, consumerLocalSize, 0, NULL, NULL);
  148. /* Read the output histogram buffer to the host */
  149. status = clEnqueueReadBuffer(cpuQueue, outputHistogram, CL_TRUE, 0, histogramSize, hOutputHistogram, 0, NULL, NULL);
  150. check(status);
  151. /* Free OpenCL resources */
  152. clReleaseKernel(producerKernel);
  153. clReleaseKernel(consumerKernel);
  154. clReleaseProgram(program);
  155. clReleaseCommandQueue(gpuQueue);
  156. clReleaseCommandQueue(cpuQueue);
  157. clReleaseMemObject(inputImage);
  158. clReleaseMemObject(outputHistogram);
  159. clReleaseMemObject(filter);
  160. clReleaseMemObject(pipe);
  161. clReleaseContext(context);
  162. /* Free host resources */
  163. free(hInputImage);
  164. free(hOutputHistogram);
  165. free(programSource);
  166. return 0;
  167. }

程序清单4.11 生产者-消费者主机端完整代码