MLlib 机器LearningBasics

LearningSpark MLlib basicconcepts and architecture, Mastercommon 机器Learningalgorithms

MLlib 机器LearningBasics

1. MLlib overview

MLlib is Spark 机器Learninglibrary, providing了丰富 机器Learningalgorithms and tool, 用于processinglarge-scaledata集. MLlibsupport many 种机器Learningtask, includingclassification, 回归, 聚class, 协同filter, 降维etc., 同时providing了data预processing, 特征工程, modelassessmentetc.tool.

MLlib 主要特点including:

  • distributed计算: 基于Spark distributed计算framework, 可以processinglarge-scaledata集.
  • 易用性: providing了简洁 API, supportScala, Java, Python and Rprogramminglanguage.
  • 集成性: and Spark othermodule (such asSpark SQL, Spark Streaming) 无缝集成.
  • high performance: throughmemory计算 and optimization algorithmsimplementation, providing了 high performance 机器Learningcapacity.
  • 全面 algorithmslibrary: package含了common 机器Learningalgorithms, 覆盖了监督Learning, 无监督Learning, 半监督Learning and other fields.

MLlibproviding了两种API风格:

  • 基于RDD API: MLlib 传统API, 基于RDDabstraction, 适用于需要 low 级控制 场景.
  • 基于DataFrame API: MLlib new API (Spark 2.0+) , 基于DataFrameabstraction, providing了更advanced functions, such as管道API, 特征工程, model选择etc., 推荐优先using.

提示

for 于 new 机器Learningapplication, 建议using基于DataFrame API, 它providing了更丰富 functions and 更 good performanceoptimization. 基于RDD API仍然可用, 但不再积极Development new functions.

2. MLlib architecture

MLlib architecture主要including以 under component:

  • data表示: support向量, 矩阵, tag点etc.datastructure, 用于表示机器Learningdata.
  • 特征工程: providing了特征提取, 转换, 选择etc.tool, 用于processing原始data.
  • algorithmslibrary: package含了各种机器Learningalgorithms, such asclassification, 回归, 聚class, 协同filteretc..
  • 管道API: 用于构建, assessment and 调优机器Learningworkflow.
  • modelassessment: providing了modelassessment and 选择 tool, such as交叉verification, 网格搜索etc..
  • model持久化: support将训练 good model保存 to disk, 以便 after 续using.

3. data表示

MLlibusing以 under datastructure表示机器Learningdata:

3.1 向量 (Vector)

向量 is 机器Learningin最basic datastructure, 用于表示特征向量. MLlibsupport两种class型 向量:

  • 密集向量 (DenseVector) : 所 has 元素都store 向量, 适用于 big many 数元素非零 circumstances.
  • 稀疏向量 (SparseVector) : 只store非零元素 向量, 适用于 big many 数元素 for 零 circumstances, 可以节省memory and 计算时间.
// Scalaexample
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// creation密集向量
val denseVector: Vector = Vectors.dense(1.0, 0.0, 3.0)

// creation稀疏向量 (indexarray and 值array) 
val sparseVector: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))

// Pythonexample
from pyspark.mllib.linalg import Vectors

# creation密集向量
dense_vector = Vectors.dense([1.0, 0.0, 3.0])

# creation稀疏向量 ( big  small , indexlist, 值list) 
sparse_vector = Vectors.sparse(3, [0, 2], [1.0, 3.0])

3.2 tag点 (LabeledPoint)

tag点 is 用于监督Learning datastructure, package含一个特征向量 and 一个tag.

// Scalaexample
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// creationtag点 (classificationissues, tag for 整数) 
val labeledPoint1 = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// creationtag点 (回归issues, tag for 实数) 
val labeledPoint2 = LabeledPoint(0.5, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

// Pythonexample
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

# creationtag点 (classificationissues, tag for 整数) 
labeled_point1 = LabeledPoint(1.0, Vectors.dense([1.0, 0.0, 3.0]))

# creationtag点 (回归issues, tag for 实数) 
labeled_point2 = LabeledPoint(0.5, Vectors.sparse(3, [0, 2], [1.0, 3.0]))

3.3 矩阵 (Matrix)

矩阵用于表示二维data, MLlibsupport两种class型 矩阵:

  • 密集矩阵 (DenseMatrix) : 所 has 元素都store 矩阵.
  • 稀疏矩阵 (SparseMatrix) : 只store非零元素 矩阵.
// Scalaexample
import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// creation密集矩阵 (行数, 列数, 按列store 元素array) 
val denseMatrix: Matrix = Matrices.dense(2, 3, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))

// creation稀疏矩阵 (行数, 列数, 列指针, 行index, 值array) 
val sparseMatrix: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9.0, 8.0, 6.0))

// Pythonexample
from pyspark.mllib.linalg import Matrices

# creation密集矩阵
dense_matrix = Matrices.dense(2, 3, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0])

# creation稀疏矩阵
sparse_matrix = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9.0, 8.0, 6.0])

4. classificationalgorithms

classification is a监督Learningtask, 用于预测data点 class别tag. MLlibproviding了 many 种classificationalgorithms, including:

4.1 逻辑回归 (Logistic Regression)

逻辑回归 is a用于二classification or many classification algorithms, 它将线性回归 输出map to 概率空间.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler

val spark = SparkSession.builder()
  .appName("Logistic Regression Example")
  .master("local[*]")
  .getOrCreate()

// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// 划分训练集 and test集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// creation逻辑回归model
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// 训练model
val model = lr.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
predictions.select("label", "prediction", "probability").show(10)

// Pythonexample (基于DataFrame API) 
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression

spark = SparkSession.builder \    .appName("Logistic Regression Example") \    .master("local[*]") \    .getOrCreate()

# 加载data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# 划分训练集 and test集
training_data, test_data = data.randomSplit([0.7, 0.3])

# creation逻辑回归model
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# 训练model
model = lr.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
predictions.select("label", "prediction", "probability").show(10)

4.2 决策treeclassification (Decision Tree Classification)

决策tree is a基于treestructure classificationalgorithms, 它through一系列 决策规则将data划分 for 不同 class别.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// creation决策treeclassification器
val dt = new DecisionTreeClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

// 训练model
val model = dt.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Accuracy = $accuracy")

// Pythonexample (基于DataFrame API) 
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# creation决策treeclassification器
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# 训练model
model = dt.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

4.3 随机森林classification (Random Forest Classification)

随机森林 is a集成Learningalgorithms, 它由 many 个决策tree组成, through投票mechanism确定最终 classification结果.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.classification.RandomForestClassifier

// creation随机森林classification器
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(10)

// 训练model
val model = rf.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
val accuracy = evaluator.evaluate(predictions)
println(s"Test Accuracy = $accuracy")

// Pythonexample (基于DataFrame API) 
from pyspark.ml.classification import RandomForestClassifier

# creation随机森林classification器
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# 训练model
model = rf.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

4.4 梯度提升treeclassification (Gradient-Boosted Tree Classification)

梯度提升tree is a集成Learningalgorithms, 它throughiteration地训练决策tree, 每次训练都针 for before 一次 errorfor修正.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.classification.GBTClassifier

// creation梯度提升treeclassification器
val gbt = new GBTClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setMaxIter(10)

// 训练model
val model = gbt.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
val accuracy = evaluator.evaluate(predictions)
println(s"Test Accuracy = $accuracy")

// Pythonexample (基于DataFrame API) 
from pyspark.ml.classification import GBTClassifier

# creation梯度提升treeclassification器
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)

# 训练model
model = gbt.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")

5. 回归algorithms

回归 is a监督Learningtask, 用于预测连续值. MLlibproviding了 many 种回归algorithms, including:

5.1 线性回归 (Linear Regression)

线性回归 is a用于预测连续值 algorithms, 它fake设特征 and tag之间存 in 线性relationships.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator

// creation线性回归model
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// 训练model
val model = lr.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) = $rmse")

// Pythonexample (基于DataFrame API) 
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# creation线性回归model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# 训练model
model = lr.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

5.2 决策tree回归 (Decision Tree Regression)

决策tree回归 is a用于预测连续值 algorithms, 它through一系列 决策规则将data划分 for 不同 区域, 每个区域 for 应一个预测值.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.regression.DecisionTreeRegressor

// creation决策tree回归器
val dt = new DecisionTreeRegressor()
  .setLabelCol("label")
  .setFeaturesCol("features")

// 训练model
val model = dt.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) = $rmse")

// Pythonexample (基于DataFrame API) 
from pyspark.ml.regression import DecisionTreeRegressor

# creation决策tree回归器
dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")

# 训练model
model = dt.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

5.3 随机森林回归 (Random Forest Regression)

随机森林回归 is a集成Learningalgorithms, 它由 many 个决策tree回归器组成, through平均预测结果得 to 最终 预测值.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.regression.RandomForestRegressor

// creation随机森林回归器
val rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(10)

// 训练model
val model = rf.fit(trainingData)

// assessmentmodel
val predictions = model.transform(testData)
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) = $rmse")

// Pythonexample (基于DataFrame API) 
from pyspark.ml.regression import RandomForestRegressor

# creation随机森林回归器
rf = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=10)

# 训练model
model = rf.fit(training_data)

# assessmentmodel
predictions = model.transform(test_data)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

6. 聚classalgorithms

聚class is a无监督Learningtask, 用于将data点group to 不同 簇in, 使得同一簇in data点相似, 不同簇in data点diff较 big . MLlibproviding了 many 种聚classalgorithms, including:

6.1 K-均值聚class (K-Means)

K-均值聚class is a基于距离 聚classalgorithms, 它将data点划分 for K个簇, 使得簇 in data点 距离之 and 最 small .

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// creationK-均值聚classmodel
val kmeans = new KMeans()
  .setK(2)
  .setSeed(1L)

// 训练model
val model = kmeans.fit(data)

// 预测
val predictions = model.transform(data)

// assessmentmodel
val evaluator = new ClusteringEvaluator()
val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// 显示聚classin心
println("Cluster Centers:")
model.clusterCenters.foreach(println)

// Pythonexample (基于DataFrame API) 
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# creationK-均值聚classmodel
kmeans = KMeans().setK(2).setSeed(1L)

# 训练model
model = kmeans.fit(data)

# 预测
predictions = model.transform(data)

# assessmentmodel
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")

# 显示聚classin心
print("Cluster Centers:")
for center in model.clusterCenters():
    print(center)

6.2 high 斯混合model (Gaussian Mixture Model)

high 斯混合model is a基于概率 聚classalgorithms, 它fake设data点 is from many 个 high 斯分布混合生成 .

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.clustering.GaussianMixture

// creation high 斯混合model
val gmm = new GaussianMixture()
  .setK(2)

// 训练model
val model = gmm.fit(data)

// 预测
val predictions = model.transform(data)

// 显示 high 斯分布 parameter
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:")
  println(s"Weight: ${model.weights(i)}")
  println(s"Mean: ${model.gaussians(i).mean}")
  println(s"Cov: ${model.gaussians(i).cov}")
}

// Pythonexample (基于DataFrame API) 
from pyspark.ml.clustering import GaussianMixture

# creation high 斯混合model
gmm = GaussianMixture().setK(2)

# 训练model
model = gmm.fit(data)

# 预测
predictions = model.transform(data)

# 显示 high 斯分布 parameter
for i in range(model.getK()):
    print(f"Gaussian {i}:")
    print(f"Weight: {model.weights[i]}")
    print(f"Mean: {model.gaussians[i].mean}")
    print(f"Cov: {model.gaussians[i].cov}")

7. 协同filter

协同filter is a用于推荐system techniques, 它基于user historybehavior or 偏 good , 预测user for 物品 评分 or 偏 good . MLlibproviding了基于交替最 small 二乘法 (ALS) 协同filteralgorithms.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.evaluation.RegressionEvaluator

// 加载data
val ratings = spark.read.option("header", "true").option("inferSchema", "true")
  .csv("data/mllib/sample_movielens_ratings.csv")

// 划分训练集 and test集
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// creationALSmodel
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

// 训练model
val model = als.fit(training)

// assessmentmodel
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// 生成推荐
val userRecs = model.recommendForAllUsers(10)
val movieRecs = model.recommendForAllItems(10)

// Pythonexample (基于DataFrame API) 
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# 加载data
ratings = spark.read.option("header", "true").option("inferSchema", "true") \
    .csv("data/mllib/sample_movielens_ratings.csv")

# 划分训练集 and test集
training, test = ratings.randomSplit([0.8, 0.2])

# creationALSmodel
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

# 训练model
model = als.fit(training)

# assessmentmodel
model.setColdStartStrategy("drop")
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# 生成推荐
user_recs = model.recommendForAllUsers(10)
movie_recs = model.recommendForAllItems(10)

8. 特征工程

特征工程 is 机器Learningin最 important 步骤之一, 它including特征提取, 转换 and 选择etc.operation, 用于将原始data转换 for 适合机器Learningalgorithms 特征. MLlibproviding了丰富 特征工程tool, including:

  • 特征提取: from 原始datain提取特征, such asTF-IDF, Word2Vecetc..
  • 特征转换: 转换特征 表示形式, such as标准化, 归一化, 独热编码etc..
  • 特征选择: 选择 for model has 用 特征, such as卡方检验, 特征 important 性etc..
  • 特征降维: 降 low 特征 维度, such asPCA, SVDetc..
// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.feature._

// exampledata
val df = spark.createDataFrame(Seq(
  (0, "a b c d e spark", 1.0),
  (1, "b d", 0.0),
  (2, "spark f g h", 1.0),
  (3, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// 分词
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val wordsData = tokenizer.transform(df)

// 计算词频 (TF) 
val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)

// 计算逆documentation频率 (IDF) 
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)

rescaledData.select("id", "features", "label").show()

// Pythonexample (基于DataFrame API) 
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# exampledata
df = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# 分词
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(df)

# 计算词频 (TF) 
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurized_data = hashing_tf.transform(words_data)

# 计算逆documentation频率 (IDF) 
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

rescaled_data.select("id", "features", "label").show()

9. 管道API

管道API is MLlibproviding 一种用于构建, assessment and 调优机器Learningworkflow tool. 它允许将 many 个机器Learning阶段 (such as特征工程, model训练, modelassessment) 组合成一个管道, 便于management and 重用.

// Scalaexample (基于DataFrame API) 
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

// exampledata
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0),
  (4L, "b spark who", 1.0),
  (5L, "g d a y", 0.0),
  (6L, "spark fly", 1.0),
  (7L, "was mapreduce", 0.0),
  (8L, "e spark program", 1.0),
  (9L, "a e c l", 0.0)
)).toDF("id", "text", "label")

// 定义管道阶段
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features")
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)

// creation管道
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))

// 训练model
val model = pipeline.fit(training)

// testdata
val test = spark.createDataFrame(Seq(
  (10L, "spark i j k"),
  (11L, "l m n"),
  (12L, "mapreduce spark"),
  (13L, "apache hadoop")
)).toDF("id", "text")

// 预测
val predictions = model.transform(test)
predictions.select("id", "text", "probability", "prediction").show()

// Pythonexample (基于DataFrame API) 
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# exampledata
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0)
], ["id", "text", "label"])

# 定义管道阶段
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# creation管道
pipeline = Pipeline(stages=[tokenizer, hashing_tf, lr])

# 训练model
model = pipeline.fit(training)

# testdata
test = spark.createDataFrame([
    (10, "spark i j k"),
    (11, "l m n"),
    (12, "mapreduce spark"),
    (13, "apache hadoop")
], ["id", "text"])

# 预测
predictions = model.transform(test)
predictions.select("id", "text", "probability", "prediction").show()

实践练习

练习1: 逻辑回归classification

  1. usingMLlib 逻辑回归algorithms for 样本dataforclassification.
  2. 划分训练集 and test集, 比例 for 7:3.
  3. 训练逻辑回归model, 调整正则化parameter.
  4. assessmentmodel 准确率, 精确率, 召回率etc.指标.

练习2: K-均值聚class

  1. usingMLlib K-均值algorithms for 样本datafor聚class.
  2. 尝试不同 K值, 观察聚class结果 变化.
  3. using轮廓系数assessment聚class效果.
  4. visualization聚class结果 (such as果可能) .

练习3: 协同filter推荐

  1. usingMLlib ALSalgorithms构建一个 simple 推荐system.
  2. 加载MovieLensdata集, 划分训练集 and test集.
  3. 训练ALSmodel, 调整正则化parameter and iteration次数.
  4. assessmentmodel RMSE指标.
  5. for 每个user生成10个电影推荐.

练习4: 管道APIapplication

  1. using管道API构建一个完整 机器Learningworkflow.
  2. package含特征提取, 转换 and model训练阶段.
  3. using交叉verification and 网格搜索optimizationmodelparameter.
  4. assessmentoptimization after modelperformance.

10. summarized

本tutorial详细介绍了Spark MLlib basicconcepts and architecture, includingdata表示, classificationalgorithms, 回归algorithms, 聚classalgorithms, 协同filter, 特征工程 and 管道APIetc. in 容. MLlib is a 强 big 机器Learninglibrary, 可以processinglarge-scaledata集, support many 种机器Learningtask.

through本tutorial Learning, 您应该able to:

  • understandingMLlib basicconcepts and architecture.
  • MasterMLlib data表示方式.
  • usingMLlib classificationalgorithmsforclassificationtask.
  • usingMLlib 回归algorithmsfor回归task.
  • usingMLlib 聚classalgorithmsfor聚classtask.
  • usingMLlib 协同filteralgorithms构建推荐system.
  • usingMLlib 特征工程toolprocessing原始data.
  • usingMLlib 管道API构建机器Learningworkflow.

in after 续tutorialin, 我们将深入LearningMLlib advanced features, includingadvanced机器Learningalgorithms, model调优, modeldeploymentetc. in 容, MasterSpark机器Learning 各种application场景 and best practices.