IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Spark通过Jdbc读取Doris varchar类型长度异常

    hpkaiq@qq.com (hpkaiq)发表于 2025-06-25 00:27:44
    love 0

    spark内通过jdbc方式读取阿里云selectdb(doris)数据,varchar类型长度不足85时总是自动填充空格至85长度,例如表name varchar(20) 字段,name值为 ‘xxx’,spark内通过jdbc方式读出来显示’xxx’后补空格至总长度85位,在spark里length(name)也是85。

    1. 自定义 MySQL Dialect 类

    版本:

    1
    2
    
     <scala.version>2.12.18</scala.version>
     <spark.version>3.5.3</spark.version>

    自定义Dialect类:

     1
     2
     3
     4
     5
     6
     7
     8
     9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    
    import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
    import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
    import org.apache.spark.sql.types._
    import java.sql.{Connection, Types}
    import scala.collection.mutable.ArrayBuilder
    
    object MyDorisDialect {
     def useMyJdbcDialect(jdbcUrl: String): Unit = {
     // 将当前的 JdbcDialect 对象unregistered掉
     JdbcDialects.unregisterDialect(JdbcDialects.get(jdbcUrl))
     if (jdbcUrl.contains("jdbc:mysql")) {
     JdbcDialects.registerDialect(new CustomMySqlJdbcDialect)
     } else if (jdbcUrl.contains("jdbc:postgresql")) {
     } else if (jdbcUrl.contains("jdbc:sqlserver")) {
     } else if (jdbcUrl.contains("jdbc:oracle")) {
     } else if (jdbcUrl.contains("jdbc:informix")) {
     }
     }
    }
    class CustomMySqlJdbcDialect extends JdbcDialect {
     override def quoteIdentifier(colName: String): String = {
     s"`$colName`"
     }
     override def schemasExists(conn: Connection, options: JDBCOptions, schema: String): Boolean = {
     listSchemas(conn, options).exists(_.head == schema)
     }
     override def listSchemas(conn: Connection, options: JDBCOptions): Array[Array[String]] = {
     val schemaBuilder = ArrayBuilder.make[Array[String]]
     try {
     JdbcUtils.executeQuery(conn, options, "SHOW SCHEMAS") { rs =>
     while (rs.next()) {
     schemaBuilder += Array(rs.getString("Database"))
     }
     }
     } catch {
     case _: Exception =>
     logWarning("Cannot show schemas.")
     }
     schemaBuilder.result
     }
     override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
     override def canHandle(url: String): Boolean = url.startsWith("jdbc:mysql")
     override def getCatalystType(
     sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
     if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
     // MariaDB connector behaviour
     // This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
     // byte arrays instead of longs.
     md.putLong("binarylong", 1)
     Option(LongType)
     } else if (sqlType == Types.BIT && size > 1) {
     // MySQL connector behaviour
     md.putLong("binarylong", 1)
     Option(LongType)
     } else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
     Option(BooleanType)
     } else if ("TINYTEXT".equalsIgnoreCase(typeName)) {
     // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason
     Some(StringType)
     } else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) {
     // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1.
     // Explicitly converts it into StringType here.
     Some(StringType)
     } else if (sqlType == Types.VARCHAR || sqlType == Types.CHAR) {
     Some(StringType)
     } else if (sqlType == Types.TINYINT) {
     if (md.build().getBoolean("isSigned")) {
     Some(ByteType)
     } else {
     Some(ShortType)
     }
     } else if (sqlType == Types.TIMESTAMP || sqlType == Types.DATE || sqlType == -101 || sqlType == -102) {
     // 将不支持的 Timestamp with local Timezone 以TimestampType形式返回
     // Some(TimestampType)
     Some(StringType)
     } else None
     }
     /**
     * 从 Spark(DataType) 到 MySQL(SQLType) 的数据类型映射
     *
     * @param dt
     * @return
     */
     override def getJDBCType(dt: DataType): Option[JdbcType] = {
     dt match {
     case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER))
     case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
     case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
     case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
     case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
     case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
     case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
     case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
     case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
     case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
     case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
     case t: DecimalType => Option(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
     case _ => JdbcUtils.getCommonJDBCType(dt)
     }
     }
    }

    2.使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
     MyDorisDialect.useMyJdbcDialect(spark.conf.get(s"spark.job.doris.jdbcurl"))
     spark.read.format("jdbc")
     .option("driver", "com.mysql.jdbc.Driver")
     .option("url", spark.conf.get(s"spark.job.doris.jdbcurl"))
     .option("dbtable", s"($sql) as df")
     .option("user", spark.conf.get(s"spark.job.doris.username"))
     .option("password", spark.conf.get(s"spark.job.doris.password"))
     .load()
    	 .show()

    3.参考

    1. jdbc获取元数据类型问题 · apache/doris · Discussion #24809
    2. bigdata-learning-notes/note/spark/Spark读取JDBC数据Time类型字段异常解析.md at master · kinoxyz1/bigdata-learning-notes


沪ICP备19023445号-2号
友情链接