Recommender Systems using TensorFlow

Paul Bruffett
6 min readMar 4, 2023

I’ll be building an end-to-end ML Ops pipeline in Azure using GitHub Actions and the newer APIs for orchestrating ML Workspace jobs. To exercise this I’ll useTensorFlow Recommenders (TFRS) to build a retrieval and ranking service to recommend products using the AliCCP dataset. This is a real world dataset published by Alibaba that includes click and conversion data for product purchases.

The end-to-end solution will involve a two stage pipeline in Azure Machine Learning to prepare the data, train the model. Model telemetry and artifacts will be stored in Azure ML using the MLFlow API.

First, the AliCCP dataset can be found here. Downloading does require signing up for a free ali cloud account.

I provisioned an Azure Machine Learning Workspace and embedded the credentials in Github following this guide.

Additionally, you must register your azure subscription ID.

The Github repo with the code I will be referring to can be found here.

After creating an Azure ML Workspace I uploaded the AliCCP train and test data to the workspaceblobstore, this can be found in the ML Workspace UI under “Data” and “Datastores”. I created folders for “aliccp_train” and “aliccp_test” which contain the training and test data respectively. These files can be uploaded using Azure Storage Explorer.

The first component is the .github/workflows file that runs our data preparation and training tasks on check in to main:

jobs:
prep_and_train:
name: "DataPrep"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3

- name: azure login
uses: azure/login@v1
with:
creds: ${{secrets.AZURE_CREDENTIALS}}
- name: setup-cli
run: |
source "${{ github.workspace }}/infra/sdk_helpers.sh";
bash setup.sh
working-directory: cli
continue-on-error: true
- name: run job
run: |
source "${{ github.workspace }}/infra/sdk_helpers.sh";
export CONNECTION_STRING=${{secrets.BLOB_CONNECTION}}
bash -x run-job.sh dataprep.yml
working-directory: code/train
- name: train
run: |
source "${{ github.workspace }}/infra/sdk_helpers.sh";
export CONNECTION_STRING=${{secrets.BLOB_CONNECTION}}
bash -x run-job.sh algo_training.yml
working-directory: code/train

Key takeaways are the use of sdk_helpers.sh, which is a file included verbatim from Microsoft’s reference implementation, this provides a number of convenience functions that are invoked when executing the run-job.sh command. This, also provided by Microsoft, wraps our Python jobs and submits them for running on the ML Workspace.

There are three tasks in the job, setting up the CLI, preparing the raw ALICCP data, and training on the prepared dataset.

Context on the Data

The ALICCP dataset contains almost 42 million records, 300,000 customers and over 3 million products. This is a very large dataset and we can evaluate on both click and conversion metrics.

From the dataset we can also see the number of unique values in each field;

For this recommender we’ll focus on building embeddings from the item_id and user_id to retrieve items the user is most likely to click on.

Data Preparation

For each of our jobs, data preparation and training, we have a .yml file that submits the run command. These yml files are invoked by the workflows file above.

Dataprep.yml invokes the data preparation job:

$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
command: |
ls ${{inputs.data_dir}}
python dataprep.py --file-location ${{inputs.data_dir}} --output-location ${{outputs.output}}
code: src
environment:
name: "prepare_data_environment"
version: 1
image: mcr.microsoft.com/azureml/curated/tensorflow-2.7-ubuntu20.04-py38-cuda11-gpu:25
conda_file: ./environment/environment.yml
inputs:
data_dir:
type: uri_folder
path: azureml://datastores/workspaceblobstore/paths/
output:
type: uri_folder
path: azureml://datastores/workspaceblobstore/paths/
mode: rw_mount
compute: azureml:dataprepcpu

Key pieces of information are the image, determining the container our job is running in, the conda_file with our dependencies and the inputs and outputs.

In this case input and output are the same path with output being mounted read/write, but you are free to specify different locations, these locations are passed as parameters to the job which expects to find the raw data in aliccp_train and aliccp_test folders.

The data preparation script takes quite some time to complete, reading a common_features file and mapping these values to the row level telemetry.

Reading the common_features file and returning an array of lookup values:

def _convert_common_features(common_path, pickle_path=None):
common = {}

with open(common_path, "r") as common_features:
for csv_line in tqdm(common_features, desc="Reading common features..."):
line = csv_line.strip().split(",")
kv = np.array(re.split("[]", line[2]))
keys = kv[range(0, len(kv), 3)]
values = kv[range(1, len(kv), 3)]
common[line[0]] = dict(zip(keys, values))

if pickle_path:
with open(pickle_path, "wb") as handle:
pickle.dump(common, handle, protocol=pickle.HIGHEST_PROTOCOL)

return common

We then parse the readings, rename the columns to friendly names and save parquet files, the size of these is determined by file_size, which sets the number of rows to be written:

with open(path+"/aliccp_%s/sample_skeleton_%s.csv" % (train_type,train_type), "r") as skeleton:
for i, csv_line in tqdm(enumerate(skeleton), desc="Processing data..."):

line = csv_line.strip().split(",")
if line[1] == "0" and line[2] == "1":
continue
kv = np.array(re.split("[]", line[5]))
key = kv[range(0, len(kv), 3)]
value = kv[range(1, len(kv), 3)]
feat_dict = dict(zip(key, value))
feat_dict.update(common[line[3]])
feat_dict["click"] = int(line[1])
feat_dict["conversion"] = int(line[2])

current.append(feat_dict)

if i > 0 and i % file_size == 0:
df = pd.DataFrame(current)

df = df.rename(columns={'109_14':"user_categories", "110_14":"user_shops","127_14":"user_brands","150_14":"user_intentions",'121':'user_profile','122':'user_group',
"124":"user_gender","125":"user_age","126":"user_consumption","128":"user_is_occupied","129":"user_geography",
"205":"item_id","206":"item_category","210":"item_intention","216":"item_brand","508":"user_item_categories",
"509":"user_item_shops","702":"user_item_brands","853":"user_item_intentions","301":"position","207":"item_shop",
"127":"user_consumption_2","101":"user_id"})

index = int((i / file_size) - 1)
file_name = f"%s_{index}.parquet" % train_type
df.to_parquet(
os.path.join(output_path,"%s_processed" % train_type, file_name)
)
current = []

Once run, we have a set of parquet files ready for training. This job is automatically run each time the code is checked in but skips processing if the directories exist already, so any modifications would require clearing the processed data folders that are created to rerun.

Training

Invoked with algo_training.py, the training loop reads our processed parquet files;

$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
command: |
ls ${{inputs.data_dir}}
python algo_training.py --file-location ${{inputs.data_dir}} --output-location ${{outputs.output}}
code: src
environment:
name: "prepare_data_environment"
version: 1
image: mcr.microsoft.com/azureml/curated/tensorflow-2.7-ubuntu20.04-py38-cuda11-gpu:25
conda_file: ./environment/mlenvironment.yml
inputs:
data_dir:
type: uri_folder
path: azureml://datastores/workspaceblobstore/paths/
outputs:
output:
type: uri_folder
path: azureml://datastores/workspaceblobstore/paths/
mode: rw_mount
compute: azureml:traininggpu

The input contains both the processed train and test files in the directory, we output saved weights and tensorboard logs.

Our environment.yml is different including ML dependencies.

The training file begins by reading the data and filtering;

train = pd.read_parquet(path+'/train_processed/')


#filter out long tail products and reduce size of dataset
train['counter'] = 1
df_agg = train[['item_id','counter']].groupby('item_id').count()
df_agg = df_agg[df_agg['counter']>500]
train = pd.merge(df_agg, train, on=['item_id', 'item_id'], how='left')
print(len(train))
train['counter'] = 1
cust_agg = train[['user_id','counter']].groupby('user_id').count()
cust_agg = cust_agg[cust_agg['counter']>50]
train = pd.merge(cust_agg, train, on=['user_id', 'user_id'], how='left')
print(len(train))


train, df_features = dataset_to_tensor(train)

Selecting the products and customers that appear frequently in the dataset, as we can see from the data exploration notebook we have a lot of products that are very infrequently ordered;

The dataset contains 294,865 unique users and 3,156,553 unique products. Cutting this down to the most common users and products will let us demonstrate an algorithm.

This also makes our training runtime more manageable.

Next, we build embedding layers for both users and products;

user_model = tf.keras.Sequential([
tf.keras.layers.IntegerLookup(
vocabulary=unique_users, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(unique_users) + 1, embedding_dimension)
])

item_model = tf.keras.Sequential([
tf.keras.layers.IntegerLookup(
vocabulary=unique_items, mask_token=None),
tf.keras.layers.Embedding(len(unique_items) + 1, embedding_dimension)
])

Set our metrics and build the model that will call the embeddings;

metrics = tfrs.metrics.FactorizedTopK(candidates=item_ids.batch(128).map(item_model))

task = tfrs.tasks.Retrieval(metrics=metrics)

class RetrievalModel(tfrs.Model):
def __init__(self, user_model, item_model):
super().__init__()
self.item_model: tf.keras.Model = item_model
self.user_model: tf.keras.Model = user_model
self.task: tf.keras.layers.Layer = task

def compute_loss(self, features, training=False) -> tf.Tensor:
# We pick out the user features and pass them into the user model.
user_embeddings = self.user_model(features['user_id'])
# And pick out the movie features and pass them into the movie model,
# getting embeddings back.
positive_item_embeddings = self.item_model(features['item_id'])

# The task computes the loss and the metrics.
return self.task(user_embeddings, positive_item_embeddings)

model = RetrievalModel(user_model, item_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

Then we fit and train. We can see the autologging from ML Flow is making metrics available in the experiments in Azure;

Most specifically, we can see the top 100 accuracy;

Obviously some overfitting but let’s see how it evaluates…

Factorized_top_k/top_100_categorical_accuracy: 0.0362, so very badly.

Using the same architecture on movielens 1m we get about 14%, which at least demonstrates a functional architecture not completely memorizing input data.

One note is movielens 1m only contains 3,883 users and 6,040 movie titles. Our reduced dataset has tens of thousands for each. So using a notebook let’s strip this down to the top few thousand products and users and retest;

Still terrible.

Perhaps a DCN model using additional features about both products and users would perform better.

--

--

Paul Bruffett

Enterprise Architect specializing in data and analytics.