The nuts and bolts of Vertex AI – introduction to pipelines

So far, we have worked with individual jobs. However, a typical machine learning flow is more complicated than this – you might have to import data, transform it, run a training, validate the outcomes or do a hyperparameter search, package a model and deploy it. Instead of putting all this into a single job, VertexAI allows you to define pipelines that model this sort of flow.

Before we start to discuss pipelines, let us describe the problem that they are trying to solve. Suppose you have a standard machine learning workflow consisting of preparing your input data, training a model, performing a validation and finally packaging or even deploying your model. During each of these steps, there are some artifacts that you create – a training data set, validation metrics, or a trained and packaged model.

Of course you could put all of this into a single custom job. However, if that job fails, all artifacts are lost until you implement some sort of caching yourself. In addition, monitoring this job to understand where you are in the process also requires a custom implementation, and we have seen in the previous posts that while you can manage experiment runs and metadata in a custom job, this requires a bit of effort.

Pipelines are a different way to address this problem. Instead of modelling a workflow and dependencies explicitly in your code, you declare individual components, each of which has certain inputs and outputs, and declare a pipeline that contains these components. In the background, VertexAI will then build a dependency graph, run your components, trace and cache inputs and outputs and monitor the execution.

VertexAI pipelines are based on Kubeflow which, among other things, implements a DSL (domain specific language) to describe components and jobs and Python decorators that allow you to model all this in Python code. If you work with pipelines, you will first declare your components, assemble these components into pipelines, create a pipeline specification and finally submit this to the platform for execution.

Let us now go through all of these steps before diving deeper into some topics in upcoming posts.

Component creation

Even though Kubeflow offers several types of components, most components that we will use will be what is called a lightweight Python component. Essentially, this is simply an ordinary Python function annotated as a component, like this.

from kfp import dsl

@dsl.component(
    base_image = "python:3.9")
def do_something():
    # some Python code

When the pipeline is executed, each component will be run as a separate container. We will look in detail at the process of executing a pipeline component a bit later, but essentially the execution engine will spin up the container specified by the base_image parameter of the annotation and run the code within the function inside this container.

This is simple, but has an important implication – the code within the function needs to be fully standalone. As a consequence, no global variables can be referenced, imports have to be done within the body of the function and, most importantly, we cannot invoke any function or class which is not defined inline inside the component function. If, for instance, we want to instantiate a PyTorch model we either have to pull the corresponding class definition into the function or we have to place the file with the model code in the container.

To be useful, a component typically needs to process inputs and produce outputs. Let us see how this works for pipeline components. In VertexAI (and KFP) pipelines, there are two different types of inputs our outputs that your function can declare – parameters and artifacts

A parameter is just an input to your function (i.e. a Python parameter) that is either defined at compile time when assembling the components into a pipeline or – more useful – when you actually submit the pipeline for execution (there are also output parameters, we get back to this). Behind the scenes, the execution engine serializes the value of a parameter into a JSON structure and (by some magic that we will look at in more detail in one of the upcoming posts) calls your component function passing this value as an ordinary Python parameter. KFP supports strings, integers, floats, boolean values and dictionaries and lists as parameters.

While parameters are passed by value, there is a second type of input a function can take and which I tend to think of as passing arguments by reference. This is mostly applied to large pieces of data like models or datasets that you do not want to pass around as a JSON object but as what is called an artifact. Inside the Python code, an artifact is represented by a Python object that has – in addition to its name – three major attributes. First, every artifact has a URI which is the name of a blob on GCS (KFP itself also supports other storage solutions like S3 and Minio). The idea is that within your Python code, the artifact represents the actual artifact stored under this URI. Second, there is a path which also points to the actual artifact but on local storage (we will explain this in a minute), and third there is a metadata property which is just a dictionary which you can use to attach metadata to an artifact.

Let us look at an example to see how this works in practice. Consider the following code snippet that defines a component called train.

from kfp import dsl
from kfp.dsl import Output, Model, Dataset, Metrics


@dsl.component(
    base_image = ...,
)
def train(epochs : int, 
          lr : float,
          data : Input[Dataset],  
          trained_model : Output[Model],
          metrics: Output[Metrics]):
    # Train
    ... 

    # Log some metrics
    metrics.log_metric("final_loss", loss.item())
    #
    # Store trained model as state dir
    #
    torch.save(_model.state_dict(), trained_model.path)

First, we see that all Python function parameters are annotated – this is required for pipeline components, as the runtime needs the type information to do its job. In our example, epochs and lr are parameters. Their values will be injected at runtime, and in the body of your function, you can access them as usual.

The other inputs – data, trained_model and metrics – are artifacts of type dataset, model and metrics (there is also a type for generic artifacts). The annotations of these artifacts do not only specify the type, but also whether this is input or output. In your Python code, you can access attributes like metadata directly to read or modify metadata (though you should only do this for output artifacts). You can also call functions like log_metric on a metrics artifact to log the actual metrics which will later be displayed on the VertexAI console. In fact, Output is a Python type annotation (see PEP 593) so that the parameter model can be treated as being of type Model inside the code.

To access the content of the actual artifact, for instance our model, you use the path property of the artifact. This is a string containing a path to the actual object on the local filesystem. When you run your pipeline, VertexAI will mount the GCS bucket where your objects are located (the so-called pipeline root) via gcsfuse into the local container file system and sets up the value of path for you accordingly. You can now read from this and write to it as needed, for instance to store model weights as in the example above. Via the file system mount, the changes will then be written back to the underlying GCS bucket. So while the actual data comprising the artifact is living on GCS it can be accessed via the Python artifact object – the trained_model in our case – this is why I tend to think of this as a reference.

Pipeline creation

We have now seen you individual components can be created by using decorators (these components are called Python components, there are also more general components called container components but we will not discuss those today). Next we will have to patch these components together into a pipeline. Not surprisingly, this is again done using decorators, and again not surprisingly, the decorator we use is called pipeline . Here is an example.

@dsl.pipeline(
    name = "my-pipeline"
)
def my_pipeline(epochs : int, lr : float, training_items : int):
    _create_data = create_data(training_items = training_items)
    train(google_project_id = google_project_id,
                  google_region = google_region,
                  epochs = epochs,
                  lr = lr,
                  data = _create_data.outputs['data'])

This code snippet defines a pipeline with two steps, i.e. two components, called create_data and train . At the top, we again have an annotation that specifies the name of the pipeline. Within the body of the function carrying this annotation, we then simply call our pipeline components to add them to the pipeline. However, this call does not actually run the code defined for the invidual components but is diverted (thanks to the @component annotation) to a method of the KFP framework which adds the component definition to the pipeline. This looks a bit like black magic – we will take a deeper look at how exactly this works in the next post. So bear with me and simply accept this magic for the time being.

We also see how parameters and artifacts are handled. In our case, the pipeline itself has three parameters that we will define when we actually submit the pipeline. Again these parameters are annotated so that the framework can derive their type and deal with them.

In our example, we also use the output of the first step as input for the second step. To do this, we capture the output of the first step (again, this is not really the output of the annotated function but the output of the wrapper in which the framework wraps our original function) which is a Python object which has – among others – a property called outputs . This is a dictionary with keys being the names of the output parameters of our step, and we can use these keys to feed the data as input artifacts into the next step.

By chaining steps in this way, we also inform VertexAI about the dependencies of our components. In our example, the second step can only start once the first step is complete, as it uses the output of the second step, so there is a dependency. Behind the scenes, this information will be assembled into a directed acyclic graph (DAG) in which the vertices are the components and edges represent dependencies. When running our pipeline, components that can run in parallel because there are no dependencies will also be scheduled to run in parallel, but where needed the execution is serialized.

The output of the component invocation that we capture in the variable _create_data (the underscore is really just an arbitrary naming convention that I use, any name will do) is also useful for a second purpose – specifying the execution environment. For that purpose, this output (which is actually of type PipelineTask) has a set of methods like set_cpu_limit or set_memory_request to define limits and requests for the later execution (designed to run on a Kubernetes platform with the usual meaning of requests and limits, but this is not guaranteed by the framework). In particular, you can use set_accelerator_type to request a GPU of a specific type. Here is an example.

_train.set_accelerator_type("NVIDIA_TESLA_T4")
 _train.set_cpu_limit("4")

When playing with this, I experienced a strange problem – if I did not request at most four CPUs for a machine using a T4 GPU, the output of my Python script did not show up in the logs. There is even a ticket for this recommending to define a higher CPU limit (as I have done it above) as a workaround, so until this is resolved you will probably have to live with that and the additional cost.

Compiling a pipeline

We now have our beautiful pipeline in Python. This is nice, but to submit a pipeline we actually need to convert this into something that the VertexAI engine can process – a pipeline definition in what is called IR YAML (intermediate representation YAML). This contains among other data the specification of the individual components of the pipeline, information on how to execute them (we will get deeper into this when we talk about the details of component execution in one of the next posts) and the DAG that we have constructed.

To translate the Python representation into an IR YAML file, we use a component called the compiler. In the most common case, this only requires two parameters – the pipeline that we want to compile and the name of a file to which we write the YAML output.

compiler.Compiler().compile(pipeline_func = my_pipeline, 
                        package_path = "my-pipeline.yaml")

Depending on the file name extension, the compiler will either create JSON or (recommended by the documentation) YAML output. We will take a closer look at the generated output in the next post in this series. We also note that it is possible to compile individual components and to load components from YAML files, so you could build a library of reusable components in YAML format as well.

Submitting a pipeline

Let us now submit the pipeline. It is important to understand that the YAML file that we have just created is all that actually gets uploaded to the VertexAI platform and contains all information that we need to run the pipeline, our Python code is just a local artifact (but shows up again in different form in the YAML file). To submit a pipeline to run as what is called a pipeline job, we first create a pipeline job object from the compiled YAML and then submit it.

#
# Create a pipeline job from the 
# pipeline definition
#
pipeline_job = aip.PipelineJob(
    display_name = "my-pipeline",
    template_path = "my-pipeline.yaml",
    pipeline_root = f"gs://vertex-ai-{google_project_id}/pipeline_root",
    parameter_values = {
        "epochs" : 5000,
        "lr" : 0.05,
        "size" : 1000,
    },
    project = google_project_id,
    location = google_region
)

Here, the template_path argument specifies the path to the compiled YAML file. The pipeline root parameter specificies a directory path in a GCS bucket under which all artifacts created during this pipeline run will be stored. You can also specify the values of parameters at runtime here (in fact, you will have to do this for all parameters which are not fixed at build time). In addition, you create a job in a specific project and region as usual.

Once created, you can submit the job using

pipeline_job.submit(
    service_account = service_account
)

Note the service account parameter which has a meaning similar to a custom job and is the service account under which all steps of this pipeline will execute.

Let us now try this with a real example. In my repository (that you should have cloned by now if you have followed the instructions in the introduction to this series), I have prepared a sample pipeline that uses all of this. To run this pipeline execute the following commands, starting from the root of the repository.

cd pipelines
# Will create the IR yaml
python3 pipeline_definition.py
# will submit the pipeline
python3 submit_pipeline.py 

Once the pipeline is submitted, it is instructive to take a look at the graphical representation of the pipeline in the VertexAI console in the pipeline tab. Your newly submitted pipeline should show up here, and as you click on it, you should see a screen similar to the following screenshot.

What we see here is a graphical representation of the DAG. We see, for instance, that the validation step depends – via the trained model – on the training step, but also on the data creation.

On this screen, you can also click on one of the artifacts or steps to see more information on it in an additional panel on the right. For a step, you can navigate to the job itself (VertexAI will in fact create one custom job for each step) and its logs. For an artifact, you can click on “View lineage” to jump to the metadata screen where you will find that VertexAI has created metadata entries for our metrics, models and datasets but also for the steps and has associated artifacts to execution as we have done it manually when working with custom jobs.

When you submit the exact same pipeline again, you will produce a second pipeline run but this run will complete within seconds. This is the power of caching – VertexAI realizes that neither the jobs nor the input has changed and therefore skips all steps that would produce the same output again. This is very useful if you modify one of the later steps in the pipeline or if a step fails as it avoids having to re-run the entire pipeline again.

This concludes our short introduction. In the next post, we will shed some light on the apparent magic behind all this and learn how exactly components and pipelines are built and executed.

The nuts and bolts of VertexAI – metadata, logging and metrics

One of the core functionalities of every machine learning platform is traceability – we want to be able to track artifacts like models, training jobs and input data and tie all this together so that given a model version, we can go back all the way to the training data that we have used to train that version of the model. On VertexAI, this is handled via metadata which we will discuss today.

Metadata items and their relations

Let us start with a short introduction into the data model behind metadata. First, all metadata is kept ´behind the scenes in a metadata store. However, this is not simply a set of key-value pairs. Instead, the actual metadata is attached to three types of entities that populate the metadata store – artifacts, executions and contexts. Each of these entities has a resource name, following the usual naming conventions (with the metadata store as the parent), and a metadata field which holds the actual metadata that you want to store. In addition, each item has a display name and refers to a schema – we will come back to this later – plus a few fields specific for the respective entity.

Why these three types of metadata? The idea is that artifacts are the primary objects of interest. An artifact can be used as an input for a training run, like a dataset, or can be an output, like a model. Executions are the processes that consume and produce artifacts. Finally, there might be a need to group several executions together because they are somehow related, and this is the purpose of a context (at first glance, this seems to be based on MLMetadata which is part of TFX).

To model these ideas, the different types of metadata entities have relations that can be set and queried via API calls – they form a graph, called the lineage graph. For instance, an execution has a method assign_input_artifact to build a relation between an artifact and an execution, and a method get_input_artifact to query for these artifacts. Other types of entities have different relations – we can add an execution to a context and we can also build context hierarchies as a context can have children. Here is a graphical overview of the various relations that our metadata entities have.

Schemata, experiments and experiment runs

So far, this has been an abstract discussion. To make this concrete, we need to map this general pattern to the objects we are usually working with. An artifact, for instance, could be a model or a dataset – how do we distinguish between these types?

Instead of introducing dedicated objects for these different types of artifacts, VertexAI uses schemas. A schema describes the layout of the metadata but also serves to identify what type of entity we are looking at. Here is an API request that you can submit to get a list of all supported schemas.

TOKEN=$(gcloud auth print-access-token)
ENDPOINT="https://$GOOGLE_REGION-aiplatform.googleapis.com"
PARENT="projects/$GOOGLE_PROJECT_ID/locations/$GOOGLE_REGION"
curl \
  -H "Authorization: Bearer $TOKEN" \
   $ENDPOINT/v1/$PARENT/metadataStores/default/metadataSchemas

Not every schema makes sense for every type of metadata entity. In the output of the statement above, you will find a schemaType field for each of the schemas. This fields tells us whether the respective schema describes an artifact, an execution or a context.

In the SDK code, the used schemas per type are encoded in the modules in aiplatform.metadata.schema.system . There is, for instance, a module artifact_schema.py which contains all schemas that can be used for artifacts (there are some schemas, however, in the output of the REST API that we call above which do not appear in the sourcecode). lf we go through these modules and map artifact types to schemas, we get the following picture.

Let us quickly discuss the most important types of schemas. First, for the schema type Artifact, we see some familiar terms – there is the generic artifact type, there are models, datasets and metrics.

Let us now turn to executions. As already explained, an execution is representing an actual job run. We see a custom job execution as well as a “Run” (which seems to be a legacy type from an earlier version of the platform) and a “ContainerExecution”. For the context, we see four different options. Today, we will talk about experiments and experiment runs and leave pipelines and pipeline runs to a later post.

An experiment is supposed to be a group of executions that somehow belong together. Suppose, for instance, you want to try out different model architectures. For that purpose, you build a job to train the model and a second job to evaluate the outcomes. Then each architecture that you test would correspond to an experiment run. For each run, you execute the two jobs so that each run consists of two executions. Ideally, you want to be able to compare results and parameters for the different runs (if you have ever taken a look at MLFlow, this will sound familiar).

This fits nicely into our general pattern. We can treat our trained model as artifact and associate this with the execution representing the training job. We can then associate the executions with the experiment run which is modelled as a context. The experiment is a context as well which is the parent of the experiment run, and everything is tied together by the relations between the different entities sketched above.

Creating metadata entities

Time to try this out. Let us see how we can use the Python SDK to create metadata entities in our metadata store. We start with artifacts. An artifact is represented by an instance of the class aiplatform.metadata.artifact.Artifact. This class has a create method that we can simply invoke to instantiate an artifact.

artifact = aip.metadata.artifact.Artifact.create(
    schema_title = artifact_schema.Artifact.schema_title,
    uri = f"gs://vertex-ai-{google_project_id}/artifacts/my-artifact",
    display_name = "my-artifact",
    project = google_project_id,
    location = google_region
)

Note the URI which is specific to the artifact. The idea of this field is of course to store the location of the actual artifact, but this is really just a URI – the framework will not make an attempt to check whether the bucket even exists.

Similarly, we can create an execution which is an instance of execution.Execution in the metadata package. There is a little twist, however – this fails with a GRPC error unless you explicitly set the credentials to None (I believe that this happens because the credentials are optional in the create method but no default, not even None, is declared). So you need something like

execution = aip.metadata.execution.Execution.create(
    display_name = "my-execution",
    schema_title = execution_schema.CustomJobExecution.schema_title,
    project = google_project_id, 
    location = google_region,
    credentials = None
)

Once we have an execution and an artifact, we can now assign the artifact as either input or output. Let us declare our artifact as an output of the execution.

execution.assign_output_artifacts([artifact])    

Next, we create the experiment and the experiment run as a context. Again there is a subtle point – we will want that our experiment appears on the console as well, and for this to work, we have to declare a metadata field experiment_deleted. So the experiment is created as follows.

parent_context = aip.metadata.context.Context.create(
    schema_title = context_schema.Experiment.schema_title,
    display_name = "my-experiment",
    project = google_project_id,
    location = google_region,
    metadata = {"experiment_deleted": False}
)

We create the experiment run similarly (here no metadata is required) and then call add_context_children on the experiment context to declare the run a subcontext of the experiment. Finally, we tie the execution to the context via add_artifacts_and_executions. The full code can be found in create_metadata.py in the metadata directory of my repository.

It is instructive to use the Vertex AI console to understand what happens when running this. So after executing the script, let us first navigate to the “Experiments” tab in the section “Model Development”. In the list of experiments, you should now see an entry called “my-experiment”. This entry reflects the context of type system.Experiment that we have created. If you click on this entry, you will be taken to a list of experiment runs where the subcontext of type system.ExperimentRun shows up. So the console nicely reflects the hierarchical structure that we have created. You could now take a look at the metric created by this experiment run, we will see a bit later how this works.

Next navigate to the “Metadata” tab. On this page, you will find all metadata entries of type “Artifact”. In particular, you should see the entry “my-artifact” that we have created. Clicking on this yields a graphical representation between the artifact and the execution that has created as, exactly as we have modelled in our code.

This graph is still very straightforward in our case, but will soon start to become useful if we have more complex dependencies between various jobs, their inputs and outputs.

To inspect the objects that we have just created in more detail, I have put together a script list_metadata.py that uses the list class methods of the various involved classes to get all artifacts, executions and contexts and also prints out the relations between them. This script also has a flag --verbose to produce more output as well as a flag --delete which will delete all entries – this is useful to clean up, the metadata store grows quickly and needs to be purged on a regular basis (if you only want to clean up you might also want to use the purge method to avoid too many API calls, remember that there is a quota on the API).

Tracking metadata lineage

In the previous section, we have seen how we can create generic artifacts, contexts and executions. Usually this is not the way how the metadata lineage is actually built. Instead, the SDK offers a few convenience functions to track artifacts (or, as in the case of pipelines, does all this automatically).

Let us start with experiments. Typically, experiments are created by adding them as parameter when initializing the library.

aip.init(project = google_project_id, 
         location = google_region,
         experiment = "my-experiment")

Behind the scenes, this will result in a call to the set_experiment method of an object that is called the experiment tracker (this is initialized at module import time here). This method will create an instance of a dedicated Experiment class defined in aiplatform.metadata.experiment_resources, create a corresponding context with the resource name being equal to the experiment name and tie the experiment and the context together (if the experiment already exists, it will use the existing context). So while the context exists on the server side and is accessed via the API, the experiment is a purely local object managed by the SDK (at the time of writing and with version 1.39 of the SDK, there are comments in the code that suggest that this is going to change).

Next, we will typically start an experiment run. For that purpose, the API offers the start_run function which calls the corresponding method of the experiment tracker. This method is prepared to act as a Python context manager so that the run is started and completed automatically.

This method will again create an SDK object which is now an instance of ExperimentRun. This experiment run will then be added as child to the experiment, and in addition the experiment run is associated directly with the experiment. So at this point, the involved entities are related as follows.

Note that if a run already exists, you can attach to this run by passing resume = True as additional parameter when you call start_run .

We now have an experiment and an experiment run. Next, we typically want to create an execution. Again there is a convenience function start_execution implemented by the experiment tracker which will create an execution object that can be used as a context manager. In addition, this wraps the assign_input_artifacts and assign_output_artifacts methods of this execution so that all artifacts which will be attached to this execution will automatically also be attached to the experiment run. Here is a piece of code that brings all of this together (see the script create_lineage.py in the repository).

aip.init(project = google_project_id, 
         location = google_region,
         experiment = "my-experiment")

#
# do some training
#
.....
#
#
# Start run
# 
with aip.start_run(run = "my-experiment-run") as experiment_run:
    with aip.start_execution(display_name = "my-execution",
                             schema_title = "system.CustomJobExecution") as execution:

        # Reflect model in metadata and assign to execution
        #
        model  = aip.metadata.artifact.Artifact.create(
            schema_title = artifact_schema.Model.schema_title,
            uri = f"gs://vertex-ai-{google_project_id}/models/my-models",
            display_name = "my-model",
            project = google_project_id,
            location = google_region
        )
        execution.assign_output_artifacts([model])

Note that we start the run only once the actual training is complete, in aligment with the recommendation in the MLFlow Quickstart, to avoid the creation of invalid runs due to errors during the training process.

Logging metrics, parameters and time series data

So far, we have seen how we can associate artifacts with executions and build a lineage graph connecting experiments, experiment runs, executions and artifacts. However, a typical machine learning job will of course log more data, specifically parameters, training metrics like evaluation results and time series data like a training loss per epoch. Let us see what the SDK can offer for these cases.

Similar to functions like start_run which are defined on the package level, the actual implementation of the various logging functions is again a part of the experiment tracker. The first logging function that we will look at is log_params. This allows you to log a dictionary of parameter names and values (which must be integers, floats or strings). Behind the scenes, this will simply look up the experiment run, navigate to its associated metadata context and add the parameters to the metadata of this context, using the key _params. Data logged in this way will show up on the console when you display the details of an experiment run and select the tab “Parameters”. In Python, you can access the parameters by reconstructing the experiment run from the experiment name and the experiment run name and calling get_params on it.

experiment_run = aip.ExperimentRun(
    run_name = "my-experiment-run",
    experiment = "my-experiment")
parameters = experiment_run.get_params()

Logging metrics with log_metrics is very similar, with the only difference that a different key in the metadata is used.

Logging time series data is a bit more interesting, as here a tensorboard instance comes into play. If you create an experiment in the aiplatform.init function, this will invoke the method set_experiment of the experiment tracker. This method will, among other things that we have already discussed, create a tensorboard instance and assign it as so-called backing tensorboard to the experiment. When an experiment run is created, an additional artifact representing what is called the tensorboard run is created as well and associated with the experiment run (you might already have detected this in the list of artifacts on the console). This tensorboard run is then used to log time series which then appears both on the console as well as in a dedicated tensorboard instance which you reach by clicking on “Open Tensorboard” on the console. Note that Google charges 10 USD per GB data and month in this instance, so you will want to clean up from time to time.

Tensorboard experiment, tensorboard runs and logging to a tensorboard can also be done independently of the other types of metadata – we will cover this in a future post after having introduced pipelines.

Let us briefly touch upon to more advanced logging features. First, there is a function log_model which will store the model at a defined GCS bucket and create an artifact pointing to this model. This only works for a few explicitly supported ML frameworks. Similarly, there is a feature called autologging which automatically turns on MLFlow autologging but again this only works if you use a framework which supports this like PyTorch lightning.

Associating a custom job with an experiment run

Let us now everything that we have learned so far together and let us write a custom job which logs parameters, metrics and time series data into an experiment run. As, in general, we could have more than one job under an experiment run, our general strategy is as follows.

First, still outside of the job, we add the experiment parameter when initializing the framework which will create the experiment (if it does not yet exist) and add it to the experiment tracker as seen above. Similarly, we start an experiment run – as the name of an experiment run needs to be unique (it is used as ID internally), we will create the experiment run name as a combination of the experiment name and a timestamp.

Inside the actual training job, we can then call start_run again, this time passing resume = True so that the experiment tracker again points to our previously created run. We can then start an execution to which we can associate metadata, and we can log into the run as demonstrated above.

This approach is supported by the parameters experiment and experiment_run of the submit method (or run method) of a custom job. When we set these parameters, the entries AIP_EXPERIMENT_NAME and AIP_EXPERIMENT_RUN_NAME will be added to the environment of the job so that within the job, we can retrieve the name of the experiment and of the experiment run from there. In addition, the name of the custom job will be added to the context representing the run as metadata item, using the key custom_jobs. So our code to prepare and submit respectively run the job would be something like

EXPERIMENT = "my-experiment"
aip.init(project = google_project_id,
         location = google_region,
         experiment = EXPERIMENT)
...
with aip.start_run(run_name) as experiment_run:
    job = aip.CustomJob.from_local_script(
        display_name = "my-job",
        script_path = "train.py",
        container_uri = image,
        machine_type  = "n1-standard-4",
        base_output_dir = f"gs://{staging_bucket}/job_output/{timestamp}",
        project = google_project_id,
        location = google_region,
        staging_bucket = staging_bucket,
        environment_variables = {
            "GOOGLE_PROJECT_ID" : google_project_id,
            "GOOGLE_REGION" : google_region
        }
    )
    job.run(
        service_account = service_account,
        experiment = EXPERIMENT, 
        experiment_run = run_name
    )

Note that we also pass the Google project ID and the location to the training job as environment variables so that the job can call init with the same parameters. Within the training code, we would then do something like

experiment_name = os.environ.get("AIP_EXPERIMENT_NAME")
experiment_run_name = os.environ.get("AIP_EXPERIMENT_RUN_NAME")

aip.init(project = google_project_id,
         location = google_region,
         experiment = experiment_name)

with aip.start_run(experiment_run_name, resume = True):
   ...
    aip.log_params({
        "epochs" : epochs,  
        "lr" : lr,
    })
    #
    # Store model on GCS
    #
    uri = upload_model(model)
    #
    # Log artifact
    #
    with aip.start_execution(display_name = f"{experiment_run_name}-train",
                             schema_title = "system.CustomJobExecution") 
                   as execution:
        model_artifact  = aip.metadata.artifact.Artifact.create(
            schema_title = "system.Model",
            uri = "uri",
            display_name = "my-model",
            project = google_project_id,
            location = google_region
        )
        execution.assign_output_artifacts([model_artifact])


Of course, you could also log times series data from within the job into your tensorboard instance. Also note that in this setup, it might make sense to use run instead of submit, as otherwise the status of the run in the metadata will be updated to “Complete” once we are done with the submission and leave the context, while the actual job is still running.

Let us try this. Make sure that you are in the directory metadata within the repository clone and run our test script by typing

python3 run_job.py

This should run a job (not just submit it was we have done it previously) that in the background executes the training function in train.py and uses all of the various metadata features discussed for far. You should see our run in the console as soon as the job has started, but metrics will only be populated when it really executes (provisioning of the container might take a few minutes).

In the Vertex AI metadata console, we can now easily navigate from the experiment run to parameters and metrics and even to the artifact, and we see the time series data updated in near time on the console or in our tensorboard instance. With that, we have reached the end of this blog post (which turned out to be a bit longer than expected) – in the next post, we will start to turn our attention to pipelines.

The nuts and bolts of VertexAI – custom training jobs

In the previous post, we have seen how to store trained models and how to spin up prediction endpoints. Now we will follow the ML lifecycle backwards and talk about how you actually train models on the platform. There are several options to do this and we start with the most basic one – custom jobs

Defining custom jobs

Abstractly speaking, a job is a defined workload that you hand over to VertexAI for execution. The platform will then allocate the required compute resources, run the workload, monitor the output and inform you once the execution is complete. This mode of operations is perfect for ML training – instead of spinning up a virtual machine, preparing the environment, uploading and executing your code and then terminating the machine again, you only have to care about the actual code and everything else is handled for you, much in the spirit of serverless computing.

But how do we define our workload? The approach that VertexAI takes at this point is very similar to what we have seen when we discussed models. A job consists of a reference to a piece of code that we want to run and a container that specificies the environment in which our code will be executed. Here is a summary of the workflow that we will go through when assembling and running a custom job.

First, we will pick a custom container (or use one of the prebuilt containers which are part of the VertexAI platform). For our purposes, we will use our base container which is already stored in our repository, so we can skip the step to upload it.

Next, we need a training script. We will reuse the script that we have used in the last posts when working with models, a copy of this script is located in the jobs directory of the repository as train.py – please cd into this directory now.

To actually create a custom job as the third step, we now invoke the class method CustomJob.from_local_script. Among other things, this method receives the URI of our container and the local path to the training script. The script will then be packaged by the SDK into a distributable package which is then uploaded to a GCS bucket called the staging bucket that we need to specify. Here is a table showing the most important parameters of the CustomJob.from_local_script method that we will use.

ParameterDescription
display_nameA human readable job name
script_pathThe path to the local Python script that we want to execute
container_uriThe URI of the container that we will use
machine_typeThe machine type for running our container
base_output_dirA path to a GCS location used as output
projectThe Google project ID
locationThe Google region
staging_bucketBucket that VertexAI will use to stage the packaged script
Parameters to create a job

Let us quickly comment on the base_output_dir parameter. Essentially, this is a GCS location of your choice. The platform will not directly use this, but will instead assemble a few environment variables for your training script to use if you want to store artifacts on GCS (of course you could use any other custom location as well). Specifically, the following environment variables will be set.

AIP_MODEL_DIRbase_output_dir/model/
AIP_CHECKPOINT_DIRbase_output_dir/checkpoints/
AIP_TENSORBOARD_LOG_DIRbase_output_dir/logs
Usage of the base output dir

It is important to understand that at this point, no Google API is really called and the job is not yet submitted – the SDK will only package your script, upload it to the staging bucket and assemble what is called a job specification. Among other things, this job specification contains the code that will be executed when the container is run. It is instructive to construct a job and take a look at the job specification.

staging_bucket = f"vertex-ai-{google_project_id}/"
registry = f"{google_region}-docker.pkg.dev/{google_project_id}"
repository = f"{registry}/vertex-ai-docker-repo"
image = f"{repository}/base"
timestamp = time.strftime("%Y%m%d%H%M%S",time.localtime())
job = aip.CustomJob.from_local_script(
        display_name = "my-job",
        script_path = "train.py",
        container_uri = image,
        machine_type  = "n1-standard-4",
        base_output_dir = f"gs://{staging_bucket}/job_output/{timestamp}",
        project = google_project_id,
        location = google_region,
        staging_bucket = staging_bucket
)

print(job.job_spec.worker_pool_specs[0].machine_spec)
print(job.job_spec.worker_pool_specs[0].container_spec)

To see this in action, let us now run the script create_job.py and observe its output which will consist of the machine spec and the container spec in addition to some debugging output.

python3 create_job.py

We can now run our job, either by calling submit or run. This will instruct VertexAI to pull our container image, run it and execute the command specified in the container spec above which will in turn simply run our training script. But before we try this out, let us see how we can test this locally.

Testing your jobs locally

Even a small job will take some time to execute on the VertexAI platform, simply because the provisioning of the underlying resources will take a few minutes. Therefore you should test your code locally before you submit your job because then debugging and error fixing is possible with much smaller turnaround cycles. Here is a way to do this.

First, we can create our job as above – this does not yet submit anything, but does already make the packaged Python code available on GCS. Then, we can extract the command string that the SDK has assembled from the container specification. We could then try to run our image, passing this command to execute it within the container, setting the required environment variables like AIP_MODEL_DIR manually.

However, there are a few challenges that we need to consider. First, the command that the SDK assembles assumes that our package is located on the local file system – in the background, it probably uses a FUSE file system to map the staging bucket into the container. This is not so easy to simulate locally. However, we can parse the command string, extract the URI of the package on GCS from there, download it to a local directory and replace the reference to the FUSE filesystem in the command by the reference to this local directory. This is a bit of a workaround to avoid having to install gcsfuse, but will work as long as later versions of the SDK do not use a significantly different command string.

We also need to make sure that our code runs under a service account. The easiest way to do this is to place a JSON key file in our working directory, mount this into the docker container and use GOOGLE_APPLICATION_CREDENTIALS in the container to make sure that it is picked up.

Finally, it took me some time to figure out how to handle the command. Eventually, I decided to split off the first two parts (“sh – c”) so that I could escape the remaining string with single quotes, using the first part as entrypoint and the remaining part as arguments.

If you want to try this, switch to the model directory, make sure that the directory contains a file key.json that contains the service account key for the vertex-ai-run service account that we have created previously, and run

python3 create_job.py --local_run

This will run the container and print messages to the console. If everything works fine, you should be able to locate the result of the training run – the uploaded model – within the job_output directory of the staging bucket (yes, I know, this is not really a directory, but let me stick to this terminology).

Running a job on VertexAI

After testing, let us now run our job on VertexAI. In the Python SDK, we can do this by simply invoking the run method of the CustomJob that we have created. This accepts the service account that we want to use as a parameter (if we do not pass a service account, a default account is used).

service_account = f"vertex-ai-run@{google_project_id}"
job.submit(
    service_account = f"{service_account}.iam.gserviceaccount.com"
)

If we do this, the SDK will make the actual API call and pass the job specification to the VertexAI platform. As we have used submit and not run, this will return immediately and not wait for completion of the job.

Let us try this out. I have added this code snippet to the create_job.py script so that the job is submitted if you pass the --submit parameter. So let us do this and then use gcloud to check for the status of the job.

python3 create_job.py --submit
gcloud ai custom-jobs list

As we have just submitted the job, gcloud will show the job as pending. You should also be able to locate your job in the VertexAI console in the Custom Jobs tab (make sure to select the correct region). From here, you can also navigate to the log entries for this job which is your primary source of information if something goes wrong. Note that the SDK will also print a direct link to the job to the console.

For simplicity, we have used the same bucket that we also use for other artifacts as staging bucket. This is something that you would probably not do in reality, as you want to be able to clean up your staging bucket on a regular basis. For now, simply use the following command to remove all packages training scripts created by the SDK (be careful if you have other blobs that match this pattern).

gsutil rm "gs://vertex-ai-$GOOGLE_PROJECT_ID/aiplatform-*.tar.gz"

From time to time, you might also want to delete the jobs on the console.

More on different types of jobs

So far, our jobs have executed exactly one Python script running on a CPU. The SDK contains several variations on this simple theme. Of course nothing prevents you from modifying the job specification that the SDK has created to execute arbitrary code in the container instead of the predefined code that installs and runs the code as package if you are not happy with the defaults (the default comand is assembled here).

First, we can use the parameters accelerator_type and accelerator_count of the job creation method to request GPUs (or TPUs) of a certain type to support training on specialized hardware.

There is also a job type called CustomPythonPackagedTrainingJob that accepts a package already uploaded to GCS as input and executes that within a job. This job is also prepared to download a dataset if needed and to upload the final model artifact automatically to the model registry.

However, adding more and more functionality into one job soon gets difficult to manage. If that job has a problem, all data that we have created in the meantime is lost, unless you implement some sort of caching and checkpointing mechanism. We will therefore instead of diving into these more advanced types of jobs discuss pipelines in a later post in this series that allow you to split the work across several steps each of which can have inputs and outputs that are cached automatically.

There is, however, a topic which we have not yet discussed – traceability. You might have seen that when running several jobs, you will easily flood your GCS buckets with artifacts that are all identified by timestamps and it will soon become tricky to figure out which job was executed with which version of the data. To automate this, VertexAI offers the concept of metadata which we will explore in the next post.

The nuts and bolts of VertexAI – prediction endpoints and model versions

In the last post, we have learned how to package a model and upload it into the model registry. Today, we will see how we can deploy a model from the registry to an endpoint and use that endpoint to make predictions. We will also learn how a model can be exported again and how we can create additional versions of a model.

Creating an endpoint

First, let us take a look at what VertexAI calls an endpoint. An endpoint is not a physical resource, but more a logical container into which we can deploy models all of which will share the same URL. For each model that we deploy, we will specify hardware resources during the deployment which VertexAI will then provision under the given endpoint by bringing up one or more instances. VertexAI will automatically scale up and down as needed and distribute the traffic among the available instances.

Note that it is also possible to deploy different models to the same endpoint. The URL for these two models will be the same, but a part of the traffic is routed to one model and another part routed to the second model. There is a parameter traffic split that allows you to influence that routing. This feature can for instance be used to realize a rolling upgrade of an existing model. We will get an idea of how exactly this works a bit later.

To get started, let us now see how we can create an endpoint. This is rather straightforward, we use the class method create on the Endpoint class.

endpoint = aip.Endpoint.create(
    display_name = "vertex-ai-endpoint",
    project = google_project_id, 
   location = google_region
)

Note that we will specify all details about instances, machine types and so forth when we create the actual deployment. So far, our endpoint is just an empty hull that does not consume any resources.

Note that having the endpoint already allows us to derive the URL under which our models will be reachable. An endpoint again has a fully qualified resource name, and the URL under which we can reach the endpoint is

https://{google_region}-aiplatform.googleapis.com/v1/{endpoint_resource_name}:predict

If we already have created our endpoint previously, we can use the list method of the Endpoint class to get a list of all endpoints and search for our endpoint. We can even supply a filter expression to the list method to only get those endpoints with a given display name back.

endpoints = aip.Endpoint.list(
    filter='display_name="vertex-ai-endpoint"'
)

Deploying a model

Next, let us deploy a model. This is done by calling deploy on an existing endpoint. To do that, we first need a reference to the model, and for that purpose, we need the full resource name of the model.

Instead of listing all models and search for matches, we will take a different approach this time. As we have defined the model ID to be “vertexaimodel”, we only need the location and project to assemble the fully qualified resource name. Unfortunately, things are not that simple, as we need the project number instead of the project ID. So we first use a different Google API – the resource manager API – to search all projects to find the one with our project ID, extract the project resource name, use that to assemble the model name and then get the model.

import google.cloud.resourcemanager_v3 as grm
projects_client = grm.ProjectsClient()
projects = projects_client.search_projects(
    query=f"id={google_project_id}"
)
project = [p for p in projects][0]
model_prefix = f"{project.name}/locations/{google_region}/models/"
model_name = f"{model_prefix}vertexaimodel"
model = aip.Model(
    model_name = model_name
)

We now have an endpoint and we have a model and can now call the deploy method on the endpoint. When we do this, the most important parameters to specify are the model that we want to deploy, the machine type, the minimum and maximum number of replicas that we want and the service account (in the usual format as an e-mail address) that we want to attach to the container.

endpoint.deploy(
    model = model,
    machine_type = "n1-standard-2",
    min_replica_count = 1,
    max_replica_count = 1,
    service_account = service_account
)

This again creates an LRO and only returns once the deployment is complete (which can take some time). 

To try this out, you can either run the commands above (and the obvious boiler plate code around it) manually or you can run the deploy.py script in the models directory of my repository. When the deployment is done, we can verify the result using curl. This is a bit tricky, because we first have to use gcloud to get the endpoint ID, then use gcloud once more to get the endpoint name and finally assemble that to an URL. We also need a bearer token which is included in the HTTP header.

ENDPOINT_ID=$(gcloud ai endpoints list \
  --filter="display_name=vertex-ai-endpoint" \
  --format="value(name)")
TOKEN=$(gcloud auth print-access-token)
BASE_URL="https://$GOOGLE_REGION-aiplatform.googleapis.com/v1"
ENDPOINT=$(gcloud ai endpoints describe \
  $ENDPOINT_ID \
  --format="value(name)")
URL=$BASE_URL/$ENDPOINT
echo "Using prediction endpoint $URL"
curl  \
    --header 'Content-Type: application/json'  \
    --header "Authorization: Bearer $TOKEN"  \
    --data '{ "instances" : [[0.5, 0.35]]}' \
   $URL:predict

Of course the Endpoint class also has a predict method that you can call directly. Here is the equivalent to the invocation above in Python.

prediction = endpoint.predict(
    instances = [[0.5, 0.35]]
)

We nicely see the structure of the JSON envelope being reflected in the arguments of the predict method. Note that there is also a raw prediction that returns information like the model version in the response headers instead of the body, you can access this method either by replacing the URL “$URL:predict” in the curl command above with “$URL:rawPredict” or using endpoint.raw_predict in the Python code, supplying the required headers like the content type yourself.

raw_prediction = endpoint.raw_predict(
    body = b'{"instances" : [[0.5, 0.35]]}',
    headers = {"Content-Type": "application/json"},
)

Creating model versions

At this point, it is helpful to pause for a moment and look at what we have done, using gcloud. Let us ask gcloud to describe our endpoint and see what we get (you might need some patience here, I have seen deployments taking 20 minutes and more).

ENDPOINT_ID=$(gcloud ai endpoints list \
  --filter="display_name=vertex-ai-endpoint" \
  --format="value(name)")
gcloud ai endpoints describe $ENDPOINT_ID

We see that our endpoint now contains a list of deployed models. Each of these deployed models reflects the parameters that we have used for the deployment and has an own ID. We also see a section trafficSplit in the output which shows that at the moment, all traffic goes to one (the only one) deployed model.

When you repeat the curl statement above and take a closer look at the output, you will also see that VertexAI adds some data in addition to the prediction results (I assume that the ability to do is the reason why VertexAI uses the JSON envelope). Specifically, we also see the deployed model ID in the output as well as the model and the model version that has been used.

It is also interesting to take a look at the log files that our endpoint has created. Let us use the gcloud logging reader to do this.

ENDPOINT_ID=$(gcloud ai endpoints list \
  --filter="display_name=vertex-ai-endpoint" \
  --format="value(name)")
type_filter="resource.type=aiplatform.googleapis.com/Endpoint"
endpoint_filter="resource.labels.endpoint_id=$ENDPOINT_ID"
location_filter="resource.labels.location=$GOOGLE_REGION"
filter="$type_filter AND $location_filter AND $endpoint_filter"
gcloud logging read \
  "$filter" \
  --limit 200 \
  --order=asc \
  --format="value(jsonPayload)"

In the first few lines, we see that VertexAI points us to a model location that is not the bucket that we have initially used for the upload, again showing that the model registry does actually hold copies of our artifacts. We can also see the first ping requests coming in (and if we would add more lines we would also see the prediction requests in the access log).

It is nice to have a deployed model, but what happens if we update the model? This is where model versions come into play. When you upload a model, you can, instead of specifying a model ID, specify the ID of an existing model, called the parent. When you do this, the model registry will create a new version of the model. Some attributes like the display name or labels are the same for all version, whereas some other attributes like (obviously) the version number are updated. So the code to do an upload would look as follows.

model = aip.Model.upload(
        serving_container_image_uri = image,
        artifact_uri = f"{bucket}/models",
        parent_model = "vertexaimodel",
        serving_container_predict_route = "/predictions/model",
        serving_container_health_route = "/ping",
        project = google_project_id,
        location = google_region
    )

Let us try this out. The script upload_model.py does already already contain some logic to check whether the model already exists and will create a new version if that is the case. So let us simply upload the same model again and check the result.

python3 upload_model.py
gcloud ai models describe vertexaimodel

We should see that the version number in the output has automatically been incremented to two. When we only use the model ID or the model name to refer to the model, this will automatically give us this version (this is called the version alias). We can, however, also access the previous versions by appending the version ID to the model ID, like this

gcloud ai models describe vertexaimodel@1

Let us now deploy the updated version of the model. We use the same code as before, but this time, we pass the traffic split as a parameter so that half of the requests will go to the old version and half of the requests will go to the new version.

python3 deploy.py --split=50

When you now run predictions again as above and look for the version number in the output, you should see that some requests are answered by the new version of the model and some requests are answered by the old version. Note that we have only specified the split for the newly deployed model and VertexAI has silently adjusted the traffic split for the existing instances as well. Here is how our deployment now looks like.

This is also reflected in the output of gcloud ai endpoints describe – if you run this again as above, you will see that there are now two deployed models with different versions of the model and the traffic has been updated to be 50 / 50. Note that you can also read the traffic split using the traffic_split property of the Endpoint class and you can use its update method to change the traffic split, for instance to implement a canary rollout approach.

Undeploying models and deleting endpoints

Suppose you want to undeploy a model, maybe the version one of our model that is now obsolete, or maybe all deploy models in an endpoint. There are several methods to do this. First, an endpoint provides the convenience method undeploy_all which undeploys all deployed models in this endpoint. Alternatively, let us see how we can iterate through all deployed models and undeploy them one by one. Assuming that we have access to the endpoint, this is surprisingly easy:

for m in endpoint.list_models():
    endpoint.undeploy(deployed_model_id = m.id)

There are some subtleties when manually adjusting the traffic split (an undeployment is not valid if all remaining models would have zero traffic afterwards, so we should start to undeploy all models with zero traffic first), which, however, are not relevant in our case. Once all models have been undeployed we can also delete the endpoint using endpoint.delete(). In the models directory of the repository, you will find a script called undeploy.py that deletes the deployed models and the endpoint. Do not forget to run this when you are done as the running instances create some cost.

Exporting models

To close this blog post, let us quickly discuss how we can export a model that is stored in the model registry. Not surprisingly, exporting means that VertexAI copies either the model archive or the container image or both to a GCS bucket respectively an Artifact Registry repository path that we can choose.

To export a model, we must specify a supported export format. The list of export formats that a model supports can be found using gcloud (or alternatively also the Python client by accessing the property supported_export_formats):

gcloud ai models describe \
  vertexaimodel \
  --format="json(supportedExportFormats)"

For our model, we should only see one entry corresponding to the format “custom-trained” that allows us to export both the image and the model. Let us do this. Run the script export.py which essentially consists of the following statement.

model.export_model(
            export_format_id = "custom-trained",
            artifact_destination = f"gs://vertex-ai-{google_project_id}/exports",
            image_destination = f"{google_region}-docker.pkg.dev/{google_project_id}/vertex-ai-docker-repo/export:latest"
)

As long as we stay within the same region (which I highly recommend as otherwise Google charges cross-regional fees for the data transfer), this should complete within one or two minutes. You should now see that in the provided GCS path, VertexAI has created a folder structure containing the model name and time stamp and placed our archive there.

gsutil ls -R gs://vertex-ai-$GOOGLE_PROJECT_ID/exports/*
gcloud artifacts docker images list \
    $GOOGLE_REGION-docker.pkg.dev/$GOOGLE_PROJECT_ID/vertex-ai-docker-repo

If you compare the digest of the exported image, you will find that it is identical to the prediction image that we used as part of the model. We can also download the model archive and verify that it really contains our artifacts (the simple command below only works if we have only done one export)

uri=$(gsutil ls -R \
  gs://vertex-ai-$GOOGLE_PROJECT_ID/exports/* \
  | grep "model.mar")
(cd /tmp ; gsutil cp $uri model.zip ; unzip -c model.zip -x model.bin)

This will download the archive to a temporary directory, unzip it and print the files (except the binary model.bin) contained in it. You should recognize our model, our handler and the manifest that the torch archiver has created.

This concludes our post for today. We now have a fair understanding of how we can create and update models (I have not explained deletion, but that should be fairly obvious), how we can set up endpoints and how models are deployed to endpoints for prediction. In the next post in this series, we will turn to simple training jobs.