Skip to main content

Google Cloud Storage Loader

Overview

Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.

Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.

Setup guide

Running

Cloud Storage Loader comes both as a Docker image and a ZIP archive.

Docker image

Docker image can be found on Docker Hub.

A container can be run as follows:

docker run \
-v $PWD/config:/snowplow/config \ # if running outside GCP
-e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP
snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--runner=DataFlowRunner \
--jobName=[JOB-NAME] \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional

To display the help message:

docker run snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--help

To display documentation about Cloud Storage Loader-specific options:

docker run snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options

ZIP archive

Archive is hosted on GitHub at this URI:

https://github.com/snowplow-incubator/snowplow-google-cloud-storage-loader/releases/download/0.5.6/snowplow-google-cloud-storage-loader-0.5.6.zip

Once unzipped the artifact can be run as follows:

./bin/snowplow-google-cloud-storage-loader \
--runner=DataFlowRunner \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional

To display the help message:

./bin/snowplow-google-cloud-storage-loader --help

To display documentation about Cloud Storage Loader-specific options:

./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options

Configuration

Cloud Storage Loader specific options

  • --inputSubscription=String The Cloud Pub/Sub subscription to read from, formatted like projects/[PROJECT]/subscriptions/[SUB]. Required.
  • --outputDirectory=gs://[BUCKET]/ The Cloud Storage directory to output files to, ending in /. Required.
  • --outputFilenamePrefix=String The prefix for output files. Default: output. Optional.
  • --shardTemplate=String A valid shard template as described here, which will be part of the filenames. Default: -W-P-SSSSS-of-NNNNN. Optional.
  • --outputFilenameSuffix=String The suffix for output files. Default: .txt. Optional.
  • --windowDuration=Int The window duration in minutes. Default: 5. Optional.
  • --compression=String The compression used (gzip, bz2 or none). Note that bz2 can't be loaded into BigQuery. Default: no compression. Optional.
  • --numShards=Int The maximum number of output shards produced when writing. Default: 1. Optional.
  • --dateFormat=YYYY/MM/dd/HH/ A date format string used for partitioning via date in outputDirectory and partitionedOutputDirectory. Default: YYYY/MM/dd/HH/. Optional. For example, the date format YYYY/MM/dd/HH/ would produce a directory structure like this:
    gs://bucket/
    └── 2022
    └── 12
    └── 15
    ├── ...
    ├── 18
    ├── 19
    ├── 20
    └── ...
  • --partitionedOutputDirectory=gs://[BUCKET]/ The Cloud Storage directory to output files to, partitioned by schema, ending with /. Unpartitioned data will be sent to outputDirectory. Optional.

Dataflow options

To run the Cloud Storage Loader on Dataflow, it is also necessary to specify additional configuration options. None of these options have default values, and they are all required.

  • --runner=DataFlowRunner Passing the string DataFlowRunner specifies that we want to run on Dataflow.
  • --jobName=[NAME] Specify a name for your Dataflow job that will be created.
  • --project=[PROJECT] The name of your GCP project.
  • --streaming=true Pass true to notify Dataflow that we're running a streaming application.
  • --workerZone=[ZONE] The zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched.
  • --region=[REGION] The region where the Dataflow job will be launched.
  • --gcpTempLocation=gs://[BUCKET]/ The GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored.

The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.