IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Hadoop get JobId

    fansy1990发表于 2016-05-30 10:48:36
    love 0

    软件版本:hadoop2.6.4 ; jdk1.7,eclipse:Myeclipse2014;


    问题描述:在进行MR任务提交的时候,想获取每次提交任务的jobId,这样在提交任务后,就可以根据此JobId来获取MR任务的进度了。

    解决方案:

    1. 之前想过的一种方式是:直接获取所有完成的JobID 列表,然后对其进行排序,取其最大的,接着使用最大的jobId来构造下一个jobId;但是这样会有问题,参考:http://blog.csdn.net/fansy1990/article/details/17426945 ;问题是当集群是第一次启动的时候,这个时候是没有已经完成JobId的列表的,所以想到了使用获取集群启动时间的解决办法,也就是刚才的链接内容;

    2. 第二种方式是:既然提交MR任务的时候可以获取Job参数,那么是否可以通过该job参数来获取jobID呢?Job在调用job.waitForComplete后才会去集群中获取jobId ,并且此jobid在每次调用的时候都会递增1,而不管任务时候完成,即如果在jobId获取后的地方加个断点:


    那么,后面直接关闭调试程序,下次再次运行此程序,发现获取的jobID是跳跃的,如下:


    并且,中间的jobid的任务没有任何信息;

    那么在Job初始化jobId后,如何获取这个JobId呢? 采用静态类:

    如下,定义静态类:

    package ssh.getjobid;
    
    import java.util.Date;
    
    import org.apache.hadoop.mapreduce.Job;
    
    public class StaticInfo {
    
    	private static Job job = null;
    
    	public static Job getApplicationId() {
    		return job;
    	}
    
    	public static synchronized void setApplicationId(Job applicationId) {
    		StaticInfo.job = applicationId;
    	}
    
    	public static String getJobId() {
    		long start = System.currentTimeMillis();
    		while (noJobId()) {
    			try {
    				Thread.sleep(200);
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			System.out.println(new Date() + ":getting job id ...");
    		}
    		long end = System.currentTimeMillis();
    
    		System.out.println("获取jobId,耗时:" + (end - start) * 1.0 / 1000 + "s");
    		return job.getJobID().getJtIdentifier() + job.getJobID().getId() + "-"
    				+ job.getJobID().toString();
    	}
    
    	private static boolean noJobId() {
    		if (job == null || job.getJobID() == null)
    			return true;
    
    		return false;
    	}
    
    }
    

    在静态类中,使用一个静态Job变量来获取提交的MR任务的Job,然后在获取JobId的时候,采用一个while循环,不断的去遍历,看是否可以获取JobId,如果可以获取,则直接返回jobId,否则进行查询;

    此Job变量在MR任务中需要进行设置


    同时,测试程序需要采用多线程的方式,这主程序中就可以通过while循环来获取jobID了;


    而Runnable里面则是调用MR任务的实际代码:

    package ssh.getjobid;
    
    import org.apache.hadoop.util.ToolRunner;
    
    import ssh.WordCount;
    
    public class WcRunnable implements Runnable {
    	private String input;
    	private String output;
    
    	public WcRunnable(String in, String out) {
    		input = in;
    		output = out;
    	}
    
    	@Override
    	public void run() {
    		String[] args = new String[] { input, output };
    		try {
    			ToolRunner.run(TestGetJobID.getConf(), new WordCount(), args);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    测试结果如下:



    注意点: 此方式只能针对每次一个MR任务的提交进行监控,并且确认可以修改MR源码(即在job.waitForComplete前面加上静态类的set方法)


    分享,成长,快乐

    脚踏实地,专注

    转载请注明blog地址:http://blog.csdn.net/fansy1990




沪ICP备19023445号-2号
友情链接