Greenplum MapReduce Specification

This specification describes the document format and schema for defining Greenplum MapReduce jobs.

MapReduce is a programming model developed by Google for processing and generating large data sets on an array of commodity servers. Greenplum MapReduce allows programmers who are familiar with the MapReduce model to write map and reduce functions and submit them to the Greenplum Database parallel engine for processing.

To enable Greenplum to process MapReduce functions, define the functions in a document, then pass the document to the Greenplum MapReduce program, gpmapreduce, for execution by the Greenplum Database parallel engine. The Greenplum Database system distributes the input data, executes the program across a set of machines, handles machine failures, and manages the required inter-machine communication.

See the Greenplum Database Utility Guide for information about gpmapreduce.

Parent topic: Greenplum Database Reference Guide

Greenplum MapReduce Document Format

This section explains some basics of the Greenplum MapReduce document format to help you get started creating your own Greenplum MapReduce documents. Greenplum uses the YAML 1.1 document format and then implements its own schema for defining the various steps of a MapReduce job.

All Greenplum MapReduce files must first declare the version of the YAML specification they are using. After that, three dashes (---) denote the start of a document, and three dots (...) indicate the end of a document without starting a new one. Comment lines are prefixed with a pound symbol (#). It is possible to declare multiple Greenplum MapReduce documents in the same file:

  1. %YAML 1.1
  2. ---
  3. # Begin Document 1
  4. # ...
  5. ---
  6. # Begin Document 2
  7. # ...

Within a Greenplum MapReduce document, there are three basic types of data structures or nodes: scalars, sequences and mappings.

A scalar is a basic string of text indented by a space. If you have a scalar input that spans multiple lines, a preceding pipe ( | ) denotes a literal style, where all line breaks are significant. Alternatively, a preceding angle bracket ( > ) folds a single line break to a space for subsequent lines that have the same indentation level. If a string contains characters that have reserved meaning, the string must be quoted or the special character must be escaped with a backslash ( \ ).

  1. # Read each new line literally
  2. somekey: | this value contains two lines
  3. and each line is read literally
  4. # Treat each new line as a space
  5. anotherkey: *\>
  6. * this value contains two lines
  7. but is treated as one continuous line
  8. # This quoted string contains a special character
  9. ThirdKey: "This is a string: not a mapping"

A sequence is a list with each entry in the list on its own line denoted by a dash and a space (-). Alternatively, you can specify an inline sequence as a comma-separated list within square brackets. A sequence provides a set of data and gives it an order. When you load a list into the Greenplum MapReduce program, the order is kept.

  1. # list sequence
  2. - this
  3. - is
  4. - a list
  5. - with
  6. - five scalar values
  7. # inline sequence
  8. [this, is, a list, with, five scalar values]

A mapping is used to pair up data values with indentifiers called keys. Mappings use a colon and space (:) for each key: value pair, or can also be specified inline as a comma-separated list within curly braces. The key is used as an index for retrieving data from a mapping.

  1. # a mapping of items
  2. title: War and Peace
  3. author: Leo Tolstoy
  4. date: 1865
  5. # same mapping written inline
  6. {title: War and Peace, author: Leo Tolstoy, date: 1865}

Keys are used to associate meta information with each node and specify the expected node type (scalar, sequence or mapping). See Greenplum MapReduce Document Schema for the keys expected by the Greenplum MapReduce program.

The Greenplum MapReduce program processes the nodes of a document in order and uses indentation (spaces) to determine the document hierarchy and the relationships of the nodes to one another. The use of white space is significant. White space should not be used simply for formatting purposes, and tabs should not be used at all.

Greenplum MapReduce Document Schema

Greenplum MapReduce uses the YAML document framework and implements its own YAML schema. The basic structure of a Greenplum MapReduce document is:

  1. %YAML 1.1
  2. ---
  3. [VERSION](#VERSION): 1.0.0.2
  4. [DATABASE](#DATABASE): dbname
  5. [USER](#USER): db_username
  6. [HOST](#HOST): master_hostname
  7. [PORT](#PORT): master_port
  1. [DEFINE](#DEFINE):
  2. - [INPUT](#INPUT):
  3. [NAME](#NAME): input_name
  4. [FILE](#FILE):
  5. - hostname:/path/to/file
  6. [GPFDIST](#GPFDIST):
  7. - hostname:port/file_pattern
  8. [TABLE](#TABLE): table_name
  9. [QUERY](#QUERY): SELECT_statement
  10. [EXEC](#EXEC): command_string
  11. [COLUMNS](#COLUMNS):
  12. - field_name data_type
  13. [FORMAT](#FORMAT): TEXT | CSV
  14. [DELIMITER](#DELIMITER): delimiter_character
  15. [ESCAPE](#ESCAPE): escape_character
  16. [NULL](#NULL): null_string
  17. [QUOTE](#QUOTE): csv_quote_character
  18. [ERROR\_LIMIT](#ERROR_LIMIT): integer
  19. [ENCODING](#ENCODING): database_encoding
  1. - [OUTPUT](#OUTPUT):
  2. [NAME](#OUTPUTNAME): output_name
  3. [FILE](#OUTPUTFILE): file_path_on_client
  4. [TABLE](#OUTPUTTABLE): table_name
  5. [KEYS](#KEYS):
  6. - column_name
  7. [MODE](#MODE): REPLACE | APPEND
  1. - [MAP](#MAP):
  2. [NAME](#NAME): function_name
  3. [FUNCTION](#FUNCTION): function_definition
  4. [LANGUAGE](#LANGUAGE): perl | python | c
  5. [LIBRARY](#LIBRARY): /path/filename.so
  6. [PARAMETERS](#PARAMETERS):
  7. - nametype
  8. [RETURNS](#RETURNS):
  9. - nametype
  10. [OPTIMIZE](#OPTIMIZE): STRICT IMMUTABLE
  11. [MODE](#MODE): SINGLE | MULTI
  1. - [TRANSITION \| CONSOLIDATE \| FINALIZE](#TCF):
  2. [NAME](#TCFNAME): function_name
  3. [FUNCTION](#FUNCTION): function_definition
  4. [LANGUAGE](#LANGUAGE): perl | python | c
  5. [LIBRARY](#LIBRARY): /path/filename.so
  6. [PARAMETERS](#PARAMETERS):
  7. - nametype
  8. [RETURNS](#RETURNS):
  9. - nametype
  10. [OPTIMIZE](#OPTIMIZE): STRICT IMMUTABLE
  11. [MODE](#TCFMODE): SINGLE | MULTI
  1. - [REDUCE](#REDUCE):
  2. [NAME](#REDUCENAME): reduce_job_name
  3. [TRANSITION](#TRANSITION): transition_function_name
  4. [CONSOLIDATE](#CONSOLIDATE): consolidate_function_name
  5. [FINALIZE](#FINALIZE): finalize_function_name
  6. [INITIALIZE](#INITIALIZE): value
  7. [KEYS](#REDUCEKEYS):
  8. - key_name
  1. - [TASK](#TASK):
  2. [NAME](#TASKNAME): task_name
  3. [SOURCE](#SOURCE): input_name
  4. [MAP](#TASKMAP): map_function_name
  5. [REDUCE](#REDUCE): reduce_function_name
  6. [EXECUTE](#EXECUTE)
  1. - [RUN](#RUN):
  2. [SOURCE](#EXECUTESOURCE): input_or_task_name
  3. [TARGET](#TARGET): output_name
  4. [MAP](#EXECUTEMAP): map_function_name
  5. [REDUCE](#EXECUTEREDUCE): reduce_function_name...

VERSION

Required. The version of the Greenplum MapReduce YAML specification. Current versions are 1.0.0.1.

DATABASE

Optional. Specifies which database in Greenplum to connect to. If not specified, defaults to the default database or $PGDATABASE if set.

USER

Optional. Specifies which database role to use to connect. If not specified, defaults to the current user or $PGUSER if set. You must be a Greenplum superuser to run functions written in untrusted Python and Perl. Regular database users can run functions written in trusted Perl. You also must be a database superuser to run MapReduce jobs that contain FILE, GPFDIST and EXEC input types.

HOST

Optional. Specifies Greenplum master host name. If not specified, defaults to localhost or $PGHOST if set.

PORT

Optional. Specifies Greenplum master port. If not specified, defaults to 5432 or $PGPORT if set.

DEFINE

Required. A sequence of definitions for this MapReduce document. The DEFINE section must have at least one INPUT definition.

  • INPUT

    Required. Defines the input data. Every MapReduce document must have at least one input defined. Multiple input definitions are allowed in a document, but each input definition can specify only one of these access types:a file, a gpfdist file distribution program, a table in the database, an SQL command, or an operating system command. See the Greenplum Database Utility Guide for information about gpfdist.

  1. NAME
  2. : A name for this input. Names must be unique with regards to the names of other objects in this MapReduce job \(such as map function, task, reduce function and output names\). Also, names cannot conflict with existing objects in the database \(such as tables, functions or views\).
  1. FILE
  2. : A sequence of one or more input files in the format: `seghostname:/path/to/filename`. You must be a Greenplum Database superuser to run MapReduce jobs with `FILE` input. The file must reside on a Greenplum segment host.
  1. GPFDIST
  2. : A sequence of one or more running `gpfdist` file distribution programs in the format: `hostname[:port]/file_pattern`. You must be a Greenplum Database superuser to run MapReduce jobs with `GPFDIST` input, unless the server configuration parameter [Server Configuration Parameters](config_params/guc_config.html) is set to `on`.
  1. TABLE
  2. : The name of an existing table in the database.
  1. QUERY
  2. : A SQL `SELECT` command to run within the database.
  1. EXEC
  2. : An operating system command to run on the Greenplum segment hosts. The command is run by all segment instances in the system by default. For example, if you have four segment instances per segment host, the command will be run four times on each host. You must be a Greenplum Database superuser to run MapReduce jobs with `EXEC` input and the server configuration parameter [Server Configuration Parameters](config_params/guc_config.html) is set to `on`.
  1. COLUMNS
  2. : Optional. Columns are specified as: `column_name``[``data_type``]`. If not specified, the default is `value text`. The [DELIMITER](#DELIMITER) character is what separates two data value fields \(columns\). A row is determined by a line feed character \(`0x0a`\).
  1. FORMAT
  2. : Optional. Specifies the format of the data - either delimited text \(`TEXT`\) or comma separated values \(`CSV`\) format. If the data format is not specified, defaults to `TEXT`.
  1. DELIMITER
  2. : Optional for [FILE](#FILE), [GPFDIST](#GPFDIST) and [EXEC](#EXEC) inputs. Specifies a single character that separates data values. The default is a tab character in `TEXT` mode, a comma in `CSV` mode. The delimiter character must only appear between any two data value fields. Do not place a delimiter at the beginning or end of a row.
  1. ESCAPE
  2. : Optional for [FILE](#FILE), [GPFDIST](#GPFDIST) and [EXEC](#EXEC) inputs. Specifies the single character that is used for C escape sequences \(such as `\n`,`\t`,`\100`, and so on\) and for escaping data characters that might otherwise be taken as row or column delimiters. Make sure to choose an escape character that is not used anywhere in your actual column data. The default escape character is a \\ \(backslash\) for text-formatted files and a `"` \(double quote\) for csv-formatted files, however it is possible to specify another character to represent an escape. It is also possible to deactivate escaping by specifying the value `'OFF'` as the escape value. This is very useful for data such as text-formatted web log data that has many embedded backslashes that are not intended to be escapes.
  1. NULL
  2. : Optional for [FILE](#FILE), [GPFDIST](#GPFDIST) and [EXEC](#EXEC) inputs. Specifies the string that represents a null value. The default is `\N` in `TEXT` format, and an empty value with no quotations in `CSV` format. You might prefer an empty string even in `TEXT` mode for cases where you do not want to distinguish nulls from empty strings. Any input data item that matches this string will be considered a null value.
  1. QUOTE
  2. : Optional for [FILE](#FILE), [GPFDIST](#GPFDIST) and [EXEC](#EXEC) inputs. Specifies the quotation character for `CSV` formatted files. The default is a double quote \(`"`\). In `CSV` formatted files, data value fields must be enclosed in double quotes if they contain any commas or embedded new lines. Fields that contain double quote characters must be surrounded by double quotes, and the embedded double quotes must each be represented by a pair of consecutive double quotes. It is important to always open and close quotes correctly in order for data rows to be parsed correctly.
  1. ERROR\_LIMIT
  2. : If the input rows have format errors they will be discarded provided that the error limit count is not reached on any Greenplum segment instance during input processing. If the error limit is not reached, all good rows will be processed and any error rows discarded.
  1. ENCODING
  2. : Character set encoding to use for the data. Specify a string constant \(such as `'SQL_ASCII'`\), an integer encoding number, or `DEFAULT` to use the default client encoding. See [Character Set Support](character_sets.html) for more information.
  • OUTPUT

    Optional. Defines where to output the formatted data of this MapReduce job. If output is not defined, the default is STDOUT (standard output of the client). You can send output to a file on the client host or to an existing table in the database.

  1. NAME
  2. : A name for this output. The default output name is `STDOUT`. Names must be unique with regards to the names of other objects in this MapReduce job \(such as map function, task, reduce function and input names\). Also, names cannot conflict with existing objects in the database \(such as tables, functions or views\).
  1. FILE
  2. : Specifies a file location on the MapReduce client machine to output data in the format: `/path/to/filename`.
  1. TABLE
  2. : Specifies the name of a table in the database to output data. If this table does not exist prior to running the MapReduce job, it will be created using the distribution policy specified with [KEYS](#KEYS).
  1. KEYS
  2. : Optional for [TABLE](#OUTPUTTABLE) output. Specifies the column\(s\) to use as the Greenplum Database distribution key. If the [EXECUTE](#EXECUTE) task contains a [REDUCE](#REDUCE) definition, then the `REDUCE` keys will be used as the table distribution key by default. Otherwise, the first column of the table will be used as the distribution key.
  1. MODE
  2. : Optional for [TABLE](#OUTPUTTABLE) output. If not specified, the default is to create the table if it does not already exist, but error out if it does exist. Declaring `APPEND` adds output data to an existing table \(provided the table schema matches the output format\) without removing any existing data. Declaring `REPLACE` will drop the table if it exists and then recreate it. Both `APPEND` and `REPLACE` will create a new table if one does not exist.
  • MAP

    Required. Each MAP function takes data structured in ( key, value) pairs, processes each pair, and generates zero or more output ( key, value) pairs. The Greenplum MapReduce framework then collects all pairs with the same key from all output lists and groups them together. This output is then passed to the REDUCE task, which is comprised of TRANSITION | CONSOLIDATE | FINALIZE functions. There is one predefined MAP function named IDENTITY that returns ( key, value) pairs unchanged. Although ( key, value) are the default parameters, you can specify other prototypes as needed.

    TRANSITION | CONSOLIDATE | FINALIZE

    TRANSITION, CONSOLIDATE and FINALIZE are all component pieces of REDUCE. A TRANSITION function is required. CONSOLIDATE and FINALIZE functions are optional. By default, all take state as the first of their input PARAMETERS, but other prototypes can be defined as well.

: A TRANSITION function iterates through each value of a given key and accumulates values in a state variable. When the transition function is called on the first value of a key, the state is set to the value specified by INITALIZE of a REDUCE job (or the default state value for the data type). A transition takes two arguments as input; the current state of the key reduction, and the next value, which then produces a new state.

: If a CONSOLIDATE function is specified, TRANSITION processing is performed at the segment-level before redistributing the keys across the Greenplum interconnect for final aggregation (two-phase aggregation). Only the resulting state value for a given key is redistributed, resulting in lower interconnect traffic and greater parallelism. CONSOLIDATE is handled like a TRANSITION, except that instead of (state + value) => state, it is (state + state) => state.

: If a FINALIZE function is specified, it takes the final state produced by CONSOLIDATE (if present) or TRANSITION and does any final processing before emitting the final result. TRANSITION and CONSOLIDATEfunctions cannot return a set of values. If you need a REDUCE job to return a set, then a FINALIZE is necessary to transform the final state into a set of output values.

  1. NAME
  2. : Required. A name for the function. Names must be unique with regards to the names of other objects in this MapReduce job \(such as function, task, input and output names\). You can also specify the name of a function built-in to Greenplum Database. If using a built-in function, do not supply [LANGUAGE](#LANGUAGE) or a [FUNCTION](#FUNCTION) body.
  1. FUNCTION
  2. : Optional. Specifies the full body of the function using the specified [LANGUAGE](#LANGUAGE). If `FUNCTION` is not specified, then a built-in database function corresponding to [NAME](#TCFNAME) is used.
  1. LANGUAGE
  2. : Required when [FUNCTION](#FUNCTION) is used. Specifies the implementation language used to interpret the function. This release has language support for `perl`, `python`, and `C`. If calling a built-in database function, `LANGUAGE` should not be specified.
  1. LIBRARY
  2. : Required when [LANGUAGE](#LANGUAGE) is C \(not allowed for other language functions\). To use this attribute, [VERSION](#VERSION) must be 1.0.0.2. The specified library file must be installed prior to running the MapReduce job, and it must exist in the same file system location on all Greenplum hosts \(master and segments\).
  1. PARAMETERS
  2. : Optional. Function input parameters. The default type is `text`.
  1. `MAP` default - `key text`, `value text`
  1. `TRANSITION` default - `state text`, `value text`
  1. `CONSOLIDATE` default - `state1 text`, `state2 text` \(must have exactly two input parameters of the same data type\)
  1. `FINALIZE` default - `state text` \(single parameter only\)
  1. RETURNS
  2. : Optional. The default return type is `text`.
  1. `MAP` default - `key text`, `value text`
  1. `TRANSITION` default - `state text` \(single return value only\)
  1. `CONSOLIDATE` default - `state text` \(single return value only\)
  1. `FINALIZE` default - `value text`
  1. OPTIMIZE
  2. : Optional optimization parameters for the function:
  1. `STRICT` - function is not affected by `NULL` values
  1. `IMMUTABLE` - function will always return the same value for a given input
  1. MODE
  2. : Optional. Specifies the number of rows returned by the function.
  1. `MULTI` - returns 0 or more rows per input record. The return value of the function must be an array of rows to return, or the function must be written as an iterator using `yield` in Python or `return_next` in Perl. `MULTI` is the default mode for `MAP` and `FINALIZE` functions.
  1. `SINGLE` - returns exactly one row per input record. `SINGLE` is the only mode supported for `TRANSITION` and `CONSOLIDATE` functions. When used with `MAP` and `FINALIZE` functions, `SINGLE` mode can provide modest performance improvement.
  • REDUCE

    Required. A REDUCE definition names the TRANSITION | CONSOLIDATE | FINALIZE functions that comprise the reduction of ( key, value) pairs to the final result set. There are also several predefined REDUCE jobs you can execute, which all operate over a column named value:

  1. `IDENTITY` - returns \(key, value\) pairs unchanged
  1. `SUM` - calculates the sum of numeric data
  1. `AVG` - calculates the average of numeric data
  1. `COUNT` - calculates the count of input data
  1. `MIN` - calculates minimum value of numeric data
  1. `MAX` - calculates maximum value of numeric data
  1. NAME
  2. : Required. The name of this `REDUCE` job. Names must be unique with regards to the names of other objects in this MapReduce job \(function, task, input and output names\). Also, names cannot conflict with existing objects in the database \(such as tables, functions or views\).
  1. TRANSITION
  2. : Required. The name of the `TRANSITION` function.
  1. CONSOLIDATE
  2. : Optional. The name of the `CONSOLIDATE` function.
  1. FINALIZE
  2. : Optional. The name of the `FINALIZE` function.
  1. INITIALIZE
  2. : Optional for `text` and `float` data types. Required for all other data types. The default value for text is `''` . The default value for float is `0.0` . Sets the initial `state` value of the `TRANSITION` function.
  1. KEYS
  2. : Optional. Defaults to `[key, *]`. When using a multi-column reduce it may be necessary to specify which columns are key columns and which columns are value columns. By default, any input columns that are not passed to the `TRANSITION` function are key columns, and a column named `key` is always a key column even if it is passed to the `TRANSITION` function. The special indicator `*` indicates all columns not passed to the `TRANSITION` function. If this indicator is not present in the list of keys then any unmatched columns are discarded.
  • TASK

    Optional. A TASK defines a complete end-to-end INPUT/ MAP/ REDUCE stage within a Greenplum MapReduce job pipeline. It is similar to EXECUTE except it is not immediately executed. A task object can be called as INPUT to further processing stages.

  1. NAME
  2. : Required. The name of this task. Names must be unique with regards to the names of other objects in this MapReduce job \(such as map function, reduce function, input and output names\). Also, names cannot conflict with existing objects in the database \(such as tables, functions or views\).
  1. SOURCE
  2. : The name of an [INPUT](#INPUT) or another `TASK`.
  1. MAP
  2. : Optional. The name of a [MAP](#MAP) function. If not specified, defaults to `IDENTITY`.
  1. REDUCE
  2. : Optional. The name of a [REDUCE](#REDUCE) function. If not specified, defaults to `IDENTITY`.

EXECUTE

Required. EXECUTE defines the final INPUT/MAP/REDUCE stage within a Greenplum MapReduce job pipeline.

  • RUN

    SOURCE

Required. The name of an INPUT or TASK.

TARGET

Optional. The name of an OUTPUT. The default output is STDOUT.

MAP

Optional. The name of a MAP function. If not specified, defaults to IDENTITY.

REDUCE

Optional. The name of a REDUCE function. Defaults to IDENTITY.

Example Greenplum MapReduce Document

  1. # This example MapReduce job processes documents and looks for keywords in them.
  2. # It takes two database tables as input:
  3. # - documents (doc_id integer, url text, data text)
  4. # - keywords (keyword_id integer, keyword text)#
  5. # The documents data is searched for occurences of keywords and returns results of
  6. # url, data and keyword (a keyword can be multiple words, such as "high performance # computing")
  7. %YAML 1.1
  8. ---
  9. VERSION:1.0.0.1
  10. # Connect to Greenplum Database using this database and role
  11. DATABASE:webdata
  12. USER:jsmith
  13. # Begin definition section
  14. DEFINE:
  15. # Declare the input, which selects all columns and rows from the
  16. # 'documents' and 'keywords' tables.
  17. - INPUT:
  18. NAME:doc
  19. TABLE:documents
  20. - INPUT:
  21. NAME:kw
  22. TABLE:keywords
  23. # Define the map functions to extract terms from documents and keyword
  24. # This example simply splits on white space, but it would be possible
  25. # to make use of a python library like nltk (the natural language toolkit)
  26. # to perform more complex tokenization and word stemming.
  27. - MAP:
  28. NAME:doc_map
  29. LANGUAGE:python
  30. FUNCTION:|
  31. i = 0 # the index of a word within the document
  32. terms = {}# a hash of terms and their indexes within the document
  33. # Lower-case and split the text string on space
  34. for term in data.lower().split():
  35. i = i + 1# increment i (the index)
  36. # Check for the term in the terms list:
  37. # if stem word already exists, append the i value to the array entry
  38. # corresponding to the term. This counts multiple occurrences of the word.
  39. # If stem word does not exist, add it to the dictionary with position i.
  40. # For example:
  41. # data: "a computer is a machine that manipulates data"
  42. # "a" [1, 4]
  43. # "computer" [2]
  44. # "machine" [3]
  45. # …
  46. if term in terms:
  47. terms[term] += ','+str(i)
  48. else:
  49. terms[term] = str(i)
  50. # Return multiple lines for each document. Each line consists of
  51. # the doc_id, a term and the positions in the data where the term appeared.
  52. # For example:
  53. # (doc_id => 100, term => "a", [1,4]
  54. # (doc_id => 100, term => "computer", [2]
  55. # …
  56. for term in terms:
  57. yield([doc_id, term, terms[term]])
  58. OPTIMIZE:STRICT IMMUTABLE
  59. PARAMETERS:
  60. - doc_id integer
  61. - data text
  62. RETURNS:
  63. - doc_id integer
  64. - term text
  65. - positions text
  66. # The map function for keywords is almost identical to the one for documents
  67. # but it also counts of the number of terms in the keyword.
  68. - MAP:
  69. NAME:kw_map
  70. LANGUAGE:python
  71. FUNCTION:|
  72. i = 0
  73. terms = {}
  74. for term in keyword.lower().split():
  75. i = i + 1
  76. if term in terms:
  77. terms[term] += ','+str(i)
  78. else:
  79. terms[term] = str(i)
  80. # output 4 values including i (the total count for term in terms):
  81. yield([keyword_id, i, term, terms[term]])
  82. OPTIMIZE:STRICT IMMUTABLE
  83. PARAMETERS:
  84. - keyword_id integer
  85. - keyword text
  86. RETURNS:
  87. - keyword_id integer
  88. - nterms integer
  89. - term text
  90. - positions text
  91. # A TASK is an object that defines an entire INPUT/MAP/REDUCE stage
  92. # within a Greenplum MapReduce pipeline. It is like EXECUTION, but it is
  93. # executed only when called as input to other processing stages.
  94. # Identify a task called 'doc_prep' which takes in the 'doc' INPUT defined earlier
  95. # and runs the 'doc_map' MAP function which returns doc_id, term, [term_position]
  96. - TASK:
  97. NAME:doc_prep
  98. SOURCE:doc
  99. MAP:doc_map
  100. # Identify a task called 'kw_prep' which takes in the 'kw' INPUT defined earlier
  101. # and runs the kw_map MAP function which returns kw_id, term, [term_position]
  102. - TASK:
  103. NAME:kw_prep
  104. SOURCE:kw
  105. MAP:kw_map
  106. # One advantage of Greenplum MapReduce is that MapReduce tasks can be
  107. # used as input to SQL operations and SQL can be used to process a MapReduce task.
  108. # This INPUT defines a SQL query that joins the output of the 'doc_prep'
  109. # TASK to that of the 'kw_prep' TASK. Matching terms are output to the 'candidate'
  110. # list (any keyword that shares at least one term with the document).
  111. - INPUT:
  112. NAME: term_join
  113. QUERY: |
  114. SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,
  115. doc.positions as doc_positions,
  116. kw.positions as kw_positions
  117. FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term)
  118. # In Greenplum MapReduce, a REDUCE function is comprised of one or more functions.
  119. # A REDUCE has an initial 'state' variable defined for each grouping key. that is
  120. # A TRANSITION function adjusts the state for every value in a key grouping.
  121. # If present, an optional CONSOLIDATE function combines multiple
  122. # 'state' variables. This allows the TRANSITION function to be executed locally at
  123. # the segment-level and only redistribute the accumulated 'state' over
  124. # the network. If present, an optional FINALIZE function can be used to perform
  125. # final computation on a state and emit one or more rows of output from the state.
  126. #
  127. # This REDUCE function is called 'term_reducer' with a TRANSITION function
  128. # called 'term_transition' and a FINALIZE function called 'term_finalizer'
  129. - REDUCE:
  130. NAME:term_reducer
  131. TRANSITION:term_transition
  132. FINALIZE:term_finalizer
  133. - TRANSITION:
  134. NAME:term_transition
  135. LANGUAGE:python
  136. PARAMETERS:
  137. - state text
  138. - term text
  139. - nterms integer
  140. - doc_positions text
  141. - kw_positions text
  142. FUNCTION: |
  143. # 'state' has an initial value of '' and is a colon delimited set
  144. # of keyword positions. keyword positions are comma delimited sets of
  145. # integers. For example, '1,3,2:4:'
  146. # If there is an existing state, split it into the set of keyword positions
  147. # otherwise construct a set of 'nterms' keyword positions - all empty
  148. if state:
  149. kw_split = state.split(':')
  150. else:
  151. kw_split = []
  152. for i in range(0,nterms):
  153. kw_split.append('')
  154. # 'kw_positions' is a comma delimited field of integers indicating what
  155. # position a single term occurs within a given keyword.
  156. # Splitting based on ',' converts the string into a python list.
  157. # add doc_positions for the current term
  158. for kw_p in kw_positions.split(','):
  159. kw_split[int(kw_p)-1] = doc_positions
  160. # This section takes each element in the 'kw_split' array and strings
  161. # them together placing a ':' in between each element from the array.
  162. # For example: for the keyword "computer software computer hardware",
  163. # the 'kw_split' array matched up to the document data of
  164. # "in the business of computer software software engineers"
  165. # would look like: ['5', '6,7', '5', '']
  166. # and the outstate would look like: 5:6,7:5:
  167. outstate = kw_split[0]
  168. for s in kw_split[1:]:
  169. outstate = outstate + ':' + s
  170. return outstate
  171. - FINALIZE:
  172. NAME: term_finalizer
  173. LANGUAGE: python
  174. RETURNS:
  175. - count integer
  176. MODE:MULTI
  177. FUNCTION:|
  178. if not state:
  179. return 0
  180. kw_split = state.split(':')
  181. # This function does the following:
  182. # 1) Splits 'kw_split' on ':'
  183. # for example, 1,5,7:2,8 creates '1,5,7' and '2,8'
  184. # 2) For each group of positions in 'kw_split', splits the set on ','
  185. # to create ['1','5','7'] from Set 0: 1,5,7 and
  186. # eventually ['2', '8'] from Set 1: 2,8
  187. # 3) Checks for empty strings
  188. # 4) Adjusts the split sets by subtracting the position of the set
  189. # in the 'kw_split' array
  190. # ['1','5','7'] - 0 from each element = ['1','5','7']
  191. # ['2', '8'] - 1 from each element = ['1', '7']
  192. # 5) Resulting arrays after subtracting the offset in step 4 are
  193. # intersected and their overlaping values kept:
  194. # ['1','5','7'].intersect['1', '7'] = [1,7]
  195. # 6) Determines the length of the intersection, which is the number of
  196. # times that an entire keyword (with all its pieces) matches in the
  197. # document data.
  198. previous = None
  199. for i in range(0,len(kw_split)):
  200. isplit = kw_split[i].split(',')
  201. if any(map(lambda(x): x == '', isplit)):
  202. return 0
  203. adjusted = set(map(lambda(x): int(x)-i, isplit))
  204. if (previous):
  205. previous = adjusted.intersection(previous)
  206. else:
  207. previous = adjusted
  208. # return the final count
  209. if previous:
  210. return len(previous)
  211. # Define the 'term_match' task which is then executed as part
  212. # of the 'final_output' query. It takes the INPUT 'term_join' defined
  213. # earlier and uses the REDUCE function 'term_reducer' defined earlier
  214. - TASK:
  215. NAME:term_match
  216. SOURCE:term_join
  217. REDUCE:term_reducer
  218. - INPUT:
  219. NAME:final_output
  220. QUERY:|
  221. SELECT doc.*, kw.*, tm.count
  222. FROM documents doc, keywords kw, term_match tm
  223. WHERE doc.doc_id = tm.doc_id
  224. AND kw.keyword_id = tm.keyword_id
  225. AND tm.count > 0
  226. # Execute this MapReduce job and send output to STDOUT
  227. EXECUTE:
  228. - RUN:
  229. SOURCE:final_output
  230. TARGET:STDOUT

Flow Diagram for MapReduce Example

The following diagram shows the job flow of the MapReduce job defined in the example:

Job flow for MapReduce job