Vertex AI Streaming Ingestion SDK
Feature Store: Streaming ingestion SDK
Run in Colab
|
View on GitHub
|
|
NOTE: This notebook has been tested in the following environment:
- Python version = 3.9
Overview
This notebook demonstrates how to use Vertex AI Feature Store’s streaming ingestion at the SDK layer.
Learn more about Vertex AI Feature Store.
Objective
In this tutorial, you learn how to ingest features from a Pandas DataFrame into your Vertex AI Feature Store using write_feature_values method from the Vertex AI SDK.
This tutorial uses the following Google Cloud ML services and resources:
- Vertex AI Feature Store
The steps performed include:
- Create
Feature Store - Create new
Entity Typefor yourFeature Store - Ingest feature values from
Pandas DataFrameintoFeature Store’sEntity Types.
Dataset
The dataset used for this notebook is the penguins dataset from BigQuery public datasets. This dataset has the following features: culmen_length_mm, culmen_depth_mm, flipper_length_mm, body_mass_g, species, and sex.
Costs
This tutorial uses billable components of Google Cloud:
- Vertex AI
Learn about Vertex AI pricing and use the Pricing Calculator to generate a cost estimate based on your projected usage.
Installation
Install the following packages required to execute this notebook.
# Install the packages
! pip3 install --upgrade google-cloud-aiplatform\
google-cloud-bigquery\
numpy\
pandas\
db-dtypes\
pyarrow -q\
--user
[33m WARNING: The scripts f2py, f2py3 and f2py3.9 are installed in '/home/jupyter/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.[0m[33m
[0m[33m WARNING: The script tb-gcp-uploader is installed in '/home/jupyter/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.[0m[33m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
apache-beam 2.46.0 requires numpy<1.25.0,>=1.14.3, but you have numpy 1.25.2 which is incompatible.
apache-beam 2.46.0 requires pyarrow<10.0.0,>=3.0.0, but you have pyarrow 13.0.0 which is incompatible.
numba 0.56.4 requires numpy<1.24,>=1.18, but you have numpy 1.25.2 which is incompatible.
tensorflow 2.6.5 requires numpy~=1.19.2, but you have numpy 1.25.2 which is incompatible.
ydata-profiling 4.5.1 requires numpy<1.24,>=1.16.0, but you have numpy 1.25.2 which is incompatible.
ydata-profiling 4.5.1 requires pandas!=1.4.0,<2.1,>1.1, but you have pandas 2.1.0 which is incompatible.[0m[31m
[0m
Colab only: Uncomment the following cell to restart the kernel.
# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython
# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)
Before you begin
Set up your Google Cloud project
The following steps are required, regardless of your notebook environment.
-
Select or create a Google Cloud project. When you first create an account, you get a $300 free credit towards your compute/storage costs.
-
If you are running this notebook locally, you need to install the Cloud SDK.
Set your project ID
If you don’t know your project ID, try the following:
- Run
gcloud config list. - Run
gcloud projects list. - See the support page: Locate the project ID
PROJECT_ID = "qwiklabs-gcp-03-f1cbfa92af09" # @param {type:"string"}
# Set the project id
! gcloud config set project {PROJECT_ID}
Updated property [core/project].
Region
You can also change the REGION variable used by Vertex AI. Learn more about Vertex AI regions.
REGION = "us-central1" # @param {type: "string"}
Authenticate your Google Cloud account
Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.
1. Vertex AI Workbench
- Do nothing as you are already authenticated.
2. Local JupyterLab instance, uncomment and run:
# ! gcloud auth login
3. Colab, uncomment and run:
# from google.colab import auth
# auth.authenticate_user()
4. Service account or other
- See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples.
UUID
If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a uuid for each instance session, and append it onto the name of resources you create in this tutorial.
import random
import string
# Generate a uuid of a specifed length(default=8)
def generate_uuid(length: int = 8) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
UUID = generate_uuid()
Import libraries
import numpy as np
import pandas as pd
from google.cloud import aiplatform, bigquery
Initialize Vertex AI SDK for Python
Initialize the Vertex AI SDK for Python for your project.
aiplatform.init(project=PROJECT_ID, location=REGION)
Download and prepare the data
def download_bq_table(bq_table_uri: str) -> pd.DataFrame:
# Remove bq:// prefix if present
prefix = "bq://"
if bq_table_uri.startswith(prefix):
bq_table_uri = bq_table_uri[len(prefix) :]
table = bigquery.TableReference.from_string(bq_table_uri)
# Create a BigQuery client
bqclient = bigquery.Client(project=PROJECT_ID)
# Download the table rows
rows = bqclient.list_rows(
table,
)
return rows.to_dataframe()
BQ_SOURCE = "bq://bigquery-public-data.ml_datasets.penguins"
# Download penguins BigQuery table
penguins_df = download_bq_table(BQ_SOURCE)
Prepare the data
Feature values to be written to the Feature Store can take the form of a list of WriteFeatureValuesPayload objects, a Python dict of the form
{entity_id : {feature_id : feature_value}, ...},
or a pandas Dataframe, where the index column holds the unique entity ID strings and each remaining column represents a feature. In this notebook, since you use a pandas DataFrame for ingesting features we convert the index column data type to string to be used as Entity ID.
# Prepare the data
penguins_df.index = penguins_df.index.map(str)
# Remove null values
NA_VALUES = ["NA", "."]
penguins_df = penguins_df.replace(to_replace=NA_VALUES, value=np.NaN).dropna()
Create Feature Store and define schemas
Vertex AI Feature Store organizes resources hierarchically in the following order:
Featurestore -> EntityType -> Feature
You must create these resources before you can ingest data into Vertex AI Feature Store.
Learn more about Vertex AI Feature Store
Create a Feature Store
You create a Feature Store using aiplatform.Featurestore.create with the following parameters:
featurestore_id (str): The ID to use for this Featurestore, which will become the final component of the Featurestore’s resource name. The value must be unique within the project and location.online_store_fixed_node_count: Configuration for online serving resources.project: Project to create EntityType in. If not set, project set inaiplatform.initis used.location: Location to create EntityType in. If not set, location set inaiplatform.initis used.sync: Whether to execute this creation synchronously.
FEATURESTORE_ID = f"penguins_{UUID}"
penguins_feature_store = aiplatform.Featurestore.create(
featurestore_id=FEATURESTORE_ID,
online_store_fixed_node_count=1,
project=PROJECT_ID,
location=REGION,
sync=True,
)
Verify that the Feature Store is created
Check if the Feature Store was successfully created by running the following code block.
fs = aiplatform.Featurestore(
featurestore_name=FEATURESTORE_ID,
project=PROJECT_ID,
location=REGION,
)
print(fs.gca_resource)
name: "projects/372586753296/locations/us-central1/featurestores/penguins_st0ifgkm"
create_time {
seconds: 1694322479
nanos: 288085000
}
update_time {
seconds: 1694322479
nanos: 418288000
}
etag: "AMEw9yNxFYMxQ2EHY88N9g0BevsWqyh-iScp0Z-Td0CyHd2sivNFCm70ax77A8pcdDQg"
online_serving_config {
fixed_node_count: 1
}
state: STABLE
online_storage_ttl_days: 4000
Create an EntityType
An entity type is a collection of semantically related features. You define your own entity types, based on the concepts that are relevant to your use case. For example, a movie service might have the entity types movie and user, which group related features that correspond to movies or users.
Here, you create an entity type entity type named penguin_entity_type using create_entity_type with the following parameters:
entity_type_id (str): The ID to use for the EntityType, which will become the final component of the EntityType’s resource name. The value must be unique within a Feature Store.description: Description of the EntityType.
ENTITY_TYPE_ID = f"penguin_entity_type_{UUID}"
# Create penguin entity type
penguins_entity_type = penguins_feature_store.create_entity_type(
entity_type_id=ENTITY_TYPE_ID,
description="Penguins entity type",
)
Verify that the EntityType is created
Check if the Entity Type was successfully created by running the following code block.
entity_type = penguins_feature_store.get_entity_type(entity_type_id=ENTITY_TYPE_ID)
print(entity_type.gca_resource)
name: "projects/372586753296/locations/us-central1/featurestores/penguins_st0ifgkm/entityTypes/penguin_entity_type_st0ifgkm"
description: "Penguins entity type"
create_time {
seconds: 1694322581
nanos: 699343000
}
update_time {
seconds: 1694322581
nanos: 699343000
}
etag: "AMEw9yM3PY6wORp6__ikvhpAWutVvKmc6hSPC9TfejfgUmIVpDfGdybouCQ5Rkq3I4ud"
monitoring_config {
}
Create Features
A feature is a measurable property or attribute of an entity type. For example, penguin entity type has features such as flipper_length_mm, and body_mass_g. Features can be created within each entity type.
When you create a feature, you specify its value type such as DOUBLE, and STRING. This value determines what value types you can ingest for a particular feature.
Learn more about Feature Value Types
penguins_feature_configs = {
"species": {
"value_type": "STRING",
},
"island": {
"value_type": "STRING",
},
"culmen_length_mm": {
"value_type": "DOUBLE",
},
"culmen_depth_mm": {
"value_type": "DOUBLE",
},
"flipper_length_mm": {
"value_type": "DOUBLE",
},
"body_mass_g": {"value_type": "DOUBLE"},
"sex": {"value_type": "STRING"},
}
You can create features either using create_feature or batch_create_features. Here, for convinience, you have added all feature configs in one variabel, so we use batch_create_features.
penguin_features = penguins_entity_type.batch_create_features(
feature_configs=penguins_feature_configs,
)
Write features to the Feature Store
Use the write_feature_values API to write a feature to the Feature Store with the following parameter:
instances: Feature values to be written to the Feature Store that can take the form of a list of WriteFeatureValuesPayload objects, a Python dict, or a pandas Dataframe.
This streaming ingestion feature has been introduced to the Vertex AI SDK under the preview namespace. Here, you pass the pandas Dataframe you created from penguins dataset as instances parameter.
Learn more about Streaming ingestion API
penguins_df.head()
| species | island | culmen_length_mm | culmen_depth_mm | flipper_length_mm | body_mass_g | sex | |
|---|---|---|---|---|---|---|---|
| 0 | Adelie Penguin (Pygoscelis adeliae) | Dream | 36.6 | 18.4 | 184.0 | 3475.0 | FEMALE |
| 1 | Adelie Penguin (Pygoscelis adeliae) | Dream | 39.8 | 19.1 | 184.0 | 4650.0 | MALE |
| 2 | Adelie Penguin (Pygoscelis adeliae) | Dream | 40.9 | 18.9 | 184.0 | 3900.0 | MALE |
| 3 | Chinstrap penguin (Pygoscelis antarctica) | Dream | 46.5 | 17.9 | 192.0 | 3500.0 | FEMALE |
| 4 | Adelie Penguin (Pygoscelis adeliae) | Dream | 37.3 | 16.8 | 192.0 | 3000.0 | FEMALE |
penguins_entity_type.preview.write_feature_values(instances=penguins_df)
Read back written features
Wait a few seconds for the write to propagate, then do an online read to confirm the write was successful.
ENTITY_IDS = [str(x) for x in range(100)]
penguins_entity_type.read(entity_ids=ENTITY_IDS)
Cleaning up
To clean up all Google Cloud resources used in this project, you can delete the Google Cloud project you used for the tutorial.
Otherwise, you can delete the individual resources you created in this tutorial:
penguins_feature_store.delete(force=True)
!jupyter nbconvert "feature_store_streaming_ingestion_sdk (1).ipynb" --to markdown --output-dir "G:\My Drive\hugo\learning\content\ml\fs" --output index.md
#!xcopy /s /y "G:\My Drive\org\notebooks\*.jpg" "G:\My Drive\hugo\learning\content\ml\airflow"
[NbConvertApp] Converting notebook feature_store_streaming_ingestion_sdk (1).ipynb to markdown
[NbConvertApp] Writing 17596 bytes to G:\My Drive\hugo\learning\content\ml\fs\index.md
Run in Colab
View on GitHub