Mongodb aggregate 实践

这段时间公司的项目业务改动,原来用的数据库是 mysql,最近打算把部分业务相关的数据存在 mongodb,我需要做的工作,第一点是数据迁移,在这里就不说了,写段小程序就迁移过去了,然后还要把这块业务的数据访问层,重写一版 mongodb 的实现。

下面分别是 mysql 和 mongodb 不同实现的查询,统计某一天内,账单类型为 1010(业务含义就不说了),累积账单金额大于 10000 的用户

mysql:

SELECT pl.user_id, IFNULL(round(sum(pl.amount), 2), 0) sum_amount
FROM ys_profit_log pl
WHERE pl.profit_type = 1010
AND pl.is_profit = 0
AND DATE_FORMAT(pl.create_time, '%Y-%m-%d') = DATE_SUB('2017-02-05', interval 1 DAY)
GROUP BY pl.user_id
HAVING sum_amount >= 10000;

mongodb:

db.ys_profit_log.aggregate([
    {$match: {
        profit_type: 1010,
        is_profit: false,
        create_time: {$gte: new Date('2017-02-04'), $lt: new Date('2017-02-05')}
    }},
    {$group: {_id: '$user_id', sum_amount: {$sum: '$amount'}}},
    {$sort : {_id: 1}},
    {$match: {sum_amount: {$gte: 10000}}}
]);

需要说明的一点,mongodb 分组查询后,不会像 mysql 一样默认按分组的列进行升序排序,为了保证和之前 mysql 的实现一样,这里用 $sort 在分组后又进行了排序

Spring data mongodb 实现代码片段

package cn.yskj.timer.dao.impl;

import cn.yskj.common.constant.Constant;
import cn.yskj.common.constant.ProfitLogEnum;
import com.mongodb.BasicDBObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.*;
import org.springframework.data.mongodb.core.query.Criteria;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * 功能描述: 账单
 *
 * 作者: Yzw
 * 日期: 2017-04-06 10:27:28
 */
public class ProfitLogDaoImpl {

    private static final Logger log = LoggerFactory.getLogger(ProfitLogDaoImpl.class);

    @Autowired
    private MongoTemplate mongoTemplate;

    public List<Long> getXLGBUserIds(Map<String, Date> dateLimit) {
        List<Long> userIdList = new ArrayList<>();

        Date beginDate = dateLimit.get("begin");
        Date endDate = dateLimit.get("end");

        Criteria where = new Criteria();
        where.and("profit_type").is(ProfitLogEnum.PROFIT_1010.getProfitType());
        where.and("is_profit").is(false);
        where.and("create_time").gte(beginDate).lt(endDate);

        Criteria having = new Criteria();
        having.and("sum_amount").gte(Constant.LGB_REQUIRE_NUM);

        Aggregation aggr = Aggregation.newAggregation(
                Aggregation.match(where),
                Aggregation.group("user_id").sum("amount").as("sum_amount"),
                Aggregation.sort(new Sort("_id")),
                Aggregation.match(having)
        );

        AggregationResults<BasicDBObject> aggrResults = mongoTemplate.aggregate(aggr, "ys_profit_log", BasicDBObject.class);
        List<BasicDBObject> mappedResults =  aggrResults.getMappedResults();
        if (mappedResults != null && mappedResults.size() > 0) {
            log.info("查询结果 => 用户数量: {}", mappedResults.size());
            mappedResults.forEach(item -> {
                userIdList.add(item.getLong("_id"));
            });
        }

        return userIdList;
    }

}