一、MR数据概述
MapReduce(MR)是一种分布式计算框架,它能够处理大型数据集。MR框架将数据分成数个块,由集群中的各个节点并行计算,最终将计算结果合并返回给用户。
MR数据包括输入数据和输出数据,其中输入数据可以来自HDFS、HBase、本地文件系统等各种数据源。输出数据包括任务的计算结果和日志信息。
/* MR数据示例代码 */ public static class WordCountMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
二、MR数据输入
MR数据输入方式有多种,下面将介绍几种常见的输入方式。
1. HDFS文件输入
MR常用的数据源之一是Hadoop分布式文件系统(HDFS),可以直接从HDFS文件中读取数据。使用HDFS文件输入时,需要指定文件路径。
/* HDFS文件输入示例代码 */ FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/input"));
2. 本地文件输入
当需要对本地文件进行处理时,需要使用本地文件系统来读取数据。使用本地文件输入时,需要给定本地文件路径。
/* 本地文件输入示例代码 */ FileInputFormat.addInputPath(job, new Path("/input"));
三、MR数据输出
MR任务的计算结果可以输出到多个数据源,下面就几种常见的输出方式进行介绍。
1. HDFS文件输出
和数据输入一样,HDFS也可以作为MR任务的输出数据源。使用HDFS文件输出时,需要指定输出文件路径。
/* HDFS文件输出示例代码 */ FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/output"));
2. HBase表输出
HBase是分布式非关系型数据库系统,可以将MR任务的计算结果输出到HBase表中。
/* HBase表输出示例代码 */ TableMapReduceUtil.initTableReducerJob("tableName", MyTableReducer.class, job);
3. 数据库表输出
MR任务的计算结构还可以输出到关系型数据库(如MySQL)中,可以通过JDBC驱动将结果写入到数据库表中。
/* MySQL表输出示例代码 */ String output = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=utf-8"; job.setOutputFormatClass(DBOutputFormat.class); DBOutputFormat.setOutput(job, "tableName", "column1", "column2"); DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", output, "root", "password");
四、MR数据处理
MR框架是分布式处理数据的,每个节点都会被分配一个任务来处理数据分块,下面将介绍几种常见的数据处理。
1. WordCount统计
WordCount是MR任务的入门案例,它的目的是统计文本中每个单词出现的次数。
/* WordCount示例代码 */ public static class WordCountMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class WordCountReducer extends Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
2. 数据排序
MR任务可以根据某个值对数据进行排序。排序的方式有多种,包括升序、降序等。
/* 数据排序示例代码 */ public static class SortMapper extends Mapper { private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); int num = Integer.parseInt(word.toString()); context.write(new IntWritable(num), new Text("")); } } } public static class SortReducer extends Reducer { public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } }
3. 数据聚合
MR任务可以将数据进行聚合,将结果输出到单个文件中。
/* 数据聚合示例代码 */ public static class GroupMapper extends Mapper { private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, new IntWritable(1)); } } } public static class GroupReducer extends Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } /* 将结果聚合成一个文件 */ job.setNumReduceTasks(1);
五、MR数据输出控制
MR框架默认将所有的Mapper和Reducer的输出结果都输出到文件中,用户也可以指定一部分结果输出到控制台或者其他数据源。
1. 输出到控制台
可以将Mapper或Reducer的结果输出到控制台。在Mapper或Reducer中,可以使用System.out.println语句进行输出。
/* 输出到控制台示例代码 */ public static class ConsoleMapper extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); System.out.println(word.toString() + "t" + 1); context.write(word, one); } } }
2. 输出到多个文件
MR任务的输出结果还可以指定写入到多个文件中,使用MultipleOutputs类可以轻松实现该功能。
/* 输出到多个文件示例代码 */ private MultipleOutputs mos; public void setup(Context context) { mos = new MultipleOutputs(context); } public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } mos.write("output1", key, new IntWritable(sum), "/output1/part"); mos.write("output2", key, new IntWritable(sum), "/output2/part"); } public void cleanup(Context context) throws IOException, InterruptedException { mos.close(); }
六、MR数据调试
在MR任务执行过程中,可能会出现各种问题,比如输入数据格式错误、Mapper和Reducer处理逻辑有误等。为了能够快速定位和解决问题,需要对MR任务进行调试。
可以使用本地模式、伪分布式模式进行调试,在本地模式下MR任务会运行在单机上。
/* MR任务本地模式示例代码 */ Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WordCountMapper.class); job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/input")); FileOutputFormat.setOutputPath(job, new Path("/output")); System.exit(job.waitForCompletion(true) ? 0 : 1);