一、文件加载1. spark.read.load

是加载数据的通用方法, 默认 加载和保存的是 parquet 格式文件

read可读格式

spark sql 分析外部文件(SparkSQL的文件加载和保存)(1)

2. spark.read.format("…")[.option("…")].load("…")

也可以直接在文件上进行查询: 文件格式 .` 文件路径 `

spark.sql("select * from json.`文件路径`").show

二、保存数据1.df.write.save 保存数据的通用方法

spark sql 分析外部文件(SparkSQL的文件加载和保存)(2)

保存不同格式的数据,可以对不同的数据格式进行设定

2.df.write.format("…")[.option("…")].save("…")

其中SaveMode 是一个枚举类,其中的常量包括:

Scala/Java

Any Language

Meaning

SaveMode.ErrorIfExists(default)

"error"(default)

如果文件已经存在则抛出异常

SaveMode.Append

"append"

如果文件已经存在则追加

SaveMode.Overwrite

"overwrite"

如果文件已经存在则覆盖

SaveMode.Ignore

"ignore"

如果文件已经存在则忽略

例如:

df.write.mode("append").json("output")

三、特殊数据源的加载和保存1.MySQL1)加载数据(需要导入MySQL连接依赖)

导入依赖

<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>

案例实操:

val conf: SparkConf = newSparkConf().setMaster("local[*]").setAppName("SparkSQL") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ //方式 1:通用的 load 方法读取 spark.read.format("jdbc") .option("url", "jdbc:mysql://node1:3306/sparkSql") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123123") .option("dbtable", "user") .load().show //方式 2:通用的 load 方法读取 参数另一种形式 spark.read.format("jdbc") .options(Map("url"->"jdbc:mysql://node1:3306/sparkSql?user=root&password= 123123", "dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show //方式 3:使用 jdbc 方法读取 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123123") val df: DataFrame = spark.read.jdbc("jdbc:mysql://node1:3306/sparkSql", "user", props) df.show //释放资源spark.stop()

2)保存数据

case class User2(name: String, age: Long) val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL") //创建 SparkSession 对象 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() import spark.implicits._ val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30))) val ds: Dataset[User2] = rdd.toDS //方式 1:通用的方式 format 指定写出类型 ds.write .format("jdbc") .option("url", "jdbc:mysql://node1:3306/sparkSql") .option("user", "root") .option("password", "123123") .option("dbtable", "user") .mode(SaveMode.Append) .save() //方式 2:通过 jdbc 方法 val props: Properties = new Properties() props.setProperty("user", "root") props.setProperty("password", "123123") ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://node1:3306/sparkSql", "user", props) //释放资源spark.stop()

2.hive1)内嵌hive

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可(使用 spark.sql() 方式).

Hive 的元数据存储在 derby 中, 默认仓库地址: $SPARK_HOME/spark-warehouse

scala> spark.sql("show tables").show scala> spark.sql("create table aa(id int)") scala> spark.sql("show tables").show //向表加载本地数据 scala> spark.sql("load data local inpath 'input/ids.txt' into table aa") scala> spark.sql("select * from aa").show

2)外部的 HIVE连接外部hive步骤:

连接后和内嵌hive操作相同

3)运行 Spark SQL CLI

Spark SQL CLI 可以很方便的在本地运行Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI,直接执行 SQL 语句,类似一Hive 窗口

bin/spark-sql

4)运行Spark beeline

Spark Thrift Server 是Spark 社区基于HiveServer2 实现的一个Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和HiveServer2 完全一致,因此我们部署好 Spark Thrift Server 后,可以直接使用hive 的 beeline 访问Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代HiveServer2,因此它依旧可以和 Hive Metastore 进行交互,获取到hive 的元数据。

如果想连接Thrift Server,需要通过以下几个步骤:

bin/beeline -u jdbc:hive2:// node1 :10000 -n root

5)代码操作 Hive

1)导入依赖

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>

2)将hive-site.xml 文件拷贝到项目的 resources 目录中,案列如下:

//创建 SparkSession val spark: SparkSession = SparkSession .builder() .enableHiveSupport() .master("local[*]") .appName("sql") .getOrCreate()

注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址 : config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")

若出现以下情况

spark sql 分析外部文件(SparkSQL的文件加载和保存)(3)

可以代码 最前面 增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "root")

此处的 root 改为自己的 hadoop 用户名称

来源:https://blog.csdn.net/m0_46782746/article/details/126769601

,