7.load & save在 Spark 中,load 函数用于从外部数据源读取数据并创建 DataFrame , 而 save 函数用于将 DataFrame 保存到外部数据源 。
下面是从 Parquet 文件中读取数据并创建 DataFrame 的示例代码:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()val df = spark.read.load("path/to/parquet/file")df.show()下面是将 DataFrame 保存到 Parquet 文件的示例代码:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.write.save("path/to/parquet/file")8.函数Spark SQL 提供了丰富的内置函数,包括数学函数、字符串函数、日期时间函数、聚合函数等 。你可以在 Spark SQL 的官方文档中查看所有可用的内置函数 。
此外,Spark SQL 还支持「自定义函数(User-Defined Function,UDF)」,可以让用户编写自己的函数并在查询中使用 。
下面是一个使用 SQL 语法编写自定义函数的示例代码:
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.functions.udfval spark = SparkSession.builder.appName("UDF Example").getOrCreate()import spark.implicits._val df = Seq(("Alice", 25),("Bob", 30),("Charlie", 35)).toDF("name", "age")df.createOrReplaceTempView("people")val square = udf((x: Int) => x * x)spark.udf.register("square", square)spark.sql("SELECT name, square(age) FROM people").show()在这个示例中,我们首先定义了一个名为 square 的自定义函数,它接受一个整数参数并返回它的平方 。然后 , 我们使用 createOrReplaceTempView 方法创建一个临时视图 , 并使用 udf.register 方法注册自定义函数 。
最后,我们使用 spark.sql 方法执行 SQL 查询,并在查询中调用自定义函数 。
9.DataSetDataSet 是 Spark 1.6 版本中引入的一种新的数据结构,它提供了 RDD 的强类型和 DataFrame 的查询优化能力 。
10.创建DataSet在 Scala 中,可以通过以下几种方式创建 DataSet:
从现有的 RDD 转换而来 。例如:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Create DataSet").getOrCreate()import spark.implicits._case class Person(name: String, age: Int)val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))val ds = rdd.toDS()ds.show()从外部数据源读取 。例如,从 JSON 文件中读取数据并创建 DataSet:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Create DataSet").getOrCreate()import spark.implicits._case class Person(name: String, age: Long)val ds = spark.read.json("path/to/json/file").as[Person]ds.show()通过编程方式创建 。例如,使用 createDataset 方法:
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("Create DataSet").getOrCreate()import spark.implicits._case class Person(name: String, age: Int)val data = https://www.isolves.com/it/cxkf/kj/2023-10-17/Seq(Person("Alice", 25), Person("Bob", 30))val ds = spark.createDataset(data)ds.show()11.DataSet VS DataFrameDataSet 和 DataFrame 都是 Spark 中用于处理结构化数据的数据结构 。它们都提供了丰富的操作,包括筛选、聚合、分组、排序等 。
它们之间的主要区别在于类型安全性 。DataFrame 是一种弱类型的数据结构,它的列只有在运行时才能确定类型 。这意味着,在编译时无法检测到类型错误,只有在运行时才会抛出异常 。
而 DataSet 是一种强类型的数据结构,它的类型在编译时就已经确定 。这意味着,如果你试图对一个不存在的列进行操作,或者对一个列进行错误的类型转换,编译器就会报错 。
此外,DataSet 还提供了一些额外的操作,例如 map、flatMap、reduce 等 。
12.RDD & DataFrame & Dataset 转化RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换 。
DataFrame/Dataset 转 RDD:
val rdd1=testDF.rddval rdd2=testDS.rddRDD 转 DataSet:
import spark.implicits._case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型val testDS = rdd.map {line=>Coltest(line._1,line._2)}.toDS可以注意到,定义每一行的类型(case class)时 , 已经给出了字段名和类型 , 后面只要往case class里面添加值即可 。
推荐阅读
- 烤箱新手教程 烤箱入门窍门
- 指南针的作用和意义 指南针的作用意义是什么
- 精致女生减肥指南,20个习惯,做到一半,就能再瘦70年
- 微波炉怎么选购指南视频 微波炉怎么选购指南
- 指南针怎么看 指南针怎么看海拔
- 抖音产业带服务商入驻详细指南!
- 春节社交礼仪 春节社交礼仪指南
- 中国早期的指南针也被称为什么?A 中国早期的指南针也被称为什么
- Tomcat目录结构详解:从新手到专家的指南
- “十一”长假哪天最堵?这份“避堵”指南请收好
