Vertex AI Streaming Ingestion SDK

Feature Store: Streaming ingestion SDK

Colab logo Run in Colab GitHub logo View on GitHub Vertex AI logo Open in Vertex AI Workbench

NOTE: This notebook has been tested in the following environment:

  • Python version = 3.9

Overview

This notebook demonstrates how to use Vertex AI Feature Store’s streaming ingestion at the SDK layer.

Learn more about Vertex AI Feature Store.

Objective

In this tutorial, you learn how to ingest features from a Pandas DataFrame into your Vertex AI Feature Store using write_feature_values method from the Vertex AI SDK.

This tutorial uses the following Google Cloud ML services and resources:

  • Vertex AI Feature Store

The steps performed include:

  • Create Feature Store
  • Create new Entity Type for your Feature Store
  • Ingest feature values from Pandas DataFrame into Feature Store’s Entity Types.

Dataset

The dataset used for this notebook is the penguins dataset from BigQuery public datasets. This dataset has the following features: culmen_length_mm, culmen_depth_mm, flipper_length_mm, body_mass_g, species, and sex.

Costs

This tutorial uses billable components of Google Cloud:

  • Vertex AI

Learn about Vertex AI pricing and use the Pricing Calculator to generate a cost estimate based on your projected usage.

Installation

Install the following packages required to execute this notebook.

# Install the packages
! pip3 install --upgrade google-cloud-aiplatform\
                         google-cloud-bigquery\
                         numpy\
                         pandas\
                         db-dtypes\
                         pyarrow -q\
                         --user
  WARNING: The scripts f2py, f2py3 and f2py3.9 are installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
  WARNING: The script tb-gcp-uploader is installed in '/home/jupyter/.local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
apache-beam 2.46.0 requires numpy<1.25.0,>=1.14.3, but you have numpy 1.25.2 which is incompatible.
apache-beam 2.46.0 requires pyarrow<10.0.0,>=3.0.0, but you have pyarrow 13.0.0 which is incompatible.
numba 0.56.4 requires numpy<1.24,>=1.18, but you have numpy 1.25.2 which is incompatible.
tensorflow 2.6.5 requires numpy~=1.19.2, but you have numpy 1.25.2 which is incompatible.
ydata-profiling 4.5.1 requires numpy<1.24,>=1.16.0, but you have numpy 1.25.2 which is incompatible.
ydata-profiling 4.5.1 requires pandas!=1.4.0,<2.1,>1.1, but you have pandas 2.1.0 which is incompatible.


Colab only: Uncomment the following cell to restart the kernel.

# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

Before you begin

Set up your Google Cloud project

The following steps are required, regardless of your notebook environment.

  1. Select or create a Google Cloud project. When you first create an account, you get a $300 free credit towards your compute/storage costs.

  2. Make sure that billing is enabled for your project.

  3. Enable the Vertex AI API.

  4. If you are running this notebook locally, you need to install the Cloud SDK.

Set your project ID

If you don’t know your project ID, try the following:

PROJECT_ID = "qwiklabs-gcp-03-f1cbfa92af09"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID}
Updated property [core/project].

Region

You can also change the REGION variable used by Vertex AI. Learn more about Vertex AI regions.

REGION = "us-central1"  # @param {type: "string"}

Authenticate your Google Cloud account

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

1. Vertex AI Workbench

  • Do nothing as you are already authenticated.

2. Local JupyterLab instance, uncomment and run:

# ! gcloud auth login

3. Colab, uncomment and run:

# from google.colab import auth
# auth.authenticate_user()

4. Service account or other

UUID

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial.

import random
import string


# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))


UUID = generate_uuid()

Import libraries

import numpy as np
import pandas as pd
from google.cloud import aiplatform, bigquery

Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project.

aiplatform.init(project=PROJECT_ID, location=REGION)

Download and prepare the data

def download_bq_table(bq_table_uri: str) -> pd.DataFrame:
    # Remove bq:// prefix if present
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix) :]

    table = bigquery.TableReference.from_string(bq_table_uri)

    # Create a BigQuery client
    bqclient = bigquery.Client(project=PROJECT_ID)

    # Download the table rows
    rows = bqclient.list_rows(
        table,
    )
    return rows.to_dataframe()
BQ_SOURCE = "bq://bigquery-public-data.ml_datasets.penguins"

# Download penguins BigQuery table
penguins_df = download_bq_table(BQ_SOURCE)

Prepare the data

Feature values to be written to the Feature Store can take the form of a list of WriteFeatureValuesPayload objects, a Python dict of the form

{entity_id : {feature_id : feature_value}, ...},

or a pandas Dataframe, where the index column holds the unique entity ID strings and each remaining column represents a feature. In this notebook, since you use a pandas DataFrame for ingesting features we convert the index column data type to string to be used as Entity ID.

# Prepare the data
penguins_df.index = penguins_df.index.map(str)
# Remove null values
NA_VALUES = ["NA", "."]
penguins_df = penguins_df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()

Create Feature Store and define schemas

Vertex AI Feature Store organizes resources hierarchically in the following order:

Featurestore -> EntityType -> Feature

You must create these resources before you can ingest data into Vertex AI Feature Store.

Learn more about Vertex AI Feature Store

Create a Feature Store

You create a Feature Store using aiplatform.Featurestore.create with the following parameters:

  • featurestore_id (str): The ID to use for this Featurestore, which will become the final component of the Featurestore’s resource name. The value must be unique within the project and location.
  • online_store_fixed_node_count: Configuration for online serving resources.
  • project: Project to create EntityType in. If not set, project set in aiplatform.init is used.
  • location: Location to create EntityType in. If not set, location set in aiplatform.init is used.
  • sync: Whether to execute this creation synchronously.
FEATURESTORE_ID = f"penguins_{UUID}"

penguins_feature_store = aiplatform.Featurestore.create(
    featurestore_id=FEATURESTORE_ID,
    online_store_fixed_node_count=1,
    project=PROJECT_ID,
    location=REGION,
    sync=True,
)
Verify that the Feature Store is created

Check if the Feature Store was successfully created by running the following code block.

fs = aiplatform.Featurestore(
    featurestore_name=FEATURESTORE_ID,
    project=PROJECT_ID,
    location=REGION,
)
print(fs.gca_resource)
name: "projects/372586753296/locations/us-central1/featurestores/penguins_st0ifgkm"
create_time {
  seconds: 1694322479
  nanos: 288085000
}
update_time {
  seconds: 1694322479
  nanos: 418288000
}
etag: "AMEw9yNxFYMxQ2EHY88N9g0BevsWqyh-iScp0Z-Td0CyHd2sivNFCm70ax77A8pcdDQg"
online_serving_config {
  fixed_node_count: 1
}
state: STABLE
online_storage_ttl_days: 4000

Create an EntityType

An entity type is a collection of semantically related features. You define your own entity types, based on the concepts that are relevant to your use case. For example, a movie service might have the entity types movie and user, which group related features that correspond to movies or users.

Here, you create an entity type entity type named penguin_entity_type using create_entity_type with the following parameters:

  • entity_type_id (str): The ID to use for the EntityType, which will become the final component of the EntityType’s resource name. The value must be unique within a Feature Store.
  • description: Description of the EntityType.
ENTITY_TYPE_ID = f"penguin_entity_type_{UUID}"

# Create penguin entity type
penguins_entity_type = penguins_feature_store.create_entity_type(
    entity_type_id=ENTITY_TYPE_ID,
    description="Penguins entity type",
)
Verify that the EntityType is created

Check if the Entity Type was successfully created by running the following code block.

entity_type = penguins_feature_store.get_entity_type(entity_type_id=ENTITY_TYPE_ID)

print(entity_type.gca_resource)
name: "projects/372586753296/locations/us-central1/featurestores/penguins_st0ifgkm/entityTypes/penguin_entity_type_st0ifgkm"
description: "Penguins entity type"
create_time {
  seconds: 1694322581
  nanos: 699343000
}
update_time {
  seconds: 1694322581
  nanos: 699343000
}
etag: "AMEw9yM3PY6wORp6__ikvhpAWutVvKmc6hSPC9TfejfgUmIVpDfGdybouCQ5Rkq3I4ud"
monitoring_config {
}

Create Features

A feature is a measurable property or attribute of an entity type. For example, penguin entity type has features such as flipper_length_mm, and body_mass_g. Features can be created within each entity type.

When you create a feature, you specify its value type such as DOUBLE, and STRING. This value determines what value types you can ingest for a particular feature.

Learn more about Feature Value Types

penguins_feature_configs = {
    "species": {
        "value_type": "STRING",
    },
    "island": {
        "value_type": "STRING",
    },
    "culmen_length_mm": {
        "value_type": "DOUBLE",
    },
    "culmen_depth_mm": {
        "value_type": "DOUBLE",
    },
    "flipper_length_mm": {
        "value_type": "DOUBLE",
    },
    "body_mass_g": {"value_type": "DOUBLE"},
    "sex": {"value_type": "STRING"},
}

You can create features either using create_feature or batch_create_features. Here, for convinience, you have added all feature configs in one variabel, so we use batch_create_features.

penguin_features = penguins_entity_type.batch_create_features(
    feature_configs=penguins_feature_configs,
)

Write features to the Feature Store

Use the write_feature_values API to write a feature to the Feature Store with the following parameter:

  • instances: Feature values to be written to the Feature Store that can take the form of a list of WriteFeatureValuesPayload objects, a Python dict, or a pandas Dataframe.

This streaming ingestion feature has been introduced to the Vertex AI SDK under the preview namespace. Here, you pass the pandas Dataframe you created from penguins dataset as instances parameter.

Learn more about Streaming ingestion API

penguins_df.head()

species island culmen_length_mm culmen_depth_mm flipper_length_mm body_mass_g sex
0 Adelie Penguin (Pygoscelis adeliae) Dream 36.6 18.4 184.0 3475.0 FEMALE
1 Adelie Penguin (Pygoscelis adeliae) Dream 39.8 19.1 184.0 4650.0 MALE
2 Adelie Penguin (Pygoscelis adeliae) Dream 40.9 18.9 184.0 3900.0 MALE
3 Chinstrap penguin (Pygoscelis antarctica) Dream 46.5 17.9 192.0 3500.0 FEMALE
4 Adelie Penguin (Pygoscelis adeliae) Dream 37.3 16.8 192.0 3000.0 FEMALE
penguins_entity_type.preview.write_feature_values(instances=penguins_df)

Read back written features

Wait a few seconds for the write to propagate, then do an online read to confirm the write was successful.

ENTITY_IDS = [str(x) for x in range(100)]
penguins_entity_type.read(entity_ids=ENTITY_IDS)

Cleaning up

To clean up all Google Cloud resources used in this project, you can delete the Google Cloud project you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

penguins_feature_store.delete(force=True)
!jupyter nbconvert "feature_store_streaming_ingestion_sdk (1).ipynb" --to markdown --output-dir "G:\My Drive\hugo\learning\content\ml\fs" --output index.md
#!xcopy /s /y "G:\My Drive\org\notebooks\*.jpg"  "G:\My Drive\hugo\learning\content\ml\airflow"

[NbConvertApp] Converting notebook feature_store_streaming_ingestion_sdk (1).ipynb to markdown
[NbConvertApp] Writing 17596 bytes to G:\My Drive\hugo\learning\content\ml\fs\index.md

Next