IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    Spark Streaming 往 HDFS 写文件,自定义文件名

    klion26发表于 2016-11-26 08:09:25
    love 0

    需求

    将 kafka 上的数据实时同步到 HDFS,不能有太多小文件

    实现过程

    Spark Streaming 支持 RDD#saveAsTextFile,将数据以 纯文本 方式写到 HDFS,我们查看 RDD#saveAsTextFile 可以看到

    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
          .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

    从上面这句话我们可以知道,首先将 RDD 转化为 PariRDD(PariRDD 的数据是 (K,V) 类型的),然后再调用 saveAsHadoopFile 函数进行实际的操作。上面的语句中 r 是原始 RDD,nullWritableClassTag 和 textClassTag 表示所写数据的类型(分别代表 PariRDD 的 K 和 V 的类型),使用 nullWritableClassTag 是因为 HDFS 不会将 PairRDD 的 key 进行实际写入,从效果上看就只写入了 PariRDD 的 V 字段。TextOutputFormat 是一个格式化函数,后面我们再来看这个函数,NullWritable 则表示一个占位符,同样是这个字段不需要实际写入 HDFS,Text 表示我们将写入文本类型的数据。

    我们看到 TextOutputFormat 这个类中有一个函数是 RecordWriter 用于操作没一条记录的写入,代码如下

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
            boolean isCompressed = getCompressOutput(job);
            String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
            if(!isCompressed) {
                Path codecClass1 = FileOutputFormat.getTaskOutputPath(job, name);
                FileSystem codec1 = codecClass1.getFileSystem(job);
                FSDataOutputStream file1 = codec1.create(codecClass1, progress);
                return new TextOutputFormat.LineRecordWriter(file1, keyValueSeparator);
            } else {
                Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
                CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);
                Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
                FileSystem fs = file.getFileSystem(job);
                FSDataOutputStream fileOut = fs.create(file, progress);
                return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
            }
        }

    从上面的代码中我们可以的知道,首先打开需要的文件,获得对应的 Stream,然后直接往里面写数据就行了。接下来我们需要做还有:1)自定义文件名;2)往文件追加数据

    而这两个需求都可以在 RecordWriter 中进行实现。

    对于自定义文件名,重写下面这句话就行了

    Path codecClass1 = FileOutputFormat.getTaskOutputPath(job, name);

    其中 name 就是文件名,我们自定义 name 就 OK 了

    如果希望往文件追加数据的话(不然会有很多小文件):

    我们可以在获取文件流的时候,传入已经存在的文件,然后往里面追加就行了。而且将 creat 函数换成 append 即可,具体参考下面的代码:

    override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[K, V] = {
            val isCompressed: Boolean = FileOutputFormat.getCompressOutput(job)
            val keyValueSeparator: String = job.get("mapreduce.output.textoutputformat.separator", "\t")
            val iname = name + System.currentTimeMillis() / HDFSService.getBatchInteral
            if (!isCompressed) {
                val file: Path = FileOutputFormat.getTaskOutputPath(job, iname)
                val fs: FileSystem = file.getFileSystem(job)
                val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname)
                val fileOut : FSDataOutputStream = if (fs.exists(newFile)) {
                    fs.append(newFile)
                } else {
                    fs.create(file, progress)
                }
                new TextOutputFormat.LineRecordWriter[K, V](fileOut, keyValueSeparator)
            } else {
                val codecClass: Class[_ <: CompressionCodec] = FileOutputFormat.getOutputCompressorClass(job, classOf[GzipCodec])
                // create the named codec
                val codec: CompressionCodec = ReflectionUtils.newInstance(codecClass, job)
                // build the filename including the extension
                val file: Path = FileOutputFormat.getTaskOutputPath(job, iname + codec.getDefaultExtension)
                val fs: FileSystem = file.getFileSystem(job)
                val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname + codec.getDefaultExtension)
    
                val fileOut: FSDataOutputStream = if (fs.exists(newFile)) {
                    fs.append(newFile)
                } else {
                    fs.create(file, progress)
                }
                new TextOutputFormat.LineRecordWriter[K, V](new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator)
            }
        }

     

    您可能也喜欢:

    Spark Streaming 自适应上游 kafka topic partition 数目变化

    Spark Streaming 中使用 zookeeper 保存 offset 并重用

    Spark Streaming 中使用 zookeeper 保存 offset 并重用(二)

    Spark Streaming 从 Kafka 读取 binlog 转换成 Json

    Linux下单用户模式修改错误配置文件
    无觅


沪ICP备19023445号-2号
友情链接