IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]基于HBase的冠字号查询系统1--理论部分

    fansy1990发表于 2016-06-04 11:02:58
    love 0

    1. 软件版本和部署

    maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014;

    Hadoop2.6.4,HBase1.1.2

    源码下载:https://github.com/fansy1990/ssh_v3/releases

    部署参考:http://blog.csdn.net/fansy1990/article/details/51356583 

    数据下载:http://download.csdn.net/detail/fansy1990/9540865 或  http://pan.baidu.com/s/1dEVeJz7 

    2. 背景&思路

    目前针对钞票识别,一般都是使用看、摸、听、测四种方式,这里使用一种比较客观的方式类进行识别。   建设冠字号管理查询系统,以冠字号查询为手段,有效解决银行对外误付假币的问题。从源头解决伪钞问题。

    本系统就是使用客观的方法来验证伪钞。本系统采用的方案是基于冠字号的,每张人民币的冠字号是唯一的,如果有一个大表可以把所有的人民币以及人民币对应的操作(在什么时间、什么地点存入或获取)记录下来,这样在进行存取时就可以根据冠字号先查询一下,看当前冠字号对应的纸币在大表中的保存的情况,这样就可以确定当前冠字号对应的纸币是否是伪钞了(这里假设在大表中的所有冠字号对应的钞票都是真钞)。

    下面对应存储场景:

     

    存/取

    最近状态(表中有无)

    真钞/伪钞

    场景1

    存

    有

    伪钞

    场景2

    存

    无

    真钞

    场景3

    取

    有(此时没有无状态)

    真钞


    目前,基于传统数据库存储数据一般在千万级别(受限于查询等性能),但是如果要存储所有钞票的信息以及其被存储或获取的记录信息,那么传统数据库肯定是不能胜任的。所以本系统是基于HBase的。

    3. 功能指标

    Ø  存储万级用户信息;

    Ø  存储百万级别钞票信息;

    Ø  支持前端业务每秒500+实时查询请求;

    Ø  数据存储和计算能够可扩展;

    Ø  提供统一接口,支持前端相关查询业务;


    说明: 其中前两条,万级用户信息和百万级钞票信息是根据数据确定的,这里可以根据数据以及集群的大小进行调整(如果集群够大,存储信息也可以很大);

    4. 架构


    冠字号查询系统包括下面5层:

    Ø  数据层:包括基础数据MySQL、文档、Web数据等;

    Ø  数据处理层:主要是数据的加载,包括MR加载方式、Java API加载模式、Sqoop加载模式等;

    Ø  数据存储层:主要是HBase存储,包括钞票的所有信息以及用户信息等;

    Ø  数据服务层:主要是对外提供查询、存储等接口服务;

    Ø  数据应用层:存取钞系统,在存钞时设计到伪钞识别;其他应用系统;

    5. 表设计

    5.1原始数据:

    冠字号存储记录(冠字号,表中是否有该冠字号(0表示没有,1表示有),存储或取时间,存储或取所在银行编号,用户id):


    用户信息表(用户Id,名字,出生日期,性别,地址,手机号,绑定银行编号):



    5.2冠字号记录

    对数据进过初步探索,发现冠字号规律如下:
    AAA[A~Z][0000~9999]
    AAB[A~Z][0000~9999]
    如果集群有四个节点,设置region初始为4,那么三个split点为:AAAM9999,AAAZ9999,AABM9999;
    假设每个用户每天进行10次操作,如果要保存100天数据,那么设置版本数为1000,则建表语句如下:
    create 'records',{NAME=>'info',VERSIONS=>1000},SPLITS =>['AAAM9999','AAAZ9999','AABM9999']

    表结构描述如下:

    主键/列簇

    字段名称

    字段含义

    字段值举例

    备注

    rowkey

    -

    表主键(钞票冠字号)

    AAAA0000

     

    timestamp

    -

    时间戳

    1414939140000

    long型(可以存储用户操作的时间)

    info

    -

    列簇

    -

    who、when、where做了哪些操作

    exist

    是否存在

    1

    如果用户是存储行为,那么在行为结束后,该值为1

    uid

    用户ID

    4113281991XXXX9919

     

    bank

    存取钞银行

    SPDBCNSH

    银行编号



    5.3用户信息

    对数据进过初步探索,发现用户信息规律如下:
    41132819[89~92]XXXX[0000~9999]
    如果集群有四个节点,设置region初始为4,那么三个split点为:4113281990XXXX0000,4113281991XXXX0000,4113281992XXXX0000;
    则建表语句如下:
    create 'user',{NAME=>'info'},SPLITS =>['4113281990XXXX0000','4113281991XXXX0000','4113281992XXXX0000']

    表结构描述如下:

    主键/列簇

    字段名称

    字段含义

    字段值举例

    备注

    Rowkey

    -

    用户主键(身份证号)

    4113281991XXXX9919

     

    Timestamp

    -

    时间戳

    1414939140000

    long型

    info

    -

    列簇

    -

    用户信息

    name

    用户名

    JACO

     

    gender

    用户性别

    femail

     

    bank

    用户注册银行

    SPDBCNSH

    银行编号

    address

    用户住址

    EXX-O94-1319151759

     

     

    birthday

    用户出生年月

    1981-10-20 09:12

     


    6. 数据加载

    系统在投入使用的时候,已经存在历史数据,需要把历史数据批量导入到系统中;在人民币首次发行时,也需要批量导入系统中。这里的导入直接使用MR导入。
    MR设计成一个通用的数据从HDFS导入HBase的MR:

    6.1 主类:

    package ssh.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    
    import ssh.util.HadoopUtils;
    
    /**
     * Job Driver驱动类
     * 
     * @author fansy
     * 
     */
    public class ImportToHBase extends Configured implements Tool {
    	public static final String SPLITTER = "SPLITTER";
    	public static final String COLSFAMILY = "COLSFAMILY";
    	public static final String DATEFORMAT = "DATEFORMAT";
    
    	@Override
    	public int run(String[] args) throws Exception {
    		if (args.length != 5) {
    			System.err
    					.println("Usage:\n demo.job.ImportToHBase <input> <tableName> <splitter> <rk,ts,col1:q1,col2:q1,col2:q2> <date_format>");
    			return -1;
    		}
    		if (args[3] == null || args[3].length() < 1) {
    			System.err.println("column family can't be null!");
    			return -1;
    		}
    		Configuration conf = getConf();
    		conf.set(SPLITTER, args[2]);
    		conf.set(COLSFAMILY, args[3]);
    		conf.set(DATEFORMAT, args[4]);
    		TableName tableName = TableName.valueOf(args[1]);
    		Path inputDir = new Path(args[0]);
    		String jobName = "Import to " + tableName.getNameAsString();
    		Job job = Job.getInstance(conf, jobName);
    		job.setJarByClass(ImportMapper.class);
    		FileInputFormat.setInputPaths(job, inputDir);
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setMapperClass(ImportMapper.class);
    
    		TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(),
    				null, job);
    		job.setNumReduceTasks(0);
    		HadoopUtils.setCurrJob(job);// 设置外部静态Job
    		return job.waitForCompletion(true) ? 0 : 1;
    	}
    
    }
    
    主类的run方法中使用的是传统的MR导入HBase的代码,只是设置了额外的参数,这里主类参数意思解释如下:
    input: HDFS输入数据路径;
    splitter : 输入数据字段分隔符;
    tableName : 表名;
    <rk,ts,col1:q1,col2:q1> : 列描述, rk代表rowkey以及rowkey所在列、ts代表timestamp及其所在列;示例数据说明原始数据,第一列为rowkey,第二列为timestamp所在列,第三列属于列簇col1,同时列名为q1,第4列属于列簇col2,同时列名为q1;
    date_format : timestamp日期格式,如果列描述中没有ts那么就代表原始数据中没有timestamp,则此参数没有意义;

    6.2 Mapper:

    package ssh.mr;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * Mapper类,接收HDFS数据,写入到HBase表中
     * @author fansy
     *
     */
    public class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
    	private static final String COMMA = ",";
    	private static final String COLON=":";
    	private String splitter = null;
    //	private String colsStr = null;
    	
    	private int rkIndex =0; // rowkey 下标
    	private int tsIndex =1; // timestamp下标
    	private boolean hasTs = false; // 原始数据是否有timestamp
    	
    	private SimpleDateFormat sf = null;
    	
    	private ArrayList<byte[][]> colsFamily= null;
    	
    	private Put put =null;
    
    	ImmutableBytesWritable rowkey = new ImmutableBytesWritable();
    	@Override
    	protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
    			throws IOException, InterruptedException {
    		splitter = context.getConfiguration().get(ImportToHBase.SPLITTER,",");
    		String colsStr = context.getConfiguration().get(ImportToHBase.COLSFAMILY,null);
    		
    		sf = context.getConfiguration().get(ImportToHBase.DATEFORMAT,null)==null
    				? new SimpleDateFormat("yyyy-MM-dd HH:mm")
    						:new SimpleDateFormat(context.getConfiguration().get(ImportToHBase.DATEFORMAT));
    		
    		String[] cols = colsStr.split(COMMA, -1);
    		
    		colsFamily =new ArrayList<>();
    		for(int i=0;i< cols.length;i++){
    			if("rk".equals(cols[i])){
    				rkIndex= i;
    				colsFamily.add(null);
    				continue;
    			}
    			if("ts".equals(cols[i])){
    				tsIndex = i;
    				colsFamily.add(null);
    				hasTs = true; // 原始数据包括ts
    				continue;
    			}
    			colsFamily.add(getCol(cols[i]));
    		}
    		
    	}
    	/**
    	 * 获取 family:qualifier byte数组
    	 * @param col
    	 * @return
    	 */
    	private byte[][] getCol(String col) {
    		byte[][] fam_qua = new byte[2][];
    		String[] fam_quaStr = col.split(COLON, -1);
    		fam_qua[0]=  Bytes.toBytes(fam_quaStr[0]);
    		fam_qua[1]=  Bytes.toBytes(fam_quaStr[1]);
    	
    		return fam_qua;
    	}
    
    	@Override
    	protected void map(LongWritable key, Text value,
    			Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
    					throws IOException, InterruptedException {
    		String[] words = value.toString().split(splitter, -1);
    		
    		if(words.length!=colsFamily.size()){
    			System.out.println("line:"+value.toString()+" does not compatible!");
    			return ;
    		}
    		
    		rowkey.set(getRowKey(words[rkIndex]));
    		
    		put = getValue(words,colsFamily,rowkey.copyBytes());
    		
    		context.write(rowkey, put);
    		
    	}
    	/**
    	 * 获取Put值
    	 * @param words
    	 * @param colsFamily
    	 * @param bs
    	 * @return
    	 */
    	private Put getValue(String[] words, ArrayList<byte[][]> colsFamily, byte[] bs) {
    		Put put = new Put(bs);
    		
    		for(int i=0;i<colsFamily.size();i++){
    			if(colsFamily.get(i)==null){// rk 或ts
    				continue;// 下一个 列
    			}
    			if(words[i]==null || words[i].length()==0) {
    				// 不添加,直接往下一个value
    				continue;
    			}
    			// 日期异常的记录同样添加
    			if(hasTs){// 插入包含时间的数据
    				put.addColumn(colsFamily.get(i)[0], colsFamily.get(i)[1],
    					getLongFromDate(words[tsIndex]), Bytes.toBytes(words[i]));
    			}else{// 不包含时间的数据
    				put.addColumn(colsFamily.get(i)[0], colsFamily.get(i)[1],
    						 Bytes.toBytes(words[i]));
    			}
    			
    		}
    		
    		return put;
    	}
    	private long getLongFromDate(String dateStr)  {
    		try{
    			return sf.parse(dateStr).getTime();
    		}catch(ParseException e){
    			System.out.println(dateStr+" 转换失败!");
    			return 0;
    		}
    	}
    	/**
    	 * 获取rowkey byte数组
    	 * @param rowKey
    	 * @return
    	 */
    	private byte[] getRowKey(String rowKey) {
    		
    		return Bytes.toBytes(rowKey);
    	}
    
    }
    

     Mapper是整个流程的核心,主要负责进行数据解析、并从HDFS导入到HBase表中的工作,其各个部分功能如下:

    Ø  setup():获取输入数据字段分隔符,获取列簇、列名,获取rowkey列标,获取ts格式及列标(如果没有的话,就按照插入数据的时间设置);

    Ø  map():解析、过滤并提取数据(需要的字段数据),生成Put对象,写入HBase;


    6.3 针对records,user MR导入:


    只需要进行拼凑参数,然后直接调用即可。

    7. 实时数据加载

    使用Java API来操作HBase数据库,完成实时HBase数据库更新,包括冠字号查询、存取款等功能。




    分享,成长,快乐

    脚踏实地,专注

    转载请注明blog地址:http://blog.csdn.net/fansy1990








沪ICP备19023445号-2号
友情链接