Published
January 19, 2022

Data Aggregation Using MongoDB

Ravi Kondepati
Ravi Kondepati
Principal Software Engineer
barrels image
Image Credit: Kyle Langford

Image Credit: Kyle Langford

Cloud adoption has tremendously increased over the years as many organizations migrate, refactor or replatform their legacy applications and deploy new “cloud native”, microservices-based applications to different clouds due to the flexibility it offers -along with other benefits. This is not anything new and has been happening for a while, but as organizations have been pushing workloads outside the company firewall, being able to streamline and operationalize how they optimize for performance, efficiency and cost becomes more important than ever.

But how is this relevant to our topic, which is data aggregation?

If you think about it, gathering cost or performance metrics (for example CPU or memory utilization), is a process of analyzing data collected from the different clouds and platforms that your workloads are running, in order to take suitable actions to optimize. This is where data aggregation comes in handy, especially if you are looking to crunch data from multiple sources.

Data aggregation operations process data records/documents and return computed results. Aggregation groups consolidate values from multiple sources to enable a variety of operations on the grouped data and return a single result.

But let’s go ahead and cover some of the basics of data aggregation using MongoDB.

MongoDB and data aggregation

First of all, why MongoDB? Well, MongoDB is probably the most popular database for data aggregation as it provides a comprehensive aggregation pipeline framework for greater performance. In MongoDB, data aggregation is achieved by the Aggregation Pipeline framework, and we will cover all about the most commonly used operators, filters, stages, all with the help of a simple example.

In MongoDB aggregation can be done by using the function aggregate():

db.collection_name.aggregate()

MongoDB’s find operation can be used to filter the documents based on the given criteria, but may not be an efficient solution for large data applications.

For example, say the requirement is to fetch some embedded documents in a given field but the find operation always fetches the main document, then filtering the embedded documents has to be handled separately by iterating all the subdocuments until matching results are found. So if an application has millions of embedded documents, then the processing time will increase exponentially and lead to an increase in API response time, which results in bad user experience. The aggregation framework can be applied to overcome this limitation associated with a larger number of embedded documents and the MongoDB aggregation works as a pipeline of stages or a list of operators/filters applied to the data.

Aggregation Pipeline

The pipeline is a framework for data aggregation modeled on the concept of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into aggregated results. The pipeline consists of stages and each stage transforms the documents as they pass through the pipeline.

Example

 
[
  {
    "_id": "1",
    "kind": "cpuUsage",
    "resourceUid": "1",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 100
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 110
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 120
      }
    ]
  },
  {
    "_id": "2",
    "kind": "memoryUsage",
    "resourceUid": "2",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 1024
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 1024
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 1024
      }
    ]
  },
  {
    "_id": "3",
    "kind": "cpuUsage",
    "resourceUid": "3",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 200
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 210
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 220
      }
    ]
  },
  {
    "_id": "4",
    "kind": "memoryUsage",
    "resourceUid": "4",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 1024
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 1024
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 1024
      }
    ]
  }
]
	

Let’s take a simple example of CPU and Memory usage metrics data of compute nodes and build an aggregation pipeline query to find the avg, min, and max CPU usage of each node.

Input data contains the documents of two compute nodes identified as resourceUid and values, representing time-series data points collected at three different times with a one-minute duration. In real-time systems, the number of documents can vary from hundreds to thousands depending on how long data is retained in the database.

Let’s take a look at a simple aggregation pipeline query with the collection name of metrics; the query has a sequence of stages starting from $match, $project, $unwind, $sort, and $group.

 
db.getCollection('metrics').aggregate([
   { "$match": { "kind":"cpuUsage" } },
   { "$project": { "kind": 0, "_id" : 0 } },
   { "$unwind": { "path": "$values" } },
   { "$sort" : { "values.timestamp" : 1 } },
   { "$group": { "_id": { "resourceUid": "$resourceUid" },
                "max": { "$max" : "$values.value"},
                "min": { "$min" : "$values.value"},
                "avg": { "$avg" : "$values.value"},
                "count": { "$sum" : 1 } } }])
	

Aggregation Pipeline Stages

In the db.collection.aggregate method and db.aggregate method, pipeline stages appear in an array. Documents pass through the stages in sequence.

Aggregation Pipeline Stages

$match

This stage filters the document stream to allow only matching documents to pass unmodified into the next pipeline stage. The input data contains the data points for two different kinds of metrics such as cpuUsage and memoryUsage. So, we need to allow only the documents whose metric kind is cpuUsage to be passed to the rest of the stages in the pipeline.

Result:

 
[
  {
    "_id": "1",
    "kind": "cpuUsage",
    "resourceUid": "1",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 100
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 110
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 120
      }
    ]
  },
  {
    "_id": "3",
    "kind": "cpuUsage",
    "resourceUid": "3",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 200
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 210
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 220
      }
    ]
  }
]
	

In general, it is always recommended that only the desired data should be passed by filtering out undesired documents at the initial stages which will minimize the resulting data size of each stage in the pipeline.

$project

This stage passes along the documents with the requested fields to the next stage in the pipeline. The specified fields can be existing fields from the input documents or newly computed fields. The field kind is excluded from the documents as the value is set to 0. If the value is set to 1 then the field is included in the documents. It is recommended that unwanted fields be excluded so that the remaining stages process only the required fields.

Result:

 
[
  {
    "resourceUid": "1",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 100
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 110
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 120
      }
    ]
  },
  {
    "resourceUid": "3",
    "values": [
      {
        "timestamp": "2020-08-27T00:00:00.265Z",
        "value": 200
      },
      {
        "timestamp": "2020-08-27T00:01:00.265Z",
        "value": 210
      },
      {
        "timestamp": "2020-08-27T00:02:00.265Z",
        "value": 220
      }
    ]
  }
]
	

$unwind

This stage deconstructs an array field values from the input documents to output a document for each element. Each output document is the input document with the value of the array field replaced by the element.

Result:

 
[
  {
    "resourceUid": "1",
    "values": {
      "timestamp": "2020-08-27T00:00:00.265Z",
      "value": 100
    }
  },
  {
    "resourceUid": "1",
    "values": {
      "timestamp": "2020-08-27T00:01:00.265Z",
      "value": 110
    }
  },
  {
    "resourceUid": "1",
    "values": {
      "timestamp": "2020-08-27T00:02:00.265Z",
      "value": 120
    }
  },
  {
    "resourceUid": "3",
    "values": {
      "timestamp": "2020-08-27T00:00:00.265Z",
      "value": 200
    }
  },
  {
    "resourceUid": "3",
    "values": {
      "timestamp": "2020-08-27T00:01:00.265Z",
      "value": 210
    }
  },
  {
    "resourceUid": "3",
    "values": {
      "timestamp": "2020-08-27T00:02:00.265Z",
      "value": 220
    }
  }
]
	

$sort

This stage reorders the document stream by a specified sort key. Only the order changes; the documents remain unmodified. For each input document, output one document. The query sorts the documents by field values.timestamp in ascending order. If the value is 1 then the documents are sorted in descending order.

Result:

 
[
  {
    "resourceUid": "1",
    "values": {
      "timestamp": "2020-08-27T00:00:00.265Z",
      "value": 100
    }
  },
  {
    "resourceUid": "3",
    "values": {
      "timestamp": "2020-08-27T00:00:00.265Z",
      "value": 200
    }
  },
  {
    "resourceUid": "1",
    "values": {
      "timestamp": "2020-08-27T00:01:00.265Z",
      "value": 110
    }
  },
  {
    "resourceUid": "3",
    "values": {
      "timestamp": "2020-08-27T00:01:00.265Z",
      "value": 210
    }
  },
  {
    "resourceUid": "1",
    "values": {
      "timestamp": "2020-08-27T00:02:00.265Z",
      "value": 120
    }
  },
  {
    "resourceUid": "3",
    "values": {
      "timestamp": "2020-08-27T00:02:00.265Z",
      "value": 220
    }
  }
]
	

$group

This groups input documents by the specified _id expression and for each distinct grouping outputs a document. The _id field of each output document contains the unique group by value.

The output documents can also contain computed fields that hold the values of some accumulator expression.

In this example we are trying to find aggregated values of each node, so the field resourceUid is the unique group field. With the help of MongoDB aggregation pipeline operators such as $avg, $min, $max and $count, we can find the average, minimum, maximum, and count of CPU usage metrics respectively.

Result:

 
[
  {
    "_id": {
      "resourceUid": "3"
    },
    "max": 220,
    "min": 200,
    "avg": 210.0,
    "count": 3.0
  },
  {
    "_id": {
      "resourceUid": "1"
    },
    "max": 120,
    "min": 100,
    "avg": 110.0,
    "count": 3.0
  }
]
	

Early Filtering

If your aggregation operation requires only a subset of the data in a collection, use the $match, $limit, and $skip stages to restrict the documents that enter at the beginning of the pipeline. When placed at the beginning of a pipeline, $match operations use suitable indexes to scan only the matching documents in a collection. Placing a $match pipeline stage followed by a $sort stage at the start of the pipeline is logically equivalent to a single query with a sort and can use an index. When possible, place $match operators at the beginning of the pipeline.

Limitations

Aggregation operations with the aggregate command have the following limitations:

Result Size Restrictions

The aggregate command can return either a cursor or store the results in a collection. When returning a cursor or storing the results in a collection, each document in the result set is subject to the BSON Document Size limit, currently 16 megabytes; if any single document exceeds the BSON Document Size limit, the command will produce an error. The limit only applies to the returned documents; during the pipeline processing, the documents may exceed this size.

Memory Restrictions

Pipeline stages have a limit of 100 MiB (100 _ 1024 _ 1024 bytes) of RAM. If a stage exceeds this limit, MongoDB will produce an error. To allow for the handling of large datasets, you can set the allowDiskUse option in the aggregate() method. The allowDiskUse option enables most aggregation pipeline operations to write data to a temporary file.

Conclusion

In this blog, we have covered the basics of the MongoDB Aggregation pipeline with a simple aggregation query to get the avg, max, and min CPU usage for each node. The aggregation pipeline is a very useful tool if the data requires complex computation operations to get the desired derived data, but at the same time, the aggregation pipeline provides better performance if the pipeline is designed properly.

Tags:
Operations
Networking
Subscribe to our newsletter
By signing up, you agree with our Terms of Service and our Privacy Policy