Skip to main content

Calculate from warehouse

You can use existing attributes that are already in your warehouse, or use the Signals batch engine to calculate new attributes in a new table.

To use historical, warehouse attributes in your real-time use cases, you will need to sync the data to the Profiles Store. Signals includes a sync engine to do this.

Warehouse support

Only Snowflake and BigQuery are supported currently.

Signals is configured slightly differently depending if you're using existing tables or creating new ones.

  • BatchAttributeGroup: create a new warehouse table with new attributes, calculated from your atomic events table
  • ExternalBatchAttributeGroup: sync pre-calculated values to Signals from an existing warehouse table

Sync engine

The sync engine is a cron job that sends warehouse attributes to the Profiles Store.

The engine will be enabled when you either:

  • Apply an ExternalBatchAttributeGroup for an existing table
  • Run the batch engine sync command after creating new attribute tables

Once enabled, syncs begin at a fixed interval. By default, this is every 1 hour. Only the records that have changed since the last sync are sent to the Profiles Store.

Using existing attributes

Using existing tables in your warehouse is the more straight-forward approach, as it doesn't require any additional modeling. You'll need to define an ExternalBatchAttributeGroup, including a BatchSource warehouse configuration object.

To start syncing existing tables, publish your ExternalBatchAttributeGroup group to Signals.

sp_signals.publish([attribute_group])

The sync will begin: the sync engine will look for new records at a given interval, based on the timestamp_field and the last time it ran. The default time interval is 1 hour.

Creating new attribute tables

To create new batch attributes, you'll need to define a BatchAttributeGroup, and publish it to Signals. However, further steps are necessary to create the required dbt models and tables in your warehouse, and register them with Signals.

The included batch engine CLI tool will help you with this process. Check out the batch engine tutorial for a step-by-step guide.

Installing the CLI tool

The Signals Python SDK includes an optional CLI tool called the batch engine. It will help you create the required dbt models and tables in your warehouse, and register them with Signals.

The batch engine is installed separately from the main Python SDK.

Choose where your new Signals dbt projects will live. Install the CLI tool there with:

pip install 'snowplow-signals[batch-engine]'

This adds the snowplow-batch-engine tool to your environment.

CLI commands

The available options are:

  init              # Initialize dbt project structure and base configuration
generate # Generate dbt project assets
sync # Registers the attribute table as a data source with Signals and publishes the Attribute Group so that syncing can begin
test_connection # Test the connection to the authentication and API services

A --verbose flag is available for every command.

Here's an example of using the CLI:

snowplow-batch-engine init --verbose

Creating and registering tables

Check out the batch engine tutorial for a walkthrough of the required steps.

The dbt models generated by the batch engine process events incrementally. This avoids unnecessary reprocessing, and along with the pre-aggregation logic, minimizes computational costs.

Model variables

The model created for each attribute group has configurable variables. The most important one to update before running is the snowplow__start_date variable. This will depend on how far back you have data, and how much of it you want to process.

You will need to update the variables for each attribute group individually, by editing the dbt_project.yml files. The table below lists the configurable variables for each model:

VariableDescriptionDefault Value
snowplow__start_dateDate from where the model starts looking for events, based on both load_tstamp and derived_tstamp'2025-01-01'
snowplow__app_idFilter the data on specific app_ids[]
snowplow__backfill_limit_daysLimit backfill increments for the filtered_events_table1
snowplow__late_event_lookback_daysThe number of days to allow for late arriving data to be reprocessed during daily aggregation5
snowplow__min_late_events_to_processThe threshold number of skipped daily events to process during daily aggregation1
snowplow__atomic_schemaChange this if you aren't using atomic schema for Snowplow event data'atomic'
snowplow__databaseChange this if you aren't using target.database for Snowplow event data
snowplow__events_tableChange this if you aren't using events table for Snowplow event data'events'