SparkSQL语法用法介绍(SQL从入门到精通)

一、基础语法

SparkSQL是一种基于SQL语法的分布式数据处理引擎,它允许我们在Spark上使用SQL语句进行数据处理和查询。在进行数据处理时,我们需要首先创建SparkSession对象,其中包含了数据源的配置信息以及上下文环境信息。接下来,我们可以使用SQL语句进行数据处理。以下是一个基础的SparkSQL语法示例。

from pyspark.sql import SparkSession

# 创建SparkSession对象
spark = SparkSession.builder.appName("Basics").getOrCreate()

# 加载数据集
df = spark.read.format("csv").option("header", "true").load("data.csv")

# 执行SQL语句
df.createOrReplaceTempView("data")
result = spark.sql("SELECT * FROM data where age > 21")

result.show()

这个例子中,我们创建了一个SparkSession对象,并使用read()方法加载了一个csv文件,并将数据集data注册为一个临时表,然后使用SQL语句查询满足条件(age > 21)的数据并输出结果。

二、数据类型

在SparkSQL中,支持多种数据类型,包括数值型、字符串型、日期型等。下面是一些基本的数据类型及其示例。

1. 数值型

在SparkSQL中,常用的数值类型有整型、长整型、浮点型等。以下是一些示例。

from pyspark.sql.functions import col

# 创建DataFrame
df = spark.createDataFrame([(1, 2.2, "a"), (2, 3.3, "b"), (3, 4.4, "c")], ["int", "float", "string"])

# 使用col函数进行数据处理
df.select(col("int") + 1, col("float") * 2).show()

这个例子中,我们创建了一个DataFrame对象,并使用col()函数对数据进行处理。col()函数用于获取DataFrame中指定列的数据,然后使用算术运算符对列进行计算。

2. 字符串型

SparkSQL中的字符串类型常用于对文本数据的处理。以下是一个例子。

# 创建DataFrame
df = spark.createDataFrame([("Alice", 25), ("Bob", 30), ("Charlie", 35)], ["name", "age"])

# 使用SQL语句进行处理
df.createOrReplaceTempView("people")
result = spark.sql("SELECT concat(name, age) as info FROM people")

result.show()

这个例子中,我们定义了一个数据集people,其中包含了姓名和年龄两个字段。然后使用concat()函数将姓名和年龄的字符串拼接在一起,作为新的字段info,并输出结果。

3. 日期型

SparkSQL中的日期类型通常用于具有时间戳的数据的处理。以下是一个示例。

from pyspark.sql.functions import to_date

# 创建DataFrame
df = spark.createDataFrame([(1, "2021-01-01"), (2, "2021-02-28"), (3, "2021-03-31")], ["id", "date"])

# 使用to_date函数进行数据清洗
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

df.show()

这个例子中,我们创建了一个DataFrame对象,并使用to_date()函数对字段“date”进行清洗。to_date()函数将字符串日期转换为日期格式,并将其插入到新的列中。

三、函数处理

SparkSQL中包含多种函数,包括字符串函数、数学函数、日期函数等等。以下是一些常用的函数及其示例。

1. 字符串函数

SparkSQL中的字符串函数用于对字符串数据进行操作。以下是一些示例。

from pyspark.sql.functions import initcap, lower, upper

# 创建DataFrame
df = spark.createDataFrame([("baseball bat", "FOOTBALL"), ("basketball", "Tennis Ball"), ("cricket bat", "GOLF BALL")], ["col1", "col2"])

# 使用字符串函数进行处理
df = df.withColumn("col1", initcap(col("col1")))
df = df.withColumn("col2", lower(col("col2")))
df = df.withColumn("col2", upper(col("col2")))

df.show()

这个例子中,我们创建了一个DataFrame对象,并使用一些字符串函数对其中的字段进行处理。initcap()函数将字符串的首字母大写,lower()函数将字符串转换为小写,upper()函数将字符串转换为大写。

2. 数学函数

SparkSQL中的数学函数用于对数字数据进行操作。以下是一些示例。

from pyspark.sql.functions import abs, ceil, floor, round

# 创建DataFrame
df = spark.createDataFrame([(-1.5, 2.3), (3.4, -5.5), (6.7, 2.8)], ["col1", "col2"])

# 使用数学函数进行处理
df = df.withColumn("col1", abs(col("col1")))
df = df.withColumn("col1", ceil(col("col1")))
df = df.withColumn("col2", floor(col("col2")))
df = df.withColumn("col2", round(col("col2"), 1))

df.show()

这个例子中,我们创建了一个DataFrame对象,并使用一些数学函数对其中的字段进行处理。abs()函数返回数字的绝对值,ceil()函数返回数字的上限,floor()函数返回数字的下限,round()函数用于对数字进行四舍五入。

3. 日期函数

SparkSQL中的日期函数用于对日期数据进行操作。以下是一些示例。

from pyspark.sql.functions import add_months, month, year

# 创建DataFrame
df = spark.createDataFrame([("2020-07-01", 5), ("2022-01-01", 9), ("2020-06-01", 12)], ["date", "months"])

# 使用日期函数进行处理
df = df.withColumn("date", add_months(col("date"), col("months")))
df = df.withColumn("month", month(col("date")))
df = df.withColumn("year", year(col("date")))

df.show()

这个例子中,我们创建了一个DataFrame对象,并使用一些日期函数对其中的字段进行处理。add_months()函数用于将给定的日期加上指定个月,month()函数用于提取日期的月份,year()函数用于提取日期的年份。

四、数据聚合

SparkSQL中的聚合函数用于对数据集进行汇总操作,例如计算平均值、求和等等。以下是一些示例。

1. 平均值

SparkSQL中的平均值函数用于计算数据集中某一列的平均值。以下是一个示例。

# 创建DataFrame
df = spark.createDataFrame([(1, 2), (2, 3), (3, 4)], ["id", "value"])

# 使用聚合函数计算平均值
result = df.selectExpr("avg(value)").collect()

print(result)

这个例子中,我们创建了一个DataFrame对象,并使用avg()函数计算了其中“value”列的平均值。

2. 求和

SparkSQL中的求和函数用于计算数据集中某一列的总和。以下是一个示例。

# 创建DataFrame
df = spark.createDataFrame([(1, 2), (2, 3), (3, 4)], ["id", "value"])

# 使用聚合函数计算总和
result = df.selectExpr("sum(value)").collect()

print(result)

这个例子中,我们创建了一个DataFrame对象,并使用sum()函数计算了其中“value”列的总和。

3. 分组统计

SparkSQL中的分组统计函数用于对数据集进行分组汇总操作。以下是一个示例。

# 创建DataFrame
df = spark.createDataFrame([(1, "a", 10), (2, "a", 20), (3, "b", 15), (4, "b", 25)], ["id", "group", "value"])

# 使用聚合函数进行分组统计
result = df.groupBy("group").agg({"value": "sum", "id": "count"})

result.show()

这个例子中,我们创建了一个DataFrame对象,并使用groupBy()函数对其中的“group”列进行分组操作,然后使用agg()函数对多个聚合函数进行应用。

五、窗口函数

SparkSQL中的窗口函数用于对数据集中的数据按照指定窗口和排序方式进行分组操作。以下是一些常用的示例。

1. 排名

SparkSQL中的排名函数用于对数据集中的数据按照指定列排序,并为每个分组生成对应的排名。以下是一个示例。

from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# 创建DataFrame
df = spark.createDataFrame([(1, "a", 10), (2, "a", 20), (3, "b", 15), (4, "b", 25)], ["id", "group", "value"])

# 使用窗口函数进行排名
windowSpec = Window.partitionBy("group").orderBy(col("value").desc())
result = df.select(col("id"), col("group"), col("value"), rank().over(windowSpec).alias("rank"))

result.show()

这个例子中,我们创建了一个DataFrame对象,并使用窗口函数对其中的“value”列按照分组并按照降序排序,并为每个分组的数据生成对应的排名。

2. 移动平均

SparkSQL中的移动平均函数用于对数据集中的数据进行一定范围内的平均值计算。以下是一个示例。

# 创建DataFrame
df = spark.createDataFrame([(1, 1), (2, 3), (3, 5), (4, 7), (5, 9)], ["id", "value"])

# 使用窗口函数进行移动平均
windowSpec = Window.orderBy(col("id")).rowsBetween(-1, 1)
result = df.select(col("id"), col("value"), avg(col("value")).over(windowSpec).alias("moving_avg"))

result.show()

这个例子中,我们创建了一个DataFrame对象,并使用窗口函数对其中的“value”列进行-1到1行的移动平均计算。

3. 区间查询

SparkSQL中的区间查询函数用于对数据集中的数据在指定的范围内进行查询。以下是一个示例。

# 创建DataFrame
df = spark.createDataFrame([(1, "2019-01-01"), (2, "2019-02-01"), (3, "2019-03-01"), (4, "2019-04-01"), (5, "2019-05-01")], ["id", "time"])

# 使用窗口函数进行区间查询
windowSpec = Window.orderBy(col("time")).rangeBetween(-30, 30)
result = df.select(col("id"), col("time"), avg(col("id")).over(windowSpec).alias("moving_avg"))

result.show()

这个例子中,我们创建了一个DataFrame对象,并使用窗口函数对其中的“time”列进行-30到30天的区间查询,并计算对应区间的数据平均值。

六、高级操作

SparkSQL中还有许多高级操作,例如多表联合查询、窗口分组、复杂数据类型处理等等。以下是一些示例。

1. 多表联合查询

在SparkSQL中,可以使用join操作实现多个表的联合查询。以下是一个示例。

# 创建DataFrame
dept = spark.createDataFrame([(1, "HR"), (2, "Finance"), (

Published by

风君子

独自遨游何稽首 揭天掀地慰生平