spark如何从数据库提取数据 SparkSQL通用数据源加载(1)

SparkSQL能用数据加载(load)和保存(save)

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中

一、数据加载(Load)

在SparkSQL中,默认的数据源是parquet, 除非配置了spark.sql.sources.default选项为其它格式的数据源。

//SparkSQL中读取时并没有指定数据源加载的option选项,默认读取的数据格式是parquet //由配置项spark.sql.sources.default选项配置 val sanguoDF: DataFrame = spark.read.load("./dataset/sanguo.parquet")

除了SparkSQL默认提供的parquet, 还可以手动指定数据源的选项。通常,要加载的数据源需要使用完全限定名(fully qualified name), 例如org.apche.spark.sql.parquet,但是SparkSQL提供了内置的源,可以使用它们的短名称(shortnames), 例如,json, csv, jdbc, orc, parquet, text, libsvm等。

# 注:这晨的filename.json不一定是json文件,也可能是存放json格式文件的目录 val df: DataFrame = spark.read.format("json").load("path/filename.json")

// 注:people.csv不一定是csv文件,也可以是存放csv文件的目录 val peopleDFCsv = spark.read.format("csv") .option("sep", ",") //指定分隔符 .option("inferSchema", "true") //推断schema信息 .option("header", "true") //是否读取csv头部 .load("examples/src/main/resources/people.csv") //csv文件所在的路径

//load这个方法做了哪些事情呢? //1.会和数据库建立连接,读取元数据信息 //2.调用load时并没读取数据 val employeeDF: DataFrame = spark.read.format("jdbc") .option("url", "jdbc:mysql://node5:3306/spark") // MySQL数据库的URL .option("driver", "com.mysql.jdbc.Driver") //MySQL的jdbc区动 .option("dbtable", "employee") //需要读取的表名 .option("user", "root") //MySQL数据库用户名 .option("password", "XXXXXXXX") //MySQL数据库的密码 .load()

SparkSQL不仅可以使用数据读取的API将文件加载到DataFrame进行查询,也可以直接使用SQL查询文件,例如:

//也可以直接使用SQL查询 val sanguoSQLDF = spark.sql("SELECT * FROM parquet.`./dataset/parquet/sanguo.parquet`") sanguoSQLDF.show()

spark如何从数据库提取数据 SparkSQL通用数据源加载(2)

直接在文件上SQL查询

二、数据保存(Save)

Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。

spark如何从数据库提取数据 SparkSQL通用数据源加载(3)

SaveMode几种模式的区别

// 1.保存为parquet文件 //sanguoSQLDF.write.mode(SaveMode.Append).parquet("./dataset/output/sanguo.parquet") sanguoSQLDF.write.format("parquet").mode(SaveMode.Append).save("./dataset/output/sanguo.parquet")

//2.保存为json文件 // sanguoSQLDF.write.mode(SaveMode.Overwrite).json("./dataset/output/sanguo.json") sanguoSQLDF.write.format("json").mode(SaveMode.Overwrite).save("./dataset/output/sanguo.json")

//3.保存为csv文件 // sanguoSQLDF.write.mode(SaveMode.Ignore).csv("./dataset/output/sanguo.csv") sanguoSQLDF.write.format("csv").mode(SaveMode.Ignore).save("./dataset/output/sanguo.csv")

//4.保存为jdbc数据库,以MySQL为例 // sanguoSQLDF.write.mode(SaveMode.ErrorIfExists) // .option("driver","com.mysql.jdbc.Driver") // .option("url","jdbc:mysql://node5:3306/spark") // .option("dbtable","t_sanguo") // .option("user","root") // .option("password","Love=-.,me1314") // .save() sanguoSQLDF.write.format("jdbc") .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://node5:3306/spark") .option("dbtable","t_sanguo") .option("user","root") .option("password","Love=-.,me1314") .mode(SaveMode.Overwrite) .save()

package com.dvtn.spark.sql.load import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object SparkSQLGenericLoad { def main(args: Array[String]): Unit = { //创建SparkSQL通往集群的切入点 val spark: SparkSession = SparkSession.builder() .appName(SparkSQLGenericLoad.getClass.getSimpleName) .master("local") .getOrCreate() //设置log的运行级别为"WARN" spark.sparkContext.setLogLevel("WARN") /** * 默认加载数据源格式:parquet * 在SparkSQL中,默认的加载的数据是parquet * 由配置项spark.sql.sources.default选项配置 */ //SparkSQL中读取时并没有指定数据源加载的option选项,默认读取的数据格式是parquet //由配置项spark.sql.sources.default选项配置 //val sanguoDF: DataFrame = spark.read.load("./dataset/parquet/sanguo.parquet") //sanguoDF.show() println("----------------------------------------------------------------------------") //也可以直接使用SQL查询 val sanguoSQLDF = spark.sql("SELECT * FROM parquet.`./dataset/parquet/sanguo.parquet`") /** * 数据的保存 */ // // 1.保存为parquet文件 // //sanguoSQLDF.write.mode(SaveMode.Append).parquet("./dataset/output/sanguo.parquet") // sanguoSQLDF.write.format("parquet").mode(SaveMode.Append).save("./dataset/output/sanguo.parquet") // // //2.保存为json文件 // // sanguoSQLDF.write.mode(SaveMode.Overwrite).json("./dataset/output/sanguo.json") // sanguoSQLDF.write.format("json").mode(SaveMode.Overwrite).save("./dataset/output/sanguo.json") // // //3.保存为csv文件 // // sanguoSQLDF.write.mode(SaveMode.Ignore).csv("./dataset/output/sanguo.csv") // sanguoSQLDF.write.format("csv").mode(SaveMode.Ignore).save("./dataset/output/sanguo.csv") //4.保存为jdbc数据库,以MySQL为例 // sanguoSQLDF.write.mode(SaveMode.ErrorIfExists) // .option("driver","com.mysql.jdbc.Driver") // .option("url","jdbc:mysql://node5:3306/spark") // .option("dbtable","t_sanguo") // .option("user","root") // .option("password","Love=-.,me1314") // .save() sanguoSQLDF.write.format("jdbc") .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://node5:3306/spark") .option("dbtable","t_sanguo") .option("user","root") .option("password","Love=-.,me1314") .mode(SaveMode.ErrorIfExists) .save() //释放资源 spark.stop() } }

spark如何从数据库提取数据 SparkSQL通用数据源加载(4)

SaveMode.ErrorIfExists

spark如何从数据库提取数据 SparkSQL通用数据源加载(5)

SaveMode.ErrorIfExists表明如果数据不存在会保存数据,如果数据存在,会报错

spark如何从数据库提取数据 SparkSQL通用数据源加载(6)

spark如何从数据库提取数据 SparkSQL通用数据源加载(7)

SaveMode.Overwrite会覆盖原表数据

spark如何从数据库提取数据 SparkSQL通用数据源加载(8)

MySQL现有表t_sanguo的记录

spark如何从数据库提取数据 SparkSQL通用数据源加载(9)

SaveMode.Append会在原有数据下面追加写入

spark如何从数据库提取数据 SparkSQL通用数据源加载(10)

原表数据

后话

本文所有文章都是原创,相当于个人在学习时整理的笔记分享到大家,欢迎大家阅读,如转载请标明出处。

如您觉得本人写的东西对您还有所帮助,请给予关注或点个赞,送人玫瑰,留有余香。我相信在您的鼓励下,本人会在后面将所有精华干货都慢慢分享出来。

本人在此非常感谢大家的理解和支持!

,