MapReduce中的二次排序

  |   0 评论   |   760 浏览

自定义MapOutputKey

public class MissAnalysisPair implements WritableComparable<MissAnalysisPair> {
 private Text clientid;
 private LongWritable sid;
 private Text checkin;
 private Text checkout;
 private LongWritable pvid;
@Override
 public void write(DataOutput out) throws IOException {
 clientid.write(out);
 sid.write(out);
 checkin.write(out);
 checkout.write(out);
 pvid.write(out);
 }
@Override
 public void readFields(DataInput in) throws IOException {
 clientid.readFields(in);
 sid.readFields(in);
 checkin.readFields(in);
 checkout.readFields(in);
 pvid.readFields(in);
 }
@Override
 public int compareTo(MissAnalysisPair o) {
int com1 = clientid.compareTo(o.clientid);
 int com2 = sid.compareTo(o.sid);
 int com3 = checkin.compareTo(o.checkin);
 int com4 = checkout.compareTo(o.checkout);
 int com5 = pvid.compareTo(o.pvid);
return (com1 == 0 ? (com2 == 0 ? (com3 == 0 ? (com4 == 0 ? com5 : com4) : com3) : com2) : com1);
 }
@Override
 public int hashCode() {
 final int prime = 31;
 int result = 1;
 result = prime * result + ((checkin == null) ? 0 : checkin.hashCode());
 result = prime * result + ((checkout == null) ? 0 : checkout.hashCode());
 result = prime * result + ((clientid == null) ? 0 : clientid.hashCode());
 result = prime * result + ((pvid == null) ? 0 : pvid.hashCode());
 result = prime * result + ((sid == null) ? 0 : sid.hashCode());
 return result;
 }
@Override
 public boolean equals(Object obj) {
 if (obj instanceof MissAnalysisPair) {
 MissAnalysisPair that = (MissAnalysisPair) obj;
 return this.compareTo(that) == 0 ? true : false;
 }
 return false;
 }
}

自定义SortComparator

注意此处的构造方法


public class MASortComparator extends WritableComparator {
 public MASortComparator(){
 super(MissAnalysisPair.class,true);
 }
@SuppressWarnings("rawtypes")
 @Override
 public int compare(WritableComparable a, WritableComparable b) {
 MissAnalysisPair t = (MissAnalysisPair)a;
 MissAnalysisPair o = (MissAnalysisPair)b;
 return t.compareTo(o);
 }
}

自定义Partitioner

public class MAPartitioner extends Partitioner<MissAnalysisPair, Text>{
@Override
 public int getPartition(MissAnalysisPair key, Text value, int numPartitions) {
 return Math.abs(key.getClientid().hashCode() * 127) % numPartitions;
 }
}

自定义GroupingComparator

public class MAGroupComparator extends WritableComparator {
 public MAGroupComparator(){
 super(MissAnalysisPair.class, true);
 }
 
 @SuppressWarnings("rawtypes")
 @Override
 public int compare(WritableComparable a, WritableComparable b) {
 MissAnalysisPair t = (MissAnalysisPair)a;
 MissAnalysisPair o = (MissAnalysisPair)b;
 
 int com1 = t.getClientid().compareTo(o.getClientid());
 int com2 = t.getSid().compareTo(o.getSid());
 int com3 = t.getCheckin().compareTo(o.getCheckin());
 int com4 = t.getCheckout().compareTo(o.getCheckout());
 
 
 return (com1==0?(com2==0?(com3==0?com4:com3):com2):com1);
 }
}

注意此处的构造方法

自定义FileOutputFormat

public class HivePreOutputFormat extends FileOutputFormat<UserInfoHiveBean, NullWritable> {
private final Log LOG = LogFactory.getLog(HivePreOutputFormat.class);
 int a = 0, b = 0, c = 0, d = 0, e = 0;
@Override
 public RecordWriter<UserInfoHiveBean, NullWritable> getRecordWriter(TaskAttemptContext context)
 throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
 Path uib = null;
 Path seb = null;
 Path erb = null;
 Path evb = null;
 Path pab = null;
 try {
 uib = new Path("hdfs://ns1/com/cxy7/userinfobean");
 seb = new Path("hdfs://ns1/com/cxy7/sessionbean");
 erb = new Path("hdfs://ns1/com/cxy7/errorbean");
 evb = new Path("hdfs://ns1/com/cxy7/eventbean");
 pab = new Path("hdfs://ns1/com/cxy7/paramterbean");
 } catch (Exception e) {
 e.printStackTrace();
 System.out.println("path后面的参数错了!!!");
 }
FSDataOutputStream usOut = fs.create(uib);
 FSDataOutputStream seOut = fs.create(seb);
 FSDataOutputStream erOut = fs.create(erb);
 FSDataOutputStream evOut = fs.create(evb);
 FSDataOutputStream paOut = fs.create(pab);
return new HivePreRecordWriter(usOut, seOut, erOut, evOut, paOut);
 }
public static class HivePreRecordWriter extends RecordWriter<UserInfoHiveBean, NullWritable> {
private FSDataOutputStream usOut;
 private FSDataOutputStream seOut;
 private FSDataOutputStream erOut;
 private FSDataOutputStream evOut;
 private FSDataOutputStream paOut;
public HivePreRecordWriter(FSDataOutputStream usOut, FSDataOutputStream seOut, FSDataOutputStream erOut,
 FSDataOutputStream evOut, FSDataOutputStream paOut) {
 this.usOut = usOut;
 this.seOut = seOut;
 this.erOut = erOut;
 this.evOut = evOut;
 this.paOut = paOut;
 }
@Override
 public void write(UserInfoHiveBean uib, NullWritable arg1) throws IOException, InterruptedException {
 String uLine = uib.toString();
 System.out.println(uLine + "到了write了");
 usOut.write(uLine.getBytes());
 TreeSetWritable<SessionHiveBean> treeSession = uib.getTreeSession();
 for (SessionHiveBean shb : treeSession) {
 String sline = shb.toString();
 seOut.write(sline.getBytes());
 TreeSetWritable<ErrorsHiveBean> treeErrors = shb.getTreeErrors();
 for (ErrorsHiveBean ehb : treeErrors) {
 String eline = ehb.toString();
 erOut.write(eline.getBytes());
 }
 TreeSetWritable<EventsHiveBean> treeEvents = shb.getTreeEvents();
 for (EventsHiveBean evb : treeEvents) {
 String evline = evb.toString();
 evOut.write(evline.getBytes());
 TreeSetWritable<ParamterHiveBean> treeParams = evb.getTreeParams();
 for (ParamterHiveBean phb : treeParams) {
 String pline = phb.toString();
 paOut.write(pline.getBytes());
 }
 }
 }
 }
@Override
 public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
 usOut.close();
 seOut.close();
 erOut.close();
 evOut.close();
 paOut.close();
 }
 }
}


读后有收获可以支付宝请作者喝咖啡