Calculate from warehouse
You can use existing attributes that are already in your warehouse, or use the Signals batch engine to calculate new attributes in a new table.
To use historical, warehouse attributes in your real-time use cases, you will need to sync the data to the Profiles Store. Signals includes a sync engine to do this.
Only Snowflake and BigQuery are supported currently.
Signals is configured slightly differently depending if you're using existing tables or creating new ones.
BatchAttributeGroup
: create a new warehouse table with new attributes, calculated from youratomic
events tableExternalBatchAttributeGroup
: sync pre-calculated values to Signals from an existing warehouse table
Sync engine
The sync engine is a cron job that sends warehouse attributes to the Profiles Store.
The engine will be enabled when you either:
- Apply an
ExternalBatchAttributeGroup
for an existing table - Run the batch engine
sync
command after creating new attribute tables
Once enabled, syncs begin at a fixed interval. By default, this is every 1 hour. Only the records that have changed since the last sync are sent to the Profiles Store.
Using existing attributes
Using existing tables in your warehouse is the more straight-forward approach, as it doesn't require any additional modeling. You'll need to define an ExternalBatchAttributeGroup
, including a BatchSource
warehouse configuration object.
To start syncing existing tables, publish your ExternalBatchAttributeGroup
group to Signals.
sp_signals.publish([attribute_group])
The sync will begin: the sync engine will look for new records at a given interval, based on the timestamp_field
and the last time it ran. The default time interval is 1 hour.
Creating new attribute tables
To create new batch attributes, you'll need to define a BatchAttributeGroup
, and publish it to Signals. However, further steps are necessary to create the required dbt models and tables in your warehouse, and register them with Signals.
The included batch engine CLI tool will help you with this process. Check out the batch engine tutorial for a step-by-step guide.
Installing the CLI tool
The Signals Python SDK includes an optional CLI tool called the batch engine. It will help you create the required dbt models and tables in your warehouse, and register them with Signals.
The batch engine is installed separately from the main Python SDK.
Choose where your new Signals dbt projects will live. Install the CLI tool there with:
pip install 'snowplow-signals[batch-engine]'
This adds the snowplow-batch-engine
tool to your environment.
CLI commands
The available options are:
init # Initialize dbt project structure and base configuration
generate # Generate dbt project assets
sync # Registers the attribute table as a data source with Signals and publishes the Attribute Group so that syncing can begin
test_connection # Test the connection to the authentication and API services
A --verbose
flag is available for every command.
Here's an example of using the CLI:
snowplow-batch-engine init --verbose
Creating and registering tables
Check out the batch engine tutorial for a walkthrough of the required steps.
The dbt models generated by the batch engine process events incrementally. This avoids unnecessary reprocessing, and along with the pre-aggregation logic, minimizes computational costs.
Model variables
The model created for each attribute group has configurable variables. The most important one to update before running is the snowplow__start_date
variable. This will depend on how far back you have data, and how much of it you want to process.
You will need to update the variables for each attribute group individually, by editing the dbt_project.yml
files. The table below lists the configurable variables for each model:
Variable | Description | Default Value |
---|---|---|
snowplow__start_date | Date from where the model starts looking for events, based on both load_tstamp and derived_tstamp | '2025-01-01' |
snowplow__app_id | Filter the data on specific app_id s | [] |
snowplow__backfill_limit_days | Limit backfill increments for the filtered_events_table | 1 |
snowplow__late_event_lookback_days | The number of days to allow for late arriving data to be reprocessed during daily aggregation | 5 |
snowplow__min_late_events_to_process | The threshold number of skipped daily events to process during daily aggregation | 1 |
snowplow__atomic_schema | Change this if you aren't using atomic schema for Snowplow event data | 'atomic' |
snowplow__database | Change this if you aren't using target.database for Snowplow event data | |
snowplow__events_table | Change this if you aren't using events table for Snowplow event data | 'events' |