欢迎访问宙启技术站
智能推送

pyspark.sql中如何处理缺失值和异常值

发布时间:2023-12-18 23:43:19

在pyspark.sql中,处理缺失值和异常值可以通过使用DataFrame API和SQL查询语言来实现。下面是处理缺失值和异常值的一些常用方法,每种方法都包含一个使用例子。

1. 删除缺失值和异常值:

可以使用dropna函数删除包含缺失值的行或列。默认情况下,该函数会删除包含任何缺失值的行,也可以通过参数how指定删除缺失值的条件。

# 创建一个包含缺失值和异常值的DataFrame
data = [("Alice", 25, 160, 50),
        ("Bob", None, 175, 80),
        ("Charlie", 30, None, 70),
        ("Dave", 35, 180, None)]
df = spark.createDataFrame(data, ["Name", "Age", "Height", "Weight"])

# 删除包含任何缺失值的行
df.dropna().show()

# 删除包含所有缺失值的行
df.dropna(how="all").show()

# 删除包含缺失值的列
df.dropna(subset=["Age", "Height"]).show()

2. 填充缺失值:

可以使用fillna函数将缺失值替换为指定的值或使用不同的填充策略。

# 填充所有缺失值为0
df.fillna(0).show()

# 填充Age列的缺失值为平均值
mean_age = df.select(avg("Age")).head()[0]
df.fillna(mean_age, subset=["Age"]).show()

# 填充Weight列的缺失值为中位数
median_weight = df.approxQuantile("Weight", [0.5], 0.25)[0]
df.fillna(median_weight, subset=["Weight"]).show()

3. 检测并替换异常值:

可以使用DataFrame API和SQL查询语言来检测并替换异常值。

from pyspark.sql.functions import col

# 使用DataFrame API检测并替换Weight列的异常值
mean_weight = df.select(mean("Weight")).head()[0]
std_dev_weight = df.select(stddev("Weight")).head()[0]
df = df.withColumn("Weight", when((col("Weight") < mean_weight - 2 * std_dev_weight) | (col("Weight") > mean_weight + 2 * std_dev_weight), mean_weight).otherwise(col("Weight")))

# 使用SQL查询语言检测并替换Height列的异常值
df.createOrReplaceTempView("data")
df = spark.sql("SELECT Name, Age, CASE WHEN Height < 100 THEN 100 WHEN Height > 200 THEN 200 ELSE Height END AS Height, Weight FROM data")
df.show()

以上是pyspark.sql中处理缺失值和异常值的一些常用方法和使用例子。根据实际情况,你可以选择合适的方法来处理你的数据集中的缺失值和异常值。