1 public class GroupComparator implements RawComparator{ 2 3 @Override 4 public int compare(MyBinaryKey o1, MyBinaryKey o2) { 5 return o1.toString().compareTo(o2.toString()); 6 } 7 8 @Override 9 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {10 return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2, Long.SIZE / 8 + Integer.SIZE / 8 * 3);11 }12 13 }14 15 public abstract class UVBinaryKey extends BinaryComparable implements WritableComparable {16 //根据需要添加属性;17 @Override18 public void readFields(DataInput in) throws IOException {19 20 } 21 22 @Override23 public byte[] getBytes() {24 25 }26 27 }28 29 public class MyPartitioner extends Partitioner {30 31 /**32 * 根据uv/ip取模分区,保证相同uv/ip落在同一分区33 */34 @Override35 public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) {36 37 int k=0;38 for(byte b : key.getAttr()){39 k+=b&0xff;40 }41 return k%numPartitions;42 }43 44 }45 46 47 48 job.setMapOutputKeyClass(UVBinaryKey.class);49 job.setGroupingComparatorClass(GroupComparator.class);50 job.setPartitionerClass(MyPartitioner.class);51 52 map 略
1 combiner(根据需要添加) 2 reduce中的实现: 3 @Override 4 protected void reduce(UVBinaryKey key, Iterablevalues, Context context) 5 throws IOException, 6 InterruptedException { 7 long count = 0; 8 byte[] tbsign = null; 9 for (NullWritable nullWritable : values) {10 byte[] attr = key.getAttr();11 if (tbsign == null) {12 tbsign = attr;13 count++;14 }15 if (tbsign != null) {16 if (tbsign.length != attr.length) {17 count++;18 tbsign = attr;19 } else {20 for (int i = 0; i < tbsign.length; i++) {21 if (tbsign[i] != attr[i]) {22 count++;23 tbsign = attr;24 break;25 }26 }27 }28 }29 30 }31 StringBuffer out = new StringBuffer();32 out.append(new String(key.getCity()))33 .append(Constants.FIELDS_TERMINATED).append(count);34 context.write(new Text(out.toString()), NullWritable.get());35 36 }