Skip to main content

BigQuery Loader

Under the umbrella of Snowplow BigQuery Loader, we have a family of applications that can be used to load enriched Snowplow data into BigQuery.

There are currently four applications, which are described in detail below. A typical deployment would consist of three of them:

  • a loader app, which consumes the enriched stream and loads the data into the storage target. This comes in two flavours (Loader and StreamLoader) that are alternatives to each other and should not both be used at the same time.
  • a Mutator app, which keeps track of the fields present in the enriched data and updates the BigQuery table accordingly;
  • a Repeater app, which handles so-called failed inserts.

Technical Architecture‚Äč

The available tools are:

  1. Snowplow BigQuery StreamLoader, a standalone Scala app that can be deployed on Google Kubernetes Engine.
  2. Snowplow BigQuery Loader, an alternative to StreamLoader, in the form of a Google Cloud Dataflow job.
  3. Snowplow BigQuery Mutator, a Scala app that performs table updates to add new columns as required.
  4. Snowplow BigQuery Repeater, a Scala app that reads failed inserts (caused by table update lag) and re-tries inserting them into BigQuery after some delay, sinking failures into a dead-letter bucket.

Snowplow BigQuery StreamLoader‚Äč

  • Reads Snowplow enriched events from a dedicated¬†Pub/Sub subscription.
  • Uses the JSON transformer from the¬†Snowplow Scala Analytics SDK¬†to convert those enriched events into JSON.
  • Uses¬†Iglu Client¬†to fetch JSON schemas for self-describing events and entities.
  • Uses¬†Iglu Schema DDL¬†to transform self-describing events and entities into BigQuery format.
  • Writes transformed data into BigQuery.
  • Writes all encountered Iglu types into a dedicated Pub/Sub topic (the types topic).
  • Writes all data that failed to be validated against its schema into a dedicated badRows Pub/Sub topic.
  • Writes all data that was successfully transformed, but could not be loaded into a dedicated failedInserts topic.

Snowplow BigQuery Loader‚Äč

An Apache Beam job intended to run on Google Cloud Dataflow. An alternative to the StreamLoader application, it has the same algorithm.

Snowplow BigQuery Mutator‚Äč

The Mutator app is in charge of performing automatic table updates, which means you do not have to pause loading and manually update the table every time you're adding a new custom self-describing event or entity.

  • Reads messages from a dedicated subscription to the types topic.
  • Finds out if a message contains a type that has not been encountered yet (by checking internal cache).
  • If a message contains a new type, double-checks it with the connected BigQuery table.
  • If the type is not in the table, fetches its JSON schema from an Iglu registry.
  • Transforms the JSON schema into BigQuery column definition.
  • Adds the column to the connected BigQuery table.

Snowplow BigQuery Repeater‚Äč

The Repeater app is in charge of handling failed inserts. It reads ready-to-load events from a dedicated subscription on the failedInserts topic and re-tries inserting them into BigQuery to overcome 'table update lag'.

Table update lag‚Äč

The loader app inserts data into BigQuery in near real-time. At the same time, it sinks messages containing information about the fields of an event into the types topic. It can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an ALTER TABLE statement against the table. Additionally, the new column takes some time to propagate and become visible to all workers trying to write to it.

If a new type arrives from the input subscription in this period of time, BigQuery might reject the row containing it and it will be sent to the failedInserts topic. This topic contains JSON objects ready to be loaded into BigQuery (ie not canonical Snowplow Enriched event format).

In order to load this data again from failedInserts to BigQuery you can use Repeater, which reads a subscription on failedInserts and performs INSERT statements.

Repeater has several important behaviour aspects:

  • If a pulled record is not a valid Snowplow event, it will result into a¬†loader_recovery_error¬†bad row.
  • If a pulled record is a valid event, Repeater will wait some time (15 minutes by default) after the¬†etl_tstamp¬†before attempting to re-insert it, in order to let Mutator do its job.
  • If the database responds with an error, the row will get transformed into a¬†loader_recovery_error¬†bad row.
  • All entities in the dead-letter bucket are valid Snowplow bad rows.

Topics, subscriptions and message formats‚Äč

The Snowplow BigQuery Loader apps use Pub/Sub topics and subscriptions to store intermediate data and communicate with each other.

KindPopulated byConsumed byData format
Input subscriptionEnriched events topicLoader / StreamLoadercanonical TSV + JSON enriched format
Types topicLoader / StreamLoaderTypes subscriptioniglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0
Types subscriptionTypes topicMutatoriglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0
Bad row topicLoader / StreamLoaderGCS Loaderiglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0
iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0
iglu:com.snowplowanalytics.snowplow.badrows/loader_runtime_error/jsonschema/1-0-1
Failed insert topicLoader / StreamLoaderFailed insert subscriptionBigQuery JSON
Failed insert subscriptionFailed insert topicRepeaterBigQuery JSON

Setup guide‚Äč

Configuration file‚Äč

Loader / StreamLoader, Mutator and Repeater accept the same configuration file in HOCON format. An example of a minimal configuration file can look like this:

{
"projectId": "com-acme"

"loader": {
"input": {
"subscription": "enriched-sub"
}

"output": {
"good": {
"datasetId": "snowplow"
"tableId": "events"
}

"bad": {
"topic": "bad-topic"
}

"types": {
"topic": "types-topic"
}

"failedInserts": {
"topic": "failed-inserts-topic"
}
}
}

"mutator": {
"input": {
"subscription": "types-sub"
}

"output": {
"good": ${loader.output.good} # will be automatically inferred
}
}

"repeater": {
"input": {
"subscription": "failed-inserts-sub"
}

"output": {
"good": ${loader.output.good} # will be automatically inferred

"deadLetters": {
"bucket": "gs://dead-letter-bucket"
}
}
}

"monitoring": {} # disabled
}

The loader takes command line arguments --config with a path to the configuration hocon file and --resolver with a path to the Iglu resolver file. If you are running the docker image then you should mount the configuration files into the container:

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.6.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json

Or you can pass the whole config as a base64-encoded string using the --config option, like so:

docker run \
-v /path/to/resolver.json:/resolver.json \
snowplow/snowplow-bigquery-streamloader:1.6.0 \
--config=ewogICJwcm9qZWN0SWQiOiAiY29tLWFjbWUiCgogICJsb2FkZXIiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAiZW5yaWNoZWQtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogewogICAgICAgICJkYXRhc2V0SWQiOiAic25vd3Bsb3ciCiAgICAgICAgInRhYmxlSWQiOiAiZXZlbnRzIgogICAgICB9CgogICAgICAiYmFkIjogewogICAgICAgICJ0b3BpYyI6ICJiYWQtdG9waWMiCiAgICAgIH0KCiAgICAgICJ0eXBlcyI6IHsKICAgICAgICAidG9waWMiOiAidHlwZXMtdG9waWMiCiAgICAgIH0KCiAgICAgICJmYWlsZWRJbnNlcnRzIjogewogICAgICAgICJ0b3BpYyI6ICJmYWlsZWQtaW5zZXJ0cy10b3BpYyIKICAgICAgfQogICAgfQogIH0KCiAgIm11dGF0b3IiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAidHlwZXMtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogJHtsb2FkZXIub3V0cHV0Lmdvb2R9ICMgd2lsbCBiZSBhdXRvbWF0aWNhbGx5IGluZmVycmVkCiAgICB9CiAgfQoKICAicmVwZWF0ZXIiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAiZmFpbGVkLWluc2VydHMtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogJHtsb2FkZXIub3V0cHV0Lmdvb2R9ICMgd2lsbCBiZSBhdXRvbWF0aWNhbGx5IGluZmVycmVkCgogICAgICAiZGVhZExldHRlcnMiOiB7CiAgICAgICAgImJ1Y2tldCI6ICJnczovL2RlYWQtbGV0dGVyLWJ1Y2tldCIKICAgICAgfQogICAgfQogIH0KCiAgIm1vbml0b3JpbmciOiB7fSAjIGRpc2FibGVkCn0= \
--resolver=/resolver.json

The --config command option is actually optional. For some setups it is more convenient to provide configuration parameters using JVM system properties or environment variables, as documented in the Lightbend config readme.

For example, to override the repeater.input.subscription setting using system properties:

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.6.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
-Drepeater.input.subscription="failed-inserts-sub"

Or to use environment variables for every setting:

docker run \
-v /path/to/resolver.json:/resolver.json \
snowplow/snowplow-bigquery-repeater:1.6.0 \
--resolver=/resolver.json \
-Dconfig.override_with_env_vars=true

See the configuration reference for more details and advanced settings.

Command line options‚Äč

All apps accept a config HOCON as specified above, and an Iglu resolver config passed via the --resolver option. The latter must conform to the iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-3 schema.

StreamLoader‚Äč

StreamLoader accepts --config and --resolver arguments, as well as any JVM system properties that can be used to override the configuration.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.6.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
-Dconfig.override_with_env_vars=true

The --config flag is optional, but if missing, all configuration options must be specified in some other way (system properties or environment variables).

The Dataflow Loader‚Äč

The Dataflow Loader accepts the same two arguments as StreamLoader and any other supported by Google Cloud Dataflow.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-loader:1.6.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--labels={"key1":"val1","key2":"val2"} # optional Dataflow args

The optional labels argument is an example of a Dataflow natively supported argument. It accepts a JSON with key-value pairs that will be used as labels to the Cloud Dataflow job.

This can be launched from any machine authenticated to submit Dataflow jobs.

Mutator‚Äč

Mutator has three subcommands: listen, create and add-column.

listen‚Äč

listen is the primary command and is used to automate table migrations.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.6.0 \
listen \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--verbose # optional, for debugging only
add-column‚Äč

add-column can be used once to add a column to the table specified via the loader.output.good setting. This should eliminate the risk of table update lag and the necessity to run a Repeater, but requires 'manual' intervention.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.6.0 \
add-column \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--shred-property=CONTEXTS \
--schema="iglu:com.acme/app_context/jsonschema/1-0-0"

The specified schema must be present in one of the Iglu registries in the resolver configuration.

create‚Äč

create creates an empty table with atomic structure. It can optionally be partitioned by a TIMESTAMP field.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.6.0 \
create \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--partitionColumn=load_tstamp \ # optional TIMESTAMP column by which to partition the table
--requirePartitionFilter # optionally require a filter on the partition column in all queries

See the Google documentation for more information about partitioned tables.

Repeater‚Äč

We recommend constantly running Repeater on a small / cheap node or Docker container.

docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-repeater:1.6.0 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--bufferSize=20 \ # size of the batch to send to the dead-letter bucket
--timeout=20 \ # duration after which bad rows will be sunk into the dead-letter bucket
--backoffPeriod=900 \ # seconds to wait before attempting an insert (calculated against etl_tstamp)
--verbose # optional, for debugging only

bufferSize, timeout and backoffPeriod are optional parameters.

Docker support‚Äč

All applications are available as Docker images on Docker Hub, based on Ubuntu Focal and OpenJDK 11:

$ docker pull snowplow/snowplow-bigquery-streamloader:1.6.0
$ docker pull snowplow/snowplow-bigquery-loader:1.6.0
$ docker pull snowplow/snowplow-bigquery-mutator:1.6.0
$ docker pull snowplow/snowplow-bigquery-repeater:1.6.0

We also provide an alternative lightweight set of images based on Google's "distroless" base image, which may provide some security advantages for carrying fewer dependencies. These images are distinguished with the 1.6.0-distroless tag:

$ docker pull snowplow/snowplow-bigquery-streamloader:1.6.0-distroless
$ docker pull snowplow/snowplow-bigquery-loader:1.6.0-distroless
$ docker pull snowplow/snowplow-bigquery-mutator:1.6.0-distroless
$ docker pull snowplow/snowplow-bigquery-repeater:1.6.0-distroless

Mutator, Repeater and Streamloader are also available as fatjar files attached to releases in the project's Github repository.