IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Hive ORC数据格式的MapReduce Shuffle

    liuzhoulong发表于 2017-03-03 17:01:25
    love 0

    1,mr代码如下


    package com.test.hadoop;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.orc.TypeDescription;
    import org.apache.orc.mapred.OrcKey;
    import org.apache.orc.mapred.OrcStruct;
    import org.apache.orc.mapred.OrcValue;
    import org.apache.orc.mapreduce.OrcInputFormat;
    import org.apache.orc.mapreduce.OrcOutputFormat;
    
    
    public class ORCStructSample {
    
    	public static class ORCMapper extends
    			Mapper<NullWritable, OrcStruct, Text, OrcValue> {
    		// private OrcKey keyWrapper = new OrcKey();
    		  private OrcValue valueWrapper = new OrcValue();
    		  
    		  
    		public void map(NullWritable key, OrcStruct value, Context output)
    				throws IOException, InterruptedException {
    			// keyWrapper.key = value;
    			valueWrapper.value = value;
    			 output.write((Text) value.getFieldValue(0), valueWrapper);
    			 
    			//output.write(keyWrapper, valueWrapper);
    		}
    	}
    
    	public static class ORCReducer extends
    			Reducer<Text, OrcValue, NullWritable, OrcStruct> {
    		private TypeDescription schema = TypeDescription
    				.fromString("struct<name:string,mobile:string>");
    		private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);
    
    		private final NullWritable nw = NullWritable.get();
    
    		public void reduce(Text key, Iterable<OrcValue> values, Context output)
    				throws IOException, InterruptedException {
    			for (OrcValue value : values) {
    				OrcStruct val = (OrcStruct) value.value;
    				pair.setFieldValue(0, val.getFieldValue(0));
    				pair.setFieldValue(1, val.getFieldValue(1));
    				output.write(nw, pair);
    			}
    		}
    	}
    
    	public static void main(String args[]) throws Exception {
    
    		Configuration conf = new Configuration();
    		conf.set("orc.mapred.output.schema","struct<name:string,mobile:string>");
    		conf.set("orc.mapred.map.output.value.schema","struct<name:string,mobile:string>");
    		//conf.set("orc.mapred.map.output.key.schema","struct<name:string,mobile:string>");
    		Job job = Job.getInstance(conf, "ORC Test");
    		job.setJarByClass(ORCStructSample.class);
    		job.setMapperClass(ORCMapper.class);
    		job.setReducerClass(ORCReducer.class);
    		job.setInputFormatClass(OrcInputFormat.class);
    		job.setOutputFormatClass(OrcOutputFormat.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(OrcValue.class);
    		job.setOutputKeyClass(NullWritable.class);
    		job.setOutputValueClass(OrcStruct.class);
    		job.setNumReduceTasks(1);
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    }
    

    注意:官网上其实有解释,如果shuffle阶段需要用OrcStruct 或者其他orc结构,必须用OrcKey封装其key,OrcValue封装其value. 

     To enable MapReduce to properly instantiate the OrcStruct and other ORC types, we need to wrap it in either an OrcKey for the shuffle key orOrcValue for the shuffle value.


    这里map 中只是将map输入封装下直接输出,当然map中也可以通过TypeDescription自己构造OrcStruct输出或者经过处理后输出

    2,执行步骤

    Hadoop jar MRTest-1.0-jar-with-dependencies.jar com.test.hadoop.ORCStructSample  /Hive/warehouse/mytest.db/t_test_orc /user/testorc3

    其他步骤,参照 http://blog.csdn.NET/liuzhoulong/article/details/52048105



沪ICP备19023445号-2号
友情链接