在平时使用spark开发中有时我们会有这样一个需求,在Spark读取数据时我们还需要获取到数据的文件名和父目录,比如目录中含有某个关键字数据使用\t分割,否则使用逗号分割等,今天小编就来聊一聊关于spark将数据写入text文件?接下来我们就一起去研究一下吧!

spark将数据写入text文件(Spark技巧之读取输入数据的文件名和目录)

spark将数据写入text文件

在平时使用spark开发中有时我们会有这样一个需求,在Spark读取数据时我们还需要获取到数据的文件名和父目录,比如目录中含有某个关键字数据使用\t分割,否则使用逗号分割等。

下面就介绍一下实现方式:

object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(this.getClass().getSimpleName().filter(!_.equals('$'))) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array( classOf[Array[String]], classOf[util.LinkedHashMap[String, Integer]], classOf[DoubleWritable], classOf[LongWritable], classOf[IntWritable], classOf[mutable.HashMap[Long, Set[(Int, Int, Long, String)]]] )) val sc = new SparkContext(conf) val input = "D:\\test\\dirs\\*.gz"//args(0) val fileRDD = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](input) val hadooprdd = fileRDD.asInstanceOf[NewHadoopRDD[LongWritable, Text]] val rdd = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, iter) =>{ val fileSplit = inputSplit.asInstanceOf[FileSplit] iter.map(x =>{ fileSplit.getPath.toString "|" fileSplit.getPath.getName "|" x._2.toString }) }) rdd.foreach(println) } }

我们使用SparkContext的newAPIHadoopFile方法,并制定key和value的类型,并将返回的rdd类型强制为NewHadoopRDD,接着调用mapPartitionsWithInputSplit方法,该方法有两个参数,第一个参数inputSplit就是分片信息,iter是迭代器,代表了数据。

输出:

file:/D:/test/dirs/1.txt.gz|1.txt.gz|1 file:/D:/test/dirs/1.txt.gz|1.txt.gz|11 file:/D:/test/dirs/1.txt.gz|1.txt.gz|111 file:/D:/test/dirs/2.txt.gz|2.txt.gz|2 file:/D:/test/dirs/2.txt.gz|2.txt.gz|22

感兴趣的朋友可以点个赞,加个关注,共同学习进步!

,