Lazy Distributed Computing on GCP AI Platform using Numpy for Model Prediction.

Alim Hanif
4 min readMay 2, 2021

--

Hi folks, distributed computing sounds fancy nowadays. I think it because some of us don’t want the script running for a long time (a few of them due to the project deadline, perhaps :D). Time is money, if we can speed it up, hopefully, we can generate more money. haha

My first impression about Distributed Computing, it was something like difficult stuff and perhaps I need some time to learn. Found out, after I knew the logic behind it, we can create a simple or you can call it a lazy way to do a distributed computing for a particular task. In this case, I want to give you an example of distributed model prediction on the GCP AI Platform. Given a huge of data (can be a single big file or multiple files), we can spawn several GCP AI Platform jobs to perform model prediction parallelly.

Requirements:

  • Access to GCP AI Platform
  • Data on Google Cloud Storage Bucket
  • Python Packages: Numpy & Argparse
  • Your prediction script, absolutely

Concept

Before we go to the distributed computing, we have to know about Data Sharding first. Data sharding is quite similar to Data Partitioning, breaking up the data into smaller subsets (chunks). The difference between both of them, for Data Sharding, the partition data spreads into multiple machines while Data Partition doesn’t (Hazzelcast [link])

The basic concept of lazy distributed computing.

Let’s go to the topic:

Step 1: Make sure your prediction script is running on GCP AI Platform.
The prediction script should be run on AI-Platform Jobs Training [link], with Google Cloud Storage (GCS Bucket) as the source of data. The script for submitting a job could be like this:

gcloud ai-patform jobs submit training prediction_job --arguments

Step 2: Set the number of machines and assign the machine index using arguments.
You know, we can create a python script to execute a command by using this function os.system(command) , according to this, we can create a for loop function to submit several jobs and assign the index to each machine simultaneously. Here’s the script:

import osjob_name_pattern = "prediction_job_{}"command_pattern = "gcloud ai-patform jobs submit training {} --n_machines {} --idx {}"n_machines = 10for idx in range(n_machines):
job_name = job_name_pattern.format(idx)
command = command_pattern.format(job_name, n_machines, idx)
os.system(command)

The script above will submit 10 jobs with different idx (job/machine index).

Step 3: Set Argparse on Prediction Script.

In the previous step, we assign the number of machines (n_machines) and the machine index (idx) for each job. The prediction script must be able to get this value. We can use argparse to collect the argument values.

import argparseparser = argparse.ArgumentParser()
parser.add_argument("n_machines", type=int,
help="set the number of machines")
parser.add_argument("idx", type=int,
help="index of the machine/jobs")
args = parser.parse_args()
n_machines = args.n_machines
idx = args.idx

At this point, each job/machine will have different idx value.

Step 4: Data Sharding
Once we have idx variable with different values for each job/machine. We can do a simple data sharding by using NumPy numpy.array_split(). But first of all, we need to know there are 2 types of data sharding:

  1. Shard by files
    if you have multiple files in a single folder on GCS Bucket, you can do shard by files, meaning you distribute the files across the jobs/machines. Let say these are your files: ['data_1.csv', 'data_2.csv', 'data_3.csv', 'data_4.csv', 'data_5.csv', 'data_6.csv'] and you have 2 jobs/machines, so the first job/machine will get data 1 until 3, and the second job will get the rest of them (data 4–6).
  2. Shard by data
    Typically, this method is suitable for a single big data. The way we do it, simply load the data, split them into partitions, and just use a particular partition to be used on the job/machine. If you have a memory issue to load the data, you can use pandas load in chunks (only work in CSV format).

Here’s the example:

import tensorflow as tf
import pandas as pd
import numpy as np
<argparse script>
n_machines = args.n_machines
idx = args.idx
# sharding by Files## path of the data
data_path = "gs://alim/storage/data_*.csv"
## check the total data
files = tf.io.gfile.glob(data_path)
> will return list of data path
## do sharding by Files with numpy
files = np.array_split(files, n_machines)[idx]
# sharding by Data
data_path = "gs://alim/storage/big_data.csv"
## load the data
data = pd.read_csv(data_path)
## do sharding by Data with numpy
data = np.array_split(data, n_machines)[idx]

Using the NumPy array split function, as long as the input data are constant, the output will always be the same (no random function). Hence, we can assign different data/files to each job by looking at the index.

Furthermore, you can also combine both methods.

ps: This idea coming from the Tensorflow Data Distribution system [link]

Step 5: Saving the result

To avoid overwritten results for each job/machine. We can simply add the index as a suffix at the output filename.

output_path = "gs://alim/storage/output_{}.csv"output_path = output_path.format(idx)

Voila, that's the simple and lazy script to do a distributed task on GCP AI Platform using Numpy and Argparse. Thank you for reading!

References:

--

--

No responses yet