Greenplum MapReduce规范

该规范描述了用于定义Greenplum MapReduce作业的文档格式和模式。

MapReduce是由Google开发的一种编程模型,用于在商用服务器阵列上处理和生成大型数据集。 Greenplum MapReduce允许熟悉MapReduce模型的程序员编写map和reduce函数,并将它们提交给Greenplum数据库并行引擎进行处理。

要使Greenplum能够处理MapReduce函数,请在文档中定义函数,然后将文档传递给Greenplum MapReduce程序gpmapreduce,以便由Greenplum数据库并行引擎执行。 Greenplum数据库系统分配输入数据,在一组机器上执行程序,处理机器故障,并管理所需的机器间通信。

关于gpmapreduce的信息请见Greenplum数据库工具指南

Parent topic: Greenplum数据库参考指南

Greenplum MapReduce文档格式

本节介绍Greenplum MapReduce文档格式的一些基础知识,以帮助您开始创建自己的Greenplum MapReduce文档。 Greenplum使用YAML 1.1文档格式,然后实现自己的模式,以定义MapReduce作业的各个步骤。

所有Greenplum MapReduce文件必须首先声明它们正在使用的YAML规范的版本。 之后,三个破折号(-—)表示文档的开头,三个点(…)表示文档的结尾而不启动新文档。 注释行以井号(#)为前缀。 可以在同一个文件中声明多个Greenplum MapReduce文档:

  1. %YAML 1.1
  2. ---
  3. # Begin Document 1
  4. # ...
  5. ---
  6. # Begin Document 2
  7. # ...

在Greenplum MapReduce文档中,有三种基本类型的数据结构或节点:标量,序列和映射。

标量是由空格缩进的基本文本字符串。 如果您有跨越多行的标量输入,则前面的管道(|)表示文字样式,其中所有换行符都很重要。 或者,前一个尖括号(>)将单个换行符折叠到具有相同缩进级别的后续行的空格。 如果字符串包含具有保留含义的字符,则必须引用该字符串,或者必须使用反斜杠(\)转义特殊字符。

  1. # Read each new line literally
  2. somekey: | this value contains two lines
  3. and each line is read literally
  4. # Treat each new line as a space
  5. anotherkey: >
  6. this value contains two lines
  7. but is treated as one continuous line
  8. # This quoted string contains a special character
  9. ThirdKey: "This is a string: not a mapping"

序列是列表,列表中的每个条目都在其自己的行上,用短划线和空格(- )表示。 或者,您可以将内联序列指定为方括号内的逗号分隔列表。 序列提供一组数据并为其提供订单。 将列表加载到Greenplum MapReduce程序时,将保留订单。

  1. # list sequence
  2. - this
  3. - is
  4. - a list
  5. - with
  6. - five scalar values
  7. # inline sequence
  8. [this, is, a list, with, five scalar values]

映射用于将数据值与称为键的标识符配对。 映射对每个key: value对使用冒号和空格(: ),或者也可以内联指定为花括号内的逗号分隔列表。 该密钥用作从映射中检索数据的索引。

  1. # a mapping of items
  2. title: War and Peace
  3. author: Leo Tolstoy
  4. date: 1865
  5. # same mapping written inline
  6. {title: War and Peace, author: Leo Tolstoy, date: 1865}

密钥用于将元信息与每个节点相关联,并指定预期的节点类型(标量,序列或映射)。 有关Greenplum MapReduce程序所需的键,请参阅Greenplum MapReduce文档模式

Greenplum MapReduce程序按顺序处理文档的节点,并使用缩进(空格)来确定文档层次结构和节点之间的关系。 使用空白区域非常重要。 不应仅将白色空间用于格式化目的,并且根本不应使用制表符。

Greenplum MapReduce文档模式

Greenplum MapReduce使用YAML文档框架并实现自己的YAML模式。 Greenplum MapReduce文档的基本结构是:

  1. %YAML 1.1
  2. ---
  3. VERSION: 1.0.0.2
  4. DATABASE: dbname
  5. USER: db_username
  6. HOST: master_hostname
  7. PORT: master_port
  1. DEFINE:
  2. - INPUT:
  3. NAME: input_name
  4. FILE:
  5. - hostname:/path/to/file
  6. GPFDIST:
  7. - hostname:port/file_pattern
  8. TABLE: table_name
  9. QUERY: SELECT_statement
  10. EXEC: command_string
  11. COLUMNS:
  12. - field_name data_type
  13. FORMAT: TEXT | CSV
  14. DELIMITER: delimiter_character
  15. ESCAPE: escape_character
  16. NULL: null_string
  17. QUOTE: csv_quote_character
  18. ERROR_LIMIT: integer
  19. ENCODING: database_encoding
  1. - OUTPUT:
  2. NAME: output_name
  3. FILE: file_path_on_client
  4. TABLE: table_name
  5. KEYS:
  6. - column_name
  7. MODE: REPLACE | APPEND
  1. - MAP:
  2. NAME: function_name
  3. FUNCTION: function_definition
  4. LANGUAGE: perl | python | c
  5. LIBRARY: /path/filename.so
  6. PARAMETERS:
  7. - nametype
  8. RETURNS:
  9. - nametype
  10. OPTIMIZE: STRICT IMMUTABLE
  11. MODE: SINGLE | MULTI
  1. - TRANSITION | CONSOLIDATE | FINALIZE:
  2. NAME: function_name
  3. FUNCTION: function_definition
  4. LANGUAGE: perl | python | c
  5. LIBRARY: /path/filename.so
  6. PARAMETERS:
  7. - nametype
  8. RETURNS:
  9. - nametype
  10. OPTIMIZE: STRICT IMMUTABLE
  11. MODE: SINGLE | MULTI
  1. - REDUCE:
  2. NAME: reduce_job_name
  3. TRANSITION: transition_function_name
  4. CONSOLIDATE: consolidate_function_name
  5. FINALIZE: finalize_function_name
  6. INITIALIZE: value
  7. KEYS:
  8. - key_name
  1. - TASK:
  2. NAME: task_name
  3. SOURCE: input_name
  4. MAP: map_function_name
  5. REDUCE: reduce_function_name
  6. EXECUTE
  1. - RUN:
  2. SOURCE: input_or_task_name
  3. TARGET: output_name
  4. MAP: map_function_name
  5. REDUCE: reduce_function_name...

VERSION

必须。Greenplum MapReduce YAML规范的版本。当前版本为1.0.0.1。

DATABASE

可选。指定Greenplum中要连接的数据库。如果未指定,则默认为默认数据库或$PGDATABASE(如果已设置)。

USER

可选。指定要用于连接的数据库角色。 如果未指定,则默认为当前用户或$PGUSER(如果已设置)。 您必须是Greenplum超级用户才能运行用不受信任的Python和Perl编写的函数。 常规数据库用户可以运行用可信Perl编写的函数。 您还必须是数据库超级用户才能运行包含FILEGPFDISTEXEC输入类型的MapReduce作业。

HOST

可选。指定Greenplum master主机名。如果未指定,则默认为localhost或$PGHOST(如果已设置)。

PORT

可选。指定Greenplum主端口。如果未指定,则默认为5432或$PGPORT(如果已设置)。

DEFINE

必须。此MapReduce文档的一系列定义。DEFINE部分必须至少有一个INPUT定义。- INPUT

  1. 必须。定义输入数据。每个MapReduce文档必须至少定义一个输入。 文档中允许多个输入定义,但每个输入定义只能指定其中一种访问类型:文件,gpfdist文件分发程序,数据库中的表,SQL命令或操作系统命令。 有关gpfdist的信息,请参阅*Greenplum数据库实用程序指南*。- NAME
  2. 此输入的名称。关于此MapReduce作业中其他对象的名称(例如map函数,taskreduce函数和输出名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。
  3. FILE
  4. 一个或多个输入文件的序列,格式为:seghostname:/path/to/filename 您必须是Greenplum数据库超级用户才能使用FILE输入运行MapReduce作业。 该文件必须位于Greenplum segment主机上。
  5. - GPFDIST
  6. 一个或多个运行gpfdist文件分发程序的序列,格式为:hostname\[:port\]/file\_pattern 除非服务器配置参数[服务器配置参数]($8618fe5efcd3ee4d.md#topic1)设置为on, 否则您必须是Greenplum数据库超级用户才能使用GPFDIST输入运行MapReduce作业。
  7. TABLE
  8. 数据库中现有表的名称。
  9. QUERY
  10. 要在数据库中运行的SQL SELECT命令。
  11. EXEC
  12. 要在Greenplum segment主机上运行的操作系统命令。 默认情况下,该命令由系统中的所有segment实例运行。 例如,如果每个segment主机有四个segment实例,则该命令将在每个主机上运行四次。 您必须是Greenplum数据库超级用户才能使用EXEC输入运行MapReduce作业, 并且服务器配置参数[服务器配置参数]($8618fe5efcd3ee4d.md#topic1)设置为on。
  13. COLUMNS
  14. 可选。列指定为:column\_name \[data\_type\] 如果未指定,则默认值为value text [DELIMITER](#topic3__DELIMITER)字符用于分隔两个数据值字段(列)。 行由换行符(0x0a)确定。
  15. FORMAT
  16. 可选。指定数据的格式 - 分隔文本(TEXT)或逗号分隔值(CSV)格式。 如果未指定数据格式,则默认为TEXT
  17. DELIMITER
  18. 可选[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入。 指定用于分隔数据值的单个字符。 默认值为TEXT模式下的制表符,CSV模式下为逗号。 分隔符字符只能出现在任意两个数据值字段之间。 不要在行的开头或结尾放置分隔符。
  19. ESCAPE
  20. 对于[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入可选。 指定用于C转义序列的单个字符(例如\\n,\\t,\\100等)以及转义可能以行或列分隔符形式取出的数据字符。 确保选择实际列数据中未使用的转义字符。 默认转义字符是文本格式文件的\\(反斜杠)和csv格式文件的"(双引号),但是可以指定另一个字符来表示转义。 也可以通过指定禁用转义值'OFF'作为转义值。 这对于诸如文本格式的Web日志数据之类的数据非常有用,这些数据具有许多不打算转义的嵌入式反斜杠。
  21. NULL
  22. 对于[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入可选。 指定表示空值的字符串。 默认值为TEXT格式的\\N,以及CSV格式没有引号的空值。 如果您不想将空值与空字符串区分开来,即使在TEXT模式下,您可能更喜欢空字符串。 与此字符串匹配的任何输入数据项都将被视为空值。
  23. QUOTE
  24. 对于[FILE](#topic3__FILE), [GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入可选。 指定CSV格式文件的引用字符。 默认值为双引号(")。 CSV格式的文件中,如果数据值字段包含任何逗号或嵌入的新行,则必须用双引号括起来。 包含双引号字符的字段必须用双引号括起来,并且嵌入双引号必须由一对连续的双引号表示。 始终正确打开和关闭引号以便正确解析数据行非常重要。
  25. ERROR\_LIMIT
  26. 如果输入行具有格式错误,则只要在输入处理期间未在任何Greenplum segment实例上达到错误限制计数,它们将被丢弃。 如果未达到错误限制,则将处理所有正常行并丢弃任何错误行。
  27. - ENCODING
  28. 用于数据的字符集编码。 指定字符串常量(例如'SQL\_ASCII'),整数编码号或DEFAULT以使用默认客户端编码。 有关更多信息,请参阅[字符集支持]($8b969ad83b61690e.md#topic1)。
  29. OUTPUT
  30. 可选。定义输出此MapReduce作业的格式化数据的位置。 如果未定义输出,则默认为STDOUT(客户端的标准输出)。 您可以将输出发送到客户端主机上的文件或数据库中的现有表。- NAME
  31. 此输出的名称。默认输出名称为STDOUT 关于MapReduce作业中其他对象的名称(例如map函数,taskreduce函数和输入名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。
  32. FILE
  33. 指定MapReduce客户端计算机上的文件位置,以如下格式输出数据:/path/to/filename
  34. TABLE
  35. 指定数据库中用于输出数据的表的名称。 如果在运行MapReduce作业之前该表不存在,则将使用[KEYS](#topic3__KEYS)指定的分发策略创建该表。
  36. KEYS
  37. [TABLE](#topic3__OUTPUTTABLE)输出的可选项。 指定要用作Greenplum数据库分发键的列。 如果[EXECUTE](#topic3__EXECUTE)任务包含[REDUCE](#topic3__REDUCE)定义, 则默认情况下REDUCE键将用作表分发键。 否则,表的第一列将用作分发键。
  38. MODE
  39. [TABLE](#topic3__OUTPUTTABLE)输出的可选项。 如果未指定,则默认为创建表(如果该表尚不存在),但如果表存在则输出错误。 声明APPEND将输出数据添加到现有表(前提是表模式与输出格式匹配),而不删除任何现有数据。 如果表存在,则声明REPLACE将删除该表,然后重新创建它。 如果不存在,APPENDREPLACE都将创建一个新表。
  40. MAP
  41. 必须。每个MAP函数采用以(key, value)对构造的数据, 处理每对,并生成零个或多个输出(key, value)对。 然后,Greenplum MapReduce框架从所有输出列表中收集具有相同密钥的所有对,并将它们组合在一起。 然后将此输出传递给[REDUCE](#topic3__TASKREDUCE)任务, 该任务由[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数组成。 有一个名为IDENTITY的预定义MAP函数,它返回的(key, value)对不变。 虽然(key, value)是默认参数,但您可以根据需要指定其他原型。
  42. TRANSITION | CONSOLIDATE | FINALIZE
  43. TRANSITIONCONSOLIDATEFINALIZE都是[REDUCE](#topic3__REDUCE)的组成部分。 需要TRANSITION函数。 CONSOLIDATEFINALIZE函数是可选的。 默认情况下,所有将state作为其输入[PARAMETERS](#topic3__PARAMETERS)的第一个,但也可以定义其他原型。
  44. TRANSITION函数遍历给定键的每个值,并在state变量中累积值。 当在键的第一个值上调用转换函数时,state将设置为[REDUCE](#topic3__REDUCE)作业的[INITIALIZE](#topic3__INITIALIZE)指定的值(或数据类型的默认状态值)。 转换需要两个参数作为输入; 密钥减少的当前状态和下一个值,然后产生一个新state
  45. 如果指定了CONSOLIDATE函数,则在segment级别执行TRANSITION处理, 然后在Greenplum互连上重新分配密钥以进行最终聚合(两阶段聚合)。 仅重新分配给定密钥的结果state值,从而导致更低的互连流量和更高的并行度。 CONSOLIDATETRANSITION一样处理,除了(state + value)=> state 它是(state + state)=> state
  46. 如果指定了FINALIZE函数,它将采用CONSOLIDATE(如果存在)或TRANSITION生成的最终state 并在发出最终结果之前进行任何最终处理。 TRANSITIONCONSOLIDATE函数不能返回一组值。 如果需要REDUCE作业来返回一个集合,则需要FINALIZE将最终状态转换为一组输出值。- NAME
  47. 必须。函数的名称。关于此MapReduce作业中其他对象的名称(例如函数,任务,输入和输出名称),名称必须是唯一的。 您还可以指定Greenplum数据库内置函数的名称。 如果使用内置函数,请不要提供[LANGUAGE](#topic3__LANGUAGE)或[FUNCTION](#topic3__FUNCTION)正文。
  48. FUNCTION
  49. 可选。使用指定的[LANGUAGE](#topic3__LANGUAGE)指定函数的完整主体。 如果未指定FUNCTION 则使用与[NAME](#topic3__TCFNAME)对应的内置数据库函数。
  50. LANGUAGE
  51. 使用[FUNCTION](#topic3__FUNCTION)时需要。指定用于解释函数的实现语言。 此版本具有对perlpythonC的语言支持。 如果调用内置数据库函数,则不应指定LANGUAGE
  52. LIBRARY
  53. [LANGUAGE](#topic3__LANGUAGE)为C时必需(不允许用于其他语言函数)。 要使用此属性,[VERSION](#topic3__VERSION)必须为1.0.0.2 必须在运行MapReduce作业之前安装指定的库文件,并且该文件必须存在于所有Greenplum主机(mastersegment)上的相同文件系统位置。
  54. PARAMETERS
  55. 可选。函数输入参数。默认类型是text
  56. MAP default - key text, value text
  57. TRANSITION default - state text, value text
  58. CONSOLIDATE default - state1 text, state2 text (必须具有相同数据类型的两个输入参数)
  59. FINALIZE default - state text (仅限单个参数)
  60. RETURNS
  61. 可选。默认返回类型是text
  62. MAP default - key text, value text
  63. TRANSITION default - state text (仅限单个参数)
  64. CONSOLIDATE default - state text (仅限单个参数)
  65. FINALIZE default - value text
  66. OPTIMIZE
  67. 该函数的可选优化参数:
  68. STRICT - 函数不受NULL值的影响
  69. IMMUTABLE - 函数将始终返回给定输入的相同值
  70. MODE
  71. 可选。指定函数返回的行数。
  72. MULTI - 每个输入记录返回0行或更多行。 函数的返回值必须是要返回的行数组,或者必须使用Python中的yieldPerl中的return\_next将函数写为迭代器。 MULTIMAPFINALIZE函数的默认模式。
  73. SINGLE - 每个输入记录只返回一行。 SINGLETRANSITIONCONSOLIDATE函数支持的唯一模式。 当与MAPFINALIZE函数一起使用时,SINGLE模式可以提供适度的性能改进。
  74. REDUCE
  75. 必须。REDUCE定义命名[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数,包括将(keyvalue)对缩减到最终结果集。 您还可以执行几个预定义的REDUCE作业,这些作业都在名为value的列上运行:
  76. IDENTITY - 返回(键,值)对不变
  77. SUM - 计算数值数据的总和
  78. AVG - 计算数字数据的平均值
  79. COUNT - 计算输入数据的计数
  80. MIN - 计算数值数据的最小值
  81. MAX - 计算数值数据的最大值
  82. - NAME
  83. 必须。这个REDUCE工作的名称。 关于此MapReduce作业中的其他对象的名称(函数,任务,输入和输出名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。
  84. TRANSITION
  85. 必须。TRANSITION函数名称。
  86. CONSOLIDATE
  87. 可选。CONSOLIDATE函数名称。
  88. FINALIZE
  89. 可选。FINALIZE函数名称。
  90. INITIALIZE
  91. textfloat数据类型的可选项。 所有其他数据类型都需要。 文本的默认值为'' float的默认值为0.0 设置TRANSITION函数的初始state值。
  92. KEYS
  93. 可选。默认为\[key, \*\] 使用多列缩减时,可能需要指定哪些列是键列,哪些列是值列。 默认情况下,未传递给TRANSITION函数的任何输入列都是键列, 名为key的列始终是键列,即使它传递给TRANSITION函数也是如此。 特殊指示符\*表示未传递给TRANSITION函数的所有列。 如果该指示符不存在于键列表中,则丢弃任何不匹配的列。
  94. TASK
  95. 可选。TASKGreenplum MapReduce作业管道中定义了完整的端到端INPUT/MAP/REDUCE阶段。 它与[EXECUTE](#topic3__EXECUTE)类似,但不会立即执行。 可以被称为[INPUT](#topic3__INPUT)的任务对象进入进一步处理阶段。- NAME
  96. 必须。此任务的名称。 关于此MapReduce作业中其他对象的名称(例如map函数,reduce函数,输入和输出名称),名称必须是唯一的。 此外,名称不能与数据库中的现有对象(例如表,函数或视图)冲突。
  97. SOURCE
  98. [INPUT](#topic3__INPUT)或其他TASK的名称。
  99. MAP
  100. 可选。[MAP](#topic3__MAP)函数的名称。 如果未指定,则默认为IDENTITY
  101. REDUCE
  102. 可选。[REDUCE](#topic3__REDUCE)函数的名称。 如果未指定,则默认为IDENTITY

EXECUTE

必须。EXECUTE定义Greenplum MapReduce作业管道中的最终INPUT/MAP/REDUCE阶段。- RUN

  1. - SOURCE
  2. 必须。[INPUT](#topic3__INPUT)或[TASK](#topic3__TASK)的名称。
  3. TARGET
  4. 可选。[OUTPUT](#topic3__OUTPUT)的名称。默认值为STDOUT
  5. MAP
  6. 可选。[MAP](#topic3__MAP)函数名称。 如果未指定,默认为IDENTITY
  7. REDUCE
  8. 可选。[REDUCE](#topic3__REDUCE)函数的名称。默认为IDENTITY

示例Greenplum MapReduce文档

  1. # This example MapReduce job processes documents and looks for keywords in them.
  2. # It takes two database tables as input:
  3. # - documents (doc_id integer, url text, data text)
  4. # - keywords (keyword_id integer, keyword text)#
  5. # The documents data is searched for occurrences of keywords and returns results of
  6. # url, data and keyword (a keyword can be multiple words, such as "high performance # computing")
  7. %YAML 1.1
  8. ---
  9. VERSION:1.0.0.1
  10. # Connect to Greenplum Database using this database and role
  11. DATABASE:webdata
  12. USER:jsmith
  13. # Begin definition section
  14. DEFINE:
  15. # Declare the input, which selects all columns and rows from the
  16. # 'documents' and 'keywords' tables.
  17. - INPUT:
  18. NAME:doc
  19. TABLE:documents
  20. - INPUT:
  21. NAME:kw
  22. TABLE:keywords
  23. # Define the map functions to extract terms from documents and keyword
  24. # This example simply splits on white space, but it would be possible
  25. # to make use of a python library like nltk (the natural language toolkit)
  26. # to perform more complex tokenization and word stemming.
  27. - MAP:
  28. NAME:doc_map
  29. LANGUAGE:python
  30. FUNCTION:|
  31. i = 0 # the index of a word within the document
  32. terms = {}# a hash of terms and their indexes within the document
  33. # Lower-case and split the text string on space
  34. for term in data.lower().split():
  35. i = i + 1# increment i (the index)
  36. # Check for the term in the terms list:
  37. # if stem word already exists, append the i value to the array entry
  38. # corresponding to the term. This counts multiple occurrences of the word.
  39. # If stem word does not exist, add it to the dictionary with position i.
  40. # For example:
  41. # data: "a computer is a machine that manipulates data"
  42. # "a" [1, 4]
  43. # "computer" [2]
  44. # "machine" [3]
  45. # …
  46. if term in terms:
  47. terms[term] += ','+str(i)
  48. else:
  49. terms[term] = str(i)
  50. # Return multiple lines for each document. Each line consists of
  51. # the doc_id, a term and the positions in the data where the term appeared.
  52. # For example:
  53. # (doc_id => 100, term => "a", [1,4]
  54. # (doc_id => 100, term => "computer", [2]
  55. # …
  56. for term in terms:
  57. yield([doc_id, term, terms[term]])
  58. OPTIMIZE:STRICT IMMUTABLE
  59. PARAMETERS:
  60. - doc_id integer
  61. - data text
  62. RETURNS:
  63. - doc_id integer
  64. - term text
  65. - positions text
  66. # The map function for keywords is almost identical to the one for documents
  67. # but it also counts of the number of terms in the keyword.
  68. - MAP:
  69. NAME:kw_map
  70. LANGUAGE:python
  71. FUNCTION:|
  72. i = 0
  73. terms = {}
  74. for term in keyword.lower().split():
  75. i = i + 1
  76. if term in terms:
  77. terms[term] += ','+str(i)
  78. else:
  79. terms[term] = str(i)
  80. # output 4 values including i (the total count for term in terms):
  81. yield([keyword_id, i, term, terms[term]])
  82. OPTIMIZE:STRICT IMMUTABLE
  83. PARAMETERS:
  84. - keyword_id integer
  85. - keyword text
  86. RETURNS:
  87. - keyword_id integer
  88. - nterms integer
  89. - term text
  90. - positions text
  91. # A TASK is an object that defines an entire INPUT/MAP/REDUCE stage
  92. # within a Greenplum MapReduce pipeline. It is like EXECUTION, but it is
  93. # executed only when called as input to other processing stages.
  94. # Identify a task called 'doc_prep' which takes in the 'doc' INPUT defined earlier
  95. # and runs the 'doc_map' MAP function which returns doc_id, term, [term_position]
  96. - TASK:
  97. NAME:doc_prep
  98. SOURCE:doc
  99. MAP:doc_map
  100. # Identify a task called 'kw_prep' which takes in the 'kw' INPUT defined earlier
  101. # and runs the kw_map MAP function which returns kw_id, term, [term_position]
  102. - TASK:
  103. NAME:kw_prep
  104. SOURCE:kw
  105. MAP:kw_map
  106. # One advantage of Greenplum MapReduce is that MapReduce tasks can be
  107. # used as input to SQL operations and SQL can be used to process a MapReduce task.
  108. # This INPUT defines a SQL query that joins the output of the 'doc_prep'
  109. # TASK to that of the 'kw_prep' TASK. Matching terms are output to the 'candidate'
  110. # list (any keyword that shares at least one term with the document).
  111. - INPUT:
  112. NAME: term_join
  113. QUERY: |
  114. SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,
  115. doc.positions as doc_positions,
  116. kw.positions as kw_positions
  117. FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term)
  118. # In Greenplum MapReduce, a REDUCE function is comprised of one or more functions.
  119. # A REDUCE has an initial 'state' variable defined for each grouping key. that is
  120. # A TRANSITION function adjusts the state for every value in a key grouping.
  121. # If present, an optional CONSOLIDATE function combines multiple
  122. # 'state' variables. This allows the TRANSITION function to be executed locally at
  123. # the segment-level and only redistribute the accumulated 'state' over
  124. # the network. If present, an optional FINALIZE function can be used to perform
  125. # final computation on a state and emit one or more rows of output from the state.
  126. #
  127. # This REDUCE function is called 'term_reducer' with a TRANSITION function
  128. # called 'term_transition' and a FINALIZE function called 'term_finalizer'
  129. - REDUCE:
  130. NAME:term_reducer
  131. TRANSITION:term_transition
  132. FINALIZE:term_finalizer
  133. - TRANSITION:
  134. NAME:term_transition
  135. LANGUAGE:python
  136. PARAMETERS:
  137. - state text
  138. - term text
  139. - nterms integer
  140. - doc_positions text
  141. - kw_positions text
  142. FUNCTION: |
  143. # 'state' has an initial value of '' and is a colon delimited set
  144. # of keyword positions. keyword positions are comma delimited sets of
  145. # integers. For example, '1,3,2:4:'
  146. # If there is an existing state, split it into the set of keyword positions
  147. # otherwise construct a set of 'nterms' keyword positions - all empty
  148. if state:
  149. kw_split = state.split(':')
  150. else:
  151. kw_split = []
  152. for i in range(0,nterms):
  153. kw_split.append('')
  154. # 'kw_positions' is a comma delimited field of integers indicating what
  155. # position a single term occurs within a given keyword.
  156. # Splitting based on ',' converts the string into a python list.
  157. # add doc_positions for the current term
  158. for kw_p in kw_positions.split(','):
  159. kw_split[int(kw_p)-1] = doc_positions
  160. # This section takes each element in the 'kw_split' array and strings
  161. # them together placing a ':' in between each element from the array.
  162. # For example: for the keyword "computer software computer hardware",
  163. # the 'kw_split' array matched up to the document data of
  164. # "in the business of computer software software engineers"
  165. # would look like: ['5', '6,7', '5', '']
  166. # and the outstate would look like: 5:6,7:5:
  167. outstate = kw_split[0]
  168. for s in kw_split[1:]:
  169. outstate = outstate + ':' + s
  170. return outstate
  171. - FINALIZE:
  172. NAME: term_finalizer
  173. LANGUAGE: python
  174. RETURNS:
  175. - count integer
  176. MODE:MULTI
  177. FUNCTION:|
  178. if not state:
  179. return 0
  180. kw_split = state.split(':')
  181. # This function does the following:
  182. # 1) Splits 'kw_split' on ':'
  183. # for example, 1,5,7:2,8 creates '1,5,7' and '2,8'
  184. # 2) For each group of positions in 'kw_split', splits the set on ','
  185. # to create ['1','5','7'] from Set 0: 1,5,7 and
  186. # eventually ['2', '8'] from Set 1: 2,8
  187. # 3) Checks for empty strings
  188. # 4) Adjusts the split sets by subtracting the position of the set
  189. # in the 'kw_split' array
  190. # ['1','5','7'] - 0 from each element = ['1','5','7']
  191. # ['2', '8'] - 1 from each element = ['1', '7']
  192. # 5) Resulting arrays after subtracting the offset in step 4 are
  193. # intersected and their overlapping values kept:
  194. # ['1','5','7'].intersect['1', '7'] = [1,7]
  195. # 6) Determines the length of the intersection, which is the number of
  196. # times that an entire keyword (with all its pieces) matches in the
  197. # document data.
  198. previous = None
  199. for i in range(0,len(kw_split)):
  200. isplit = kw_split[i].split(',')
  201. if any(map(lambda(x): x == '', isplit)):
  202. return 0
  203. adjusted = set(map(lambda(x): int(x)-i, isplit))
  204. if (previous):
  205. previous = adjusted.intersection(previous)
  206. else:
  207. previous = adjusted
  208. # return the final count
  209. if previous:
  210. return len(previous)
  211. # Define the 'term_match' task which is then executed as part
  212. # of the 'final_output' query. It takes the INPUT 'term_join' defined
  213. # earlier and uses the REDUCE function 'term_reducer' defined earlier
  214. - TASK:
  215. NAME:term_match
  216. SOURCE:term_join
  217. REDUCE:term_reducer
  218. - INPUT:
  219. NAME:final_output
  220. QUERY:|
  221. SELECT doc.*, kw.*, tm.count
  222. FROM documents doc, keywords kw, term_match tm
  223. WHERE doc.doc_id = tm.doc_id
  224. AND kw.keyword_id = tm.keyword_id
  225. AND tm.count > 0
  226. # Execute this MapReduce job and send output to STDOUT
  227. EXECUTE:
  228. - RUN:
  229. SOURCE:final_output
  230. TARGET:STDOUT

MapReduce示例的流程图

下图显示了示例中定义的MapReduce作业的作业流程:

Greenplum MapReduce规范 - 图1