MapReduce is a programming model developed by Google for processing and generating large data sets on an array of commodity servers. Greenplum MapReduce allows programmers who are familiar with the MapReduce model to write map and reduce functions and submit them to the Greenplum Database parallel engine for processing.

You configure a Greenplum MapReduce job via a YAML-formatted configuration file, then pass the file to the Greenplum MapReduce program, gpmapreduce, for execution by the Greenplum Database parallel engine. The Greenplum Database system distributes the input data, runs the program across a set of machines, handles machine failures, and manages the required inter-machine communication.

Refer to gpmapreduce for details about running the Greenplum MapReduce program.

Parent topic: Querying Data

About the Greenplum MapReduce Configuration File

This section explains some basics of the Greenplum MapReduce configuration file format to help you get started creating your own Greenplum MapReduce configuration files. Greenplum uses the YAML 1.1 document format and then implements its own schema for defining the various steps of a MapReduce job.

All Greenplum MapReduce configuration files must first declare the version of the YAML specification they are using. After that, three dashes (---) denote the start of a document, and three dots (...) indicate the end of a document without starting a new one. (A document in this context is equivalent to a MapReduce job.) Comment lines are prefixed with a pound symbol (#). You can declare multiple Greenplum MapReduce documents/jobs in the same file:

  1. %YAML 1.1
  2. ---
  3. # Begin Document 1
  4. # ...
  5. ---
  6. # Begin Document 2
  7. # ...

Within a Greenplum MapReduce document, there are three basic types of data structures or nodes: scalars, sequences and mappings.

A scalar is a basic string of text indented by a space. If you have a scalar input that spans multiple lines, a preceding pipe ( | ) denotes a literal style, where all line breaks are significant. Alternatively, a preceding angle bracket ( > ) folds a single line break to a space for subsequent lines that have the same indentation level. If a string contains characters that have reserved meaning, the string must be quoted or the special character must be escaped with a backslash ( \ ).

  1. # Read each new line literally
  2. somekey: | this value contains two lines
  3. and each line is read literally
  4. # Treat each new line as a space
  5. anotherkey: >
  6. this value contains two lines
  7. but is treated as one continuous line
  8. # This quoted string contains a special character
  9. ThirdKey: "This is a string: not a mapping"

A sequence is a list with each entry in the list on its own line denoted by a dash and a space (-). Alternatively, you can specify an inline sequence as a comma-separated list within square brackets. A sequence provides a set of data and gives it an order. When you load a list into the Greenplum MapReduce program, the order is kept.

  1. # list sequence
  2. - this
  3. - is
  4. - a list
  5. - with
  6. - five scalar values
  7. # inline sequence
  8. [this, is, a list, with, five scalar values]

A mapping is used to pair up data values with identifiers called keys. Mappings use a colon and space (:) for each key: value pair, or can also be specified inline as a comma-separated list within curly braces. The key is used as an index for retrieving data from a mapping.

  1. # a mapping of items
  2. title: War and Peace
  3. author: Leo Tolstoy
  4. date: 1865
  5. # same mapping written inline
  6. {title: War and Peace, author: Leo Tolstoy, date: 1865}

Keys are used to associate meta information with each node and specify the expected node type (scalar, sequence or mapping).

The Greenplum MapReduce program processes the nodes of a document in order and uses indentation (spaces) to determine the document hierarchy and the relationships of the nodes to one another. The use of white space is significant. White space should not be used simply for formatting purposes, and tabs should not be used at all.

Refer to gpmapreduce.yaml for detailed information about the Greenplum MapReduce configuration file format and the keys and values supported.

Example Greenplum MapReduce Job

In this example, you create a MapReduce job that processes text documents and reports on the number of occurrences of certain keywords in each document. The documents and keywords are stored in separate Greenplum Database tables that you create as part of the exercise.

This example MapReduce job utilizes the untrusted plpythonu language; as such, you must run the job as a user with Greenplum Database administrative privileges.

  1. Log in to the Greenplum Database master host as the gpadmin administrative user and set up your environment. For example:

    1. $ ssh gpadmin@<gpmaster>
    2. gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
  2. Create a new database for the MapReduce example: For example:

    1. gpadmin@gpmaster$ createdb mapredex_db
  3. Start the psql subsystem, connecting to the new database:

    1. gpadmin@gpmaster$ psql -d mapredex_db
  4. Register the PL/Python language in the database. For example:

    1. mapredex_db=> CREATE EXTENSION plpythonu;
  5. Create the documents table and add some data to the table. For example:

    1. CREATE TABLE documents (doc_id int, url text, data text);
    2. INSERT INTO documents VALUES (1, 'http:/url/1', 'this is one document in the corpus');
    3. INSERT INTO documents VALUES (2, 'http:/url/2', 'i am the second document in the corpus');
    4. INSERT INTO documents VALUES (3, 'http:/url/3', 'being third never really bothered me until now');
    5. INSERT INTO documents VALUES (4, 'http:/url/4', 'the document before me is the third document');
  6. Create the keywords table and add some data to the table. For example:

    1. CREATE TABLE keywords (keyword_id int, keyword text);
    2. INSERT INTO keywords VALUES (1, 'the');
    3. INSERT INTO keywords VALUES (2, 'document');
    4. INSERT INTO keywords VALUES (3, 'me');
    5. INSERT INTO keywords VALUES (4, 'being');
    6. INSERT INTO keywords VALUES (5, 'now');
    7. INSERT INTO keywords VALUES (6, 'corpus');
    8. INSERT INTO keywords VALUES (7, 'is');
    9. INSERT INTO keywords VALUES (8, 'third');
  7. Construct the MapReduce YAML configuration file. For example, open a file named mymrjob.yaml in the editor of your choice and copy/paste the following large text block:

    1. # This example MapReduce job processes documents and looks for keywords in them.
    2. # It takes two database tables as input:
    3. # - documents (doc_id integer, url text, data text)
    4. # - keywords (keyword_id integer, keyword text)#
    5. # The documents data is searched for occurrences of keywords and returns results of
    6. # url, data and keyword (a keyword can be multiple words, such as "high performance # computing")
    7. %YAML 1.1
    8. ---
    9. VERSION: 1.0.0.2
    10. # Connect to Greenplum Database using this database and role
    11. DATABASE: mapredex_db
    12. USER: gpadmin
    13. # Begin definition section
    14. DEFINE:
    15. # Declare the input, which selects all columns and rows from the
    16. # 'documents' and 'keywords' tables.
    17. - INPUT:
    18. NAME: doc
    19. TABLE: documents
    20. - INPUT:
    21. NAME: kw
    22. TABLE: keywords
    23. # Define the map functions to extract terms from documents and keyword
    24. # This example simply splits on white space, but it would be possible
    25. # to make use of a python library like nltk (the natural language toolkit)
    26. # to perform more complex tokenization and word stemming.
    27. - MAP:
    28. NAME: doc_map
    29. LANGUAGE: python
    30. FUNCTION: |
    31. i = 0 # the index of a word within the document
    32. terms = {}# a hash of terms and their indexes within the document
    33. # Lower-case and split the text string on space
    34. for term in data.lower().split():
    35. i = i + 1# increment i (the index)
    36. # Check for the term in the terms list:
    37. # if stem word already exists, append the i value to the array entry
    38. # corresponding to the term. This counts multiple occurrences of the word.
    39. # If stem word does not exist, add it to the dictionary with position i.
    40. # For example:
    41. # data: "a computer is a machine that manipulates data"
    42. # "a" [1, 4]
    43. # "computer" [2]
    44. # "machine" [3]
    45. # …
    46. if term in terms:
    47. terms[term] += ','+str(i)
    48. else:
    49. terms[term] = str(i)
    50. # Return multiple lines for each document. Each line consists of
    51. # the doc_id, a term and the positions in the data where the term appeared.
    52. # For example:
    53. # (doc_id => 100, term => "a", [1,4]
    54. # (doc_id => 100, term => "computer", [2]
    55. # …
    56. for term in terms:
    57. yield([doc_id, term, terms[term]])
    58. OPTIMIZE: STRICT IMMUTABLE
    59. PARAMETERS:
    60. - doc_id integer
    61. - data text
    62. RETURNS:
    63. - doc_id integer
    64. - term text
    65. - positions text
    66. # The map function for keywords is almost identical to the one for documents
    67. # but it also counts of the number of terms in the keyword.
    68. - MAP:
    69. NAME: kw_map
    70. LANGUAGE: python
    71. FUNCTION: |
    72. i = 0
    73. terms = {}
    74. for term in keyword.lower().split():
    75. i = i + 1
    76. if term in terms:
    77. terms[term] += ','+str(i)
    78. else:
    79. terms[term] = str(i)
    80. # output 4 values including i (the total count for term in terms):
    81. yield([keyword_id, i, term, terms[term]])
    82. OPTIMIZE: STRICT IMMUTABLE
    83. PARAMETERS:
    84. - keyword_id integer
    85. - keyword text
    86. RETURNS:
    87. - keyword_id integer
    88. - nterms integer
    89. - term text
    90. - positions text
    91. # A TASK is an object that defines an entire INPUT/MAP/REDUCE stage
    92. # within a Greenplum MapReduce pipeline. It is like EXECUTION, but it is
    93. # run only when called as input to other processing stages.
    94. # Identify a task called 'doc_prep' which takes in the 'doc' INPUT defined earlier
    95. # and runs the 'doc_map' MAP function which returns doc_id, term, [term_position]
    96. - TASK:
    97. NAME: doc_prep
    98. SOURCE: doc
    99. MAP: doc_map
    100. # Identify a task called 'kw_prep' which takes in the 'kw' INPUT defined earlier
    101. # and runs the kw_map MAP function which returns kw_id, term, [term_position]
    102. - TASK:
    103. NAME: kw_prep
    104. SOURCE: kw
    105. MAP: kw_map
    106. # One advantage of Greenplum MapReduce is that MapReduce tasks can be
    107. # used as input to SQL operations and SQL can be used to process a MapReduce task.
    108. # This INPUT defines a SQL query that joins the output of the 'doc_prep'
    109. # TASK to that of the 'kw_prep' TASK. Matching terms are output to the 'candidate'
    110. # list (any keyword that shares at least one term with the document).
    111. - INPUT:
    112. NAME: term_join
    113. QUERY: |
    114. SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,
    115. doc.positions as doc_positions,
    116. kw.positions as kw_positions
    117. FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term)
    118. # In Greenplum MapReduce, a REDUCE function is comprised of one or more functions.
    119. # A REDUCE has an initial 'state' variable defined for each grouping key. that is
    120. # A TRANSITION function adjusts the state for every value in a key grouping.
    121. # If present, an optional CONSOLIDATE function combines multiple
    122. # 'state' variables. This allows the TRANSITION function to be run locally at
    123. # the segment-level and only redistribute the accumulated 'state' over
    124. # the network. If present, an optional FINALIZE function can be used to perform
    125. # final computation on a state and emit one or more rows of output from the state.
    126. #
    127. # This REDUCE function is called 'term_reducer' with a TRANSITION function
    128. # called 'term_transition' and a FINALIZE function called 'term_finalizer'
    129. - REDUCE:
    130. NAME: term_reducer
    131. TRANSITION: term_transition
    132. FINALIZE: term_finalizer
    133. - TRANSITION:
    134. NAME: term_transition
    135. LANGUAGE: python
    136. PARAMETERS:
    137. - state text
    138. - term text
    139. - nterms integer
    140. - doc_positions text
    141. - kw_positions text
    142. FUNCTION: |
    143. # 'state' has an initial value of '' and is a colon delimited set
    144. # of keyword positions. keyword positions are comma delimited sets of
    145. # integers. For example, '1,3,2:4:'
    146. # If there is an existing state, split it into the set of keyword positions
    147. # otherwise construct a set of 'nterms' keyword positions - all empty
    148. if state:
    149. kw_split = state.split(':')
    150. else:
    151. kw_split = []
    152. for i in range(0,nterms):
    153. kw_split.append('')
    154. # 'kw_positions' is a comma delimited field of integers indicating what
    155. # position a single term occurs within a given keyword.
    156. # Splitting based on ',' converts the string into a python list.
    157. # add doc_positions for the current term
    158. for kw_p in kw_positions.split(','):
    159. kw_split[int(kw_p)-1] = doc_positions
    160. # This section takes each element in the 'kw_split' array and strings
    161. # them together placing a ':' in between each element from the array.
    162. # For example: for the keyword "computer software computer hardware",
    163. # the 'kw_split' array matched up to the document data of
    164. # "in the business of computer software software engineers"
    165. # would look like: ['5', '6,7', '5', '']
    166. # and the outstate would look like: 5:6,7:5:
    167. outstate = kw_split[0]
    168. for s in kw_split[1:]:
    169. outstate = outstate + ':' + s
    170. return outstate
    171. - FINALIZE:
    172. NAME: term_finalizer
    173. LANGUAGE: python
    174. RETURNS:
    175. - count integer
    176. MODE: MULTI
    177. FUNCTION: |
    178. if not state:
    179. yield 0
    180. kw_split = state.split(':')
    181. # This function does the following:
    182. # 1) Splits 'kw_split' on ':'
    183. # for example, 1,5,7:2,8 creates '1,5,7' and '2,8'
    184. # 2) For each group of positions in 'kw_split', splits the set on ','
    185. # to create ['1','5','7'] from Set 0: 1,5,7 and
    186. # eventually ['2', '8'] from Set 1: 2,8
    187. # 3) Checks for empty strings
    188. # 4) Adjusts the split sets by subtracting the position of the set
    189. # in the 'kw_split' array
    190. # ['1','5','7'] - 0 from each element = ['1','5','7']
    191. # ['2', '8'] - 1 from each element = ['1', '7']
    192. # 5) Resulting arrays after subtracting the offset in step 4 are
    193. # intersected and their overlapping values kept:
    194. # ['1','5','7'].intersect['1', '7'] = [1,7]
    195. # 6) Determines the length of the intersection, which is the number of
    196. # times that an entire keyword (with all its pieces) matches in the
    197. # document data.
    198. previous = None
    199. for i in range(0,len(kw_split)):
    200. isplit = kw_split[i].split(',')
    201. if any(map(lambda(x): x == '', isplit)):
    202. yield 0
    203. adjusted = set(map(lambda(x): int(x)-i, isplit))
    204. if (previous):
    205. previous = adjusted.intersection(previous)
    206. else:
    207. previous = adjusted
    208. # return the final count
    209. if previous:
    210. yield len(previous)
    211. # Define the 'term_match' task which is then run as part
    212. # of the 'final_output' query. It takes the INPUT 'term_join' defined
    213. # earlier and uses the REDUCE function 'term_reducer' defined earlier
    214. - TASK:
    215. NAME: term_match
    216. SOURCE: term_join
    217. REDUCE: term_reducer
    218. - INPUT:
    219. NAME: final_output
    220. QUERY: |
    221. SELECT doc.*, kw.*, tm.count
    222. FROM documents doc, keywords kw, term_match tm
    223. WHERE doc.doc_id = tm.doc_id
    224. AND kw.keyword_id = tm.keyword_id
    225. AND tm.count > 0
    226. # Execute this MapReduce job and send output to STDOUT
    227. EXECUTE:
    228. - RUN:
    229. SOURCE: final_output
    230. TARGET: STDOUT
  8. Save the file and exit the editor.

  9. Run the MapReduce job. For example:

    1. gpadmin@gpmaster$ gpmapreduce -f mymrjob.yaml

    The job displays the number of occurrences of each keyword in each document to stdout.

Flow Diagram for MapReduce Example

The following diagram shows the job flow of the MapReduce job defined in the example:

MapReduce job flow