MongoDB aggregateoperation

MasterMongoDB aggregate管道, operation符 and dataprocessingtechniques

aggregateoperationIntroduction

MongoDB aggregateoperation is a强 big dataprocessingtool, 用于 for collectionin datafor转换 and analysis. aggregateoperation可以执行 complex dataprocessingtask, such asgroup, filter, sort, 计算etc., class似于relationships型datalibraryin SQLaggregatefunction and GROUP BYoperation.

aggregateoperation application场景

  • datastatistics: 计算总 and , 平均值, 最 big 值, 最 small 值etc.
  • datagroup: 按字段group并计算每组 statisticsinformation
  • data转换: 重塑datastructure, such asunfoldarray, merge字段etc.
  • datafilter: in aggregate过程in筛选data
  • data关联: in aggregate过程in关联 many 个collection data

MongoDBaggregatemethod

method describes 适用场景
aggregate() usingaggregate管道fordataprocessing complex dataprocessing and analysis
mapReduce() usingMapReducefordataprocessing large-scaledataprocessing and complex analysis
count() 计算documentation数量 simple 计数operation
distinct() 获取字段 不同值 获取唯一值list

提示: for 于 big many 数aggregateoperation, 推荐usingaggregate管道 (aggregate()method) , 因 for 它providing了更flexible, 更 high 效 dataprocessingcapacity.

aggregate管道

aggregate管道原理

aggregate管道 is MongoDBin最强 big aggregatetool, 它由 many 个阶段 (stage) 组成, 每个阶段接收 before 一个阶段 输出并产生输出传递给 under 一个阶段. 这种pipeline式 processing方式使得aggregateoperation可以processing complex data转换 and analysistask.

// aggregate管道basic语法
db.collection.aggregate([
    { "$stage1": { /* 阶段1operation */ } },
    { "$stage2": { /* 阶段2operation */ } },
    { "$stage3": { /* 阶段3operation */ } }
])

// example: 计算每个class别 产品数量 and 平均价格
db.products.aggregate([
    { "$match": { "stock": { "$gt": 0 } } },  // 筛选 has library存 产品
    { "$group": {                            // 按class别group
        "_id": "$category",
        "count": { "$sum": 1 },            // 计算产品数量
        "averagePrice": { "$avg": "$price" }  // 计算平均价格
    } },
    { "$sort": { "count": -1 } }             // 按产品数量降序sort
])

常用aggregate管道阶段

阶段operation符 describes example
$match 筛选documentation, class似于find()method { "$match": { "age": { "$gt": 30 } } }
$group 按指定字段group并计算aggregate值 { "$group": { "_id": "$category", "count": { "$sum": 1 } } }
$sort for documentationforsort { "$sort": { "count": -1, "name": 1 } }
$limit 限制输出documentation数量 { "$limit": 10 }
$skip 跳过指定数量 documentation { "$skip": 5 }
$project 重塑documentationstructure, class似于投影operation { "$project": { "name": 1, "age": 1, "_id": 0 } }
$unwind unfoldarray字段, 每个元素生成一个documentation { "$unwind": "$hobbies" }
$lookup 关联query, class似于SQL JOINoperation { "$lookup": { "from": "orders", "localField": "_id", "foreignField": "userId", "as": "orders" } }
$addFields 添加 new 字段 to documentation { "$addFields": { "fullName": { "$concat": ["$firstName", " ", "$lastName"] } } }
$merge 将aggregate结果merge to 指定collection { "$merge": { "into": "results", "on": "_id", "whenMatched": "replace" } }

aggregateoperation符

算术operation符

operation符 describes example
$add 加法运算 { "$add": ["$price", "$tax"] }
$subtract 减法运算 { "$subtract": ["$total", "$discount"] }
$multiply 乘法运算 { "$multiply": ["$price", "$quantity"] }
$divide 除法运算 { "$divide": ["$total", "$count"] }
$mod 取模运算 { "$mod": ["$value", 2] }

stringoperation符

operation符 describes example
$concat 连接string { "$concat": ["$firstName", " ", "$lastName"] }
$substr 截取string { "$substr": ["$name", 0, 3] }
$toUpper 转换 for big 写 { "$toUpper": "$name" }
$toLower 转换 for small 写 { "$toLower": "$name" }
$strLenCP 计算string long 度 { "$strLenCP": "$name" }

arrayoperation符

operation符 describes example
$size 计算array long 度 { "$size": "$hobbies" }
$arrayElemAt 获取array指定位置 元素 { "$arrayElemAt": ["$hobbies", 0] }
$concatArrays mergearray { "$concatArrays": ["$array1", "$array2"] }
$filter filterarray元素 { "$filter": { "input": "$scores", "as": "score", "cond": { "$gt": ["$$score", 80] } } }
$reduce for array元素for归约operation { "$reduce": { "input": "$values", "initialValue": 0, "in": { "$add": ["$$value", "$$this"] } } }

日期operation符

operation符 describes example
$year 获取年份 { "$year": "$date" }
$month 获取月份 { "$month": "$date" }
$dayOfMonth 获取日期 { "$dayOfMonth": "$date" }
$hour 获取 small 时 { "$hour": "$date" }
$minute 获取分钟 { "$minute": "$date" }
$second 获取秒数 { "$second": "$date" }
$dateToString 日期转换 for string { "$dateToString": { "format": "%Y-%m-%d", "date": "$date" } }

aggregate管道example

example1: 销售dataanalysis

fake设我们 has 一个销售订单collection, package含以 under 字段: _id, userId, products (array, package含产品ID and 数量) , totalAmount, orderDateetc.. 我们需要analysis销售data, 计算每月销售额, 平均订单金额, 热销产品etc..

// 准备testdata
db.orders.insertMany([
    {
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
        "products": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "quantity": 2, "price": 100 },
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8e"), "quantity": 1, "price": 200 }
        ],
        "totalAmount": 400,
        "orderDate": ISODate("2020-10-01T08:00:00Z")
    },
    {
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
        "products": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "quantity": 1, "price": 100 }
        ],
        "totalAmount": 100,
        "orderDate": ISODate("2020-10-15T09:00:00Z")
    },
    {
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"),
        "products": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8e"), "quantity": 2, "price": 200 },
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b90"), "quantity": 1, "price": 300 }
        ],
        "totalAmount": 700,
        "orderDate": ISODate("2020-11-05T10:00:00Z")
    }
])

// example1: 按月份group计算销售额 and 订单数
db.orders.aggregate([
    {
        "$group": {
            "_id": {
                "year": { "$year": "$orderDate" },
                "month": { "$month": "$orderDate" }
            },
            "totalSales": { "$sum": "$totalAmount" },
            "orderCount": { "$sum": 1 },
            "averageOrderAmount": { "$avg": "$totalAmount" }
        }
    },
    {
        "$sort": {
            "_id.year": 1,
            "_id.month": 1
        }
    }
])

// example2: 计算每个产品 销售数量 and 总销售额
db.orders.aggregate([
    {
        "$unwind": "$products"
    },
    {
        "$group": {
            "_id": "$products.productId",
            "totalQuantity": { "$sum": "$products.quantity" },
            "totalSales": { "$sum": { "$multiply": ["$products.quantity", "$products.price"] } }
        }
    },
    {
        "$sort": {
            "totalSales": -1
        }
    }
])

// example3: 计算每个user 订单数 and 总consume
db.orders.aggregate([
    {
        "$group": {
            "_id": "$userId",
            "orderCount": { "$sum": 1 },
            "totalSpent": { "$sum": "$totalAmount" }
        }
    },
    {
        "$lookup": {
            "from": "users",
            "localField": "_id",
            "foreignField": "_id",
            "as": "userInfo"
        }
    },
    {
        "$unwind": "$userInfo"
    },
    {
        "$project": {
            "userId": "$_id",
            "userName": "$userInfo.name",
            "orderCount": 1,
            "totalSpent": 1,
            "_id": 0
        }
    }
])

MapReduce

MapReduce原理

MapReduce is adistributed计算model, 用于processinglarge-scaledata集. in MongoDBin, MapReduceoperation由两个主要阶段组成:

  1. Map阶段: 将datamap for 键值 for
  2. Reduce阶段: for 相同键 值for归约operation

MapReduce语法

// MapReducebasic语法
db.collection.mapReduce(
    function() { /* mapfunction */ },  // mapfunction
    function(key, values) { /* reducefunction */ },  // reducefunction
    {
        out: "resultCollection",  // 输出collection
        query: { /* query条件 */ },  // 可选, filterdocumentation
        sort: { /* sort条件 */ },  // 可选, sortdocumentation
        limit: 100,  // 可选, 限制documentation数量
        finalize: function(key, reducedValue) { /* finalizefunction */ }  // 可选, 最终processingfunction
    }
)

MapReduceexample

// example: usingMapReduce计算每个class别 产品数量 and 平均价格
db.products.mapReduce(
    // mapfunction
    function() {
        emit(this.category, { count: 1, totalPrice: this.price });
    },
    // reducefunction
    function(key, values) {
        let result = { count: 0, totalPrice: 0 };
        values.forEach(function(value) {
            result.count += value.count;
            result.totalPrice += value.totalPrice;
        });
        return result;
    },
    // 选项
    {
        out: "categoryStats",
        query: { stock: { $gt: 0 } },
        finalize: function(key, reducedValue) {
            reducedValue.averagePrice = reducedValue.totalPrice / reducedValue.count;
            return reducedValue;
        }
    }
)

// 查看结果
db.categoryStats.find()

MapReduce and aggregate管道 比较

features aggregate管道 MapReduce
performance 更 high , pipelineprocessing 较 low , 需要in间结果
flexible性 较 high , in 置 many 种operation 最 high , 可自定义JavaScriptfunction
易用性 更易用, 声明式语法 较 complex , 需要writingJavaScriptfunction
适用场景 big many 数aggregateoperation complex 自定义aggregate逻辑

注意: 由于MapReduceusingJavaScript执行, performance通常比aggregate管道 low . for 于 big many 数aggregatetask, 推荐usingaggregate管道.

aggregatebest practices

1. usingindexoptimizationaggregateperformance

  • in $match阶段usingindex字段forfilter
  • in $sort阶段usingindex字段forsort
  • 考虑creation复合index以optimization many 字段query

2. optimizationaggregate管道顺序

// 推荐 管道顺序
[   // 1. filterdata, reducing after 续processing data量
    { "$match": { /* filter条件 */ } },
    // 2. sortdata (such as果需要) 
    { "$sort": { /* sort条件 */ } },
    // 3. 限制data量
    { "$limit": 100 },
    // 4. unfoldarray
    { "$unwind": "$arrayField" },
    // 5. groupdata
    { "$group": { /* group条件 */ } },
    // 6. 投影data
    { "$project": { /* 投影条件 */ } }
]

3. 避免processing big 量data

  • using$match and $limit限制processing data量
  • for 于 big 型aggregateoperation, 考虑using游标 (cursor) 模式
  • 避免 in aggregate管道inusing过于 complex JavaScriptfunction

4. using$lookup best practices

  • in $lookup之 before using$matchfilterdata
  • 只关联必要 字段
  • for 于 big 型collection, 考虑usingindexoptimization关联operation

5. monitoraggregateoperation

  • usingexplain()analysisaggregateoperation 执行计划
  • monitoraggregateoperation 执行时间 and resourceusingcircumstances
  • for 于 long 时间run aggregateoperation, 考虑using after 台执行模式

实践case

case: 电商system销售analysis

usingaggregateoperationanalysis电商system 销售data, including销售额趋势, 热门产品, user购买behavioretc..

// 准备testdata
db.orders.insertMany([
    {
        "orderId": "ORD001",
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
        "orderDate": ISODate("2020-10-01T08:00:00Z"),
        "totalAmount": 1500,
        "status": "completed",
        "items": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "name": "iPhone 12", "quantity": 1, "price": 999 },
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8e"), "name": "AirPods Pro", "quantity": 1, "price": 501 }
        ]
    },
    {
        "orderId": "ORD002",
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"),
        "orderDate": ISODate("2020-10-05T10:00:00Z"),
        "totalAmount": 2999,
        "status": "completed",
        "items": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b90"), "name": "MacBook Pro", "quantity": 1, "price": 2999 }
        ]
    },
    {
        "orderId": "ORD003",
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"),
        "orderDate": ISODate("2020-11-01T09:00:00Z"),
        "totalAmount": 799,
        "status": "completed",
        "items": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b91"), "name": "iPad Air", "quantity": 1, "price": 799 }
        ]
    },
    {
        "orderId": "ORD004",
        "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b92"),
        "orderDate": ISODate("2020-11-10T11:00:00Z"),
        "totalAmount": 1299,
        "status": "pending",
        "items": [
            { "productId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8d"), "name": "iPhone 12", "quantity": 1, "price": 1299 }
        ]
    }
])

// 1. 月度销售额趋势analysis
db.orders.aggregate([
    {
        "$match": { "status": "completed" }
    },
    {
        "$group": {
            "_id": {
                "year": { "$year": "$orderDate" },
                "month": { "$month": "$orderDate" }
            },
            "totalSales": { "$sum": "$totalAmount" },
            "orderCount": { "$sum": 1 }
        }
    },
    {
        "$sort": {
            "_id.year": 1,
            "_id.month": 1
        }
    }
])

// 2. 热门产品analysis
db.orders.aggregate([
    {
        "$match": { "status": "completed" }
    },
    {
        "$unwind": "$items"
    },
    {
        "$group": {
            "_id": { "productId": "$items.productId", "name": "$items.name" },
            "totalQuantity": { "$sum": "$items.quantity" },
            "totalSales": { "$sum": { "$multiply": ["$items.quantity", "$items.price"] } },
            "orderCount": { "$sum": 1 }
        }
    },
    {
        "$sort": {
            "totalSales": -1
        }
    },
    {
        "$limit": 10
    }
])

// 3. user购买behavioranalysis
db.orders.aggregate([
    {
        "$match": { "status": "completed" }
    },
    {
        "$group": {
            "_id": "$userId",
            "totalOrders": { "$sum": 1 },
            "totalSpent": { "$sum": "$totalAmount" },
            "firstOrderDate": { "$min": "$orderDate" },
            "lastOrderDate": { "$max": "$orderDate" }
        }
    },
    {
        "$addFields": {
            "averageOrderValue": { "$divide": ["$totalSpent", "$totalOrders"] }
        }
    },
    {
        "$sort": {
            "totalSpent": -1
        }
    }
])

// 4. 订单statusanalysis
db.orders.aggregate([
    {
        "$group": {
            "_id": "$status",
            "orderCount": { "$sum": 1 },
            "totalAmount": { "$sum": "$totalAmount" }
        }
    },
    {
        "$sort": {
            "orderCount": -1
        }
    }
])

互动练习

练习1: usingaggregate管道计算学生成绩

给定以 under 学生成绩data:
[{"name": "张三", "subject": "数学", "score": 85},
{"name": "张三", "subject": "语文", "score": 90},
{"name": "张三", "subject": "英语", "score": 75},
{"name": "李四", "subject": "数学", "score": 95},
{"name": "李四", "subject": "语文", "score": 80},
{"name": "李四", "subject": "英语", "score": 85},
{"name": "王五", "subject": "数学", "score": 70},
{"name": "王五", "subject": "语文", "score": 75},
{"name": "王五", "subject": "英语", "score": 80}]
usingaggregate管道completion以 under task: 1. 计算每个学生 总分 and 平均分 2. 计算每个科目 平均分 and 最 high 分 3. 按照总分 for 学生for排名

referenceimplementation:

// 1. 计算每个学生 总分 and 平均分
db.scores.aggregate([
    {
        "$group": {
            "_id": "$name",
            "totalScore": { "$sum": "$score" },
            "averageScore": { "$avg": "$score" },
            "subjectCount": { "$sum": 1 }
        }
    }
])

// 2. 计算每个科目 平均分 and 最 high 分
db.scores.aggregate([
    {
        "$group": {
            "_id": "$subject",
            "averageScore": { "$avg": "$score" },
            "highestScore": { "$max": "$score" },
            "lowestScore": { "$min": "$score" }
        }
    }
])

// 3. 按照总分 for 学生for排名
db.scores.aggregate([
    {
        "$group": {
            "_id": "$name",
            "totalScore": { "$sum": "$score" },
            "averageScore": { "$avg": "$score" }
        }
    },
    {
        "$sort": {
            "totalScore": -1
        }
    },
    {
        "$addFields": {
            "rank": { "$indexOfArray": [["$totalScore"], "$totalScore"] }
        }
    }
])

练习2: using$lookup关联data

给定以 under 两个collection:
// userscollection
[{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"), "name": "张三", "age": 30},
{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"), "name": "李四", "age": 25}]

// orderscollection
[{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b93"), "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"), "amount": 1000, "date": ISODate("2020-10-01")},
{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b94"), "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8c"), "amount": 1500, "date": ISODate("2020-10-15")},
{"_id": ObjectId("5f8d0f9a9b2c3d4e5f6a7b95"), "userId": ObjectId("5f8d0f9a9b2c3d4e5f6a7b8f"), "amount": 800, "date": ISODate("2020-11-01")}]
using$lookup关联两个collection, 计算每个user 订单数 and 总consume金额.

referenceimplementation:

db.users.aggregate([
    {
        "$lookup": {
            "from": "orders",
            "localField": "_id",
            "foreignField": "userId",
            "as": "orders"
        }
    },
    {
        "$addFields": {
            "orderCount": { "$size": "$orders" },
            "totalSpent": {
                "$reduce": {
                    "input": "$orders",
                    "initialValue": 0,
                    "in": { "$add": ["$$value", "$$this.amount"] }
                }
            }
        }
    },
    {
        "$project": {
            "_id": 1,
            "name": 1,
            "age": 1,
            "orderCount": 1,
            "totalSpent": 1
        }
    }
])