数据集
- 下载链接:cal_housing
- 数据集格式
- longitude:区域中心的纬度
- latitude:区域中心的经度
- housingMedianAge:区域内所有房屋年龄的中位数
- totalRooms:区域内总房间数
- totalBedrooms:区域内总卧室数
- population:区域内总人口数
- households:区域内总家庭数
- medianIncome:区域内人均收入中位数
- medianHouseValue:区域房价的中位数
- 前面8个属性都可能对房价有影响,假设影响是线性的,可以得到类似的公式
A=bB+cC+...iI
,A代表房价,B~I代表属性
数据清洗
创建RDD
把房屋信息数据和每个属性的定义读入到Spark,并创建两个相应的RDD
1 | from pyspark.sql import SparkSession |
collect + take
1 | # collect函数会把所有数据都加载到内存中,常用方法是用take函数去只读取RDD中的某几个元素 |
map
用SparkContext.textFile函数创建的RDD,每个数据都是一个大字符串,各个属性用逗号分隔,用map函数把大字符串分隔成数组
1 | rdd = rdd.map(lambda line: line.split(",")) |
doDF
Spark SQL的DataFrame API在查询结构化数据时更加方便,且性能更好,先把RDD转换为DataFrame
1 | from pyspark.sql import Row |
1 | df.show() |
cast
每一列的数据格式都是string,通过cast()函数把每一列的类型转换成float
1 | from pyspark.sql.types import FloatType |
1 | # 转换成数字有很多优势,例如可以统计出所有建造年限各有多少个房子 |
预处理
- 房价的值普遍都很大,需要把它们调整成相对较小的数字
- 有的属性没太大意义,例如区域内的总房间数和总卧室数,更应该关心的是平均房间数
- 房价是结果,其他属性是输入参数,需要把它们分离处理
- 有的属性最大值和最小值范围很大,需要把它们标准化处理
调小房价
大部分房价都是10万起
1 | from pyspark.sql import functions |
添加新列
- 每个家庭的平均房间数:roomsPerHousehold
- 每个家庭的平均人数:populationPerHousehold
- 卧室在总房间的占比:bedroomsPerRoom
1 | df = df.withColumn("roomsPerHousehold", functions.col("totalRooms") / functions.col("households")) \ |
筛选列
去除没有太大价值的列,例如经纬度,保留有价值的列
1 | df = df.select("medianHouseValue", |
分离处理
先把DataFrame转换到RDD,然后用map函数把每个对象分成两部分,最后再转换回DataFrame
1 | from pyspark.ml.linalg import DenseVector |
标准化
数据的标准化,可以借助Spark ML来完成,增加了features_scaled列,里面每个数据都是标准化过的,用于训练模型
1 | from pyspark.ml.feature import StandardScaler |
1 | [Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264]))] |
创建模型
- 把数据集分成训练集和测试集,训练集用来训练模型,测试集用来评估模型的正确性
- DataFrame的randomSplit函数很容易将数据随机分割,将80%的数据用于训练,20%的数据用于测试
- Spark ML提供的LinearRegression功能,很容易构建一个线性回归模型
1 | from pyspark.ml.regression import LinearRegression |
模型评估
可以用linearModel的transform函数来预测测试集中的房价,与真实情况进行对比
1 | predicted = linearModel.transform(test_data) |