Skip to main content

Google Cloud Storage Loader

Overview​

Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.

Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.

Setup guide​

Running​

Cloud Storage Loader comes both as a Docker image and a ZIP archive.

Docker image​

Docker image can be found on Docker Hub.

A container can be run as follows:

docker run \
-v $PWD/config:/snowplow/config \ # if running outside GCP
-e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP
snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--runner=DataFlowRunner \
--jobName=[JOB-NAME] \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional

To display the help message:

docker run snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--help

To display documentation about Cloud Storage Loader-specific options:

docker run snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options

ZIP archive​

Archive is hosted on GitHub at this URI:

https://github.com/snowplow-incubator/snowplow-google-cloud-storage-loader/releases/download/0.5.6/snowplow-google-cloud-storage-loader-0.5.6.zip

Once unzipped the artifact can be run as follows:

./bin/snowplow-google-cloud-storage-loader \
--runner=DataFlowRunner \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional

To display the help message:

./bin/snowplow-google-cloud-storage-loader --help

To display documentation about Cloud Storage Loader-specific options:

./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options

Configuration​

Cloud Storage Loader specific options​

  • --inputSubscription=String The Cloud Pub/Sub subscription to read from, formatted like projects/[PROJECT]/subscriptions/[SUB]. Required.
  • --outputDirectory=gs://[BUCKET]/ The Cloud Storage directory to output files to, ending in /. Required.
  • --outputFilenamePrefix=String The prefix for output files. Default: output. Optional.
  • --shardTemplate=String A valid shard template as described here, which will be part of the filenames. Default: -W-P-SSSSS-of-NNNNN. Optional.
  • --outputFilenameSuffix=String The suffix for output files. Default: .txt. Optional.
  • --windowDuration=Int The window duration in minutes. Default: 5. Optional.
  • --compression=String The compression used (gzip, bz2 or none). Note that bz2 can't be loaded into BigQuery. Default: no compression. Optional.
  • --numShards=Int The maximum number of output shards produced when writing. Default: 1. Optional.
  • --dateFormat=YYYY/MM/dd/HH/ A date format string used for partitioning via date in outputDirectory and partitionedOutputDirectory. Default: YYYY/MM/dd/HH/. Optional. For example, the date format YYYY/MM/dd/HH/ would produce a directory structure like this:
    gs://bucket/
    └── 2022
    └── 12
    └── 15
    ├── ...
    ├── 18
    ├── 19
    ├── 20
    └── ...
  • --partitionedOutputDirectory=gs://[BUCKET]/ The Cloud Storage directory to output files to, partitioned by schema, ending with /. Unpartitioned data will be sent to outputDirectory. Optional.

Dataflow options​

To run the Cloud Storage Loader on Dataflow, it is also necessary to specify additional configuration options. None of these options have default values, and they are all required.

  • --runner=DataFlowRunner Passing the string DataFlowRunner specifies that we want to run on Dataflow.
  • --jobName=[NAME] Specify a name for your Dataflow job that will be created.
  • --project=[PROJECT] The name of your GCP project.
  • --streaming=true Pass true to notify Dataflow that we're running a streaming application.
  • --workerZone=[ZONE] The zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched.
  • --region=[REGION] The region where the Dataflow job will be launched.
  • --gcpTempLocation=gs://[BUCKET]/ The GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored.

The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.