使用spark streaming使用snappy压缩保存数据到HDFS中
工作中需要将从Kafka中的数据保存到HDFS中去,并且需要使用snappy压缩话不多说,直接上代码/*** 自定义多目录写与追加写,采用snappy压缩* @author demon* @version 2019/05/05*/class AppendTextOutputFormat extends TextOutputFormat[Any, Any] {...
·
工作中需要将从Kafka中的数据保存到HDFS中去,并且需要使用snappy压缩
话不多说,直接上代码
/**
* 自定义多目录写与追加写,采用snappy压缩
* @author demon
* @version 2019/05/05
*/
class AppendTextOutputFormat extends TextOutputFormat[Any, Any] {
override def getRecordWriter(ignored: FileSystem, job: JobConf, iname: String, progress: Progressable): RecordWriter[Any, Any] = {
val isCompressed: Boolean = FileOutputFormat.getCompressOutput(job)
val keyValueSeparator: String = job.get("mapreduce.output.textoutputformat.separator", "\t")
//自定义输出文件名
val name = job.get("filename",iname)
if (!isCompressed) {
val file: Path = FileOutputFormat.getTaskOutputPath(job, name)
val fs: FileSystem = file.getFileSystem(job)
val newFile : Path = new Path(FileOutputFormat.getOutputPath(job), name)
val fileOut : FSDataOutputStream = if (fs.exists(newFile)) {
//存在,追加写
fs.append(newFile)
} else {
fs.create(file, progress)
}
new TextOutputFormat.LineRecordWriter[Any, Any](fileOut, keyValueSeparator)
} else {
val codecClass: Class[_ <: CompressionCodec] = FileOutputFormat.getOutputCompressorClass(job, classOf[GzipCodec])
// create the named codec
val codec: CompressionCodec = ReflectionUtils.newInstance(codecClass, job)
// build the filename including the extension
val file: Path = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension)
val fs: FileSystem = file.getFileSystem(job)
val newFile: Path = new Path(FileOutputFormat.getOutputPath(job), name + codec.getDefaultExtension)
val fileOut: FSDataOutputStream = if (fs.exists(newFile)) {
//存在,追加写
fs.append(newFile)
} else {
fs.create(file, progress)
}
new TextOutputFormat.LineRecordWriter[Any, Any](new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator)
}
}
}
class RDDMultipleAppendTextOutputFormat extends MultipleOutputFormat[Any, Any]{
private var theTextOutputFormat: AppendTextOutputFormat = null
//产生分区目录
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
//TODO 分区目录
}
//追加写
override def getBaseRecordWriter(fs: FileSystem, job: JobConf, name: String, arg3: Progressable): RecordWriter[Any, Any] = {
if (this.theTextOutputFormat == null) {
this.theTextOutputFormat = new AppendTextOutputFormat()
}
this.theTextOutputFormat.getRecordWriter(fs, job, name, arg3)
}
//key重置为空
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
}
使用方法:
/**
* 操作HDFS工具类
* @author demon
* @version 2019/03/29
*/
object HdfsOperationUtil {
/**
* 保存数据到HDFS上
* @param rdd rdd
* @param path 保存路径
*/
def saveToHDFS(rdd: RDD[(String, String)], path: String) : Unit ={
// if (!rdd.isEmpty())
// rdd.saveAsTextFile(path)
val job = new JobConf()
job.set("mapred.output.compress", "true")
job.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")
rdd.saveAsHadoopFile(path,
classOf[Text], classOf[Text], classOf[RDDMultipleAppendTextOutputFormat], job)
}
/**
* 从hdfs上删除数据
* @param path 删除的路径
*/
def deleteToHDFS(path: String): Unit = {
// 删除输出目录
val output = new Path(path)
val hdfs = org.apache.hadoop.fs.FileSystem.get(
new java.net.URI(PropertiesUtil.getPropertiesToStr("hdfs.hosts")), new org.apache.hadoop.conf.Configuration())
if (hdfs.exists(output)) hdfs.delete(output, true)
}
}
更多推荐
所有评论(0)