The nuts and bolts of VertexAI – custom training jobs

In the previous post, we have seen how to store trained models and how to spin up prediction endpoints. Now we will follow the ML lifecycle backwards and talk about how you actually train models on the platform. There are several options to do this and we start with the most basic one – custom jobs

Defining custom jobs

Abstractly speaking, a job is a defined workload that you hand over to VertexAI for execution. The platform will then allocate the required compute resources, run the workload, monitor the output and inform you once the execution is complete. This mode of operations is perfect for ML training – instead of spinning up a virtual machine, preparing the environment, uploading and executing your code and then terminating the machine again, you only have to care about the actual code and everything else is handled for you, much in the spirit of serverless computing.

But how do we define our workload? The approach that VertexAI takes at this point is very similar to what we have seen when we discussed models. A job consists of a reference to a piece of code that we want to run and a container that specificies the environment in which our code will be executed. Here is a summary of the workflow that we will go through when assembling and running a custom job.

First, we will pick a custom container (or use one of the prebuilt containers which are part of the VertexAI platform). For our purposes, we will use our base container which is already stored in our repository, so we can skip the step to upload it.

Next, we need a training script. We will reuse the script that we have used in the last posts when working with models, a copy of this script is located in the jobs directory of the repository as train.py – please cd into this directory now.

To actually create a custom job as the third step, we now invoke the class method CustomJob.from_local_script. Among other things, this method receives the URI of our container and the local path to the training script. The script will then be packaged by the SDK into a distributable package which is then uploaded to a GCS bucket called the staging bucket that we need to specify. Here is a table showing the most important parameters of the CustomJob.from_local_script method that we will use.

ParameterDescription
display_nameA human readable job name
script_pathThe path to the local Python script that we want to execute
container_uriThe URI of the container that we will use
machine_typeThe machine type for running our container
base_output_dirA path to a GCS location used as output
projectThe Google project ID
locationThe Google region
staging_bucketBucket that VertexAI will use to stage the packaged script
Parameters to create a job

Let us quickly comment on the base_output_dir parameter. Essentially, this is a GCS location of your choice. The platform will not directly use this, but will instead assemble a few environment variables for your training script to use if you want to store artifacts on GCS (of course you could use any other custom location as well). Specifically, the following environment variables will be set.

AIP_MODEL_DIRbase_output_dir/model/
AIP_CHECKPOINT_DIRbase_output_dir/checkpoints/
AIP_TENSORBOARD_LOG_DIRbase_output_dir/logs
Usage of the base output dir

It is important to understand that at this point, no Google API is really called and the job is not yet submitted – the SDK will only package your script, upload it to the staging bucket and assemble what is called a job specification. Among other things, this job specification contains the code that will be executed when the container is run. It is instructive to construct a job and take a look at the job specification.

staging_bucket = f"vertex-ai-{google_project_id}/"
registry = f"{google_region}-docker.pkg.dev/{google_project_id}"
repository = f"{registry}/vertex-ai-docker-repo"
image = f"{repository}/base"
timestamp = time.strftime("%Y%m%d%H%M%S",time.localtime())
job = aip.CustomJob.from_local_script(
        display_name = "my-job",
        script_path = "train.py",
        container_uri = image,
        machine_type  = "n1-standard-4",
        base_output_dir = f"gs://{staging_bucket}/job_output/{timestamp}",
        project = google_project_id,
        location = google_region,
        staging_bucket = staging_bucket
)

print(job.job_spec.worker_pool_specs[0].machine_spec)
print(job.job_spec.worker_pool_specs[0].container_spec)

To see this in action, let us now run the script create_job.py and observe its output which will consist of the machine spec and the container spec in addition to some debugging output.

python3 create_job.py

We can now run our job, either by calling submit or run. This will instruct VertexAI to pull our container image, run it and execute the command specified in the container spec above which will in turn simply run our training script. But before we try this out, let us see how we can test this locally.

Testing your jobs locally

Even a small job will take some time to execute on the VertexAI platform, simply because the provisioning of the underlying resources will take a few minutes. Therefore you should test your code locally before you submit your job because then debugging and error fixing is possible with much smaller turnaround cycles. Here is a way to do this.

First, we can create our job as above – this does not yet submit anything, but does already make the packaged Python code available on GCS. Then, we can extract the command string that the SDK has assembled from the container specification. We could then try to run our image, passing this command to execute it within the container, setting the required environment variables like AIP_MODEL_DIR manually.

However, there are a few challenges that we need to consider. First, the command that the SDK assembles assumes that our package is located on the local file system – in the background, it probably uses a FUSE file system to map the staging bucket into the container. This is not so easy to simulate locally. However, we can parse the command string, extract the URI of the package on GCS from there, download it to a local directory and replace the reference to the FUSE filesystem in the command by the reference to this local directory. This is a bit of a workaround to avoid having to install gcsfuse, but will work as long as later versions of the SDK do not use a significantly different command string.

We also need to make sure that our code runs under a service account. The easiest way to do this is to place a JSON key file in our working directory, mount this into the docker container and use GOOGLE_APPLICATION_CREDENTIALS in the container to make sure that it is picked up.

Finally, it took me some time to figure out how to handle the command. Eventually, I decided to split off the first two parts (“sh – c”) so that I could escape the remaining string with single quotes, using the first part as entrypoint and the remaining part as arguments.

If you want to try this, switch to the model directory, make sure that the directory contains a file key.json that contains the service account key for the vertex-ai-run service account that we have created previously, and run

python3 create_job.py --local_run

This will run the container and print messages to the console. If everything works fine, you should be able to locate the result of the training run – the uploaded model – within the job_output directory of the staging bucket (yes, I know, this is not really a directory, but let me stick to this terminology).

Running a job on VertexAI

After testing, let us now run our job on VertexAI. In the Python SDK, we can do this by simply invoking the run method of the CustomJob that we have created. This accepts the service account that we want to use as a parameter (if we do not pass a service account, a default account is used).

service_account = f"vertex-ai-run@{google_project_id}"
job.submit(
    service_account = f"{service_account}.iam.gserviceaccount.com"
)

If we do this, the SDK will make the actual API call and pass the job specification to the VertexAI platform. As we have used submit and not run, this will return immediately and not wait for completion of the job.

Let us try this out. I have added this code snippet to the create_job.py script so that the job is submitted if you pass the --submit parameter. So let us do this and then use gcloud to check for the status of the job.

python3 create_job.py --submit
gcloud ai custom-jobs list

As we have just submitted the job, gcloud will show the job as pending. You should also be able to locate your job in the VertexAI console in the Custom Jobs tab (make sure to select the correct region). From here, you can also navigate to the log entries for this job which is your primary source of information if something goes wrong. Note that the SDK will also print a direct link to the job to the console.

For simplicity, we have used the same bucket that we also use for other artifacts as staging bucket. This is something that you would probably not do in reality, as you want to be able to clean up your staging bucket on a regular basis. For now, simply use the following command to remove all packages training scripts created by the SDK (be careful if you have other blobs that match this pattern).

gsutil rm "gs://vertex-ai-$GOOGLE_PROJECT_ID/aiplatform-*.tar.gz"

From time to time, you might also want to delete the jobs on the console.

More on different types of jobs

So far, our jobs have executed exactly one Python script running on a CPU. The SDK contains several variations on this simple theme. Of course nothing prevents you from modifying the job specification that the SDK has created to execute arbitrary code in the container instead of the predefined code that installs and runs the code as package if you are not happy with the defaults (the default comand is assembled here).

First, we can use the parameters accelerator_type and accelerator_count of the job creation method to request GPUs (or TPUs) of a certain type to support training on specialized hardware.

There is also a job type called CustomPythonPackagedTrainingJob that accepts a package already uploaded to GCS as input and executes that within a job. This job is also prepared to download a dataset if needed and to upload the final model artifact automatically to the model registry.

However, adding more and more functionality into one job soon gets difficult to manage. If that job has a problem, all data that we have created in the meantime is lost, unless you implement some sort of caching and checkpointing mechanism. We will therefore instead of diving into these more advanced types of jobs discuss pipelines in a later post in this series that allow you to split the work across several steps each of which can have inputs and outputs that are cached automatically.

There is, however, a topic which we have not yet discussed – traceability. You might have seen that when running several jobs, you will easily flood your GCS buckets with artifacts that are all identified by timestamps and it will soon become tricky to figure out which job was executed with which version of the data. To automate this, VertexAI offers the concept of metadata which we will explore in the next post.

2 Comments

Leave a Comment