Skip to main content

BigQuery Loader

Under the umbrella of Snowplow BigQuery Loader, we have a family of applications that can be used to load enriched Snowplow data into BigQuery.

At the high level, BigQuery loader reads enriched Snowplow events in real time and loads them in BigQuery using the Storage Write API.

BigQuery loader consists of three applications: Loader, Mutator and Repeater. The following diagram illustrates the interaction between them and BigQuery:

Schemas in BigQuery

For more information on how events are stored in BigQuery, check the mapping between Snowplow schemas and the corresponding BigQuery column types.

Technical Architecture

The available tools are:

  1. Snowplow BigQuery StreamLoader, a standalone Scala app that can be deployed on Google Kubernetes Engine.
  2. Snowplow BigQuery Loader, an alternative to StreamLoader, in the form of a Google Cloud Dataflow job.
  3. Snowplow BigQuery Mutator, a Scala app that performs table updates to add new columns as required.
  4. Snowplow BigQuery Repeater, a Scala app that reads failed inserts (caused by table update lag) and re-tries inserting them into BigQuery after some delay, sinking failures into a dead-letter bucket.

Snowplow BigQuery StreamLoader

  • Reads Snowplow enriched events from a dedicated Pub/Sub subscription.
  • Uses the JSON transformer from the Snowplow Scala Analytics SDK to convert those enriched events into JSON.
  • Uses Iglu Client to fetch JSON schemas for self-describing events and entities.
  • Uses Iglu Schema DDL to transform self-describing events and entities into BigQuery format.
  • Writes transformed data into BigQuery.
  • Writes all encountered Iglu types into a dedicated Pub/Sub topic (the types topic).
  • Writes all data that failed to be validated against its schema into a dedicated badRows Pub/Sub topic.
  • Writes all data that was successfully transformed, but could not be loaded into a dedicated failedInserts topic.

Snowplow BigQuery Loader

An Apache Beam job intended to run on Google Cloud Dataflow. An alternative to the StreamLoader application, it has the same algorithm.

Snowplow BigQuery Mutator

The Mutator app is in charge of performing automatic table updates, which means you do not have to pause loading and manually update the table every time you're adding a new custom self-describing event or entity.

  • Reads messages from a dedicated subscription to the types topic.
  • Finds out if a message contains a type that has not been encountered yet (by checking internal cache).
  • If a message contains a new type, double-checks it with the connected BigQuery table.
  • If the type is not in the table, fetches its JSON schema from an Iglu registry.
  • Transforms the JSON schema into BigQuery column definition.
  • Adds the column to the connected BigQuery table.

Snowplow BigQuery Repeater

The Repeater app is in charge of handling failed inserts. It reads ready-to-load events from a dedicated subscription on the failedInserts topic and re-tries inserting them into BigQuery to overcome 'table update lag'.

Table update lag

The loader app inserts data into BigQuery in near real-time. At the same time, it sinks messages containing information about the fields of an event into the types topic. It can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an ALTER TABLE statement against the table. Additionally, the new column takes some time to propagate and become visible to all workers trying to write to it.

If a new type arrives from the input subscription in this period of time, BigQuery might reject the row containing it and it will be sent to the failedInserts topic. This topic contains JSON objects ready to be loaded into BigQuery (ie not canonical Snowplow Enriched event format).

In order to load this data again from failedInserts to BigQuery you can use Repeater, which reads a subscription on failedInserts and performs INSERT statements.

Repeater has several important behavior aspects:

  • If a pulled record is not a valid Snowplow event, it will result into a loader_recovery_error bad row.
  • If a pulled record is a valid event, Repeater will wait some time (15 minutes by default) after the etl_tstamp before attempting to re-insert it, in order to let Mutator do its job.
  • If the database responds with an error, the row will get transformed into a loader_recovery_error bad row.
  • All entities in the dead-letter bucket are valid Snowplow bad rows.

Topics, subscriptions and message formats

The Snowplow BigQuery Loader apps use Pub/Sub topics and subscriptions to store intermediate data and communicate with each other.

KindPopulated byConsumed byData format
Input subscriptionEnriched events topicLoader / StreamLoadercanonical TSV + JSON enriched format
Types topicLoader / StreamLoaderTypes subscriptioniglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0
Types subscriptionTypes topicMutatoriglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0
Bad row topicLoader / StreamLoaderGCS Loaderiglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0
iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0
iglu:com.snowplowanalytics.snowplow.badrows/loader_runtime_error/jsonschema/1-0-1
Failed insert topicLoader / StreamLoaderFailed insert subscriptionBigQuery JSON
Failed insert subscriptionFailed insert topicRepeaterBigQuery JSON

Setup guide

Configuration file

Loader / StreamLoader, Mutator and Repeater accept the same configuration file in HOCON format. An example of a minimal configuration file can look like this:

{
"projectId": "com-acme"

"loader": {
"input": {
"subscription": "enriched-sub"
}

"output": {
"good": {
"datasetId": "snowplow"
"tableId": "events"
}

"bad": {
"topic": "bad-topic"
}

"types": {
"topic": "types-topic"
}

"failedInserts": {
"topic": "failed-inserts-topic"
}
}
}

"mutator": {
"input": {
"subscription": "types-sub"
}

"output": {
"good": ${loader.output.good} # will be automatically inferred
}
}

"repeater": {
"input": {
"subscription": "failed-inserts-sub"
}

"output": {
"good": ${loader.output.good} # will be automatically inferred

"deadLetters": {
"bucket": "gs://dead-letter-bucket"
}
}
}

"monitoring": {} # disabled
}

The loader takes command line arguments --config with a path to the configuration hocon file and --resolver with a path to the Iglu resolver file. If you are running the docker image then you should mount the configuration files into the container:

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.7.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json

Or you can pass the whole config as a base64-encoded string using the --config option, like so:

docker run \
-v /path/to/resolver.json:/resolver.json \
snowplow/snowplow-bigquery-streamloader:1.7.0 \
--config=ewogICJwcm9qZWN0SWQiOiAiY29tLWFjbWUiCgogICJsb2FkZXIiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAiZW5yaWNoZWQtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogewogICAgICAgICJkYXRhc2V0SWQiOiAic25vd3Bsb3ciCiAgICAgICAgInRhYmxlSWQiOiAiZXZlbnRzIgogICAgICB9CgogICAgICAiYmFkIjogewogICAgICAgICJ0b3BpYyI6ICJiYWQtdG9waWMiCiAgICAgIH0KCiAgICAgICJ0eXBlcyI6IHsKICAgICAgICAidG9waWMiOiAidHlwZXMtdG9waWMiCiAgICAgIH0KCiAgICAgICJmYWlsZWRJbnNlcnRzIjogewogICAgICAgICJ0b3BpYyI6ICJmYWlsZWQtaW5zZXJ0cy10b3BpYyIKICAgICAgfQogICAgfQogIH0KCiAgIm11dGF0b3IiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAidHlwZXMtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogJHtsb2FkZXIub3V0cHV0Lmdvb2R9ICMgd2lsbCBiZSBhdXRvbWF0aWNhbGx5IGluZmVycmVkCiAgICB9CiAgfQoKICAicmVwZWF0ZXIiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAiZmFpbGVkLWluc2VydHMtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogJHtsb2FkZXIub3V0cHV0Lmdvb2R9ICMgd2lsbCBiZSBhdXRvbWF0aWNhbGx5IGluZmVycmVkCgogICAgICAiZGVhZExldHRlcnMiOiB7CiAgICAgICAgImJ1Y2tldCI6ICJnczovL2RlYWQtbGV0dGVyLWJ1Y2tldCIKICAgICAgfQogICAgfQogIH0KCiAgIm1vbml0b3JpbmciOiB7fSAjIGRpc2FibGVkCn0= \
--resolver=/resolver.json

The --config command option is actually optional. For some setups it is more convenient to provide configuration parameters using JVM system properties or environment variables, as documented in the Lightbend config readme.

For example, to override the repeater.input.subscription setting using system properties:

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.7.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
-Drepeater.input.subscription="failed-inserts-sub"

Or to use environment variables for every setting:

docker run \
-v /path/to/resolver.json:/resolver.json \
snowplow/snowplow-bigquery-repeater:1.7.0 \
--resolver=/resolver.json \
-Dconfig.override_with_env_vars=true

See the configuration reference for more details and advanced settings.

Command line options

All apps accept a config HOCON as specified above, and an Iglu resolver config passed via the --resolver option. The latter must conform to the iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-3 schema.

StreamLoader

StreamLoader accepts --config and --resolver arguments, as well as any JVM system properties that can be used to override the configuration.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.7.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
-Dconfig.override_with_env_vars=true

The --config flag is optional, but if missing, all configuration options must be specified in some other way (system properties or environment variables).

The Dataflow Loader

The Dataflow Loader accepts the same two arguments as StreamLoader and any other supported by Google Cloud Dataflow.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-loader:1.7.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--labels={"key1":"val1","key2":"val2"} # optional Dataflow args

The optional labels argument is an example of a Dataflow natively supported argument. It accepts a JSON with key-value pairs that will be used as labels to the Cloud Dataflow job.

This can be launched from any machine authenticated to submit Dataflow jobs.

Mutator

Mutator has three subcommands: listencreate and add-column.

listen

listen is the primary command and is used to automate table migrations.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.7.0 \
listen \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--verbose # optional, for debugging only
add-column

add-column can be used once to add a column to the table specified via the loader.output.good setting. This should eliminate the risk of table update lag and the necessity to run a Repeater, but requires 'manual' intervention.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.7.0 \
add-column \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--shred-property=CONTEXTS \
--schema="iglu:com.acme/app_context/jsonschema/1-0-0"

The specified schema must be present in one of the Iglu registries in the resolver configuration.

create

create creates an empty table with atomic structure. It can optionally be partitioned by a TIMESTAMP field.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.7.0 \
create \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--partitionColumn=load_tstamp \ # optional TIMESTAMP column by which to partition the table
--requirePartitionFilter # optionally require a filter on the partition column in all queries

See the Google documentation for more information about partitioned tables.

Repeater

We recommend constantly running Repeater on a small / cheap node or Docker container.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-repeater:1.7.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--bufferSize=20 \ # size of the batch to send to the dead-letter bucket
--timeout=20 \ # duration after which bad rows will be sunk into the dead-letter bucket
--backoffPeriod=900 \ # seconds to wait before attempting an insert (calculated against etl_tstamp)
--verbose # optional, for debugging only

bufferSizetimeout and backoffPeriod are optional parameters.

Docker support

All applications are available as Docker images on Docker Hub, based on Ubuntu Focal and OpenJDK 11:

$ docker pull snowplow/snowplow-bigquery-streamloader:1.7.0
$ docker pull snowplow/snowplow-bigquery-loader:1.7.0
$ docker pull snowplow/snowplow-bigquery-mutator:1.7.0
$ docker pull snowplow/snowplow-bigquery-repeater:1.7.0

We also provide an alternative lightweight set of images based on Google's "distroless" base image, which may provide some security advantages for carrying fewer dependencies. These images are distinguished with the 1.7.0-distroless tag:

$ docker pull snowplow/snowplow-bigquery-streamloader:1.7.0-distroless
$ docker pull snowplow/snowplow-bigquery-loader:1.7.0-distroless
$ docker pull snowplow/snowplow-bigquery-mutator:1.7.0-distroless
$ docker pull snowplow/snowplow-bigquery-repeater:1.7.0-distroless

Mutator, Repeater and Streamloader are also available as fatjar files attached to releases in the project's Github repository.

Was this page helpful?