第六章 - MapReduce

MapReduce是一种数据处理的方法,有相比较为传统的方案它有两个显著的优势。第一个优势是它卓越的性能,也是最初开发MapReduce的主要目的。理论上MapReduce可以并行工作,可以利用多核/多CPU/多机器同时处理非常大量的数据。我们也说过,这点优势MongoDB无法利用上。第二个优势就是用户可以为数据处理编写真正的程序。与SQL相比,用MapReduce可以实现无限多种功能,在逼不得已寻求更专业的方案之前,MapReduce提供了更多的可能。

MapReduce这种模式越来越普及,几乎任何语言上都有它的实现:C#,Ruby,Java,Python等等。我要说的是一开始它看起来和其他方案很不一样而且很复杂,不过不要泄气,花些时间来实践。无论您用不用MongoDB,它都很值得您去了解。

理论与实践

MapReduce的流程分两步。首先做映射(map)然后做缩减(reduce)。在映射时转换输入的文档并输出(emit)键-值组合(键或值可以很复杂)。在缩减时将一个键以及为该键输出的值的数组生成最终的结果。我们来看看这当中的每一步以及相应的输出。

下面的例子假设为某个数据源(比如说一个网页)生成每天的点击数。这相当于MapReduce的hello world。为了实现这个应用,我们需要有一个hits集合,其中有两个域:resourcedate。我们设计的输出分为:resourceyearmonthday以及count

又假设hits的数据如下:

  1. resource date
  2. index Jan 20 2010 4:30
  3. index Jan 20 2010 5:30
  4. about Jan 20 2010 6:00
  5. index Jan 20 2010 7:00
  6. about Jan 21 2010 8:00
  7. about Jan 21 2010 8:30
  8. index Jan 21 2010 8:30
  9. about Jan 21 2010 9:00
  10. index Jan 21 2010 9:30
  11. index Jan 22 2010 5:00

我们希望最终有下面的输出:

  1. resource year month day count
  2. index 2010 1 20 3
  3. about 2010 1 20 1
  4. about 2010 1 21 3
  5. index 2010 1 21 2
  6. index 2010 1 22 1

当前分析的这个方法有一个好处,那就是通过存储输出的数据,报告很快就可以生成,且数据的增长是可控的。(对于上面的数据源,每天只需要增加最多一个文档)

我们先专注于概念的理解,到了本章快结束时,会有数据和代码的示例供您亲自实验。

首先来看看映射函数。映射的目的在于输出(emit)值以便后续缩减。一个映射有可能不输出或者输出多次值。在我们的例子中,映射总是会输出一次(很正常的做法)。可以把这里的映射想象成遍历hits中的每一个文档。对于每个文档我们要输出一个包含了resource,year,month和day的键,还有一个简单的值,1:

  1. function() {
  2. var key = {
  3. resource: this.resource,
  4. year: this.date.getFullYear(),
  5. month: this.date.getMonth(),
  6. day: this.date.getDate()
  7. };
  8. emit(key, {count: 1});
  9. }

this指的是当前正在分析的文档。希望看到下面映射输出可以让这个过程清楚一些。基于前面的数据,完整的映射输出应该是:

  1. {resource: 'index', year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
  2. {resource: 'about', year: 2010, month: 0, day: 20} => [{count: 1}]
  3. {resource: 'about', year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}, {count:1}]
  4. {resource: 'index', year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}]
  5. {resource: 'index', year: 2010, month: 0, day: 22} => [{count: 1}]

了解这一中间步骤是了解MapReduce的关键。输出的值根据键的不同被组织成相应的数组。.NET和Java的程序员可以把这视为类型IDictionary<object, IList<object>>(.NET)或者是HashMap<Object, ArrayList>(Java)。

接下来我们人为的修改一下映射函数的行为:

  1. function() {
  2. var key = {resource: this.resource, year: this.date.getFullYear(), month: this.date.getMonth(), day: this.date.getDate()};
  3. if (this.resource == 'index' && this.date.getHours() == 4) {
  4. emit(key, {count: 5});
  5. } else {
  6. emit(key, {count: 1});
  7. }
  8. }

第一个中间输出因此变成:

  1. {resource: 'index', year: 2010, month: 0, day: 20} => [{count: 5}, {count: 1}, {count:1}]

值得注意的是每一次输出是如何按照键的不同来分组生成新的值的。

缩减函数接受中间结果后产生了最后的结果。例子中的缩减函数见下:

  1. function(key, values) {
  2. var sum = 0;
  3. values.forEach(function(value) {
  4. sum += value['count'];
  5. });
  6. return {count: sum};
  7. };

得到的结果是:

  1. {resource: 'index', year: 2010, month: 0, day: 20} => {count: 3}
  2. {resource: 'about', year: 2010, month: 0, day: 20} => {count: 1}
  3. {resource: 'about', year: 2010, month: 0, day: 21} => {count: 3}
  4. {resource: 'index', year: 2010, month: 0, day: 21} => {count: 2}
  5. {resource: 'index', year: 2010, month: 0, day: 22} => {count: 1}

MongoDB中的输出是:

  1. _id: {resource: 'home', year: 2010, month: 0, day: 20}, value: {count: 3}

希望您注意到这个就是我们想要的结果了。

如果您有注意到,可能会问为什么不直接用sum = values.length如果在计算值都是1的数组,这个方法确实是很有效的。可是实际上缩减函数不见得总是会得到完整的中间数据,比如说,不是:

  1. {resource: 'home', year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]

而是像下面这样调用Reduce:

  1. {resource: 'home', year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}]
  2. {resource: 'home', year: 2010, month: 0, day: 20} => [{count: 2}, {count: 1}]

结果应该还是3,不过计算的路径就不一样了。因此,缩减函数必须具有幂等性。也就是说,多次调用该函数和只调用一次的效果应该是一样的。

一个比较常见的做法是将多个缩减函数链接起来实现更加复杂的分析功能,不过我们在这里就不再深入了。

Pure Practical

MongoDB中是对集合使用mapReduce的。mapReduce需要一个映射函数,一个缩减函数以及一个输出指令。在shell中我们可以创建并传递一个JavaScript函数的调用。大多数的库都支持这种将函数当作字符串值的方式(虽然有点难看)。首先我们还是来创建一些数据:

  1. db.hits.insert({resource: 'index', date: new Date(2010, 0, 20, 4, 30)});
  2. db.hits.insert({resource: 'index', date: new Date(2010, 0, 20, 5, 30)});
  3. db.hits.insert({resource: 'about', date: new Date(2010, 0, 20, 6, 0)});
  4. db.hits.insert({resource: 'index', date: new Date(2010, 0, 20, 7, 0)});
  5. db.hits.insert({resource: 'about', date: new Date(2010, 0, 21, 8, 0)});
  6. db.hits.insert({resource: 'about', date: new Date(2010, 0, 21, 8, 30)});
  7. db.hits.insert({resource: 'index', date: new Date(2010, 0, 21, 8, 30)});
  8. db.hits.insert({resource: 'about', date: new Date(2010, 0, 21, 9, 0)});
  9. db.hits.insert({resource: 'index', date: new Date(2010, 0, 21, 9, 30)});
  10. db.hits.insert({resource: 'index', date: new Date(2010, 0, 22, 5, 0)});

然后创建我们自己的映射和缩减函数(MongoDB的shell允许多行声明,回车之后您会看到说明shell在等待后续的输入):

  1. var map = function() {
  2. var key = {resource: this.resource, year: this.date.getFullYear(), month: this.date.getMonth(), day: this.date.getDate()};
  3. emit(key, {count: 1});
  4. };
  5. var reduce = function(key, values) {
  6. var sum = 0;
  7. values.forEach(function(value) {
  8. sum += value['count'];
  9. });
  10. return {count: sum};
  11. };

有了上面两个函数,就可以对hits集合使用mapReduce命令了:

  1. db.hits.mapReduce(map, reduce, {out: {inline:1}})

执行上面的命令后,应该就可以看到期望的输出了。把out设成inline是为了把mapReduce的输出流直接返回到shell中显示。这个功能目前只能用于最多16MB的结果。另外的一个方法就是使用{out: 'hit_stats'}以把结果存储在hit_stats集合里:

  1. db.hits.mapReduce(map, reduce, {out: 'hit_stats'});
  2. db.hit_stats.find();

上面的命令执行之后,hit_stats中既有的数据就会丢失。如果用的是{out: {merge: 'hit_stats'}}已有键的值就会被新的值覆盖且新的键-值组合就会被作为新的文档插入到集合中。最后,我们可以用reduce函数中的out选项处理更复杂的情况(比如说插新)。

第三个参数是一个可选项,比如可以用来过滤、排序或是限制需要分析的文档。也可以将一个finalize方法应用到reduce之后的结果上。

本章小结

这是介绍了MongoDB真正与众不同指出的第一个章节。如果您觉得很不自在,要知道您总是可以使用MongoDB的其他聚合能力事情变得简单一些。不过归根结底,MapReduce是MongoDB最吸引人的功能之一。学会编写映射函数和缩减函数的关键在于把映射输出的数据以及缩减所需要的数据可视化,并真正了解这些数据。