The general principle behind an incremental model is to identify new events/rows since the previous run of the model, and only process these new events. This minimizes cost and reduces run times. This is great for basic event streams, however you begin to encounter issues for any aggregations you wish to do. To resolve this we treat a
session as the atomic unit of the majority of our packages, ensuring that when we have a new event for a previously processed session, we have to reprocess all historic events for that session as well as the new events - otherwise our metrics would be incorrect. Note that because we allow custom session identifiers this does not have to be a true web-type
The logic we have implemented to achieve this is:
- Identify new events since the previous run of the package
- Identify all
session_identifiers associated with the new events
- Look back over the events table to find all events associated with these
- Use only this set of events in all downstream processing
Given the large nature of event tables, Step 3 can be an expensive operation, we solve this issue in 3 ways:
|We have to scan the whole events table to find events for a subset of sessions
|Record when any given session started. We use a lifecycle manifest table to record the start and end timestamp of all sessions.
|Sessions generated by bots can persist for years which would mean scanning years of data every run of the package
|Limit the maximum allowed session length. We use a quarantine table to store the
session_identifier of any sessions that have exceeded the max allowed session length (
snowplow__max_session_days). For such sessions, all events are processed up until the max allowed length. Moving forward, no more data is processed for that session.
|We don't know how up-to-date each model is, so we have to scan each derived table
|Record the max processed timestamp for each model. We use an incremental manifest table to record the max processed timestamp of all derived tables.
Your Snowplow dbt package can be considered to be in one of 4 states at any given time:
- First run of the package
- New model introduced
- Models out of sync
- Standard run
This state determines the range of events to start with to process in the next run of the package (note that events earlier than this range may be processed as part of a complete session, as detailed above). We use the
max_last_success for models enabled in the current run, as well as the number of models in the manifest, as the base for our calculation.
The identification of which state the package is in is computed by the
get_run_limits macro which is called in the
snowplow_<package>_base_new_event_limits model. This macro uses the metadata recorded in
snowplow_<package>_incremental_manifest to determine the
upper_limit for the run of the package. This limit is based on the calculations shown below, and is based on the field defined in your
To get the min and max last success, we run the following query:
min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models
where model in (array_of_snowplow_tagged_enabled_models)
Based on these the model is in one of the four states.
In all states the
upper_limit is limited by the
snowplow__backfill_limit_days variable. This protects against back-fills with many rows causing very long run times.
State 1: First run of the package
If there are no enabled models already in the manifest then we process from the start date up to the backfill limit or now, whichever is older:
least(current_tstamp, snowplow__start_date + snowplow__backfill_limit_days)
State 2: New model introduced
If there are enabled models that aren't in the manifest table then a new model tagged with
snowplow_<package>_incremental has been added since the last run; this can happen with a new custom model, or you have enabled some previously disabled custom modules. In this case the package will replay all previously processed events in order to back-fill the new model.
least(max_last_success, snowplow__start_date + snowplow__backfill_limit_days)
State 3: Models out of sync
min_last_success is less than the
max_last_success it means the tagged models are out of sync, for example due to a particular model failing to execute successfully during the previous run or as part of catching up on a new model. The package will attempt to sync all models as far as your backfill limit will allow.
min_last_success - snowplow__lookback_window_hours
least(max_last_success, min_last_success + snowplow__backfill_limit_days)
State 4: Standard run
If none of the above criteria are met, then we consider it a 'standard run' where all models are in sync and we carry on from the last processed event.
max_last_success - snowplow__lookback_window_hours
least(current_tstamp, max_last_success + snowplow__backfill_limit_days)
Remember, this upper and lower limit is only used to identify which events are considered to get the session identifiers for the run, additional events outside this date range may be included to ensure that full sessions are processed. See late arriving data for more information.
How to identify the current state
If you want to check the current state of a model, run the
snowplow_<package>_base_new_event_limits model. This will log the current state to the CLI while causing no disruption to the incremental processing of events. You will also see this in the logs of any incremental run.
dbt run --select snowplow_<package>_base_new_event_limits
00:26:29 + Snowplow: Standard incremental run
00:26:29 + Snowplow: Processing data between 2021-01-05 17:59:32 and 2021-01-07 23:59:32
- State 1:
Snowplow: No data in manifest. Processing data from start_date
- State 2:
Snowplow: New Snowplow incremental model. Backfilling
- State 3:
Snowplow: Snowplow incremental models out of sync. Syncing
- State 4:
Snowplow: Standard incremental run
In all states, although much more likely in state 1, it is possible that a run finds no incremental data to process. In this case you will see a warning in the logs of the form
Snowplow Warning: No data in <table> for date range from variables, please modify your run variables to include data if this is not expected.. In this case you should increase your start date if no runs have completed, or increase your backfill limit days if there is a gap in your data before the next records.