The nuts and bolts of VertexAI – pipelines under the hood

In our previous post, we have covered the process of creating pipeline components, defining a pipeline and running the pipeline in Vertex AI. However, parts of this appeared to be a bit mysterious. Today, we will take a closer look at what is going on behind the scenes when we create and run a pipeline

Deep dive into component creation

To be able to easily and interactively inspect the various types of Python objects involved, an interactive Python shell is useful. You might want to use ipython3 for that purpose or you might want to run the following commands to install a kernel into our environment and add the kernel to the Jupyter configuration so that you can run all this in a Jupyter notebook.

pip3 install ipykernel
python -m ipykernel install --user --name=vertexai-venv

As a starting point for our discussion, it is helpful to recall what a Python decorator is doing. Essentially, a decorator is a device to wrap a function into a different function or – more precisely – a callable which can either be a function or a Python object with a __call__ method. Therefore, a decorator is a function which accepts a function as input and returns a callable. This callable will then be registered under the name of the original function in the namespace, so effectively replacing the decorated function by the new callable.

Let us take a closer look at this for the example of a pipeline component. Suppose we have a very simply pipeline component.

from kfp import dsl 
from kfp.dsl import Output, Artifact, Dataset

@dsl.component(base_image = "python:3.9")
def my_component(msg : str, data : Output[Dataset]):
    print(f"Message: {msg}")

When the Python interpreter hits upon this piece of code, it will call the decorator – i.e. the function kfp.dsl.component_decorator.component– with the first argument being your my_component function (and the remaining parameters being the parameters of the decorators like base_image in our case). This decorator function uses inspection to analyze the parameters of your function and then returns an instance of kfp.dsl.python_component.PythonComponent which then becomes known under the name my_component . Let us verify this in our interactive Python session and learn a bit about our newly created object.

type(my_component)
my_component.__dict__.keys()

We see that our new object has an attribute python_func. This is in fact the wrapped function which you can inspect and run.

import inspect
inspect.getsource(my_component.python_func)
my_component.python_func("Hello", data = None)

Another interesting attribute of our object is the component_spec. This contains the full specification of the container that will implement this component (you might want to take a look at this, but we will get back to it later) including image, command, arguments and environment, and the description of inputs and outputs of our component.

Assembling a pipeline

We now have a component. As a next step, we will assemble our components into a pipeline. As we have seen, this usually happens with a piece of code like this.

@dsl.pipeline()
def my_pipeline():
    _task = my_component(msg = "Hello")

So here we call our component – but in fact, as our component has been replaced by the PythonComponent, we do actually not call our component but end up in the __call__ method of PythonComponent. This method will create a representation of the component at runtime, i.e. an instance of a PipelineTask. In addition to a component specification, this object now also contains the inputs and outputs of our component (we see actual values, not just descriptions – note that artifacts are represented by placeholders).

_task = my_component(msg = "Hello")
print(type(_task))
print(_task.component_spec)
print(_task.inputs)
print(_task.outputs)

But how do the components actually end up in a pipeline? Why is there no need to explicitly call something along the lines of my_pipeline.add(my_component)? This is where the magic of decorators and the magic of context managers join forces…

To understand this, let us go back a step and think about what happens when the Python interpreter hits upon the line @dsl.pipeline. At this point, the pipeline decorator function defined here is invoked and receives our function my_pipeline as argument. After some intermediate calls, we end up constructing a GraphComponent which is the object actually representing a pipeline. In the constructor of this component, we find the following code snippet.

with pipeline_context.Pipeline(
                self.component_spec.name) as dsl_pipeline:
    pipeline_outputs = pipeline_func(*args_list)

So we eventually run the code inside the function body of our my_pipeline function where, as we already know, we create a PipelineTask for every component. But we do this within a pipeline context, i.e. an instance of the class Pipeline which implements the Python context manager interface. So the __enter__ method is called, and this method sets the variable pipeline_task.PipelineTask._register_task_handler.

Now the point is that the function pointed to by this variable is invoked in the constructor of a PipelineTask (this happens here)! Thus for every component created inside a function that carries the pipeline decorator, a call into the pipeline context, more precisely into the add_task method of the pipeline context, will be made automatically, meaning that all components that we create here will automatically be registered with the pipeline. Its not magic, just some very clever Python code…

We can now also understand why a syntax like

_train = train(epochs = epochs,lr = lr)
evaluate(trained_model = _train.outputs['trained_model'])

for connecting inputs and outputs works. When this is executed, _train is an instance of a PipelineTask which does actually have an attribute called outputs, and we have seen that this is a dictionary with one entry for each output that we declare. This explains why we can get the outputs from this dictionary and pass them another component as input.

An important lesson from all this is that the code in the pipeline definition gets executed at build time, but the code in the component definition is actually never executed unless we submit the pipeline. This implies that trivial errors in the code are detected only if you really submit the pipeline (and this takes some time, as every step provisions a new container on the Google platform). Therefore it is vitally important to test your code locally before you submit it, we will see a bit later in this post how this can be done.

Pipeline execution

We have seen above that the component specification that is part of a Python component or a pipeline task does already contain the command (i.e. the entrypoint) and the arguments which will be used to launch the container. Structurally (and much simplified, if you want to see the full code take a look at the generated pipeline IR YAML file after compilation), this command in combination with the arguments looks as follows.

#
# First install KFP inside the container
#
python3 -m pip install kfp
#
# Create a Python module holding the code
# of the component
# 
printf "%s" <<code of your component>> > "ephemeral_component.py"
#
# Run KFP executor pointing to your component
#
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main \
  --component_module_path  "ephemeral_component.py" \
  --executor_input <<executor input>> \
  --function_to_execute <<your component code>>

Let us go through this step by step. First, we invoke pip to make sure that the KFP package is installed in the container (in the full version of the code, we can also see that apt-get is used to install pip if needed, which makes me think about what happens if we use a base image that is not based on a Debian distribution – I do not think that this implicit requirement is mentioned anywhere in the documentation). We then store the code of the pipeline component in a file called ephemeral_component.py, and run the KFP executor main entry point.

After loading your code as a module, this will create an instance of an executor and invoke its execute method. This executor is created with two arguments – the function to be executed (corresponding to the last argument in the command line sketched above) and the executor input (corresponding to the second to last line in the command above).

The executor input is a JSON structure that contains the information that the executor needs to construct input and output artifacts. Here is an example.

{
    "inputs": {
        "parameterValues": {
            "size": 1000
        }
    },
    "outputs": {
        "outputFile": "./gcs/create_data/execution_output.json",
        "artifacts": {
            "training_data": {
                "artifacts": [
                    {
                        "name": "training_data",
                        "metadata": {},
                        "type": {
                            "schemaTitle": "system.Dataset"
                        },
                        "uri": "gs://pipeline_root/create_data/training_data"
                    }
                ]
            },
            "validation_data": {
                "artifacts": [
                    {
                        "name": "validation_data",
                        "metadata": {},
                        "type": {
                            "schemaTitle": "system.Dataset"
                        },
                        "uri": "gs://pipeline_root/create_data/validation_data"
                    }
                ]
            }
        }
    }
}

In this example, the input only consists of parameters, but in general, the input section can also contain artifacts and would then look similarly to the output section.

In the output section, we can find a description of each artifact created by our component. This description contains all the fields (name, metadata, schema and URI) that are needed to instantiate a Python representation of the artifact, i.e. an instance of the Artifact class. In fact, this is what the executor does – it uses the data in the executor input to instantiate parameters and input and output artifacts and then simply calls the code of the component with that data. Inside the component, we can then access the artifacts and parameters as usual.

Note that the output section also contains an execution output file. This is a file to which the executor will log outputs. When you run a pipeline on Vertex AI, this executor output is stored in your GCS pipeline root directory along with the real artifacts. If your component returns any parameter values (a feature which we have mentioned but not yet discussed so far), the executor will add the output parameter values to this file as well so that the engine can get it from there if needed.

We can now understand what execution of a pipeline really means. The engine executing your pipeline needs to inspect the DAG derived from the pipeline to understand in what order the components needs to run. For each component, it then assembles the executor input and runs the container defined in the component specification with the command and arguments as sketched above. It then looks at the execution output written by the file and extracts the outputs of the components from there to be able to pass it as input to other components if needed.

There is one little step that we have not discussed so far. We have learned that an artifact has an URI (pointing to GCS) and a path (pointing into a local file system inside the container). How is the URI mapped to the path and why can the component code access the artifact?

Again, the source gives us a clue, this time the code of the Artifact class itself in which we see that the path attribute is decorated as a Python property, turning it into what the Python documentation calls a managed attribute. If you look at the underlying function, you will see that this simply translates the URI into a local path by replacing the “gs://” piece of the GCS location with “/gcs” (or whatever is stored in _GCS_LOCAL_MOUNT_PREFIX). The executing engine needs to make sure that this directory exists and contains a copy of the artifact, and that changes to that copy are – at least for output artifacts – replicated back to the actual object on GCS. Vertex AI does this by mounting the entire bucket using gcsfuse, but the KFP internal executor seems to simply create a copy at least if I read this code correctly)

Testing pipelines locally

Having a better understanding of what happens when a component is executed now puts in a position to think about how we can execute a component locally for testing purposes.

First, let us discuss unit testing. If we have a function annotated as a component, we have learned that calling this function as part of a unit test is pointless as the code of the component will not even be run. However, we can of course still access the original function as my_component.python_func and call it as part of a unit test (or, more conveniently, we can use the execute method of a Python component that does exactly this). So in order to run a unit test for a component define like this

@dsl.component
def create_data(training_data : Output[Dataset], 
                validation_data : Output[Dataset],
                size : int ):

we could come up with a Python unit test like this (designed to work with pytest).

from unittest.mock import Mock
import os 
 
def test_create_data(tmp_path):
    output_dir = f"{tmp_path}/outputs/"
    os.mkdir(output_dir)
    #
    # Make sure that environment variables are set
    #
    os.environ["GOOGLE_PROJECT_ID"] = "my-project"
    os.environ["GOOGLE_REGION"] = "my-region"
    #
    # Import the file where all the components are defined
    #
    import pipeline_definition
    #
    # Prepare mocked artifacts
    #
    training_data = Mock(
        uri = "gs://test/training_data",
        path = f"{output_dir}/training_data"
    )
    validation_data = Mock(
        uri = "gs://test/training_data",
        path = f"{output_dir}/validation_data"
    )
    pipeline_definition.create_data.execute(size = 10,
                            training_data = training_data,
                            validation_data = validation_data)
    #
    # Do whatever validations have to be done
    #

So we first import the file containing the component definitions (note that environment variables that we refer to need to be defined before we do the import as they are evaluated when the import is processed) and then use execute to access the underlying function. Output artifacts are mocked using a local path inside a temporary directory provided by pytest. Inside the test function, we can then access the outputs and run whatever validations we want to run.

If your code invokes Google API calls, do also not forget to mock those (maybe you even want to patch the entire module to be on the safe side) – you do not want to be caught be surprise by a high monthly bill because you accidentally spin up endpoints in your unit tests that you run several times a day as part of a CI/CD pipeline…

After unit testing, there is typically some sort of integration test. A possible approach to doing this without having to actually submit your pipeline is to use the magic of the KFP executor that we have seen above to your advantage by invoking the executor with your own executor input in order to run containers locally. Roughly speaking, here is what you have to do to make this work.

First, we need to look at the component specification and extract input and output parameters and artifacts which will be instances of either InputSpec or OutputSpec. For each input and output, we have determine whether it represents a parameter or an artifact based on the value of the type field. For a parameter, we determine the value that the parameter should have. For artifacts, we need to assemble a URI.

For each input and output, we can then add a corresponding snippet to the executor input. We then pass this executor input and a command assembled according to the logic shown above into a docker cointainer in which we run our image. To be able to preserve artifacts across component runs so that we can test an entire pipeline, we will also want to mount a local filesystem as “/gcs” into the container.

Alternatively, we can also run our component outside of a container by using the KFP executor. When we do this, however, we need to apply an additional trick as by default, this will use a local path starting with “/gcs” which is usually not accessible by an ordinary user. However, we have seen that this prefix is a variable that we can patch to point into a local file system of our choice before running the executor.

I have put together a script showing how to do all this in my repository. To try this out, switch into the pipeline subdirectory of the cloned repository and run

#
# Preload the Python images that we use
#
docker pull python:3.9
docker pull $GOOGLE_REGION-docker.pkg.dev/$GOOGLE_PROJECT_ID/vertex-ai-docker-repo/pipeline:latest
#
# Run the actual script
#
python3 run_locally.py --no_container

which will run our pipeline without using containers. If you drop the parameter, the component will execute inside a container and you will see some additional output that the executor generates, including the full executor input that we assemble. This simple script still has a few limitations, it is for instance not able to handle artifacts which are a list or a dictionary, but should work fine for our purposes.

One closing remark: looking at the documentation and the source code, it appears that newer versions of KPF like 2.6 come with a functionality to run components or even pipelines locally. As at the time of writing , the Google SDK has a dependency on KFP 2.4 which does not yet offer this functionality, I have not tried this, but I expect it to work somehow in a similar fashion.

We now have built a solid understand of what is going on when we assemble and run a pipeline. In the next post, we will take a closer look at how Vertex AI reflects pipeline runs in its metadata and how we can log metrics and parameters inside a pipeline.

1 Comment

Leave a Comment