MLlib 机器LearningBasics
1. MLlib overview
MLlib is Spark 机器Learninglibrary, providing了丰富 机器Learningalgorithms and tool, 用于processinglarge-scaledata集. MLlibsupport many 种机器Learningtask, includingclassification, 回归, 聚class, 协同filter, 降维etc., 同时providing了data预processing, 特征工程, modelassessmentetc.tool.
MLlib 主要特点including:
- distributed计算: 基于Spark distributed计算framework, 可以processinglarge-scaledata集.
- 易用性: providing了简洁 API, supportScala, Java, Python and Rprogramminglanguage.
- 集成性: and Spark othermodule (such asSpark SQL, Spark Streaming) 无缝集成.
- high performance: throughmemory计算 and optimization algorithmsimplementation, providing了 high performance 机器Learningcapacity.
- 全面 algorithmslibrary: package含了common 机器Learningalgorithms, 覆盖了监督Learning, 无监督Learning, 半监督Learning and other fields.
MLlibproviding了两种API风格:
- 基于RDD API: MLlib 传统API, 基于RDDabstraction, 适用于需要 low 级控制 场景.
- 基于DataFrame API: MLlib new API (Spark 2.0+) , 基于DataFrameabstraction, providing了更advanced functions, such as管道API, 特征工程, model选择etc., 推荐优先using.
提示
for 于 new 机器Learningapplication, 建议using基于DataFrame API, 它providing了更丰富 functions and 更 good performanceoptimization. 基于RDD API仍然可用, 但不再积极Development new functions.
2. MLlib architecture
MLlib architecture主要including以 under component:
- data表示: support向量, 矩阵, tag点etc.datastructure, 用于表示机器Learningdata.
- 特征工程: providing了特征提取, 转换, 选择etc.tool, 用于processing原始data.
- algorithmslibrary: package含了各种机器Learningalgorithms, such asclassification, 回归, 聚class, 协同filteretc..
- 管道API: 用于构建, assessment and 调优机器Learningworkflow.
- modelassessment: providing了modelassessment and 选择 tool, such as交叉verification, 网格搜索etc..
- model持久化: support将训练 good model保存 to disk, 以便 after 续using.
3. data表示
MLlibusing以 under datastructure表示机器Learningdata:
3.1 向量 (Vector)
向量 is 机器Learningin最basic datastructure, 用于表示特征向量. MLlibsupport两种class型 向量:
- 密集向量 (DenseVector) : 所 has 元素都store 向量, 适用于 big many 数元素非零 circumstances.
- 稀疏向量 (SparseVector) : 只store非零元素 向量, 适用于 big many 数元素 for 零 circumstances, 可以节省memory and 计算时间.
// Scalaexample
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// creation密集向量
val denseVector: Vector = Vectors.dense(1.0, 0.0, 3.0)
// creation稀疏向量 (indexarray and 值array)
val sparseVector: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Pythonexample
from pyspark.mllib.linalg import Vectors
# creation密集向量
dense_vector = Vectors.dense([1.0, 0.0, 3.0])
# creation稀疏向量 ( big small , indexlist, 值list)
sparse_vector = Vectors.sparse(3, [0, 2], [1.0, 3.0])
3.2 tag点 (LabeledPoint)
tag点 is 用于监督Learning datastructure, package含一个特征向量 and 一个tag.
// Scalaexample import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // creationtag点 (classificationissues, tag for 整数) val labeledPoint1 = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // creationtag点 (回归issues, tag for 实数) val labeledPoint2 = LabeledPoint(0.5, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))) // Pythonexample from pyspark.mllib.linalg import Vectors from pyspark.mllib.regression import LabeledPoint # creationtag点 (classificationissues, tag for 整数) labeled_point1 = LabeledPoint(1.0, Vectors.dense([1.0, 0.0, 3.0])) # creationtag点 (回归issues, tag for 实数) labeled_point2 = LabeledPoint(0.5, Vectors.sparse(3, [0, 2], [1.0, 3.0]))
3.3 矩阵 (Matrix)
矩阵用于表示二维data, MLlibsupport两种class型 矩阵:
- 密集矩阵 (DenseMatrix) : 所 has 元素都store 矩阵.
- 稀疏矩阵 (SparseMatrix) : 只store非零元素 矩阵.
// Scalaexample
import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// creation密集矩阵 (行数, 列数, 按列store 元素array)
val denseMatrix: Matrix = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
// creation稀疏矩阵 (行数, 列数, 列指针, 行index, 值array)
val sparseMatrix: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9.0, 8.0, 6.0))
// Pythonexample
from pyspark.mllib.linalg import Matrices
# creation密集矩阵
dense_matrix = Matrices.dense(2, 3, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0])
# creation稀疏矩阵
sparse_matrix = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9.0, 8.0, 6.0])
4. classificationalgorithms
classification is a监督Learningtask, 用于预测data点 class别tag. MLlibproviding了 many 种classificationalgorithms, including:
4.1 逻辑回归 (Logistic Regression)
逻辑回归 is a用于二classification or many classification algorithms, 它将线性回归 输出map to 概率空间.
// Scalaexample (基于DataFrame API)
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
val spark = SparkSession.builder()
.appName("Logistic Regression Example")
.master("local[*]")
.getOrCreate()
// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 划分训练集 and test集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// creation逻辑回归model
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 训练model
val model = lr.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
predictions.select("label", "prediction", "probability").show(10)
// Pythonexample (基于DataFrame API)
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder \ .appName("Logistic Regression Example") \ .master("local[*]") \ .getOrCreate()
# 加载data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 划分训练集 and test集
training_data, test_data = data.randomSplit([0.7, 0.3])
# creation逻辑回归model
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 训练model
model = lr.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(10)
4.2 决策treeclassification (Decision Tree Classification)
决策tree is a基于treestructure classificationalgorithms, 它through一系列 决策规则将data划分 for 不同 class别.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// creation决策treeclassification器
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
// 训练model
val model = dt.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Accuracy = $accuracy")
// Pythonexample (基于DataFrame API)
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# creation决策treeclassification器
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
# 训练model
model = dt.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
4.3 随机森林classification (Random Forest Classification)
随机森林 is a集成Learningalgorithms, 它由 many 个决策tree组成, through投票mechanism确定最终 classification结果.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.classification.RandomForestClassifier
// creation随机森林classification器
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(10)
// 训练model
val model = rf.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val accuracy = evaluator.evaluate(predictions)
println(s"Test Accuracy = $accuracy")
// Pythonexample (基于DataFrame API)
from pyspark.ml.classification import RandomForestClassifier
# creation随机森林classification器
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
# 训练model
model = rf.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
4.4 梯度提升treeclassification (Gradient-Boosted Tree Classification)
梯度提升tree is a集成Learningalgorithms, 它throughiteration地训练决策tree, 每次训练都针 for before 一次 errorfor修正.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.classification.GBTClassifier
// creation梯度提升treeclassification器
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
// 训练model
val model = gbt.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val accuracy = evaluator.evaluate(predictions)
println(s"Test Accuracy = $accuracy")
// Pythonexample (基于DataFrame API)
from pyspark.ml.classification import GBTClassifier
# creation梯度提升treeclassification器
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
# 训练model
model = gbt.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
5. 回归algorithms
回归 is a监督Learningtask, 用于预测连续值. MLlibproviding了 many 种回归algorithms, including:
5.1 线性回归 (Linear Regression)
线性回归 is a用于预测连续值 algorithms, 它fake设特征 and tag之间存 in 线性relationships.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
// creation线性回归model
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 训练model
val model = lr.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) = $rmse")
// Pythonexample (基于DataFrame API)
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# creation线性回归model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 训练model
model = lr.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")
5.2 决策tree回归 (Decision Tree Regression)
决策tree回归 is a用于预测连续值 algorithms, 它through一系列 决策规则将data划分 for 不同 区域, 每个区域 for 应一个预测值.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.regression.DecisionTreeRegressor
// creation决策tree回归器
val dt = new DecisionTreeRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
// 训练model
val model = dt.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) = $rmse")
// Pythonexample (基于DataFrame API)
from pyspark.ml.regression import DecisionTreeRegressor
# creation决策tree回归器
dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")
# 训练model
model = dt.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")
5.3 随机森林回归 (Random Forest Regression)
随机森林回归 is a集成Learningalgorithms, 它由 many 个决策tree回归器组成, through平均预测结果得 to 最终 预测值.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.regression.RandomForestRegressor
// creation随机森林回归器
val rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(10)
// 训练model
val model = rf.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) = $rmse")
// Pythonexample (基于DataFrame API)
from pyspark.ml.regression import RandomForestRegressor
# creation随机森林回归器
rf = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=10)
# 训练model
model = rf.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")
6. 聚classalgorithms
聚class is a无监督Learningtask, 用于将data点group to 不同 簇in, 使得同一簇in data点相似, 不同簇in data点diff较 big . MLlibproviding了 many 种聚classalgorithms, including:
6.1 K-均值聚class (K-Means)
K-均值聚class is a基于距离 聚classalgorithms, 它将data点划分 for K个簇, 使得簇 in data点 距离之 and 最 small .
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
// creationK-均值聚classmodel
val kmeans = new KMeans()
.setK(2)
.setSeed(1L)
// 训练model
val model = kmeans.fit(data)
// 预测
val predictions = model.transform(data)
// assessmentmodel
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")
// 显示聚classin心
println("Cluster Centers:")
model.clusterCenters.foreach(println)
// Pythonexample (基于DataFrame API)
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# creationK-均值聚classmodel
kmeans = KMeans().setK(2).setSeed(1L)
# 训练model
model = kmeans.fit(data)
# 预测
predictions = model.transform(data)
# assessmentmodel
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")
# 显示聚classin心
print("Cluster Centers:")
for center in model.clusterCenters():
print(center)
6.2 high 斯混合model (Gaussian Mixture Model)
high 斯混合model is a基于概率 聚classalgorithms, 它fake设data点 is from many 个 high 斯分布混合生成 .
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.clustering.GaussianMixture
// creation high 斯混合model
val gmm = new GaussianMixture()
.setK(2)
// 训练model
val model = gmm.fit(data)
// 预测
val predictions = model.transform(data)
// 显示 high 斯分布 parameter
for (i <- 0 until model.getK) {
println(s"Gaussian $i:")
println(s"Weight: ${model.weights(i)}")
println(s"Mean: ${model.gaussians(i).mean}")
println(s"Cov: ${model.gaussians(i).cov}")
}
// Pythonexample (基于DataFrame API)
from pyspark.ml.clustering import GaussianMixture
# creation high 斯混合model
gmm = GaussianMixture().setK(2)
# 训练model
model = gmm.fit(data)
# 预测
predictions = model.transform(data)
# 显示 high 斯分布 parameter
for i in range(model.getK()):
print(f"Gaussian {i}:")
print(f"Weight: {model.weights[i]}")
print(f"Mean: {model.gaussians[i].mean}")
print(f"Cov: {model.gaussians[i].cov}")
7. 协同filter
协同filter is a用于推荐system techniques, 它基于user historybehavior or 偏 good , 预测user for 物品 评分 or 偏 good . MLlibproviding了基于交替最 small 二乘法 (ALS) 协同filteralgorithms.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.evaluation.RegressionEvaluator
// 加载data
val ratings = spark.read.option("header", "true").option("inferSchema", "true")
.csv("data/mllib/sample_movielens_ratings.csv")
// 划分训练集 and test集
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// creationALSmodel
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
// 训练model
val model = als.fit(training)
// assessmentmodel
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// 生成推荐
val userRecs = model.recommendForAllUsers(10)
val movieRecs = model.recommendForAllItems(10)
// Pythonexample (基于DataFrame API)
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# 加载data
ratings = spark.read.option("header", "true").option("inferSchema", "true") \
.csv("data/mllib/sample_movielens_ratings.csv")
# 划分训练集 and test集
training, test = ratings.randomSplit([0.8, 0.2])
# creationALSmodel
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
# 训练model
model = als.fit(training)
# assessmentmodel
model.setColdStartStrategy("drop")
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")
# 生成推荐
user_recs = model.recommendForAllUsers(10)
movie_recs = model.recommendForAllItems(10)
8. 特征工程
特征工程 is 机器Learningin最 important 步骤之一, 它including特征提取, 转换 and 选择etc.operation, 用于将原始data转换 for 适合机器Learningalgorithms 特征. MLlibproviding了丰富 特征工程tool, including:
- 特征提取: from 原始datain提取特征, such asTF-IDF, Word2Vecetc..
- 特征转换: 转换特征 表示形式, such as标准化, 归一化, 独热编码etc..
- 特征选择: 选择 for model has 用 特征, such as卡方检验, 特征 important 性etc..
- 特征降维: 降 low 特征 维度, such asPCA, SVDetc..
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.feature._
// exampledata
val df = spark.createDataFrame(Seq(
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// 分词
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val wordsData = tokenizer.transform(df)
// 计算词频 (TF)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// 计算逆documentation频率 (IDF)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("id", "features", "label").show()
// Pythonexample (基于DataFrame API)
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
# exampledata
df = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# 分词
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(df)
# 计算词频 (TF)
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashing_tf.transform(words_data)
# 计算逆documentation频率 (IDF)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)
rescaled_data.select("id", "features", "label").show()
9. 管道API
管道API is MLlibproviding 一种用于构建, assessment and 调优机器Learningworkflow tool. 它允许将 many 个机器Learning阶段 (such as特征工程, model训练, modelassessment) 组合成一个管道, 便于management and 重用.
// Scalaexample (基于DataFrame API)
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
// exampledata
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0),
(4L, "b spark who", 1.0),
(5L, "g d a y", 0.0),
(6L, "spark fly", 1.0),
(7L, "was mapreduce", 0.0),
(8L, "e spark program", 1.0),
(9L, "a e c l", 0.0)
)).toDF("id", "text", "label")
// 定义管道阶段
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features")
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
// creation管道
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
// 训练model
val model = pipeline.fit(training)
// testdata
val test = spark.createDataFrame(Seq(
(10L, "spark i j k"),
(11L, "l m n"),
(12L, "mapreduce spark"),
(13L, "apache hadoop")
)).toDF("id", "text")
// 预测
val predictions = model.transform(test)
predictions.select("id", "text", "probability", "prediction").show()
// Pythonexample (基于DataFrame API)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# exampledata
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0)
], ["id", "text", "label"])
# 定义管道阶段
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# creation管道
pipeline = Pipeline(stages=[tokenizer, hashing_tf, lr])
# 训练model
model = pipeline.fit(training)
# testdata
test = spark.createDataFrame([
(10, "spark i j k"),
(11, "l m n"),
(12, "mapreduce spark"),
(13, "apache hadoop")
], ["id", "text"])
# 预测
predictions = model.transform(test)
predictions.select("id", "text", "probability", "prediction").show()
实践练习
练习1: 逻辑回归classification
- usingMLlib 逻辑回归algorithms for 样本dataforclassification.
- 划分训练集 and test集, 比例 for 7:3.
- 训练逻辑回归model, 调整正则化parameter.
- assessmentmodel 准确率, 精确率, 召回率etc.指标.
练习2: K-均值聚class
- usingMLlib K-均值algorithms for 样本datafor聚class.
- 尝试不同 K值, 观察聚class结果 变化.
- using轮廓系数assessment聚class效果.
- visualization聚class结果 (such as果可能) .
练习3: 协同filter推荐
- usingMLlib ALSalgorithms构建一个 simple 推荐system.
- 加载MovieLensdata集, 划分训练集 and test集.
- 训练ALSmodel, 调整正则化parameter and iteration次数.
- assessmentmodel RMSE指标.
- for 每个user生成10个电影推荐.
练习4: 管道APIapplication
- using管道API构建一个完整 机器Learningworkflow.
- package含特征提取, 转换 and model训练阶段.
- using交叉verification and 网格搜索optimizationmodelparameter.
- assessmentoptimization after modelperformance.
10. summarized
本tutorial详细介绍了Spark MLlib basicconcepts and architecture, includingdata表示, classificationalgorithms, 回归algorithms, 聚classalgorithms, 协同filter, 特征工程 and 管道APIetc. in 容. MLlib is a 强 big 机器Learninglibrary, 可以processinglarge-scaledata集, support many 种机器Learningtask.
through本tutorial Learning, 您应该able to:
- understandingMLlib basicconcepts and architecture.
- MasterMLlib data表示方式.
- usingMLlib classificationalgorithmsforclassificationtask.
- usingMLlib 回归algorithmsfor回归task.
- usingMLlib 聚classalgorithmsfor聚classtask.
- usingMLlib 协同filteralgorithms构建推荐system.
- usingMLlib 特征工程toolprocessing原始data.
- usingMLlib 管道API构建机器Learningworkflow.
in after 续tutorialin, 我们将深入LearningMLlib advanced features, includingadvanced机器Learningalgorithms, model调优, modeldeploymentetc. in 容, MasterSpark机器Learning 各种application场景 and best practices.