MapReduce开发基础知识

  |   0 评论   |   814 浏览

基本流程

写Mapper

public static class TokenizerMapper
  extends Mapper<Object, Text, Text, IntWritable>{
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
    StringTokenizer itr = new StringTokenizer(value.toString());
    while (itr.hasMoreTokens()) {
      word.set(itr.nextToken());
      context.write(word, one);
  }
}

  Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。

写Reducer

public static class IntSumReducer
  extends Reducer<Text,IntWritable,Text,IntWritable> {
  private IntWritable result = new IntWritable();
  public void reduce(Text key, Iterable<IntWritable> values,Context context)
     throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }
}

  Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

写驱动类

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
    System.err.println("Usage: wordcount <in> <out>");
    System.exit(2);
  }
  Job job = new Job(conf, "cxy7.com-word_count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
}

打包运行

设置

指定输入格式

通过参数

conf.set("mapreduce.inputformat.class", "com.hadoop.mapreduce.LzoTextInputFormat");

通过org.apache.hadoop.mapreduce.Job类

job.setInputFormatClass(LzoTextInputFormat.class);

添加输入文件

添加一批

FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));

添加单一路径

FileInputFormat.addInputPath(job, new Path(""));

输出压缩

boolean isCompress = conf.getBoolean(FileOutputFormat.COMPRESS, false);
 if (isCompress) {
 TextOutputFormat.setCompressOutput(job, true);
 TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
 }

多输入(单任务处理多目录、多格式文件)

List<Path> paths = HDFSUtil.getMultDataSourcesByPattern(conf, pattern);
 for (Path path : paths) {
 logger.info("point:{}, path:{}", point, path.toString());
 MultipleInputs.addInputPath(job, path, AvroKeyInputFormat.class, SearchMapper.class);
 }

多输出(单任务输入多类数据、多格式)

普通格式

定义

private MultipleOutputs<AvroKey<GenericRecord>, NullWritable> mo = null;

初始化

@Override
protected void setup(Context context) throws IOException, InterruptedException {
 this.mo = new MultipleOutputs<AvroKey<GenericRecord>, NullWritable>(context);
}

使用

mo.write(new AvroKey<GenericRecord>(datum), NullWritable.get(), dt + "/" + taskId);

关闭(切记)

@Override
 protected void cleanup(
 Reducer<MissAnalysisPair, Text, AvroKey<GenericRecord>, NullWritable>.Context context)
 throws IOException, InterruptedException {
 super.cleanup(context);
 mo.close();
 }

Avro格式

声明

job.setOutputFormatClass(AvroKeyOutputFormat.class);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "MissingAnalysis", AvroKeyOutputFormat.class, schema);

定义

private AvroMultipleOutputs amos = null;

初始化

@Override
 protected void setup(Context context) throws IOException, InterruptedException {
 this.amos = new AvroMultipleOutputs(context);
}

使用

amos.write("MissingAnalysis", new AvroKey<GenericRecord>(datum), NullWritable.get(), dt + "/" + taskId);

关闭(切记)

@Override
 protected void cleanup(
 Reducer<MissAnalysisPair, Text, AvroKey<GenericRecord>, NullWritable>.Context context)
 throws IOException, InterruptedException {
 super.cleanup(context);
 amos.close();
 }

Mapper中获取当前输入文件的全路径及父目录

private Path getPath(Context context) {
 InputSplit split = context.getInputSplit();
 Class<? extends InputSplit> splitClass = split.getClass();
FileSplit fileSplit = null;
 if (splitClass.equals(FileSplit.class)) {
 fileSplit = (FileSplit) split;
 } else if (splitClass.getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
 try {
 Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
 getInputSplitMethod.setAccessible(true);
 fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 return fileSplit.getPath();
 }
/**
 * 获取当前map读取文件的全路径
 * @author    cxy7.com
 * @param context
 * @return
 */
 protected String getFilePath(Context context) {
 return getPath(context).toString();
 }
/**
 * 获取当前map读取文件所属目录全路径(末尾不带/)
 * @author    cxy7.com
 * @param context
 * @return
 */
 protected String getDirectoryPath(Context context) {
 String path = getPath(context).getParent().toString();
 if (path.endsWith("/"))
 path = path.substring(0, path.length() - 1);
 return path;
 }

为lzo文件建索引

代码指定

DistributedLzoIndexer indexer = new DistributedLzoIndexer();
 Job indexerJob = indexer.getJob(conf);
 if (indexerJob != null)
 {
 if(!indexerJob.waitForCompletion(true)){
 logger.error("create lzo index failed.");
 SendMailUtils.sendMail(conf.get(CommonConstant.COMMON_TASK_ALARM_MAIL_ADDRESSES),conf.get(CommonConstant.MAPREDUCE_JOB_NAME) + "Lzo索引创建失败");
 return false;
 }
 }

命令行为HDFS文件创建索引

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/lib/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer $DATA_DIR

命令行为本地文件创建索引

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/lib/hadoop-lzo.jar com.hadoop.compression.lzo.LzoIndexer $DATA_DIR
读后有收获可以支付宝请作者喝咖啡