The nuts and bolts of Vertex AI – introduction to pipelines

So far, we have worked with individual jobs. However, a typical machine learning flow is more complicated than this – you might have to import data, transform it, run a training, validate the outcomes or do a hyperparameter search, package a model and deploy it. Instead of putting all this into a single job, VertexAI allows you to define pipelines that model this sort of flow.

Before we start to discuss pipelines, let us describe the problem that they are trying to solve. Suppose you have a standard machine learning workflow consisting of preparing your input data, training a model, performing a validation and finally packaging or even deploying your model. During each of these steps, there are some artifacts that you create – a training data set, validation metrics, or a trained and packaged model.

Of course you could put all of this into a single custom job. However, if that job fails, all artifacts are lost until you implement some sort of caching yourself. In addition, monitoring this job to understand where you are in the process also requires a custom implementation, and we have seen in the previous posts that while you can manage experiment runs and metadata in a custom job, this requires a bit of effort.

Pipelines are a different way to address this problem. Instead of modelling a workflow and dependencies explicitly in your code, you declare individual components, each of which has certain inputs and outputs, and declare a pipeline that contains these components. In the background, VertexAI will then build a dependency graph, run your components, trace and cache inputs and outputs and monitor the execution.

VertexAI pipelines are based on Kubeflow which, among other things, implements a DSL (domain specific language) to describe components and jobs and Python decorators that allow you to model all this in Python code. If you work with pipelines, you will first declare your components, assemble these components into pipelines, create a pipeline specification and finally submit this to the platform for execution.

Let us now go through all of these steps before diving deeper into some topics in upcoming posts.

Component creation

Even though Kubeflow offers several types of components, most components that we will use will be what is called a lightweight Python component. Essentially, this is simply an ordinary Python function annotated as a component, like this.

from kfp import dsl

@dsl.component(
    base_image = "python:3.9")
def do_something():
    # some Python code

When the pipeline is executed, each component will be run as a separate container. We will look in detail at the process of executing a pipeline component a bit later, but essentially the execution engine will spin up the container specified by the base_image parameter of the annotation and run the code within the function inside this container.

This is simple, but has an important implication – the code within the function needs to be fully standalone. As a consequence, no global variables can be referenced, imports have to be done within the body of the function and, most importantly, we cannot invoke any function or class which is not defined inline inside the component function. If, for instance, we want to instantiate a PyTorch model we either have to pull the corresponding class definition into the function or we have to place the file with the model code in the container.

To be useful, a component typically needs to process inputs and produce outputs. Let us see how this works for pipeline components. In VertexAI (and KFP) pipelines, there are two different types of inputs our outputs that your function can declare – parameters and artifacts

A parameter is just an input to your function (i.e. a Python parameter) that is either defined at compile time when assembling the components into a pipeline or – more useful – when you actually submit the pipeline for execution (there are also output parameters, we get back to this). Behind the scenes, the execution engine serializes the value of a parameter into a JSON structure and (by some magic that we will look at in more detail in one of the upcoming posts) calls your component function passing this value as an ordinary Python parameter. KFP supports strings, integers, floats, boolean values and dictionaries and lists as parameters.

While parameters are passed by value, there is a second type of input a function can take and which I tend to think of as passing arguments by reference. This is mostly applied to large pieces of data like models or datasets that you do not want to pass around as a JSON object but as what is called an artifact. Inside the Python code, an artifact is represented by a Python object that has – in addition to its name – three major attributes. First, every artifact has a URI which is the name of a blob on GCS (KFP itself also supports other storage solutions like S3 and Minio). The idea is that within your Python code, the artifact represents the actual artifact stored under this URI. Second, there is a path which also points to the actual artifact but on local storage (we will explain this in a minute), and third there is a metadata property which is just a dictionary which you can use to attach metadata to an artifact.

Let us look at an example to see how this works in practice. Consider the following code snippet that defines a component called train.

from kfp import dsl
from kfp.dsl import Output, Model, Dataset, Metrics


@dsl.component(
    base_image = ...,
)
def train(epochs : int, 
          lr : float,
          data : Input[Dataset],  
          trained_model : Output[Model],
          metrics: Output[Metrics]):
    # Train
    ... 

    # Log some metrics
    metrics.log_metric("final_loss", loss.item())
    #
    # Store trained model as state dir
    #
    torch.save(_model.state_dict(), trained_model.path)

First, we see that all Python function parameters are annotated – this is required for pipeline components, as the runtime needs the type information to do its job. In our example, epochs and lr are parameters. Their values will be injected at runtime, and in the body of your function, you can access them as usual.

The other inputs – data, trained_model and metrics – are artifacts of type dataset, model and metrics (there is also a type for generic artifacts). The annotations of these artifacts do not only specify the type, but also whether this is input or output. In your Python code, you can access attributes like metadata directly to read or modify metadata (though you should only do this for output artifacts). You can also call functions like log_metric on a metrics artifact to log the actual metrics which will later be displayed on the VertexAI console. In fact, Output is a Python type annotation (see PEP 593) so that the parameter model can be treated as being of type Model inside the code.

To access the content of the actual artifact, for instance our model, you use the path property of the artifact. This is a string containing a path to the actual object on the local filesystem. When you run your pipeline, VertexAI will mount the GCS bucket where your objects are located (the so-called pipeline root) via gcsfuse into the local container file system and sets up the value of path for you accordingly. You can now read from this and write to it as needed, for instance to store model weights as in the example above. Via the file system mount, the changes will then be written back to the underlying GCS bucket. So while the actual data comprising the artifact is living on GCS it can be accessed via the Python artifact object – the trained_model in our case – this is why I tend to think of this as a reference.

Pipeline creation

We have now seen you individual components can be created by using decorators (these components are called Python components, there are also more general components called container components but we will not discuss those today). Next we will have to patch these components together into a pipeline. Not surprisingly, this is again done using decorators, and again not surprisingly, the decorator we use is called pipeline . Here is an example.

@dsl.pipeline(
    name = "my-pipeline"
)
def my_pipeline(epochs : int, lr : float, training_items : int):
    _create_data = create_data(training_items = training_items)
    train(google_project_id = google_project_id,
                  google_region = google_region,
                  epochs = epochs,
                  lr = lr,
                  data = _create_data.outputs['data'])

This code snippet defines a pipeline with two steps, i.e. two components, called create_data and train . At the top, we again have an annotation that specifies the name of the pipeline. Within the body of the function carrying this annotation, we then simply call our pipeline components to add them to the pipeline. However, this call does not actually run the code defined for the invidual components but is diverted (thanks to the @component annotation) to a method of the KFP framework which adds the component definition to the pipeline. This looks a bit like black magic – we will take a deeper look at how exactly this works in the next post. So bear with me and simply accept this magic for the time being.

We also see how parameters and artifacts are handled. In our case, the pipeline itself has three parameters that we will define when we actually submit the pipeline. Again these parameters are annotated so that the framework can derive their type and deal with them.

In our example, we also use the output of the first step as input for the second step. To do this, we capture the output of the first step (again, this is not really the output of the annotated function but the output of the wrapper in which the framework wraps our original function) which is a Python object which has – among others – a property called outputs . This is a dictionary with keys being the names of the output parameters of our step, and we can use these keys to feed the data as input artifacts into the next step.

By chaining steps in this way, we also inform VertexAI about the dependencies of our components. In our example, the second step can only start once the first step is complete, as it uses the output of the second step, so there is a dependency. Behind the scenes, this information will be assembled into a directed acyclic graph (DAG) in which the vertices are the components and edges represent dependencies. When running our pipeline, components that can run in parallel because there are no dependencies will also be scheduled to run in parallel, but where needed the execution is serialized.

The output of the component invocation that we capture in the variable _create_data (the underscore is really just an arbitrary naming convention that I use, any name will do) is also useful for a second purpose – specifying the execution environment. For that purpose, this output (which is actually of type PipelineTask) has a set of methods like set_cpu_limit or set_memory_request to define limits and requests for the later execution (designed to run on a Kubernetes platform with the usual meaning of requests and limits, but this is not guaranteed by the framework). In particular, you can use set_accelerator_type to request a GPU of a specific type. Here is an example.

_train.set_accelerator_type("NVIDIA_TESLA_T4")
 _train.set_cpu_limit("4")

When playing with this, I experienced a strange problem – if I did not request at most four CPUs for a machine using a T4 GPU, the output of my Python script did not show up in the logs. There is even a ticket for this recommending to define a higher CPU limit (as I have done it above) as a workaround, so until this is resolved you will probably have to live with that and the additional cost.

Compiling a pipeline

We now have our beautiful pipeline in Python. This is nice, but to submit a pipeline we actually need to convert this into something that the VertexAI engine can process – a pipeline definition in what is called IR YAML (intermediate representation YAML). This contains among other data the specification of the individual components of the pipeline, information on how to execute them (we will get deeper into this when we talk about the details of component execution in one of the next posts) and the DAG that we have constructed.

To translate the Python representation into an IR YAML file, we use a component called the compiler. In the most common case, this only requires two parameters – the pipeline that we want to compile and the name of a file to which we write the YAML output.

compiler.Compiler().compile(pipeline_func = my_pipeline, 
                        package_path = "my-pipeline.yaml")

Depending on the file name extension, the compiler will either create JSON or (recommended by the documentation) YAML output. We will take a closer look at the generated output in the next post in this series. We also note that it is possible to compile individual components and to load components from YAML files, so you could build a library of reusable components in YAML format as well.

Submitting a pipeline

Let us now submit the pipeline. It is important to understand that the YAML file that we have just created is all that actually gets uploaded to the VertexAI platform and contains all information that we need to run the pipeline, our Python code is just a local artifact (but shows up again in different form in the YAML file). To submit a pipeline to run as what is called a pipeline job, we first create a pipeline job object from the compiled YAML and then submit it.

#
# Create a pipeline job from the 
# pipeline definition
#
pipeline_job = aip.PipelineJob(
    display_name = "my-pipeline",
    template_path = "my-pipeline.yaml",
    pipeline_root = f"gs://vertex-ai-{google_project_id}/pipeline_root",
    parameter_values = {
        "epochs" : 5000,
        "lr" : 0.05,
        "size" : 1000,
    },
    project = google_project_id,
    location = google_region
)

Here, the template_path argument specifies the path to the compiled YAML file. The pipeline root parameter specificies a directory path in a GCS bucket under which all artifacts created during this pipeline run will be stored. You can also specify the values of parameters at runtime here (in fact, you will have to do this for all parameters which are not fixed at build time). In addition, you create a job in a specific project and region as usual.

Once created, you can submit the job using

pipeline_job.submit(
    service_account = service_account
)

Note the service account parameter which has a meaning similar to a custom job and is the service account under which all steps of this pipeline will execute.

Let us now try this with a real example. In my repository (that you should have cloned by now if you have followed the instructions in the introduction to this series), I have prepared a sample pipeline that uses all of this. To run this pipeline execute the following commands, starting from the root of the repository.

cd pipelines
# Will create the IR yaml
python3 pipeline_definition.py
# will submit the pipeline
python3 submit_pipeline.py 

Once the pipeline is submitted, it is instructive to take a look at the graphical representation of the pipeline in the VertexAI console in the pipeline tab. Your newly submitted pipeline should show up here, and as you click on it, you should see a screen similar to the following screenshot.

What we see here is a graphical representation of the DAG. We see, for instance, that the validation step depends – via the trained model – on the training step, but also on the data creation.

On this screen, you can also click on one of the artifacts or steps to see more information on it in an additional panel on the right. For a step, you can navigate to the job itself (VertexAI will in fact create one custom job for each step) and its logs. For an artifact, you can click on “View lineage” to jump to the metadata screen where you will find that VertexAI has created metadata entries for our metrics, models and datasets but also for the steps and has associated artifacts to execution as we have done it manually when working with custom jobs.

When you submit the exact same pipeline again, you will produce a second pipeline run but this run will complete within seconds. This is the power of caching – VertexAI realizes that neither the jobs nor the input has changed and therefore skips all steps that would produce the same output again. This is very useful if you modify one of the later steps in the pipeline or if a step fails as it avoids having to re-run the entire pipeline again.

This concludes our short introduction. In the next post, we will shed some light on the apparent magic behind all this and learn how exactly components and pipelines are built and executed.

3 Comments

Leave a Comment