aggregateoperationIntroduction
MongoDB aggregateoperation is a强 big dataprocessingtool, 用于 for collectionin datafor转换 and analysis. aggregateoperation可以执行 complex dataprocessingtask, such asgroup, filter, sort, 计算etc., class似于relationships型datalibraryin SQLaggregatefunction and GROUP BYoperation.
aggregateoperation application场景
- datastatistics: 计算总 and , 平均值, 最 big 值, 最 small 值etc.
- datagroup: 按字段group并计算每组 statisticsinformation
- data转换: 重塑datastructure, such asunfoldarray, merge字段etc.
- datafilter: in aggregate过程in筛选data
- data关联: in aggregate过程in关联 many 个collection data
MongoDBaggregatemethod
| method | describes | 适用场景 |
|---|---|---|
| aggregate() | usingaggregate管道fordataprocessing | complex dataprocessing and analysis |
| mapReduce() | usingMapReducefordataprocessing | large-scaledataprocessing and complex analysis |
| count() | 计算documentation数量 | simple 计数operation |
| distinct() | 获取字段 不同值 | 获取唯一值list |
提示: for 于 big many 数aggregateoperation, 推荐usingaggregate管道 (aggregate()method) , 因 for 它providing了更flexible, 更 high 效 dataprocessingcapacity.
aggregate管道
aggregate管道原理
aggregate管道 is MongoDBin最强 big aggregatetool, 它由 many 个阶段 (stage) 组成, 每个阶段接收 before 一个阶段 输出并产生输出传递给 under 一个阶段. 这种pipeline式 processing方式使得aggregateoperation可以processing complex data转换 and analysistask.
// aggregate管道basic语法
db.collection.aggregate([
{ "$stage1": { /* 阶段1operation */ } },
{ "$stage2": { /* 阶段2operation */ } },
{ "$stage3": { /* 阶段3operation */ } }
])
// example: 计算每个class别 产品数量 and 平均价格
db.products.aggregate([
{ "$match": { "stock": { "$gt": 0 } } }, // 筛选 has library存 产品
{ "$group": { // 按class别group
"_id": "$category",
"count": { "$sum": 1 }, // 计算产品数量
"averagePrice": { "$avg": "$price" } // 计算平均价格
} },
{ "$sort": { "count": -1 } } // 按产品数量降序sort
])
常用aggregate管道阶段
| 阶段operation符 | describes | example |
|---|---|---|
| $match | 筛选documentation, class似于find()method | { "$match": { "age": { "$gt": 30 } } } |
| $group | 按指定字段group并计算aggregate值 | { "$group": { "_id": "$category", "count": { "$sum": 1 } } } |
| $sort | for documentationforsort | { "$sort": { "count": -1, "name": 1 } } |
| $limit | 限制输出documentation数量 | { "$limit": 10 } |
| $skip | 跳过指定数量 documentation | { "$skip": 5 } |
| $project | 重塑documentationstructure, class似于投影operation | { "$project": { "name": 1, "age": 1, "_id": 0 } } |
| $unwind | unfoldarray字段, 每个元素生成一个documentation | { "$unwind": "$hobbies" } |
| $lookup | 关联query, class似于SQL JOINoperation | { "$lookup": { "from": "orders", "localField": "_id", "foreignField": "userId", "as": "orders" } } |
| $addFields | 添加 new 字段 to documentation | { "$addFields": { "fullName": { "$concat": ["$firstName", " ", "$lastName"] } } } |
| $merge | 将aggregate结果merge to 指定collection | { "$merge": { "into": "results", "on": "_id", "whenMatched": "replace" } } |
aggregateoperation符
算术operation符
| operation符 | describes | example |
|---|---|---|
| $add | 加法运算 | { "$add": ["$price", "$tax"] } |
| $subtract | 减法运算 | { "$subtract": ["$total", "$discount"] } |
| $multiply | 乘法运算 | { "$multiply": ["$price", "$quantity"] } |
| $divide | 除法运算 | { "$divide": ["$total", "$count"] } |
| $mod | 取模运算 | { "$mod": ["$value", 2] } |
stringoperation符
| operation符 | describes | example |
|---|---|---|
| $concat | 连接string | { "$concat": ["$firstName", " ", "$lastName"] } |
| $substr | 截取string | { "$substr": ["$name", 0, 3] } |
| $toUpper | 转换 for big 写 | { "$toUpper": "$name" } |
| $toLower | 转换 for small 写 | { "$toLower": "$name" } |
| $strLenCP | 计算string long 度 | { "$strLenCP": "$name" } |
arrayoperation符
| operation符 | describes | example |
|---|---|---|
| $size | 计算array long 度 | { "$size": "$hobbies" } |
| $arrayElemAt | 获取array指定位置 元素 | { "$arrayElemAt": ["$hobbies", 0] } |
| $concatArrays | mergearray | { "$concatArrays": ["$array1", "$array2"] } |
| $filter | filterarray元素 | { "$filter": { "input": "$scores", "as": "score", "cond": { "$gt": ["$$score", 80] } } } |
| $reduce | for array元素for归约operation | { "$reduce": { "input": "$values", "initialValue": 0, "in": { "$add": ["$$value", "$$this"] } } } |
日期operation符
| operation符 | describes | example |
|---|---|---|
| $year | 获取年份 | { "$year": "$date" } |
| $month | 获取月份 | { "$month": "$date" } |
| $dayOfMonth | 获取日期 | { "$dayOfMonth": "$date" } |
| $hour | 获取 small 时 | { "$hour": "$date" } |
| $minute | 获取分钟 | { "$minute": "$date" } |
| $second | 获取秒数 | { "$second": "$date" } |
| $dateToString | 日期转换 for string | { "$dateToString": { "format": "%Y-%m-%d", "date": "$date" } } |
aggregate管道example
example1: 销售dataanalysis
fake设我们 has 一个销售订单collection, package含以 under 字段: _id, userId, products (array, package含产品ID and 数量) , totalAmount, orderDateetc.. 我们需要analysis销售data, 计算每月销售额, 平均订单金额, 热销产品etc..
// 准备testdata
db.orders.insertMany([
{
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
"products": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "quantity": 2, "price": 100 },
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8e"), "quantity": 1, "price": 200 }
],
"totalAmount": 400,
"orderDate": ISODate("2020-10-01T08:00:00Z")
},
{
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
"products": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "quantity": 1, "price": 100 }
],
"totalAmount": 100,
"orderDate": ISODate("2020-10-15T09:00:00Z")
},
{
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"),
"products": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8e"), "quantity": 2, "price": 200 },
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b90"), "quantity": 1, "price": 300 }
],
"totalAmount": 700,
"orderDate": ISODate("2020-11-05T10:00:00Z")
}
])
// example1: 按月份group计算销售额 and 订单数
db.orders.aggregate([
{
"$group": {
"_id": {
"year": { "$year": "$orderDate" },
"month": { "$month": "$orderDate" }
},
"totalSales": { "$sum": "$totalAmount" },
"orderCount": { "$sum": 1 },
"averageOrderAmount": { "$avg": "$totalAmount" }
}
},
{
"$sort": {
"_id.year": 1,
"_id.month": 1
}
}
])
// example2: 计算每个产品 销售数量 and 总销售额
db.orders.aggregate([
{
"$unwind": "$products"
},
{
"$group": {
"_id": "$products.productId",
"totalQuantity": { "$sum": "$products.quantity" },
"totalSales": { "$sum": { "$multiply": ["$products.quantity", "$products.price"] } }
}
},
{
"$sort": {
"totalSales": -1
}
}
])
// example3: 计算每个user 订单数 and 总consume
db.orders.aggregate([
{
"$group": {
"_id": "$userId",
"orderCount": { "$sum": 1 },
"totalSpent": { "$sum": "$totalAmount" }
}
},
{
"$lookup": {
"from": "users",
"localField": "_id",
"foreignField": "_id",
"as": "userInfo"
}
},
{
"$unwind": "$userInfo"
},
{
"$project": {
"userId": "$_id",
"userName": "$userInfo.name",
"orderCount": 1,
"totalSpent": 1,
"_id": 0
}
}
])
MapReduce
MapReduce原理
MapReduce is adistributed计算model, 用于processinglarge-scaledata集. in MongoDBin, MapReduceoperation由两个主要阶段组成:
- Map阶段: 将datamap for 键值 for
- Reduce阶段: for 相同键 值for归约operation
MapReduce语法
// MapReducebasic语法
db.collection.mapReduce(
function() { /* mapfunction */ }, // mapfunction
function(key, values) { /* reducefunction */ }, // reducefunction
{
out: "resultCollection", // 输出collection
query: { /* query条件 */ }, // 可选, filterdocumentation
sort: { /* sort条件 */ }, // 可选, sortdocumentation
limit: 100, // 可选, 限制documentation数量
finalize: function(key, reducedValue) { /* finalizefunction */ } // 可选, 最终processingfunction
}
)
MapReduceexample
// example: usingMapReduce计算每个class别 产品数量 and 平均价格
db.products.mapReduce(
// mapfunction
function() {
emit(this.category, { count: 1, totalPrice: this.price });
},
// reducefunction
function(key, values) {
let result = { count: 0, totalPrice: 0 };
values.forEach(function(value) {
result.count += value.count;
result.totalPrice += value.totalPrice;
});
return result;
},
// 选项
{
out: "categoryStats",
query: { stock: { $gt: 0 } },
finalize: function(key, reducedValue) {
reducedValue.averagePrice = reducedValue.totalPrice / reducedValue.count;
return reducedValue;
}
}
)
// 查看结果
db.categoryStats.find()
MapReduce and aggregate管道 比较
| features | aggregate管道 | MapReduce |
|---|---|---|
| performance | 更 high , pipelineprocessing | 较 low , 需要in间结果 |
| flexible性 | 较 high , in 置 many 种operation | 最 high , 可自定义JavaScriptfunction |
| 易用性 | 更易用, 声明式语法 | 较 complex , 需要writingJavaScriptfunction |
| 适用场景 | big many 数aggregateoperation | complex 自定义aggregate逻辑 |
注意: 由于MapReduceusingJavaScript执行, performance通常比aggregate管道 low . for 于 big many 数aggregatetask, 推荐usingaggregate管道.
aggregatebest practices
1. usingindexoptimizationaggregateperformance
- in $match阶段usingindex字段forfilter
- in $sort阶段usingindex字段forsort
- 考虑creation复合index以optimization many 字段query
2. optimizationaggregate管道顺序
// 推荐 管道顺序
[ // 1. filterdata, reducing after 续processing data量
{ "$match": { /* filter条件 */ } },
// 2. sortdata (such as果需要)
{ "$sort": { /* sort条件 */ } },
// 3. 限制data量
{ "$limit": 100 },
// 4. unfoldarray
{ "$unwind": "$arrayField" },
// 5. groupdata
{ "$group": { /* group条件 */ } },
// 6. 投影data
{ "$project": { /* 投影条件 */ } }
]
3. 避免processing big 量data
- using$match and $limit限制processing data量
- for 于 big 型aggregateoperation, 考虑using游标 (cursor) 模式
- 避免 in aggregate管道inusing过于 complex JavaScriptfunction
4. using$lookup best practices
- in $lookup之 before using$matchfilterdata
- 只关联必要 字段
- for 于 big 型collection, 考虑usingindexoptimization关联operation
5. monitoraggregateoperation
- usingexplain()analysisaggregateoperation 执行计划
- monitoraggregateoperation 执行时间 and resourceusingcircumstances
- for 于 long 时间run aggregateoperation, 考虑using after 台执行模式
实践case
case: 电商system销售analysis
usingaggregateoperationanalysis电商system 销售data, including销售额趋势, 热门产品, user购买behavioretc..
// 准备testdata
db.orders.insertMany([
{
"orderId": "ORD001",
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
"orderDate": ISODate("2020-10-01T08:00:00Z"),
"totalAmount": 1500,
"status": "completed",
"items": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "name": "iPhone 12", "quantity": 1, "price": 999 },
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8e"), "name": "AirPods Pro", "quantity": 1, "price": 501 }
]
},
{
"orderId": "ORD002",
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"),
"orderDate": ISODate("2020-10-05T10:00:00Z"),
"totalAmount": 2999,
"status": "completed",
"items": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b90"), "name": "MacBook Pro", "quantity": 1, "price": 2999 }
]
},
{
"orderId": "ORD003",
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
"orderDate": ISODate("2020-11-01T09:00:00Z"),
"totalAmount": 799,
"status": "completed",
"items": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b91"), "name": "iPad Air", "quantity": 1, "price": 799 }
]
},
{
"orderId": "ORD004",
"userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b92"),
"orderDate": ISODate("2020-11-10T11:00:00Z"),
"totalAmount": 1299,
"status": "pending",
"items": [
{ "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "name": "iPhone 12", "quantity": 1, "price": 1299 }
]
}
])
// 1. 月度销售额趋势analysis
db.orders.aggregate([
{
"$match": { "status": "completed" }
},
{
"$group": {
"_id": {
"year": { "$year": "$orderDate" },
"month": { "$month": "$orderDate" }
},
"totalSales": { "$sum": "$totalAmount" },
"orderCount": { "$sum": 1 }
}
},
{
"$sort": {
"_id.year": 1,
"_id.month": 1
}
}
])
// 2. 热门产品analysis
db.orders.aggregate([
{
"$match": { "status": "completed" }
},
{
"$unwind": "$items"
},
{
"$group": {
"_id": { "productId": "$items.productId", "name": "$items.name" },
"totalQuantity": { "$sum": "$items.quantity" },
"totalSales": { "$sum": { "$multiply": ["$items.quantity", "$items.price"] } },
"orderCount": { "$sum": 1 }
}
},
{
"$sort": {
"totalSales": -1
}
},
{
"$limit": 10
}
])
// 3. user购买behavioranalysis
db.orders.aggregate([
{
"$match": { "status": "completed" }
},
{
"$group": {
"_id": "$userId",
"totalOrders": { "$sum": 1 },
"totalSpent": { "$sum": "$totalAmount" },
"firstOrderDate": { "$min": "$orderDate" },
"lastOrderDate": { "$max": "$orderDate" }
}
},
{
"$addFields": {
"averageOrderValue": { "$divide": ["$totalSpent", "$totalOrders"] }
}
},
{
"$sort": {
"totalSpent": -1
}
}
])
// 4. 订单statusanalysis
db.orders.aggregate([
{
"$group": {
"_id": "$status",
"orderCount": { "$sum": 1 },
"totalAmount": { "$sum": "$totalAmount" }
}
},
{
"$sort": {
"orderCount": -1
}
}
])
互动练习
练习1: usingaggregate管道计算学生成绩
[{"name": "张三", "subject": "数学", "score": 85},
{"name": "张三", "subject": "语文", "score": 90},
{"name": "张三", "subject": "英语", "score": 75},
{"name": "李四", "subject": "数学", "score": 95},
{"name": "李四", "subject": "语文", "score": 80},
{"name": "李四", "subject": "英语", "score": 85},
{"name": "王五", "subject": "数学", "score": 70},
{"name": "王五", "subject": "语文", "score": 75},
{"name": "王五", "subject": "英语", "score": 80}]
usingaggregate管道completion以 under task:
1. 计算每个学生 总分 and 平均分
2. 计算每个科目 平均分 and 最 high 分
3. 按照总分 for 学生for排名
referenceimplementation:
// 1. 计算每个学生 总分 and 平均分
db.scores.aggregate([
{
"$group": {
"_id": "$name",
"totalScore": { "$sum": "$score" },
"averageScore": { "$avg": "$score" },
"subjectCount": { "$sum": 1 }
}
}
])
// 2. 计算每个科目 平均分 and 最 high 分
db.scores.aggregate([
{
"$group": {
"_id": "$subject",
"averageScore": { "$avg": "$score" },
"highestScore": { "$max": "$score" },
"lowestScore": { "$min": "$score" }
}
}
])
// 3. 按照总分 for 学生for排名
db.scores.aggregate([
{
"$group": {
"_id": "$name",
"totalScore": { "$sum": "$score" },
"averageScore": { "$avg": "$score" }
}
},
{
"$sort": {
"totalScore": -1
}
},
{
"$addFields": {
"rank": { "$indexOfArray": [["$totalScore"], "$totalScore"] }
}
}
])
练习2: using$lookup关联data
// userscollection
[{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"), "name": "张三", "age": 30},
{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"), "name": "李四", "age": 25}]
// orderscollection
[{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b93"), "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"), "amount": 1000, "date": ISODate("2020-10-01")},
{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b94"), "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"), "amount": 1500, "date": ISODate("2020-10-15")},
{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b95"), "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"), "amount": 800, "date": ISODate("2020-11-01")}]
using$lookup关联两个collection, 计算每个user 订单数 and 总consume金额.
referenceimplementation:
db.users.aggregate([
{
"$lookup": {
"from": "orders",
"localField": "_id",
"foreignField": "userId",
"as": "orders"
}
},
{
"$addFields": {
"orderCount": { "$size": "$orders" },
"totalSpent": {
"$reduce": {
"input": "$orders",
"initialValue": 0,
"in": { "$add": ["$$value", "$$this.amount"] }
}
}
}
},
{
"$project": {
"_id": 1,
"name": 1,
"age": 1,
"orderCount": 1,
"totalSpent": 1
}
}
])