IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]Sqoop1.99.4 Java API实践

    fansy1990发表于 2016-03-08 14:27:09
    love 0

    软件版本:

    Sqoop:1.99.4;JDK:1.7;Hadoop2.2;

    环境:

    1. Hadoop集群:

    node12:NameNode、ResourceManager、JobHistoryServer、DataNode、NodeManager; 1.5G内存+10G硬盘+1核

    node13:SecondaryNameNode、DataNode、NodeManager;1.5G内存+10G硬盘+1核

    2. Sqoop server:
    Sqoop server部署在node13上;

    3. intellij idea配置
    下载sqoop1.99.4的压缩包,解压;

    (1)找到sqoop-1.99.4-bin-hadoop200\server\webapps\sqoop.war,接着,解压sqoop.war文件,找到:

    sqoop-1.99.4-bin-hadoop200\server\webapps\sqoop\WEB-INF\lib目录,拷贝下面的lib包到工程build path(非运行必须);


    (2)找到目录sqoop-1.99.4-bin-hadoop200\shell\lib ,拷贝下面的lib包到工程的build path(运行必须)


    Sqoop简单概念:

    1. Sqoop connector:

    Sqoop连接器,sqoop server启动后,使用客户端输入命令:show connector,可以看到sqoop目前的连接器,如下所示:


    2. Sqoop link:

    一个link需要一个connector的支持,所以link的建立需要指定一个connector(指定其id即可,从下面的代码可以看到);link是对connector的一层封装可以设置参数,比如针对mysql,可以设置mysql driver的名字、链接字符串、用户名、密码等等参数;针对hdfs,可以设置uri;具体可以设置的参数如下所示:

    hdfs:

    Link configuration:
    linkConfig.uri,HDFS URI : null
    
    From Job configuration:
    fromJobConfig.inputDirectory,Input directory : null
    
    ToJob configuration:
    toJobConfig.outputFormat,Output format : null
    toJobConfig.compression,Compression format : null
    toJobConfig.customCompression,Custom compression format : null
    toJobConfig.outputDirectory,Output directory : null


    jdbc:

    Link configuration:
    linkConfig.jdbcDriver,JDBC Driver Class : null
    linkConfig.connectionString,JDBC Connection String : null
    linkConfig.username,Username : null
    linkConfig.password,Password : null
    linkConfig.jdbcProperties,JDBC Connection Properties : null
    
    From database configuration:
    fromJobConfig.schemaName,Schema name : null
    fromJobConfig.tableName,Table name : null
    fromJobConfig.sql,Table SQL statement : null
    fromJobConfig.columns,Table column names : null
    fromJobConfig.partitionColumn,Partition column name : null
    fromJobConfig.allowNullValueInPartitionColumn,Null value allowed for the partition column : null
    fromJobConfig.boundaryQuery,Boundary query : null
    
    To database configuration:
    toJobConfig.schemaName,Schema name : null
    toJobConfig.tableName,Table name : null
    toJobConfig.sql,Table SQL statement : null
    toJobConfig.columns,Table column names : null
    toJobConfig.stageTableName,Stage table name : null
    toJobConfig.shouldClearStageTable,Should clear stage table : null
    

    这里设置link,暂时不用指定它是源头还是目的(sqoop数据传输就是从一个源到一个目的而已,及from to);

    3. Sqoop job:

    Sqoop任务的建立需要指定两个东西,其一:指定from和to,from和to只需要提供其对应的link的id即可;其二,则是指定from和to的具体参数(这里的参数是指变动比较大的参数,比如hdfs中的输入目录,jdbc中的表名或列字符串等);

    Sqoop Java API:

    0. 建立connector(这个不需要建立,在配置好sqoop后自然会有)

    1. 建立link

    1)在建立link前,先在sqoop client中查看现有的link:

    show link ,可以看到是没有link的;

    2)建立hdfs的link:

    //        long connectorId = 1;
    //        MLink link = createHDFSLink(client,connectorId);
    3)建立jdbc的link:(以mysql为例)

     long connectorId=2;
    //        MLink link = createMysqlLink(client,connectorId);
    4)查看,show link:

    2. 建立任务:

    0)查看现有的任务:show job ,可以看到是没有的(默认没有)

    1)建立job:

    // 建立任务
    //        long fromLinkId=1;
    //        long toLinkId=2;
    //        String input="/user/fansy/name.data";
    //        String tableName="test_name_age";
    //        String columns="id,name,age";
    //        int mappers = 2;
    //        MJob job = createJob(client,fromLinkId,toLinkId,input,tableName,columns,mappers);
    //        System.out.println(job.getEnabled());
    2)查看任务:


    3. 启动任务:

    // 启动任务
    //        long jobId =1;
    //        startJob(client,jobId);

    启动任务后,可以用命令 show job --jid 1来查看任务状态,或者在8088端口(hadoop任务端口),查看任务,或调用函数查看任务状态。


    所有代码如下所示:

    package sqoop;
    
    import org.apache.sqoop.client.SqoopClient;
    import org.apache.sqoop.model.*;
    import org.apache.sqoop.submission.counter.Counter;
    import org.apache.sqoop.submission.counter.CounterGroup;
    import org.apache.sqoop.submission.counter.Counters;
    import org.apache.sqoop.validation.Status;
    
    import java.util.List;
    import java.util.ResourceBundle;
    
    /**
     * Created by fansy on 2016/3/7.
     */
    public class Sqoop1_99_4_Test {
    
        public static void main(String[] args){
            String url = "http://node13:12000/sqoop/";
            SqoopClient client = new SqoopClient(url);
    
            // 建立link 连接
    //        long connectorId = 1;
    //        MLink link = createHDFSLink(client,connectorId);
            long connectorId=2;
    //        MLink link = createMysqlLink(client,connectorId);
            listInputAndOutputParameters(client,connectorId);
    // 建立任务
    //        long fromLinkId=1;
    //        long toLinkId=2;
    //        String input="/user/fansy/name.data";
    //        String tableName="test_name_age";
    //        String columns="name,age";
    //        int mappers = 2;
    //        MJob job = createJob(client,fromLinkId,toLinkId,input,tableName,columns,mappers);
    //        System.out.println(job.getEnabled());
    // 启动任务
    //        long jobId =1;
    //        startJob(client,jobId);
        }
    
        static void describe(List<MConfig> configs, ResourceBundle resource) {
            for (MConfig config : configs) {
                System.out.println(resource.getString(config.getLabelKey()) + ":");
                List<MInput<?>> inputs = config.getInputs();
                for (MInput input : inputs) {
                    System.out.println(input.getName()+","+resource.getString(input.getLabelKey()) + " : " + input.getValue());
                }
                System.out.println();
            }
        }
    
        /**
         * 打印各个connector的参数
         * @param client
         * @param connectorId
         */
        static void listInputAndOutputParameters(SqoopClient client,long connectorId){
    
    //        String url = "http://node13:12000/sqoop/";
    //        SqoopClient client = new SqoopClient(url);
    
    //        long connectorId = link.getConnectorId();
    
    // link config for connector
            describe(client.getConnector(connectorId).getLinkConfig().getConfigs(), client.getConnectorConfigBundle(connectorId));
    // from job config for connector
            describe(client.getConnector(connectorId).getFromConfig().getConfigs(), client.getConnectorConfigBundle(connectorId));
    // to job config for the connector
            describe(client.getConnector(connectorId).getToConfig().getConfigs(), client.getConnectorConfigBundle(connectorId));
            // create a placeholder for link
        }
    
        /**
         * 建立mysql link
         * @param client
         * @param connectorId
         * @return
         */
        static MLink createMysqlLink(SqoopClient client,long connectorId){
            MLink link = client.createLink(connectorId);
            link.setName("mysql");
            link.setCreationUser("fansy");
            MLinkConfig linkConfig = link.getConnectorLinkConfig();
    // fill in the link config values
            linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://192.168.0.1/test_fz");
            linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
            linkConfig.getStringInput("linkConfig.username").setValue("fansy");
            linkConfig.getStringInput("linkConfig.password").setValue("fansy");
    // save the link object that was filled
            Status status = client.saveLink(link);
            if(status.canProceed()) {
                System.out.println("Created Link with Link Id : " + link.getPersistenceId());
            } else {
                System.out.println("Something went wrong creating the link");
            }
            return link;
        }
    
        /**
         * 建立HDFS link
         * @param client
         * @param connectorId
         * @return
         */
        static MLink createHDFSLink(SqoopClient client,long connectorId){
            MLink link = client.createLink(connectorId);
            link.setName("hdfs");
            link.setCreationUser("fansy");
            MLinkConfig linkConfig = link.getConnectorLinkConfig();
    // fill in the link config values
            linkConfig.getStringInput("linkConfig.uri").setValue("hdfs://node12:8020");
    // save the link object that was filled
            Status status = client.saveLink(link);
            if(status.canProceed()) {
                System.out.println("Created Link with Link Id : " + link.getPersistenceId());
            } else {
                System.out.println("Something went wrong creating the link");
            }
            return link;
        }
    
        /**
         * 建立hdfs to mysql 任务
         * @param client
         * @param fromLinkId
         * @param toLinkId
         * @param input
         * @param tableName
         * @param columns
         * @param mappers
         * @return
         */
        static MJob createJob(SqoopClient client,long fromLinkId,long toLinkId,
                              String input,String tableName,String columns,int mappers){
            MJob job = client.createJob(fromLinkId, toLinkId);
            job.setName("hdfs to mysql");
            job.setCreationUser("fansy");
    // set the "FROM" link job config values
            MFromConfig fromJobConfig = job.getFromJobConfig();
            fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue(input);
    
    // set the "TO" link job config values
            MToConfig toJobConfig = job.getToJobConfig();
            toJobConfig.getStringInput("toJobConfig.tableName").setValue(tableName);
            toJobConfig.getStringInput("toJobConfig.columns").setValue(columns);
    
    // set the driver config values
            MDriverConfig driverConfig = job.getDriverConfig();
    //        driverConfig.getStringInput("throttlingConfig.numExtractors").setValue(String.valueOf(mappers));
            driverConfig.getIntegerInput("throttlingConfig.numExtractors").setValue(mappers);
            Status status = client.saveJob(job);
            if(status.canProceed()) {
                System.out.println("Created Job with Job Id: "+ job.getPersistenceId());
            } else {
                System.out.println("Something went wrong creating the job");
            }
            return job;
        }
    
        /**
         *  启动任务
         * @param client
         * @param jobId
         */
        static void startJob(SqoopClient client,long jobId){
            MSubmission submission = client.startJob(jobId);
            System.out.println("Job Submission Status : " + submission.getStatus());
            if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
                System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
            }
            System.out.println("Hadoop job id :" + submission.getExternalId());
            System.out.println("Job link : " + submission.getExternalLink());
            Counters counters = submission.getCounters();
            if(counters != null) {
                System.out.println("Counters:");
                for(CounterGroup group : counters) {
                    System.out.print("\t");
                    System.out.println(group.getName());
                    for(Counter counter : group) {
                        System.out.print("\t\t");
                        System.out.print(counter.getName());
                        System.out.print(": ");
                        System.out.println(counter.getValue());
                    }
                }
            }
            if(submission.getExceptionInfo() != null) {
                System.out.println("Exception info : " +submission.getExceptionInfo());
            }
    
    
    
        }
    
        /**
         * 查看任务状态
         * @param client
         * @param jobId
         */
        static void checkJobStatus(SqoopClient client,long jobId){
            //Check job status for a running job
            MSubmission submission = client.getJobStatus(jobId);
            if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
                System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
            }
            System.out.println("Job status:"+submission.getStatus().name());
        }
    
    
    }
    

    测试:

    1. HDFS数据:

    11kate,3
    fansy,22
    12kate,3
    tom,32
    1kate,3
    2kate,3
    2. Mysql:

    CREATE TABLE `test_name_age` (
      `name` varchar(255) DEFAULT NULL,
      `age` int(11) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    3. 运行完毕后,查看任务以及数据库



    总结:

    1. 数据缺失:

    可以看到数据库中的数据name列数据前后都少了一个字符,但是在8088端口看到的日志中看到读取的数据确实是完整的:


    2. 不能指定分隔符,Sqoop1.99.4版本只有一个默认的分隔符实现

    CSVIntermediateDataFormat

    如果要用其他的分隔符,那么就要自己去实现;在sqoop1.99.6版本(目前最新版)中支持了json和avro的实现,如下:



    最终一句话,sqoop1.99的版本还是bug多多,实际中还是用回1.4的版本吧!




沪ICP备19023445号-2号
友情链接