IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Spark Streaming 从 Kafka 读取 binlog 转换成 Json

    klion26发表于 2016-08-27 02:12:36
    love 0

    在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果

    如果本文有讲述不详细,或者错误指出,肯请指出,谢谢

    对于 binlog 数据,每一次操作(INSERT/UPDATE/DELETE 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)

    在数据库中,有 id, name 两个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下

    id name age
    1 john 30
    2 john 40

    那么你进行操作

    update table set age = 50 where name = john

    的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。

    下面,我们给出具体的代码,然后对代码进行分析

    def desirializeByte(b: (String, Array[Byte])) : (String, String) = { 
      val binlogEntry = BinlogEntryUtil.serializeToBean(b._2)   //将 Array[Byte] 数据转换成 com.meituan.data.binlog.BinlogEntry 类,相关类定义参考附录
       
      val pkeys = binlogEntry.getPrimaryKeys.asScala   //获取主键,这里的 asScala 将 Java 的 List 转换为 Scala 的 List
      val rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList  //获取具体的信息
      val strRowDatas = rowDatas.map(a => {            //将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面进行 Json 化
        val b = a.getBeforeColumns.asScala    //获取 beforColumns
        val c = a.getAfterColumns.asScala     //获取 afterColumns
        val mb = b.map(d => (d._1, d._2.getValue))  //去掉所有不需要的信息,只保留每个字段的值
        val mc = c.map(c => (c._1, c._2.getValue))  //去掉所有不需要的信息,只保留每个字段的值
        (mb, mc) //返回转换后的 beforeColumns 和 afterColumns
      })
      //下面利用 json4s 进行 Json 化
      (binlogEntry.getEventType, compact("rowdata" -> strRowDatas.map{
        w => List("row_data" -> ("before" -> w._1.toMap) ~ ("after" -> w._2.toMap))  //这里的两个 toMap 是必要的,不然里层会变成 List,这个地方比较疑惑的是,
                                                                                     //w._1 按理是 Map类型,为什么还需要强制转换成 Map
                                                                                  //而且用 strRowDatas.foreach(x => println(s"${x._1}  ${x._2}")打印的结果表名是 Map
      }))

    desirializeByte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (String, String) 的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete 等),第二个字段为 Json 化后的数据。

    BinlogEntryUtil.serilizeToBean 是一个辅助类,将 binlog 数据转化为一个 Java bean 类。

    第 4 行,我们得到表对应的主键,第 5 行获得具体的数据

    第 6 行到第 12 行是 Json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。

    第 14, 15 行就是具体的 Json 工作了(使用了 json4s 包进行 Json 化)

    这个过程中有一点需要注意的是,在 Json 化的时候,记得为 w._1 和 w._2 加 toMap 操作,不然会变成 List(很奇怪,我将 w._1 和 w._2 打印出来看,都是 Map 类型)或者你可以在第 7,8 行的末尾加上 .toMap 操作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。

    利用上述代码,我们可以得到下面这样 Json 化之后的字符串(我进行了排版,程序返回的 Json 串是不换行的)

    {"rowdata":
       [{"row_data":
           {"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},
            "after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}
           }
        }]
    }"

    到这里,基本就完成了一种将 binlog 数据 Json 化的代码。

    附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。

    public static BinlogEntry serializeToBean(byte[] input) {
          BinlogEntry binlogEntry = null;
          Entry entry = deserializeFromProtoBuf(input);//从 protobuf 反序列化
          if(entry != null) {
             binlogEntry = serializeToBean(entry);
          }
          return binlogEntry;
        }
    
    public static Entry deserializeFromProtoBuf(byte[] input) {
            Entry entry = null;
    
            try {
                entry = Entry.parseFrom(input);
    //com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成
            } catch (InvalidProtocolBufferException var3) {
                logger.error("Exception:" + var3);
            }
    
            return entry;
        }
    //将 Entry 解析为一个 bean 类
    public static BinlogEntry serializeToBean(Entry entry) {
            RowChange rowChange = null;
    
            try {
                rowChange = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception var8) {
                throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);
            }
    
            BinlogEntry binlogEntry = new BinlogEntry();
            String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");
            String logFileNo = "000000";
            if(logFileNames.length > 1) {
                logFileNo = logFileNames[1];
            }
    
            binlogEntry.setBinlogFileName(logFileNo);
            binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());
            binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());
            binlogEntry.setTableName(entry.getHeader().getTableName());
            binlogEntry.setEventType(entry.getHeader().getEventType().toString());
            Iterator primaryKeysList = rowChange.getRowDatasList().iterator();
    
            while(primaryKeysList.hasNext()) {
                RowData rowData = (RowData)primaryKeysList.next();
                BinlogRow row = new BinlogRow(binlogEntry.getEventType());
                row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));
                row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));
                binlogEntry.addRowData(row);
            }
    
            if(binlogEntry.getRowDatas().size() >= 1) {
                BinlogRow primaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);
                binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));
            } else {
                ArrayList primaryKeysList2 = new ArrayList();
                binlogEntry.setPrimaryKeys(primaryKeysList2);
            }
    
            return binlogEntry;
        }
    
    public class BinlogEntry implements Serializable {
        private String binlogFileName;
        private long binlogOffset;
        private long executeTime;
        private String tableName;
        private String eventType;
        private List<String> primaryKeys;
        private List<BinlogRow> rowDatas = new ArrayList();
    }
    public class BinlogRow implements Serializable {
        public static final String EVENT_TYPE_INSERT = "INSERT";
        public static final String EVENT_TYPE_UPDATE = "UPDATE";
        public static final String EVENT_TYPE_DELETE = "DELETE";
        private String eventType;
        private Map<String, BinlogColumn> beforeColumns;
        private Map<String, BinlogColumn> afterColumns;
    }
    public class BinlogColumn implements Serializable {
        private int index;
        private String mysqlType;
        private String name;
        private boolean isKey;
        private boolean updated;
        private boolean isNull;
        private String value;
    }

     



沪ICP备19023445号-2号
友情链接