第一题
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRXXY01 {
public static class Map extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\n");
while(tokenizer.hasMoreElements()) {
StringTokenizer lineTokenizer = new StringTokenizer(tokenizer.nextToken());
String studentId = lineTokenizer.nextToken(); //学生ID
String classId = lineTokenizer.nextToken(); //班级ID
int course1 = Integer.parseInt(lineTokenizer.nextToken()); //课程1成绩
int course2 = Integer.parseInt(lineTokenizer.nextToken()); //课程2成绩
int course3 = Integer.parseInt(lineTokenizer.nextToken()); //课程3成绩
int sum = course1 + course2 + course3;
int average = sum / 3;
context.write(new Text(studentId), new Text(classId + " " + average + " " + sum));
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
public static void main(String[] args) throws Exception{
String input="hdfs://localhost:9000/user/xuxuanyu/input01",
output="hdfs://localhost:9000/user/xuxuanyu/output01";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MRXXY01.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
第二题
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class MRXXY02 {
public static int time = 0;
public static class RelationMap extends Mapper<Object, Text, Text, Text> {
private Text classID = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//下面两步能获取当前行数据的输入文件名称
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String name = fileSplit.getPath().getName();
//将当前行数据转换为标准的String
String line = value.toString();
//若数据无效则丢弃
if (line == null || line.equals("")) return;
//根据空格进行分割
String[] s = line.split("\\s+");
if (name.contains("part-r-00000")) {
String Classid = s[1];
String studentid =s[0];
String avrg = s[2];
String sum = s[3];
context.write(new Text(Classid), new Text("#" + studentid + " " + avrg + " " + sum));
} else if (name.contains("mrxxu02.txt")) {
String Classid =s[0];
String classname =s[1];
String detpname =s[2];
context.write(new Text(Classid), new Text("$" + classname + " " + detpname));
}
}
}
public static class RelationReduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//list1存表一带来的数据
List<String> list1 = new LinkedList<>();
//list2存表二带来的数据
List<String> list2 = new LinkedList<>();
//遍历values
for (Text text : values) {
String value = text.toString();
//如果value数据以#开头,则为表一中的数据,添加至list1中
if (value.startsWith("#")) {
value = value.substring(1);
list1.add(value);
} else if (value.startsWith("$")) {
//如果value数据以$开头,则为表二中的数据,添加至list2中
value = value.substring(1);
list2.add(value);
}
}
//将两表id相同的数据进行笛卡尔积,key为id,value为list1与list2的组合
for (String a : list1) {
for (String b : list2) {
context.write(key, new Text(b + " " + a));
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();// 加载hadoop配置
conf.set("fs.defaultFS", "hdfs://localhost:9000");
String[] otherArgs = new String[]{"output01/part-r-00000", "input02/mrxxu02.txt", "output02"};
if (otherArgs.length < 2) {
System.err.println("Usage: Relation <in> <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "MRXXY02");// 设置环境参数
job.setJarByClass(MRXXY02.class);// 设置程序主类
job.setMapperClass(RelationMap.class);// 设置用户实现的Mapper类
job.setReducerClass(RelationReduce.class);// 设置用户实现的Reducer类
job.setOutputKeyClass(Text.class);// 设置输出key类型
job.setOutputValueClass(Text.class); // 设置输出value类型
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));// 添加输入文件路径
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));// 设置输出文件路径
System.exit(job.waitForCompletion(true) ? 0 : 1); // 提交作业并等待结束
}
}