将 kafka 上的数据实时同步到 HDFS,不能有太多小文件
Spark Streaming 支持 RDD#saveAsTextFile,将数据以 纯文本 方式写到 HDFS,我们查看 RDD#saveAsTextFile 可以看到
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
从上面这句话我们可以知道,首先将 RDD 转化为 PariRDD(PariRDD 的数据是 (K,V) 类型的),然后再调用 saveAsHadoopFile 函数进行实际的操作。上面的语句中 r
是原始 RDD,nullWritableClassTag
和 textClassTag
表示所写数据的类型(分别代表 PariRDD 的 K 和 V 的类型),使用 nullWritableClassTag
是因为 HDFS 不会将 PairRDD 的 key 进行实际写入,从效果上看就只写入了 PariRDD 的 V 字段。TextOutputFormat
是一个格式化函数,后面我们再来看这个函数,NullWritable
则表示一个占位符,同样是这个字段不需要实际写入 HDFS,Text
表示我们将写入文本类型的数据。
我们看到 TextOutputFormat
这个类中有一个函数是 RecordWriter
用于操作没一条记录的写入,代码如下
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); if(!isCompressed) { Path codecClass1 = FileOutputFormat.getTaskOutputPath(job, name); FileSystem codec1 = codecClass1.getFileSystem(job); FSDataOutputStream file1 = codec1.create(codecClass1, progress); return new TextOutputFormat.LineRecordWriter(file1, keyValueSeparator); } else { Class codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } }
从上面的代码中我们可以的知道,首先打开需要的文件,获得对应的 Stream,然后直接往里面写数据就行了。接下来我们需要做还有:1)自定义文件名;2)往文件追加数据
而这两个需求都可以在 RecordWriter 中进行实现。
对于自定义文件名,重写下面这句话就行了
Path codecClass1 = FileOutputFormat.getTaskOutputPath(job, name);
其中 name 就是文件名,我们自定义 name 就 OK 了
如果希望往文件追加数据的话(不然会有很多小文件):
我们可以在获取文件流的时候,传入已经存在的文件,然后往里面追加就行了。而且将 creat 函数换成 append 即可,具体参考下面的代码:
override def getRecordWriter(ignored: FileSystem, job: JobConf, name: String, progress: Progressable): RecordWriter[K, V] = { val isCompressed: Boolean = FileOutputFormat.getCompressOutput(job) val keyValueSeparator: String = job.get("mapreduce.output.textoutputformat.separator", "\t") val iname = name + System.currentTimeMillis() / HDFSService.getBatchInteral if (!isCompressed) { val file: Path = FileOutputFormat.getTaskOutputPath(job, iname) val fs: FileSystem = file.getFileSystem(job) val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname) val fileOut : FSDataOutputStream = if (fs.exists(newFile)) { fs.append(newFile) } else { fs.create(file, progress) } new TextOutputFormat.LineRecordWriter[K, V](fileOut, keyValueSeparator) } else { val codecClass: Class[_ <: CompressionCodec] = FileOutputFormat.getOutputCompressorClass(job, classOf[GzipCodec]) // create the named codec val codec: CompressionCodec = ReflectionUtils.newInstance(codecClass, job) // build the filename including the extension val file: Path = FileOutputFormat.getTaskOutputPath(job, iname + codec.getDefaultExtension) val fs: FileSystem = file.getFileSystem(job) val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), iname + codec.getDefaultExtension) val fileOut: FSDataOutputStream = if (fs.exists(newFile)) { fs.append(newFile) } else { fs.create(file, progress) } new TextOutputFormat.LineRecordWriter[K, V](new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator) } }