package duogemap;
import java.io.IOException;
import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JoinJob { public static final String DELIMITER = "\u0009"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //多路径判断 if (args.length < 2) { System.out.println("参数数量不对,至少两个以上参数:<数据文件输出路径>、<输入路径...>"); System.exit(1); } //输出结果路径 String dataOutput = args[0]; //多个路输入径 String[] inputs = new String[args.length - 1]; System.arraycopy(args, 1, inputs, 0, inputs.length); Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "join 测试"); job.setJarByClass(JoinJob.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //将输出路径和输入路径放入Path中 Path[] inputPathes = new Path[inputs.length]; for (int i = 0; i < inputs.length; i++) { inputPathes[i] = new Path(inputs[i]); } Path outputPath = new Path(dataOutput); FileInputFormat.setInputPaths(job, inputPathes); FileOutputFormat.setOutputPath(job, outputPath); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { private String inputPath; private String fileCode = ""; protected void setup(Context context) throws IOException, InterruptedException { // 每个文件传进来时获得文件中属性前缀 FileSplit input = (FileSplit) context.getInputSplit(); inputPath = input.getPath().getName(); try { //获得文件名 fileCode = inputPath.split("_")[0]; } catch (Exception e) { e.printStackTrace(); } } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split(DELIMITER); StringBuffer sb = new StringBuffer(); //将文件名拼接到value中,做reduce的判断标识 sb.append(fileCode + "#"); boolean first = true; for (String v : values) { if (!first) { sb.append(v + DELIMITER); } first = false; } context.write(new Text(values[0]), new Text(sb.toString().substring(0, sb.toString().length() - 1))); } } static class MyReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> left = new ArrayList<String>(); List<String> right = new ArrayList<String>(); for (Text value : values) { String[] vv = value.toString().split("#"); String fileCode = vv[0]; if (fileCode.equals("A.txt")) { // 左表数据 left.add(vv[1]); } else { // 右表数据 right.add(vv[1]); } } //只有当left和right都有数据是才会遍历 for (String l : left) { for (String r : right) { context.write(new Text(key), new Text(l + DELIMITER + r)); } } } } } //首先准备数据:////假设我们有2张表:////表A(左表)数据:////1 a//2 b//3 c//3 d//4 e//6 f//表B(右表)数据://1 10//2 20//3 30//4 40//4 400//5 50//我们需要得到的结果是://1 a 10//2 b 20//3 c 30//3 d 30//4 e 40//4 e 400