MR数据用法介绍(在MR数据集上进行数据预处理)

一、MR数据概述

MapReduce(MR)是一种分布式计算框架,它能够处理大型数据集。MR框架将数据分成数个块,由集群中的各个节点并行计算,最终将计算结果合并返回给用户。

MR数据包括输入数据和输出数据,其中输入数据可以来自HDFS、HBase、本地文件系统等各种数据源。输出数据包括任务的计算结果和日志信息。

    /* MR数据示例代码 */
    public static class WordCountMapper extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

二、MR数据输入

MR数据输入方式有多种,下面将介绍几种常见的输入方式。

1. HDFS文件输入

MR常用的数据源之一是Hadoop分布式文件系统(HDFS),可以直接从HDFS文件中读取数据。使用HDFS文件输入时,需要指定文件路径。

    /* HDFS文件输入示例代码 */
    FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/input"));

2. 本地文件输入

当需要对本地文件进行处理时,需要使用本地文件系统来读取数据。使用本地文件输入时,需要给定本地文件路径。

    /* 本地文件输入示例代码 */
    FileInputFormat.addInputPath(job, new Path("/input"));

三、MR数据输出

MR任务的计算结果可以输出到多个数据源,下面就几种常见的输出方式进行介绍。

1. HDFS文件输出

和数据输入一样,HDFS也可以作为MR任务的输出数据源。使用HDFS文件输出时,需要指定输出文件路径。

    /* HDFS文件输出示例代码 */
    FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/output"));

2. HBase表输出

HBase是分布式非关系型数据库系统,可以将MR任务的计算结果输出到HBase表中。

    /* HBase表输出示例代码 */
    TableMapReduceUtil.initTableReducerJob("tableName", MyTableReducer.class, job);

3. 数据库表输出

MR任务的计算结构还可以输出到关系型数据库(如MySQL)中,可以通过JDBC驱动将结果写入到数据库表中。

    /* MySQL表输出示例代码 */
    String output = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf-8";
    job.setOutputFormatClass(DBOutputFormat.class);
    DBOutputFormat.setOutput(job, "tableName", "column1", "column2");
    DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", output, "root", "password");

四、MR数据处理

MR框架是分布式处理数据的,每个节点都会被分配一个任务来处理数据分块,下面将介绍几种常见的数据处理。

1. WordCount统计

WordCount是MR任务的入门案例,它的目的是统计文本中每个单词出现的次数。

    /* WordCount示例代码 */
    public static class WordCountMapper extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    
    public static class WordCountReducer extends Reducer {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

2. 数据排序

MR任务可以根据某个值对数据进行排序。排序的方式有多种,包括升序、降序等。

    /* 数据排序示例代码 */
    public static class SortMapper extends Mapper {
        private Text word = new Text();
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                int num = Integer.parseInt(word.toString());
                context.write(new IntWritable(num), new Text(""));
            }
        }
    }
    
    public static class SortReducer extends Reducer {
        public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {
            context.write(key, new Text(""));
        }
    }

3. 数据聚合

MR任务可以将数据进行聚合,将结果输出到单个文件中。

    /* 数据聚合示例代码 */
    public static class GroupMapper extends Mapper {
        private Text word = new Text();
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, new IntWritable(1));
            }
        }
    }
    
    public static class GroupReducer extends Reducer {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    
    /* 将结果聚合成一个文件 */
    job.setNumReduceTasks(1);

五、MR数据输出控制

MR框架默认将所有的Mapper和Reducer的输出结果都输出到文件中,用户也可以指定一部分结果输出到控制台或者其他数据源。

1. 输出到控制台

可以将Mapper或Reducer的结果输出到控制台。在Mapper或Reducer中,可以使用System.out.println语句进行输出。

    /* 输出到控制台示例代码 */
    public static class ConsoleMapper extends Mapper {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                System.out.println(word.toString() + "t" + 1);
                context.write(word, one);
            }
        }
    }

2. 输出到多个文件

MR任务的输出结果还可以指定写入到多个文件中,使用MultipleOutputs类可以轻松实现该功能。

    /* 输出到多个文件示例代码 */
    private MultipleOutputs mos;
        
    public void setup(Context context) {
        mos = new MultipleOutputs(context);
    }
    
    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        mos.write("output1", key, new IntWritable(sum), "/output1/part");
        mos.write("output2", key, new IntWritable(sum), "/output2/part");
    }
    
    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

六、MR数据调试

在MR任务执行过程中,可能会出现各种问题,比如输入数据格式错误、Mapper和Reducer处理逻辑有误等。为了能够快速定位和解决问题,需要对MR任务进行调试。

可以使用本地模式、伪分布式模式进行调试,在本地模式下MR任务会运行在单机上。

    /* MR任务本地模式示例代码 */
    Job job = Job.getInstance(conf);
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordCountMapper.class);
    job.setCombinerClass(WordCountReducer.class);
    job.setReducerClass(WordCountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path("/input"));
    FileOutputFormat.setOutputPath(job, new Path("/output"));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

Published by

风君子

独自遨游何稽首 揭天掀地慰生平