engineering

October 01, 2020   |   14min read

Using Cloud Monitoring API to Oversee Performance of GKE Cluster

Google Kubernetes Engine is a great tool for managing containerized applications. This managed Kubernetes cluster is easy to use, secure, provides high-availability, and reduces operational overhead. But, as with any containerized environment, to use it efficiently, you need to answer the question: how big my cluster should be to fulfill its purpose? Make it too big, and you end up wasting valuable processing power. However, if it is too small, then your processes start competing for insufficient resources.

One of the tools to help you diagnose your GKE cluster’s performance is Cloud Monitoring (previously known as Stackdriver Monitoring), designed to collect various metrics and metadata of cloud resources. In this article, I will show you how to use its Python API to gather time series data regarding the status of a GKE cluster and store them in a pandas DataFrame.

Setting up monitoring on GKE cluster

First of all, you have to make sure that Monitoring API is enabled. Also, that your cluster is monitored using Cloud Operations for GKE and not Legacy Logging and Monitoring. Cloud Operations for GKE is available as of GKE version 1.12.7 and is enabled by default, starting with version 1.14. You can check if the monitoring is enabled for your cluster and in which configuration by going to your cluster’s page in Cloud Console and checking Cloud Operations for the GKE field.

Screenshot of the Cloud Console cluster's page If it is set to Disabled or Legacy Logging and Monitoring, then change it to System and workload logging and monitoring by clicking on the Edit button.

Installing libraries

Stackdriver Monitoring API is one of the GA (General Availability) client libraries, meaning it is stable and should be developed in a backward-compatible manner. To install and use it, you simply need to execute the command below in your virtual environment:

pip install google-cloud-monitoring

You can also use Discovery API if you wish, but it works a little bit different, and this article focuses on using GA API.

Since we will be storing the data in a pandas DataFrame, we need pandas library as well:

pip install pandas

When using the API to collect time series data, make sure that the account you use for authentication has monitoring.timeSeries.list permission. Granting your service account the Monitoring Viewer role (roles/monitoring.viewer) gives the necessary permission. You can read more about IAM roles regarding Cloud Monitoring.

Collecting time series data

For the purpose of this article, we will be monitoring a freshly created GKE cluster consisting of three nodes. To execute requests, we need an instance of a service client. To create one just run the following code:

from google.cloud import monitoring_v3

client = monitoring_v3.MetricServiceClient()

Request parameters

To send a time series listing request to Monitoring API, we need the following input parameters explaining which data we want to collect:

  1. Name—defines the Cloud project on which the request should be executed. It must follow the format: projects/[PROJECT_ID_OR_NUMBER]. Having the project id, the name can be constructed using the service client:

    project_id = "my-project-id"
    
    name = client.project_path(project_id)
  2. Monitoring filter—a string that specifies which time series should be returned and consists of filters on the following fields:

metric.type: is basically the name of the metric that should be collected. For example, kubernetes.io/pod/network/received_bytes_count is the metric type describing the cumulative number of bytes read by the pod over the network. Every monitoring filter must contain exactly one metric type.

resource.type: the kind of resource the data should be collected for. There are 5 available for GKE, representing the master component, cluster, node, pod, and container.

metric.label.{label_name}: which specify additional details regarding the given metric type. Filters can be provided on multiple metric labels, but note that not every metric type has any labels defined for it.

resource.label.{label_name}: which specify additional details regarding given resource type. These filters can be provided on multiple resource labels as well.

For every field you need to specify a value to compare with, as well as type of comparison using one of the following comparators:

  • = (equality)
  • != (inequality)
  • : (containment)
  • > (greater than)
  • < (lower than)
  • >= (greater than or equal to)
  • <= (less than or equal to)

You can also use special functions on the values, like starts_with or ends_with, to specify a partial match.

The parts of the filter must be combined in a query-like format where negation, conjunction, and disjunction are written using NOT, AND, and OR keywords. For example, if you wanted to collect evictable memory usage for all containers with names starting with “kube” from cluster “my-cluster,” then your monitoring filter should look like this:

monitoring_filter = ' AND '.join([
    'metric.type = "kubernetes.io/container/memory/used_bytes"',
    'resource.type = "k8s_container"',
    'metric.label.memory_type = "evictable"',
    'resource.label.container_name = starts_with("kube")',
    'resource.label.cluster_name = "my-cluster"'
])

Note that values for types and labels must be provided with double-quotes.

To see all metric and resource types (together with their description and corresponding labels) available for GKE, you can use the service client to execute the following requests:

metric_descriptors = [
    descriptor for descriptor in client.list_metric_descriptors(
        name=name, filter_='metric.type = starts_with("kubernetes.io")'
    )
]

resource_descriptors = [
    descriptor for descriptor in client.list_monitored_resource_descriptors(
        name=name, filter_='resource.type = starts_with("k8s")'
    )
]
  1. Time interval—specifies the time frame for the data to be collected. It needs to be provided as an instance of TimeInterval class where the start and end time are timestamps (seconds passed since January 1, 1970). For example, to get the TimeInterval object collects time series from last hour, you can use the code below:

    from datetime import datetime, timedelta
    
    from google.cloud.monitoring_v3.types import TimeInterval
    
    end_date = datetime.now()
    start_date = end_date - timedelta(hours=1)
    
    end_date_timestamp = end_date.timestamp()
    start_date_timestamp = start_date.timestamp()
    
    time_interval = TimeInterval()
    
    time_interval.end_time.seconds = int(end_date_timestamp)
    time_interval.start_time.seconds = int(start_date_timestamp)
    
    time_interval.end_time.nanos = int(
        (end_date_timestamp - time_interval.end_time.seconds) * 10 ** 9
    )
    time_interval.start_time.nanos = int(
        (start_date_timestamp - time_interval.start_time.seconds) * 10 ** 9
    )
  2. View—specifies which information is returned about the time series. To collect all available information, use the FULL view:

    from google.cloud import monitoring_v3
    view = monitoring_v3.enums.ListTimeSeriesRequest.TimeSeriesView.FULL
  3. Optional arguments—API allows us to also specify some optional arguments, like the page size, request timeout, or retry behavior. Page size controls the maximal amount of time series points returned in one page of list_time_series response. When creating pages, api tries to put a full time series on a single page, adding additional time series beyond the first one to the page only if it can fully fit. Note that api has a default configuration of retries and timeouts in place.

Executing a request

Having the inputs explained above, we can execute the time series listing request as follows:

results = client.list_time_series(
    name=name,
    filter_=monitoring_filter,
    interval=time_interval,
    view=view,
    page_size=1440,
)

Processing time series data

Executing the request returns an instance of PageIterator object, which can be used to iterate over collected TimeSeries instances matching the request’s arguments.

time_series_list = []
for page in results.pages:
    for time_series in page:
        time_series_list.append(time_series)

For a freshly created GKE cluster with three nodes, this request should return 5 TimeSeries objects, as the specified metric is collected for 5 containers, the name of which begins with “kube” and page_size is big enough to hold them all on one page.

TimeSeries object contents

Let’s take a look at the contents of a single TimeSeries object.

  • time_series_list[0].metric
labels {
  key: "memory_type"
  value: "evictable"
}
type: "kubernetes.io/container/memory/used_bytes"

Contains information about the metric of TimeSeries—its type and a dict with labels. As we can see, their values match the monitoring filter.

  • time_series_list[0].resource
type: "k8s_container"
labels {
  key: "cluster_name"
  value: "my-cluster"
}
labels {
  key: "container_name"
  value: "kubedns"
}
labels {
  key: "location"
  value: "europe-west1-b"
}
labels {
  key: "namespace_name"
  value: "kube-system"
}
labels {
  key: "pod_name"
  value: "kube-dns-5c446b66bd-v4zkf"
}
labels {
  key: "project_id"
  value: "my-project-id"
}

resource attribute contains the resource_type represented by given TimeSeries and values for all its matching labels. This particular TimeSeries refers to an “kubedns” container.

  • time_series_list[0].metric_kind

metric_kind is an enum integer value describing the kind of metric. There are three kinds of metrics: CUMULATIVE, GAUGE and DELTA (you can find more information about them in Google documentation). To get the kind based on the integer value, use the following function:

from google.cloud.monitoring_v3.types import MetricDescriptor
    
METRIC_KINDS = ["GAUGE", "DELTA", "CUMULATIVE"]
    
def get_metric_kind(metric_kind_value: int) -> str:
    for metric_kind in METRIC_KINDS:
        if metric_kind_value == getattr(MetricDescriptor, metric_kind):
            return metric_kind
    raise TypeError("Unknown kind of metric returned from Monitoring API.")
  • time_series_list[0].value_type

Another enum describing the type of value represented by TimeSeries points. There are five value types: BOOL, INT64, DOUBLE, STRING and DISTRIBUTION (see their description in Google documentation). We will deal with the value_type in the last component of the response.

  • time_series_list[0].points
[
  interval {
    start_time {
      seconds: 1598601360
    }
    end_time {
      seconds: 1598601360
    }
  }
  value {
    int64_value: 35725312
  },
  interval {
    start_time {
      seconds: 1598601240
    }
    end_time {
      seconds: 1598601240
    }
  }
  value {
    int64_value: 35717120
  },
...
]

points is the list containing measured values. The time difference between points varies depending on the metric. Each element of the list is an instance of Point class that consists of two attributes:

  • interval—describes the time of measurement. interval is provided as timestamps of start and end time of measurement, which are identical in the case of a single point. It is worth noting that timestamps are rounded to full minutes.
  • value—contains the measured value. It is an instance of TypedValue class and represents the same type as the value_type attribute of the TimeSeries object. To collect the value itself, use the method below on TypedValue objects:
from typing import Union
    
from google.api.distribution_pb2 import Distribution
from google.cloud.monitoring_v3.types import TypedValue
    
VALUE_TYPES = ["bool_value", "distribution_value", "double_value", "int64_value", "string_value"]

def get_typed_value(
    typed_value: TypedValue
) -> Union[bool, Distribution, float, int, str]:
    for value_type in VALUE_TYPES:
        if typed_value.HasField(value_type):
            return getattr(typed_value, value_type)
    raise TypeError("Unknown type of value returned in Cloud Monitoring.")

Storing time series in a DataFrame

Knowing the contents of TimeSeries objects, we can proceed to transfer their data to pandas DataFrames (separate DataFrame for every resource type). The format of our DataFrames will resemble a fact table, where a single row holds the values of various metrics collected for a given resource—specific container, pod, or node—at a specific timestamp. As such, the same timestamp might appear in the DataFrame in multiple rows because we will be storing data for multiple instances of a given resource type—multiple containers, pods or nodes.

Now, let’s assume that multiple calls to the Monitoring API have been made, and various TimeSeries, regarding different metrics and resource types, have been collected and stored in time_series_list. We can transfer their data to DataFrames using the method below:

from typing import Dict, List

import numpy as np
import pandas as pd
from google.cloud.monitoring_v3.types import TimeSeries


def store_time_series_in_dfs(time_series_list: List[TimeSeries]) -> Dict:
    resource_dataframes = {}
    for time_series in time_series_list:
        resource_type = time_series.resource.type
 
        metric_column_name = get_metric_column_name(time_series)
 
        single_time_series_df = transform_time_series_into_df(
            time_series, metric_column_name
        )
 
        if resource_type not in resource_dataframes:
            resource_dataframes[resource_type] = single_time_series_df
            continue
 
        columns_to_merge_on = ["timestamp"] + list(
            time_series.resource.labels.keys()
        )
 
        resource_dataframes[resource_type] = merge_single_ts_into_main_df(
            resource_dataframes[resource_type],
            single_time_series_df,
            columns_to_merge_on,
            metric_column_name,
        )

        return resource_dataframes

Let’s go through it step by step.

  1. resource_dataframes is a dictionary where we store our resulting DataFrames—one for every resource type.
  2. We start by preparing a name for column storing metric value for every time series using the following function:
def get_metric_column_name(time_series: TimeSeries) -> str:
    metric_kind = get_metric_kind(time_series.metric_kind)
    metric_labels = sorted(
        [
            "-".join([key, value])
            for key, value in time_series.metric.labels.items()
        ]
    )
    metric_column_name = "__".join(
        [time_series.metric.type, metric_kind] + metric_labels
    )
    return metric_column_name

get_metric_column_name first gets the type of metric from the time series using the previously presented function get_metric_kind. Then, it prepares a string from metric’s labels—if there are any, sorting is done to avoid random results. These components are combined with metric type to form the final metric column.

  1. Contents of single time series are then transformed into a DataFrame by the function transform_time_series_into_df:
def transform_time_series_into_df(
    time_series: TimeSeries,
    metric_column_name: str
) -> pd.DataFrame:

    points = [
        (point.interval.end_time.seconds, get_typed_value(point.value))
        for point in time_series.points
    ]

    df = pd.DataFrame(
        points, columns=["timestamp", metric_column_name]
    )

    for label, value in time_series.resource.labels.items():
        df[label] = value
    
    return df

transform_time_series_into_df transforms time series’ points into a list of timestamp/metric value pairs, and then uses it as a base for the DataFrame. Point’s values are collected using the get_typed_value method described above. As the last step, columns are added for every resource label—they contain the same value of their respective label in every row.

  1. If this is the first DataFrame of a given resource type, then we simply save it in the results dictionary under resource_type key and continue to the next time series.
  2. Otherwise, we need to merge the new DataFrame into the main one, which holds all time series from the given resource type processed so far. Merging is done on all of the new DataFrame’s columns, apart from the metric column:
def merge_single_ts_into_main_df(
    resource_df: pd.DataFrame,
    single_time_series_df: pd.DataFrame,
    columns_to_merge_on: List[str],
    metric_column_name: str,
) -> pd.DataFrame:
    
    join_metric_columns = metric_column_name in resource_df.columns

    resource_df = pd.merge(
        resource_df,
        single_time_series_df,
        on=columns_to_merge_on,
        how="outer",
        sort=False,
    )

    if join_metric_columns:
        resource_df[metric_column_name] = (
            resource_df[f"{metric_column_name}_x"].combine(
                resource_df[f"{metric_column_name}_y"],
                func=combine_metric_columns,
            )
        )

        resource_df.drop(
            [f"{metric_column_name}_x", f"{metric_column_name}_y"],
            axis=1,
            inplace=True,
        )
    
    return resource_df

merge_single_ts_into_main_df first checks if the provided metric column is already present in the resource_df:

  • If it is not, then just the merging operation is performed. We use an outer strategy because consecutive time series contain data for different container/pods/nodes, so they might not match any existing row and should be simply appended to the resource_df instead. Sorting is set to False to improve performance.
  • If it is, then merging duplicates the metric column, adding _x and _y suffixes. We need to combine them back together using the following function:
def combine_metric_columns(
    value_x: pd.Series, value_y: pd.Series
) -> pd.Series:
    if np.isnan(value_x):
        return value_y
    if np.isnan(value_y):
        return value_x
    if value_x != value_y:
        raise ValueError(
            "Failed to join metric columns. "
            "Metric columns have different non-nan values in the same row."
        )
    return value_x

combine_metric_columns joins the two columns only if their values are equal or one of the values is empty in every row. This should be true if all time series were collected from the same cluster. After joining the columns, we just need to drop the duplicated one.

Let’s have a look at a fragment of a resulting DataFrame for nodes:

project_id location cluster_name node_name timestamp

kubernetes.io/node/

cpu/core_usage_

time__CUMULATIVE

my-project-id europe-west1-b my-cluster node-1 1599560520 32693.0598
my-project-id europe-west1-b my-cluster node-2 1599560520 31978.19344
my-project-id europe-west1-b my-cluster node-3 1599560520 118933.8632
my-project-id europe-west1-b my-cluster node-1 1599560640 32735.86465
my-project-id europe-west1-b my-cluster node-2 1599560640 32021.55832
my-project-id europe-west1-b my-cluster node-3 1599560640 119098.8055
my-project-id europe-west1-b my-cluster node-1 1599560760 32784.97255
my-project-id europe-west1-b my-cluster node-2 1599560760 32061.72853
my-project-id europe-west1-b my-cluster node-3 1599560760 119266.1638
*Scroll right to see more.*

The first four columns are resource labels for the k8s_node resource. As we can see, every row contains values corresponding to a specific timestamp and node. The same timestamp occurs in multiple rows, but then, in every row, values refer to a different node. In fact, values in some metric columns might be empty for some timestamps if you collect metrics with different intervals between their values.

Summary

After reading this article, you should be able to set up Cloud Monitoring on your own GKE cluster, collect time series regarding its performance and store them in a readable and easily transferable format of pandas DataFrames. Thanks to that, you can easily store this data in the cloud and analyze it using dashboarding and data visualization tools to find choke points in your cluster.

Kamil Olszewski

Software Engineer

Did you enjoy the read?

If you have any questions, don’t hesitate to ask!