Implementing MLOps pipeline in Vertex AI to adapt to the changes in data

Muthukumaraswamy
Searce
Published in
11 min readFeb 24, 2022

--

The goal of this post is to demonstrate the process of building a machine learning pipeline to detect objects.We will demonstrate how to use Vertex AI’s AI and Cloud Functions to prepare MLOps to prepare for data drift situations.

Our machine learning pipeline and trigger will be built using Vertex AI, Google Cloud Storage, and Cloud Function. The Vertex AI service is a collection of different AI services. This post utilizes everything that Vertex AI has to offer, including its Dataset, AutoML, Notebook, Pipeline, Model, and Endpoint features.

Img. 1 shows a step-by-step guide on how to build an entire pipeline. Think of the workflow as a set of components. This helps to understand a machine learning system’s blueprint. We will go through them one by one. It is important to remember that each component is not an independent task, but they connect to each other in order.

Img1

Data Preparation

We cannot do anything without data, and it is at the heart of machine learning. Preparing the dataset is clearly the first step. A managed dataset feature is available from Vertex AI. In addition to importing data from local storage, you can point to a GCS bucket if you already have the dataset stored within it.

As you know, data alone is not enough. Labels and annotations are required. The Vertex AI Dataset lets you import labels directly when importing raw data. Labels need only to be arranged according to the suggested format. Depending on the data types, you can make your own labeling file here.

A sample label for an image classification task is shown below in CSV format (you can use JSONL as well).

[ML_USE], GCS_FILE_PATH,[LABEL]

Here is another example of object detection in CSV format.

[ML_USE],GCS_FILE_PATH,[LABEL],[BOUNDING_BOX]*

ML_USE can be ignored in this case, but if you decide to manually split data into training/validation/test sets, you can use either training or validation. There are eight values in the BOUNDING_BOX, two of which are paired. As you might expect, it represents coordinates of edges within a bounding box. Basically, the order has to follow X_MIN, Y_MIN, X_MAX, Y_MIN, X_MAX, Y_MAX, X_MIN, Y_MAX.

Training

In order to train a model with your dataset, you have multiple options. Using Vertex AI AutoML features to realize MLOps is the focus of this post. I have chosen AutoML because of these three reasons. Modeling doesn’t need to be my concern. My only task is to prepare the dataset correctly so that AutoML can understand it. There is no additional workload for this since the Vertex AI Dataset matches the Vertex AI AutoML perfectly.

Second, the state-of-the-art model may not be adequate when the dataset evolves. Data engineering, modeling, and hyper-parameter tuning probably require a number of different versions of codes. Results from AutoML are generally excellent. Consequently, Google engineers are likely to modify and maintain the internal algorithms over time, which potentially ensures we almost always use state-of-the-art AutoML modeling software.

Vertex AI Pipeline can easily be integrated with Vertex AI AutoML. Essentially, Vertex AI Pipeline is Kubeflow Pipeline wrapped in Vertex AI, and Google has created a bunch of Kubeflow components integrated into the standard Kubeflow Pipeline seamlessly. Consequently, you can use Vertex AI AutoML to write Python code for custom components and connect them.

Deployment and Monitoring

Model export and serving an endpoint are two separate operations that can be considered as one operation during deployment. By leveraging Vertex AI Models and Endpoints, both of them can be utilized. All models at Vertex AI Model are managed in this central place, along with their versions. The trained results can be viewed as metrics and simple predictions can be made.

You can create an endpoint with your chosen model once you are ready to deploy the model for real-world users. In fact, the model’s endpoint is managed by Vertex AI Endpoint in Google Kubernetes Engine. You don’t need to worry about scalability. During the early stages of your business, only a few nodes can be served, but when you become too big to handle requests with only a few nodes, the number of nodes can grow smoothly.

Model monitoring capability in Vertex AI Endpoints

Monitoring functionality for the Vertex AI Endpoint includes prediction/second, request/second, latency, and prediction error percentage. Vertex AI provides additional monitoring features for tabular model and custom trained model to inspect the model behaviour in depth but AutoML is expected to be supported in near future. It’s hard to handle concept/data drift issues, but it is possible to see if there are errors in prediction requests, if the prediction delays more than expected, if the throughput is not enough with Vertex AI.

Pipeline and Trigger

A dataset can be created, a model can be trained, an endpoint can be instantiated, and a model can be deployed independently. The best solution is to build a pipeline that does all these jobs consistently. With AutoML you will likely have the best model. In other words, we just need to prepare more data and run the pipeline when we observe model performance degradation.

How can we trigger the pipeline to run to learn a new dataset? A system should be in place for detecting changes in the dataset via an event listening system. Cloud Function can be used for this purpose. Cloud Function can monitor changes to a designated GCS bucket when the bucket is modified. Using this capability, we can easily run the pipeline as more data is recorded.

Initial Operational Workflow

A base dataset is needed to demonstrate MLOps in its initial phase. Creating a dataset involves multiple steps, as shown in image 3. The first step is to choose a dataset type and a task type. As part of this project, I have selected “Image object detection” as part of the “IMAGE” category. Two, you can upload images straight from your local file system, or if you already have images uploaded to a GCS bucket, you can directly select them.

Img 3 Creating Vertex AI Dataset

You can also upload an extra label file using the same interface as in image 3. Here’s how: Because I did not have labels for the images, I just clicked “CONTINUE” after uploading them.

There are nice labeling tools available in your browser with Vertex AI Dataset. Through this functionality, I have labeled approximately 100 images by simply dragging and dropping the mouse position. You will have a complete dataset, meaning that the data and the labels associated with it will be stored in a GCS bucket.

We are finally ready to start building our pipeline with the dataset. You can write pipelining code directly in the terminal or favorite IDE, but it’s usually better to run it in a Jupyter Notebook. We can go back and forth between the code and edit it easily because it provides an interactive environment. As an added benefit, Vertex AI Notebook does not require you to handle any authorizing process for the GCP as it already operates in the GCP environment.

Img4 — Creating and running a pipeline for Vertex AI Pipeline

img 4 It demonstrates how the initial pipeline run proceeds within a notebook. Here’s some code showing how to import libraries and set variables in the first step.

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aipfrom kfp.dsl import pipeline
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClientPROJECT_ID = “YOUR_GCP_PROJECT_ID”
REGION = “GCP_REGION_TO_RUN_PIPELINE”
PIPELINE_ROOT = “LOCATION_RUN_METADATA_IS_GOING_TO_BE_STORED”
DATASET_META_PATH = “LOCATION_DATASET_METADATA_IS_STORED”

The three package must include three five libraries. Below are two blocks of code showing how they are used. In the first code block, we describe how to create a pipeline with three pipeline components. By using the @component decorator, you can indicate the entire pipeline is defined in the function pipeline. Using the @component decorator, we can break the three components up into separate functions. I have compiled everything in one place to make this sample as straightforward as possible.

The ImageDatasetCreateOp component imports datasets that have been defined with Vertex AI Dataset. This component requires three things in order to be instantiated: GCP Project ID, GCS path where the label file is kept, and task type.

@pipeline(name=”my-pipeline”)
def pipeline(project: str = PROJECT_ID):
ds_op = gcc_aip.ImageDatasetCreateOp(
project=project,
display_name=”DATASET_NAME_TO_APPEAR”,
gcs_source=DATASET_META_PATH,
import_schema_uri=\
aiplatform.schema.dataset.ioformat.image.bounding_box,
) training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
project=project,
display_name=”my-demomodel”,
prediction_type=”object_detection”,
model_type=”CLOUD”,
base_model=None,
dataset=ds_op.outputs[“dataset”],
model_display_name=”my-demomodel”,
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=20000,
) endpoint_op = gcc_aip.ModelDeployOp(
project=project, model=training_job_run_op.outputs[“model”]
)

AutoMLImageTrainingJobRunOp is the next component. This component is used for all image processing operations. As you can see, the prediction_type argument allows you to specify the specific task type. Also note that model_type is “CLOUD”. AutoML uses this to determine the type of resulting model to produce. If you want a lighter model with low latency, you can set model_type differently to “CLOUD_LOW_LATENCY_1”. Please see the API document for more information about the available options. I just used the standard average model for this project.

In the AutoMLImageTrainingJobRunOp component, there are three more arguments to consider. First, you can specify directly the split ratio between training, validation, and test. During the dataset preparation stage, you can specify which images are associated with which datasets, but if you explicitly set them within this component, it will ignore that information and randomly assign data according to the ratios.This is a good way to get started if you’re not able to decide carefully the splits yourself. When to stop training, the budget_milli_node_hours serves as a constraint. If you train AutoML forever, the model will grow in size indefinitely, so you must decide when to stop training. In any other case, you will lose a lot of accuracy and end up paying lots of money. Last but not least, the dataset argument must be used to indicate which dataset the AutoML should be trained on. Since the training job must be executed after the dataset creation operation, there is a dependency between the dataset argument and ImageDatasetCreateOp.

ModelDeployOp is the final component. Although it has the name “Endpoint”, it can create an endpoint and deploy the trained model to it. It is convenient to do both operations simultaneously rather than doing them separately, but it is possible to do them separately. To deploy a model, all you need to do is specify it with a model argument. In addition to that, this argument establishes a connection and a dependency between AutoMLImageTrainingJobRunOp and ModelDeployOp.

compiler.Compiler().compile(
pipeline_func=pipeline, package_path=PIPELINE_SPEC_PATH
)api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)response = api_client.create_run_from_job_spec(
PIPELINE_SPEC_PATH,
pipeline_root=PIPELINE_ROOT,
parameter_values={“project”: PROJECT_ID},
)

When we have defined the pipeline in its entirety, we should compile it using the compiler.Compiler().compile method. During compilation, pipeline function definitions are used to create a pipeline specification. Many hidden pieces are recorded in the pipeline specification, including task dependencies, which cloud machine type and which container image is to be used, and many other details. As a result of specifying the package_path argument, the compiler creates a JSON file containing the pipeline specification. To run the pipeline with the JSON file, you simply need to pass it to the create_run_from_job_spec method of the Vertex AI Client. This is highly helpful for triggering and reusing pipelines automatically.

You will be able to run the initial pipeline in the Vertex AI Pipeline as soon as you send a request to Vertex AI with the JSON file. Using Vertex AI Model, Endpoint UI Panels, you can get an overview of the trained model, the endpoint, and the model which has been deployed to the endpoint at the end of the pipeline run.

Observing data drift

You can easily test your model with Vertex AI Model’s testing functionality after you have sent an image to the endpoint for prediction. This is a useful feature because the predicted result can be easily seen on the GCP console.I have given some of my images to the model so it could test whether it is able to detect my face at the age of three as expected. However, I got unexpected results that the model correctly identified me grown up faces.

Unexpected result (1). The trained model recognizes her face at the age of 3 even though it was trained on the pictures of infants

Based on the recent images, I have continued to test the model, and I have seen that the COVID-19 pandemic happened last year. She often wore masks in her photos, and there were some pictures of her wearing sunglasses. At the beginning of this project, when I brainstormed the idea, I was surprised by this situation, and I quickly identified it as the data drift problem.

Manually running the pipeline after collecting and labeling new data is possible. We have created the JSON specification file, so we should implement an automatic way to trigger pipeline runs whenever we have new data.

Below is an image showing a data drift workflow. Here you can see that the Vertex AI Notebook is no longer needed, and the pipeline run is not performed directly from the notebook. Instead, we can create and deploy a small function on Cloud Function that listens to an event of a change in a designated GCS bucket. The metadata of the final dataset is stored in a separate GCS bucket. During the process of labeling your dataset one by one, the metadata keeps changing, and you don’t want to run the pipeline every time it changes. The finalized metadata can instead be exported into a separate GCS bucket when we are finished.

Below is the code for the Cloud Function. This is so simple because we already have the JSON file for the pipeline specifications. The function vertex_ai_pipeline_trigger gets called whenever a file belonging to a designated GCS bucket changes. Therefore, we need to write a simple conditional statement for filtering. When exporting the Vertex AI dataset, the code below ensures the pipeline runs whenever a jsonl file with a supportable extension is modified.

from kfp.v2.google.client import AIPlatformClientPROJECT_ID = “YOUR_GCP_PROJECT_ID”
REGION = “GCP_REGION_TO_RUN_PIPELINE”
PIPELINE_ROOT = “LOCATION_RUN_METADATA_IS_GOING_TO_BE_STORED”
PIPELINE_SPEC_PATH = “LOCATION_PIPELINE_SPEC_IS_STORED”def vertex_ai_pipeline_trigger(event, context):
print(‘File: {}’.format(event[‘name’]))
print(‘Extension: {}’.format(event[‘name’].split(“.”)[-1])) if event[‘name’].split(“.”)[-1] == “jsonl”:
print(“target file extension”) api_client = AIPlatformClient(
project_id=PROJECT_ID,
region=REGION
)
print(“api_client is successfully instantiated”) response = api_client.create_run_from_job_spec(
PIPELINE_SPEC_PATH,
pipeline_root=PIPELINE_ROOT,
parameter_values={“project”: PROJECT_ID},
)
print(‘response: {}’.format(response))

We have seen in the notebook section that the code to run the pipeline follows the same conditional statement. If you’re using other GCP services, you might wonder if Vertex AI requires extra authentication. Due to the fact that Cloud Function and Vertex AI are both GCP services, it can be streamlined.

gcloud functions deploy YOUR_FUNCTION_NAME \
— trigger-resource YOUR_TRIGGER_BUCKET_NAME \
— trigger-event providers/cloud.storage/eventTypes/object.finzlize

We can deploy the python code above to Cloud Function by executing the shell command below. This command should be run in the same directory as the python file, and “YOUR_FUNCTION_NAME” should match the name of the function. For more information, please see the official guide. Ensure that the requirements.txt file is in the same directory as any necessary libraries. Google-cloud-aiplatform was used to access Vertex AI APIs in this project.

In this tutorial, we looked at how Vertex AI can be used to create a simple but extendable MLOps pipeline. Through Vertex AI, you can create your very own data set within your browser. The notebook service lets you interact with Vertex AI APIs through codes. When you have corrected your code base, you can run your initial pipeline and create the JSON specification file, which contains all the details of how to run the pipeline later without having to write any actual code. To conclude, Cloud Function and GCS can be integrated with Vertex AI to automatically run whenever your dataset changes.

--

--