MapReduce 编程实践

发布于 2022-05-09  123 次阅读


数据去重

class MyMapper extends Mapper<Object, Text, Text, NullWritale> {
    @Override
    protected void map(Object key, Text val, Context context) {
        context.write(val,NullWritable.get());
    }
}
class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Overeide
    protected void reduce(Text key, Iterable<NullWritable> val, Context context) {
        context.write(key, NullWritable.get());
    }
}
class MyDriver {
    public static void main() {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"work_name");

        job.setJarByClass(MyDriver.class);

        job.setMapperClass(MyMapper.class);
        job.setCombinerClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.addInputPath(job, new path("/a"));
        FileOutputFormat.setOutputPath(job, new Path("/b"));

        Systerm.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

数据排序

class MyMapper extends Mapper<Object, Text, IntWritalbe, IntWritable> {
    @Override
    protected void map(Object key,Text val, Context context)    {
        String line = val.toString();
        context.write(new IntWritable(Integer.parseInt(val)), new IntWritable(1));
    }
}
class MyReducer extends Reducer<IntWritable, IntWritable, IntWritale, IntWritable> {
    private static k = 0;

    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> vals, Context context) {
        for(IntWritable val : vals)
            context.write(new IntWritable(++k), new IntWritable(key));
    }
}
class MyDriver {
    public static void main(){
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "work_name");

        job.setJarByClass(MyDriver.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutpiyValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path("/a"));
        FileOutputFormat.setOutputPaths(job, new Path(:/b));

        System.exit(job.waitForCompletion(ture) ? 0 : 1);
    }
}

平均成绩

class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
    @Override
    protected void map(Object key, Text val) {
        String[] s = val.toString().split(" ");
        context.write(new Text(s[0]), new IntWritable(Integer.parseInt(s[1])));
    }
}
class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key,Iterable<IntWritable> vals, Context context) {
        int sum = 0;
        int cnt = 0;
        for(IntWritable val : vals) {
            cnt++;
            sum += val.get()
        }
        context.write(key, new IntWritable(sum/cnt));
    }
}
class MyDriver(){
    public static void main(){
        Configuration conf = new Configuraion();
        Job job = Job.getInstance(conf, "work_name");

        job.setJarByClass(MyDriver.class);

        job.setMapperClass(MyMapper.class);
        job.setCombinerClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path("/a"));
        FileOutputFormat.setOutputPath(job, new Path("/b"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

多表关联

class AllCity extends mapper<LongWritable, Text, Text, Text> {
    public static String LABEL = "a_";
    @Override
    protected void map(LongWritable key, Text val, Context context) {
        String name = val.toString();
        context.write(val, new Text(LABEL + name));
    }
}
class SomeCity extends mapper<LongWritable, Text, Text, Text> {
    public static String LABEL = "s_";
    @Override
    protected void map(LongWritable key, Text val, Context context) {
        String s = val.toString().split(" ");
        String name = s[0];
        context.write(new Text(name), new Text(LABEL + name));
    }
}
class MyReducer extends Reducer<Text, Text, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Text> vals, Context context) {
        boolean a = false;
        boolean s = false;
        for(Text val : vals) {
            String ss = val.toString();
            if(ss.startWith(SomeCity.LABEL))
                s = true;
            else if(ss.startWith(AllCity.LABEL))
                a = true;
        }
        if(a && !b)
            context.write(key,NullWritable.get());
    }
}
class MyDriver {
    public static void main() {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "work_name");
        job.setJarByClass(MyDriver.class);

        MutipleInputs.addInputPath(job, new Path("/allcity"), TextInputFormat.class, AllCity.class);
        MutipleInputs.addInputPath(job,new Path("/somecity"),TextInputFormat.calss, SomeCity.class);

        FileOutputFormat.setOutputPath(job, new Path("/b"));

        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputvalueClass(NullWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

二次排序

class MyKey implements WritableComparable<MyKey> {
    private int a, b;

    public MyKey(){}
    public MyKey(int a, int b){
        this.a = a;
        this.b = b;
    }

    @Override
    public void write(DataOutput out){
        out.writeInt(a);
        out.writeInt(b);
    }
    @Override
    public void readFields(DataInput in){
        this.a = in.readInt();
        this.b = in.readInt(;)
    }

    @Override
    public String toString(){
        return a + " " + b;
    }

    @Override
    public int compareTo(MyKey x){
        if(x.a != this.a) return this.a - x.a;
        else return this.b - x.b;
    }
}
class MyMapper extends Mapper<Object, Text, MyKey, IntWritable> {
    private static IntWritable num = new IntWritable(1);
    @Override
    public void map(Object key,Text val, Context context) {
        String s = val.toString.split(" ");
        int a = Integer.parseInt(s[0]);
        int b = Integer.parseInt(s[1]);

        context.write(new MyKey(a, b), num);
    }
}
class MuReducer extends Reducer<MyKey, IntWritable, NewKey, NullWritable> {
    @Override
    protected void reduce(MyKey key, Iterable<IntWritable> vals, Context context) {
        for(IntWritable val : vals)
            context.write(key, NullWritable.get());
    }
}
class MyDriver() {
    public static void main(){
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "work_name");

        job.setJarByClass(MuDriver.class);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(MyKey.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(MyKey.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, "/a");
        FileOutputFormat.setOutputPaths(job, "/b");

        System.exit(job.waitForCompletion(true) ? 0 : 1)
    }
}