Google Cloud Storage Loader
Overview​
Cloud Storage Loader is a Dataflow job which dumps event from an input PubSub subscription into a Cloud Storage bucket.
Cloud Storage loader is built on top of Apache Beam and its Scala wrapper SCIO.
Setup guide​
Running​
Cloud Storage Loader comes both as a Docker image and a ZIP archive.
Docker image​
Docker image can be found on Docker Hub.
A container can be run as follows:
docker run \
-v $PWD/config:/snowplow/config \ # if running outside GCP
-e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/config/credentials.json \ # if running outside GCP
snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--runner=DataFlowRunner \
--jobName=[JOB-NAME] \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
To display the help message:
docker run snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--help
To display documentation about Cloud Storage Loader-specific options:
docker run snowplow/snowplow-google-cloud-storage-loader:0.5.6 \
--help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
ZIP archive​
Archive is hosted on GitHub at this URI:
https://github.com/snowplow-incubator/snowplow-google-cloud-storage-loader/releases/download/0.5.6/snowplow-google-cloud-storage-loader-0.5.6.zip
Once unzipped the artifact can be run as follows:
./bin/snowplow-google-cloud-storage-loader \
--runner=DataFlowRunner \
--project=[PROJECT] \
--streaming=true \
--workerZone=[ZONE] \
--inputSubscription=projects/[PROJECT]/subscriptions/[SUBSCRIPTION] \
--outputDirectory=gs://[BUCKET]/YYYY/MM/dd/HH/ \ # partitions by date
--outputFilenamePrefix=output \ # optional
--shardTemplate=-W-P-SSSSS-of-NNNNN \ # optional
--outputFilenameSuffix=.txt \ # optional
--windowDuration=5 \ # optional, in minutes
--compression=none \ # optional, gzip, bz2 or none
--numShards=1 # optional
To display the help message:
./bin/snowplow-google-cloud-storage-loader --help
To display documentation about Cloud Storage Loader-specific options:
./bin/snowplow-google-cloud-storage-loader --help=com.snowplowanalytics.storage.googlecloudstorage.loader.Options
Configuration​
Cloud Storage Loader specific options​
--inputSubscription=String
The Cloud Pub/Sub subscription to read from, formatted like projects/[PROJECT]/subscriptions/[SUB]. Required.--outputDirectory=gs://[BUCKET]/
The Cloud Storage directory to output files to, ending in /. Required.--outputFilenamePrefix=String
The prefix for output files. Default: output. Optional.--shardTemplate=String
A valid shard template as described here, which will be part of the filenames. Default:-W-P-SSSSS-of-NNNNN
. Optional.--outputFilenameSuffix=String
The suffix for output files. Default: .txt. Optional.--windowDuration=Int
The window duration in minutes. Default: 5. Optional.--compression=String
The compression used (gzip, bz2 or none). Note that bz2 can't be loaded into BigQuery. Default: no compression. Optional.--numShards=Int
The maximum number of output shards produced when writing. Default: 1. Optional.--dateFormat=YYYY/MM/dd/HH/
A date format string used for partitioning via date inoutputDirectory
andpartitionedOutputDirectory
. Default:YYYY/MM/dd/HH/
. Optional. For example, the date formatYYYY/MM/dd/HH/
would produce a directory structure like this:gs://bucket/
└── 2022
└── 12
└── 15
├── ...
├── 18
├── 19
├── 20
└── ...--partitionedOutputDirectory=gs://[BUCKET]/
The Cloud Storage directory to output files to, partitioned by schema, ending with /. Unpartitioned data will be sent tooutputDirectory
. Optional.
Dataflow options​
To run the Cloud Storage Loader on Dataflow, it is also necessary to specify additional configuration options. None of these options have default values, and they are all required.
--runner=DataFlowRunner
Passing the stringDataFlowRunner
specifies that we want to run on Dataflow.--jobName=[NAME]
Specify a name for your Dataflow job that will be created.--project=[PROJECT]
The name of your GCP project.--streaming=true
Passtrue
to notify Dataflow that we're running a streaming application.--workerZone=[ZONE]
The zone where the Dataflow nodes (effectively GCP Compute Engine nodes) will be launched.--region=[REGION]
The region where the Dataflow job will be launched.--gcpTempLocation=gs://[BUCKET]/
The GCS bucket where temporary files necessary to run the job (e.g. JARs) will be stored.
The list of all the options can be found at https://cloud.google.com/dataflow/pipelines/specifying-exec-params#setting-other-cloud-pipeline-options.