例子:词频统计 WordCount 程序

下面是 Hadoop 提供的词频统计 WordCount 程序 示例。运行运行改程序之前,请确保 HDFS 已经启动。

  1. import java.io.BufferedReader;
  2. import java.io.FileReader;
  3. import java.io.IOException;
  4. import java.net.URI;
  5. import java.util.ArrayList;
  6. import java.util.HashSet;
  7. import java.util.List;
  8. import java.util.Set;
  9. import java.util.StringTokenizer;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.fs.Path;
  12. import org.apache.hadoop.io.IntWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.Mapper;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  19. import org.apache.hadoop.mapreduce.Counter;
  20. import org.apache.hadoop.util.GenericOptionsParser;
  21. import org.apache.hadoop.util.StringUtils;
  22. public class WordCount2 {
  23. public static class TokenizerMapper
  24. extends Mapper<Object, Text, Text, IntWritable>{
  25. static enum CountersEnum { INPUT_WORDS }
  26. private final static IntWritable one = new IntWritable(1);
  27. private Text word = new Text();
  28. private boolean caseSensitive;
  29. private Set<String> patternsToSkip = new HashSet<String>();
  30. private Configuration conf;
  31. private BufferedReader fis;
  32. @Override
  33. public void setup(Context context) throws IOException,
  34. InterruptedException {
  35. conf = context.getConfiguration();
  36. caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
  37. if (conf.getBoolean("wordcount.skip.patterns", true)) {
  38. URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
  39. for (URI patternsURI : patternsURIs) {
  40. Path patternsPath = new Path(patternsURI.getPath());
  41. String patternsFileName = patternsPath.getName().toString();
  42. parseSkipFile(patternsFileName);
  43. }
  44. }
  45. }
  46. private void parseSkipFile(String fileName) {
  47. try {
  48. fis = new BufferedReader(new FileReader(fileName));
  49. String pattern = null;
  50. while ((pattern = fis.readLine()) != null) {
  51. patternsToSkip.add(pattern);
  52. }
  53. } catch (IOException ioe) {
  54. System.err.println("Caught exception while parsing the cached file '"
  55. + StringUtils.stringifyException(ioe));
  56. }
  57. }
  58. @Override
  59. public void map(Object key, Text value, Context context
  60. ) throws IOException, InterruptedException {
  61. String line = (caseSensitive) ?
  62. value.toString() : value.toString().toLowerCase();
  63. for (String pattern : patternsToSkip) {
  64. line = line.replaceAll(pattern, "");
  65. }
  66. StringTokenizer itr = new StringTokenizer(line);
  67. while (itr.hasMoreTokens()) {
  68. word.set(itr.nextToken());
  69. context.write(word, one);
  70. Counter counter = context.getCounter(CountersEnum.class.getName(),
  71. CountersEnum.INPUT_WORDS.toString());
  72. counter.increment(1);
  73. }
  74. }
  75. }
  76. public static class IntSumReducer
  77. extends Reducer<Text,IntWritable,Text,IntWritable> {
  78. private IntWritable result = new IntWritable();
  79. public void reduce(Text key, Iterable<IntWritable> values,
  80. Context context
  81. ) throws IOException, InterruptedException {
  82. int sum = 0;
  83. for (IntWritable val : values) {
  84. sum += val.get();
  85. }
  86. result.set(sum);
  87. context.write(key, result);
  88. }
  89. }
  90. public static void main(String[] args) throws Exception {
  91. Configuration conf = new Configuration();
  92. GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
  93. String[] remainingArgs = optionParser.getRemainingArgs();
  94. if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) {
  95. System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
  96. System.exit(2);
  97. }
  98. Job job = Job.getInstance(conf, "word count");
  99. job.setJarByClass(WordCount2.class);
  100. job.setMapperClass(TokenizerMapper.class);
  101. job.setCombinerClass(IntSumReducer.class);
  102. job.setReducerClass(IntSumReducer.class);
  103. job.setOutputKeyClass(Text.class);
  104. job.setOutputValueClass(IntWritable.class);
  105. List<String> otherArgs = new ArrayList<String>();
  106. for (int i=0; i < remainingArgs.length; ++i) {
  107. if ("-skip".equals(remainingArgs[i])) {
  108. job.addCacheFile(new Path(remainingArgs[++i]).toUri());
  109. job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
  110. } else {
  111. otherArgs.add(remainingArgs[i]);
  112. }
  113. }
  114. FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
  115. FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
  116. System.exit(job.waitForCompletion(true) ? 0 : 1);
  117. }
  118. }

待输入的样本文件如下:

  1. $ bin/hadoop fs -ls /user/joe/wordcount/input/
  2. /user/joe/wordcount/input/file01
  3. /user/joe/wordcount/input/file02
  4. $ bin/hadoop fs -cat /user/joe/wordcount/input/file01
  5. Hello World, Bye World!
  6. $ bin/hadoop fs -cat /user/joe/wordcount/input/file02
  7. Hello Hadoop, Goodbye to hadoop.

运行程序:

  1. $ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

输出如下:

  1. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
  2. Bye 1
  3. Goodbye 1
  4. Hadoop, 1
  5. Hello 2
  6. World! 1
  7. World, 1
  8. hadoop. 1
  9. to 1

通过 DistributedCache 来设置单词过滤的策略:

  1. $ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
  2. \.
  3. \,
  4. \!
  5. to

再次运行,这次增加了更多的选项:

  1. $ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

输出如下:

  1. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
  2. Bye 1
  3. Goodbye 1
  4. Hadoop 1
  5. Hello 2
  6. World 2
  7. hadoop 1

再次运行,这次去掉了大小写敏感:

  1. $ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

输出如下:

  1. $ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
  2. bye 1
  3. goodbye 1
  4. hadoop 2
  5. hello 2
  6. horld 2