MLlib advanced机器Learning
1. 管道API进阶
管道API is MLlibproviding 一种用于构建, assessment and 调优机器Learningworkflow 强 big tool. 它允许将 many 个机器Learning阶段 (such as特征工程, model训练, modelassessment) 组合成一个管道, 便于management and 重用.
1.1 自定义转换器 and 估计器
除了MLlibproviding in 置转换器 and 估计器 out , 我们还可以自定义转换器 and 估计器, 以满足specific 业务requirements.
// Scalaexample: 自定义转换器
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StringType, StructType}
class CustomTransformer(override val uid: String)
extends Transformer with DefaultParamsWritable {
def this() = this(Identifiable.randomUID("customTransformer"))
val inputCol = new Param[String](this, "inputCol", "输入列名")
val outputCol = new Param[String](this, "outputCol", "输出列名")
def setInputCol(value: String): this.type = set(inputCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
StructType(schema.fields :+ new StructField($(outputCol), inputType, true))
}
override def transform(dataset: Dataset[_]): DataFrame = {
val customFunc = udf {(value: String) => value.toUpperCase()}
dataset.withColumn($(outputCol), customFunc(col($(inputCol))))
}
override def copy(extra: ParamMap): CustomTransformer = {
defaultCopy(extra)
}
}
// Pythonexample: 自定义转换器
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
class CustomTransformer(Transformer, HasInputCol, HasOutputCol,
DefaultParamsReadable, DefaultParamsWritable):
def __init__(self):
super(CustomTransformer, self).__init__()
def _transform(self, dataset):
def custom_func(value):
return value.upper()
custom_udf = udf(custom_func, StringType())
return dataset.withColumn(self.getOutputCol(),
custom_udf(dataset[self.getInputCol()]))
1.2 交叉verification and 网格搜索
交叉verification (Cross-Validation) and 网格搜索 (Grid Search) is 用于model选择 and 超parameter调优 常用techniques. MLlibproviding了CrossValidator and TrainValidationSplit两种交叉verificationmethod, 以及ParamGridbuilder用于构建parameter网格.
// Scalaexample: using交叉verification and 网格搜索optimizationmodel
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridbuilder}
// exampledata
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0),
(4L, "b spark who", 1.0),
(5L, "g d a y", 0.0),
(6L, "spark fly", 1.0),
(7L, "was mapreduce", 0.0),
(8L, "e spark program", 1.0),
(9L, "a e c l", 0.0)
)).toDF("id", "text", "label")
// 定义管道阶段
val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features")
val lr = new LogisticRegression().setMaxIter(10)
// creation管道
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))
// 构建parameter网格
val paramGrid = new ParamGridbuilder()
.addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
// creation交叉verification器
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
// 训练model
val cvModel = cv.fit(training)
// testdata
val test = spark.createDataFrame(Seq(
(10L, "spark i j k"),
(11L, "l m n"),
(12L, "mapreduce spark"),
(13L, "apache hadoop")
)).toDF("id", "text")
// 预测
val predictions = cvModel.transform(test)
predictions.select("id", "text", "probability", "prediction").show()
// Pythonexample: using交叉verification and 网格搜索optimizationmodel
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridbuilder
# exampledata
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0)
], ["id", "text", "label"])
# 定义管道阶段
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="features")
lr = LogisticRegression(maxIter=10)
# creation管道
pipeline = Pipeline(stages=[tokenizer, hashing_tf, lr])
# 构建parameter网格
param_grid = ParamGridbuilder()
.addGrid(hashing_tf.numFeatures, [10, 100, 1000])
.addGrid(lr.regParam, [0.1, 0.01])
.build()
# creation交叉verification器
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=BinaryClassificationEvaluator(),
numFolds=3)
# 训练model
cv_model = crossval.fit(training)
# testdata
test = spark.createDataFrame([
(10, "spark i j k"),
(11, "l m n"),
(12, "mapreduce spark"),
(13, "apache hadoop")
], ["id", "text"])
# 预测
predictions = cv_model.transform(test)
predictions.select("id", "text", "probability", "prediction").show()
2. advanced特征工程
特征工程 is 机器Learningin最 important 步骤之一, 它including特征提取, 转换, 选择 and 降维etc.operation. MLlibproviding了丰富 特征工程tool, 用于processingvarious types ofdata.
2.1 特征选择
特征选择 is 指 from 原始特征in选择 for model has 用 特征, 以reducing特征维度, improvingmodelperformance and 训练速度. MLlibproviding了 many 种特征选择method, including:
- ChiSqSelector: 基于卡方检验 特征选择, 适用于classificationissues.
- VarianceThresholdSelector: 基于方差阈值 特征选择, 移除方差 small 于阈值 特征.
- Feature Importance: 基于model特征 important 性 特征选择, such as决策tree, 随机森林etc..
// Scalaexample: usingChiSqSelectorfor特征选择
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
// exampledata
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
).toDF("id", "features", "label")
// creationChiSqSelector
val selector = new ChiSqSelector()
.setNumTopFeatures(2)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures")
// 训练model
val selectorModel = selector.fit(data)
// 转换data
val result = selectorModel.transform(data)
result.show()
// Pythonexample: usingChiSqSelectorfor特征选择
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
# exampledata
data = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)
], ["id", "features", "label"])
# creationChiSqSelector
selector = ChiSqSelector(numTopFeatures=2, featuresCol="features",
labelCol="label", outputCol="selectedFeatures")
# 训练model
selector_model = selector.fit(data)
# 转换data
result = selector_model.transform(data)
result.show()
2.2 特征降维
特征降维 is 指through线性 or 非线性变换, 将 high 维特征空间map to low 维特征空间, 以reducing特征维度, improvingmodelperformance and 训练速度. MLlibproviding了 many 种特征降维method, including:
- PCA: 主成分analysis, 适用于线性data.
- SVD: 奇异值分解, 适用于线性data.
- TSNE: t-分布随机邻域嵌入, 适用于非线性datavisualization.
// Scalaexample: usingPCAfor特征降维
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
// exampledata
val data = Seq(
Vectors.dense(2.0, 8.0, 4.0, 5.0),
Vectors.dense(3.0, 3.0, 3.0, 3.0),
Vectors.dense(5.0, 5.0, 5.0, 5.0),
Vectors.dense(7.0, 1.0, 0.0, 0.0)
).map(Tuple1.apply).toDF("features")
// creationPCAmodel
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(2)
.fit(data)
// 转换data
val result = pca.transform(data)
result.show(false)
// Pythonexample: usingPCAfor特征降维
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
# exampledata
data = spark.createDataFrame([
(Vectors.dense([2.0, 8.0, 4.0, 5.0]),),
(Vectors.dense([3.0, 3.0, 3.0, 3.0]),),
(Vectors.dense([5.0, 5.0, 5.0, 5.0]),),
(Vectors.dense([7.0, 1.0, 0.0, 0.0]),)
], ["features"])
# creationPCAmodel
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(data)
# 转换data
result = model.transform(data)
result.show(truncate=False)
2.3 特征哈希 and 词嵌入
MLlibproviding了 many 种特征哈希 and 词嵌入method, 用于processing文本data:
- HashingTF: 基于哈希 词频计算.
- CountVectorizer: 基于词汇表 词频计算.
- TF-IDF: 词频-逆documentation频率.
- Word2Vec: 将单词转换 for low 维向量表示.
- Doc2Vec: 将documentation转换 for low 维向量表示.
// Scalaexample: usingWord2Vecfor词嵌入
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
// exampledata
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" ").toSeq,
"I wish Java could use case classes".split(" ").toSeq,
"Logistic regression models are neat".split(" ").toSeq
).map(Tuple1.apply)).toDF("text")
// creationWord2Vecmodel
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
// 训练model
val model = word2Vec.fit(documentDF)
// 转换data
val result = model.transform(documentDF)
result.show(false)
// find相似词
val synonyms = model.findSynonyms("Spark", 2)
synonyms.show(false)
// Pythonexample: usingWord2Vecfor词嵌入
from pyspark.ml.feature import Word2Vec
# exampledata
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "),),
("I wish Java could use case classes".split(" "),),
("Logistic regression models are neat".split(" "),)
], ["text"])
# creationWord2Vecmodel
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
# 训练model
model = word2Vec.fit(documentDF)
# 转换data
result = model.transform(documentDF)
result.show(truncate=False)
# find相似词
synonyms = model.findSynonyms("Spark", 2)
synonyms.show(truncate=False)
3. modelassessment and 调优
modelassessment and 调优 is 机器Learning流程in important 环节, 它可以helping我们assessmentmodel performance, 找出最佳 超parameter组合, improvingmodel 准确性 and 泛化capacity.
3.1 assessment指标
MLlibproviding了 many 种assessment指标, 用于assessment不同class型 机器Learningmodel:
- classificationassessment指标: 准确率 (Accuracy) , 精确率 (Precision) , 召回率 (Recall) , F1值 (F1-Score) , ROC曲线, AUC值etc..
- 回归assessment指标: 均方误差 (MSE) , 均方根误差 (RMSE) , 平均绝 for 误差 (MAE) , R²值etc..
- 聚classassessment指标: 轮廓系数 (Silhouette Coefficient) , Davies-Bouldin指数etc..
// Scalaexample: using many 种assessment指标assessmentclassificationmodel
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 划分训练集 and test集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// creation逻辑回归model
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// 训练model
val model = lr.fit(trainingData)
// 预测
val predictions = model.transform(testData)
// using准确率assessmentmodel
val accuracyEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = accuracyEvaluator.evaluate(predictions)
println(s"Accuracy = $accuracy")
// using精确率assessmentmodel
val precisionEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("weightedPrecision")
val precision = precisionEvaluator.evaluate(predictions)
println(s"Precision = $precision")
// using召回率assessmentmodel
val recallEvaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("weightedRecall")
val recall = recallEvaluator.evaluate(predictions)
println(s"Recall = $recall")
// usingF1值assessmentmodel
val f1Evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("f1")
val f1 = f1Evaluator.evaluate(predictions)
println(s"F1-Score = $f1")
// Pythonexample: using many 种assessment指标assessmentclassificationmodel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# 加载data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 划分训练集 and test集
training_data, test_data = data.randomSplit([0.7, 0.3])
# creation逻辑回归model
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# 训练model
model = lr.fit(training_data)
# 预测
predictions = model.transform(test_data)
# using准确率assessmentmodel
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy}")
# using精确率assessmentmodel
evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictions)
print(f"Precision = {precision}")
# using召回率assessmentmodel
evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictions)
print(f"Recall = {recall}")
# usingF1值assessmentmodel
evaluator.setMetricName("f1")
f1 = evaluator.evaluate(predictions)
print(f"F1-Score = {f1}")
3.2 model选择
model选择 is 指 from many 种候选modelin选择最适合specificissues model. MLlibproviding了TrainValidationSplit用于model选择, 它将data集划分 for 训练集 and verification集, 然 after in 训练集 on 训练model, in verification集 on assessmentmodelperformance.
// Scalaexample: usingTrainValidationSplitformodel选择
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{DecisionTreeClassifier, LogisticRegression, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.tuning.{ParamGridbuilder, TrainValidationSplit}
// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 划分训练集 and test集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// 准备index器
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// 定义候选model
val lr = new LogisticRegression()
val dt = new DecisionTreeClassifier()
val rf = new RandomForestClassifier()
// 准备tag转换器
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
// for 每个modelcreation管道
val lrPipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, lr, labelConverter))
val dtPipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
val rfPipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
// 定义parameter网格
val lrParamGrid = new ParamGridbuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
val dtParamGrid = new ParamGridbuilder()
.addGrid(dt.maxDepth, Array(5, 10))
.build()
val rfParamGrid = new ParamGridbuilder()
.addGrid(rf.numTrees, Array(10, 20))
.build()
// creationTrainValidationSplit
val lrTvs = new TrainValidationSplit()
.setEstimator(lrPipeline)
.setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabel"))
.setEstimatorParamMaps(lrParamGrid)
.setTrainRatio(0.8)
val dtTvs = new TrainValidationSplit()
.setEstimator(dtPipeline)
.setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabel"))
.setEstimatorParamMaps(dtParamGrid)
.setTrainRatio(0.8)
val rfTvs = new TrainValidationSplit()
.setEstimator(rfPipeline)
.setEvaluator(new MulticlassClassificationEvaluator().setLabelCol("indexedLabel"))
.setEstimatorParamMaps(rfParamGrid)
.setTrainRatio(0.8)
// 训练model
val lrModel = lrTvs.fit(trainingData)
val dtModel = dtTvs.fit(trainingData)
val rfModel = rfTvs.fit(trainingData)
// assessmentmodel
val lrAccuracy = lrModel.bestModel.asInstanceOf[PipelineModel]
.transform(testData)
.select("label", "predictedLabel")
.filter(expr("label = predictedLabel"))
.count().toDouble / testData.count()
val dtAccuracy = dtModel.bestModel.asInstanceOf[PipelineModel]
.transform(testData)
.select("label", "predictedLabel")
.filter(expr("label = predictedLabel"))
.count().toDouble / testData.count()
val rfAccuracy = rfModel.bestModel.asInstanceOf[PipelineModel]
.transform(testData)
.select("label", "predictedLabel")
.filter(expr("label = predictedLabel"))
.count().toDouble / testData.count()
// 比较modelperformance
println(s"Logistic Regression Accuracy: $lrAccuracy")
println(s"Decision Tree Accuracy: $dtAccuracy")
println(s"Random Forest Accuracy: $rfAccuracy")
// Pythonexample: usingTrainValidationSplitformodel选择
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.tuning import ParamGridbuilder, TrainValidationSplit
# 加载data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 划分训练集 and test集
training_data, test_data = data.randomSplit([0.7, 0.3])
# 准备index器
label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
feature_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# 定义候选model
lr = LogisticRegression()
dt = DecisionTreeClassifier()
rf = RandomForestClassifier()
# 准备tag转换器
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=label_indexer.labels)
# for 每个modelcreation管道
lr_pipeline = Pipeline(stages=[label_indexer, feature_indexer, lr, label_converter])
dt_pipeline = Pipeline(stages=[label_indexer, feature_indexer, dt, label_converter])
rf_pipeline = Pipeline(stages=[label_indexer, feature_indexer, rf, label_converter])
# 定义parameter网格
lr_param_grid = ParamGridbuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
dt_param_grid = ParamGridbuilder().addGrid(dt.maxDepth, [5, 10]).build()
rf_param_grid = ParamGridbuilder().addGrid(rf.numTrees, [10, 20]).build()
# creationTrainValidationSplit
lr_tvs = TrainValidationSplit(estimator=lr_pipeline,
estimatorParamMaps=lr_param_grid,
evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel"),
trainRatio=0.8)
dt_tvs = TrainValidationSplit(estimator=dt_pipeline,
estimatorParamMaps=dt_param_grid,
evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel"),
trainRatio=0.8)
rf_tvs = TrainValidationSplit(estimator=rf_pipeline,
estimatorParamMaps=rf_param_grid,
evaluator=MulticlassClassificationEvaluator(labelCol="indexedLabel"),
trainRatio=0.8)
# 训练model
lr_model = lr_tvs.fit(training_data)
dt_model = dt_tvs.fit(training_data)
rf_model = rf_tvs.fit(training_data)
# assessmentmodel
def evaluate_model(model, test_data):
predictions = model.transform(test_data)
correct = predictions.filter(predictions.label == predictions.predictedLabel).count()
total = test_data.count()
return correct / total
lr_accuracy = evaluate_model(lr_model.bestModel, test_data)
dt_accuracy = evaluate_model(dt_model.bestModel, test_data)
rf_accuracy = evaluate_model(rf_model.bestModel, test_data)
# 比较modelperformance
print(f"Logistic Regression Accuracy: {lr_accuracy}")
print(f"Decision Tree Accuracy: {dt_accuracy}")
print(f"Random Forest Accuracy: {rf_accuracy}")
3. model持久化 and deployment
MLlibsupport将训练 good model保存 to disk, 以便 after 续using or deployment to produceenvironment. model可以保存 for PMML格式, MLlib原生格式 or ONNX格式, 便于 and othersystem集成.
3.1 保存 and 加载model
可以usingsave()method将model保存 to disk, usingload()method加载model.
// Scalaexample: 保存 and 加载model
import org.apache.spark.ml.classification.LogisticRegression
// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// creation并训练model
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val model = lr.fit(data)
// 保存model
model.write.overwrite().save("myModelPath")
// 加载model
val sameModel = LogisticRegressionModel.load("myModelPath")
// Pythonexample: 保存 and 加载model
from pyspark.ml.classification import LogisticRegression
# 加载data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# creation并训练model
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(data)
# 保存model
model.write().overwrite().save("myModelPath")
# 加载model
from pyspark.ml.classification import LogisticRegressionModel
same_model = LogisticRegressionModel.load("myModelPath")
3.2 export for PMML格式
PMML (Predictive Model Markup Language) is a用于表示预测model XML格式, support many 种机器Learningalgorithms and tool. MLlibsupport将modelexport for PMML格式, 便于 and othersystem集成.
// Scalaexample: exportmodel for PMML格式
import org.jpmml.sparkml.PMMLbuilder
import org.apache.spark.ml.classification.LogisticRegression
// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// creation并训练model
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val model = lr.fit(data)
// export for PMML
val pmmlbuilder = new PMMLbuilder(data.schema, model)
val pmml = pmmlbuilder.build()
// 保存PMML to file
import java.io.FileOutputStream
import org.dmg.pmml.PMMLUtil
val fos = new FileOutputStream("model.pmml")
PMMLUtil.marshal(pmml, fos)
fos.close()
3.3 modeldeployment选项
训练 good Spark MLlibmodel可以through以 under 方式deployment to produceenvironment:
- Sparkapplication程序: 直接 in Sparkapplication程序inusingmodelfor预测.
- Spark Streaming: in 流processingapplication程序inusingmodelfor实时预测.
- REST API: usingFlask, Spring Bootetc.framework将modelpackage装 for REST API, providingHTTPservice.
- Apache Flink: 将modelexport for PMML or other格式, in Flinkapplication程序inusing.
- ONNX Runtime: 将modelexport for ONNX格式, in ONNX Runtimeindeployment.
- Spark MLlib Serving: usingSpark MLlib Servingcomponentdeploymentmodel.
4. distributedmodel训练
MLlib 一个主要优势 is supportdistributedmodel训练, 可以processinglarge-scaledata集. MLlib distributed训练capacity主要体现 in 以 under 几个方面:
- distributeddataprocessing: usingSpark distributed计算frameworkprocessinglarge-scaledata集.
- distributedalgorithms: implementation了 many 种distributed机器Learningalgorithms, such asdistributed随机森林, distributed梯度提升treeetc..
- parameterserver: supportusingparameterserverforlarge-scalemodel训练.
- fault tolerancemechanism: throughRDD 血统mechanism, implementation了自动fault tolerance.
4.1 large-scale线性model训练
MLlibimplementation了 many 种distributed线性model训练algorithms, including:
- SGD (随机梯度 under 降) : 适用于large-scale线性model训练.
- L-BFGS ( has 限memoryBFGS) : 适用于in small 规模线性model训练, 收敛速度 fast .
- OWLQN (Orthant-Wise Limited-memory Quasi-Newton) : 适用于L1正则化 线性model训练.
// Scalaexample: usingSGD训练large-scale线性model
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// 加载data
val data = spark.sparkContext.textFile("data/mllib/sample_svm_data.txt")
val parsedData = data.map {
line =>
val parts = line.split(" ")
LabeledPoint(parts(0).toDouble, Vectors.dense(parts.tail.map(x => x.toDouble)))
}
// 划分训练集 and test集
val splits = parsedData.randomSplit(Array(0.7, 0.3), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// creation并训练model
val numIterations = 100
val model = LogisticRegressionWithSGD.train(training, numIterations)
// assessmentmodel
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
val auROC = metrics.areaUnderROC()
println(s"Area under ROC = $auROC")
// Pythonexample: usingSGD训练large-scale线性model
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
# 加载data
data = spark.sparkContext.textFile("data/mllib/sample_svm_data.txt")
parsed_data = data.map(lambda line: line.split(" ")).map(lambda parts: LabeledPoint(float(parts[0]), Vectors.dense([float(x) for x in parts[1:]])))
# 划分训练集 and test集
splits = parsed_data.randomSplit([0.7, 0.3], seed=11)
training = splits[0].cache()
test = splits[1]
# creation并训练model
num_iterations = 100
model = LogisticRegressionWithSGD.train(training, num_iterations)
# assessmentmodel
prediction_and_labels = test.map(lambda p: (float(model.predict(p.features)), p.label))
metrics = BinaryClassificationMetrics(prediction_and_labels)
au_roc = metrics.areaUnderROC
print(f"Area under ROC = {au_roc}")
4.2 distributedtreemodel训练
MLlibimplementation了distributed随机森林 and distributed梯度提升treealgorithms, supportlarge-scaletreemodel训练.
// Scalaexample: 训练distributed随机森林model
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
// 加载data
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// 划分训练集 and test集
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// creation随机森林model
val rf = new RandomForestClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(100) // 增加tree 数量, improvingmodelperformance
.setMaxDepth(10) // 设置最 big 深度
.setImpurity("gini") // 设置不纯度指标
// 训练model
val model = rf.fit(trainingData)
// assessmentmodel
val predictions = model.transform(testData)
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Accuracy = $accuracy")
// Pythonexample: 训练distributed随机森林model
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# 加载data
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# 划分训练集 and test集
training_data, test_data = data.randomSplit([0.7, 0.3])
# creation随机森林model
rf = RandomForestClassifier(labelCol="label", featuresCol="features",
numTrees=100, maxDepth=10, impurity="gini")
# 训练model
model = rf.fit(training_data)
# assessmentmodel
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy}")
实践练习
练习1: 自定义转换器
- implementation一个自定义转换器, 用于将文本转换 for small 写.
- 将自定义转换器集成 to 管道in.
- using管道训练model, assessmentmodelperformance.
练习2: model选择 and 超parameter调优
- 加载一个classificationdata集, such asIrisdata集.
- 定义至 few 3种不同 classificationalgorithms, such as逻辑回归, 决策tree, 随机森林.
- using网格搜索 and 交叉verificationoptimization每个model 超parameter.
- 比较不同model performance, 选择最佳model.
练习3: 特征工程综合application
- 加载一个文本classificationdata集, such as20 Newsgroupsdata集.
- usingTF-IDF or Word2Vecfor特征提取.
- using特征选择method选择 important 特征.
- usingPCAfor特征降维.
- 训练classificationmodel, assessmentmodelperformance.
练习4: modeldeployment
- 训练一个机器Learningmodel, such as线性回归 or classificationmodel.
- 将model保存 to disk.
- usingFlask or Spring Bootcreation一个REST API, providingmodel预测service.
- testAPI, 确保able to正确返回预测结果.
5. summarized
本tutorial详细介绍了Spark MLlib advanced features, including管道API进阶, advanced特征工程, model选择 and assessment, model持久化 and deployment, 以及distributedmodel训练etc. in 容. through这些advanced features, 我们可以构建更强 big , 更 high 效 机器Learningapplication.
through本tutorial Learning, 您应该able to:
- using自定义转换器 and 估计器scaleMLlibfunctions.
- using交叉verification and 网格搜索optimizationmodel超parameter.
- using many 种特征选择 and 降维methodprocessing high 维data.
- using many 种assessment指标assessmentmodelperformance.
- 保存 and 加载model, deploymentmodel to produceenvironment.
- usingdistributedalgorithms训练large-scale机器Learningmodel.
MLlib is a 强 big 机器Learninglibrary, providing了丰富 algorithms and tool, 可以processing各种机器Learningtask. through结合Spark distributed计算capacity, MLlib可以processinglarge-scaledata集, 训练 complex 机器Learningmodel. in practicalapplicationin, 我们应该根据具体 业务requirements and data特点, 选择合适 algorithms and tool, 构建 high 效, 准确 机器Learningapplication.