数据去重
class MyMapper extends Mapper<Object, Text, Text, NullWritale> {
@Override
protected void map(Object key, Text val, Context context) {
context.write(val,NullWritable.get());
}
}
class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Overeide
protected void reduce(Text key, Iterable<NullWritable> val, Context context) {
context.write(key, NullWritable.get());
}
}
class MyDriver {
public static void main() {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"work_name");
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new path("/a"));
FileOutputFormat.setOutputPath(job, new Path("/b"));
Systerm.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
数据排序
class MyMapper extends Mapper<Object, Text, IntWritalbe, IntWritable> {
@Override
protected void map(Object key,Text val, Context context) {
String line = val.toString();
context.write(new IntWritable(Integer.parseInt(val)), new IntWritable(1));
}
}
class MyReducer extends Reducer<IntWritable, IntWritable, IntWritale, IntWritable> {
private static k = 0;
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> vals, Context context) {
for(IntWritable val : vals)
context.write(new IntWritable(++k), new IntWritable(key));
}
}
class MyDriver {
public static void main(){
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "work_name");
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutpiyValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/a"));
FileOutputFormat.setOutputPaths(job, new Path(:/b));
System.exit(job.waitForCompletion(ture) ? 0 : 1);
}
}
平均成绩
class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text val) {
String[] s = val.toString().split(" ");
context.write(new Text(s[0]), new IntWritable(Integer.parseInt(s[1])));
}
}
class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key,Iterable<IntWritable> vals, Context context) {
int sum = 0;
int cnt = 0;
for(IntWritable val : vals) {
cnt++;
sum += val.get()
}
context.write(key, new IntWritable(sum/cnt));
}
}
class MyDriver(){
public static void main(){
Configuration conf = new Configuraion();
Job job = Job.getInstance(conf, "work_name");
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/a"));
FileOutputFormat.setOutputPath(job, new Path("/b"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
多表关联
class AllCity extends mapper<LongWritable, Text, Text, Text> {
public static String LABEL = "a_";
@Override
protected void map(LongWritable key, Text val, Context context) {
String name = val.toString();
context.write(val, new Text(LABEL + name));
}
}
class SomeCity extends mapper<LongWritable, Text, Text, Text> {
public static String LABEL = "s_";
@Override
protected void map(LongWritable key, Text val, Context context) {
String s = val.toString().split(" ");
String name = s[0];
context.write(new Text(name), new Text(LABEL + name));
}
}
class MyReducer extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> vals, Context context) {
boolean a = false;
boolean s = false;
for(Text val : vals) {
String ss = val.toString();
if(ss.startWith(SomeCity.LABEL))
s = true;
else if(ss.startWith(AllCity.LABEL))
a = true;
}
if(a && !b)
context.write(key,NullWritable.get());
}
}
class MyDriver {
public static void main() {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "work_name");
job.setJarByClass(MyDriver.class);
MutipleInputs.addInputPath(job, new Path("/allcity"), TextInputFormat.class, AllCity.class);
MutipleInputs.addInputPath(job,new Path("/somecity"),TextInputFormat.calss, SomeCity.class);
FileOutputFormat.setOutputPath(job, new Path("/b"));
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputvalueClass(NullWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
二次排序
class MyKey implements WritableComparable<MyKey> {
private int a, b;
public MyKey(){}
public MyKey(int a, int b){
this.a = a;
this.b = b;
}
@Override
public void write(DataOutput out){
out.writeInt(a);
out.writeInt(b);
}
@Override
public void readFields(DataInput in){
this.a = in.readInt();
this.b = in.readInt(;)
}
@Override
public String toString(){
return a + " " + b;
}
@Override
public int compareTo(MyKey x){
if(x.a != this.a) return this.a - x.a;
else return this.b - x.b;
}
}
class MyMapper extends Mapper<Object, Text, MyKey, IntWritable> {
private static IntWritable num = new IntWritable(1);
@Override
public void map(Object key,Text val, Context context) {
String s = val.toString.split(" ");
int a = Integer.parseInt(s[0]);
int b = Integer.parseInt(s[1]);
context.write(new MyKey(a, b), num);
}
}
class MuReducer extends Reducer<MyKey, IntWritable, NewKey, NullWritable> {
@Override
protected void reduce(MyKey key, Iterable<IntWritable> vals, Context context) {
for(IntWritable val : vals)
context.write(key, NullWritable.get());
}
}
class MyDriver() {
public static void main(){
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "work_name");
job.setJarByClass(MuDriver.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(MyKey.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, "/a");
FileOutputFormat.setOutputPaths(job, "/b");
System.exit(job.waitForCompletion(true) ? 0 : 1)
}
}
Comments | NOTHING