Skip to main content

Spark transformer configuration reference

The configuration reference in this page is written for Spark Transformer 6.0.0

An example of the minimal required config for the Spark transformer can be found here and a more detailed one here.

Licenseโ€‹

Since version 6.0.0, RDB Loader is released under the Snowplow Limited Use License (FAQ).

To accept the terms of license and run RDB Loader, set the ACCEPT_LIMITED_USE_LICENSE=yes environment variable. Alternatively, you can configure the license.accept option, like this:

license {
accept = true
}
ParameterDescription
inputRequired. S3 URI of the enriched archive. It must be populated separately with run=YYYY-MM-DD-hh-mm-ss directories.
runInterval.*Specifies interval to process.
runInterval.sinceAgeOptional. A duration that specifies the maximum age of folders that should get processed. If sinceAge and sinceTimestamp are both specified, then the latest value of the two determines the earliest folder that will be processed.
runInterval.untilOptional. Process until this timestamp.
monitoring.sentry.dsnOptional. For tracking runtime exceptions.
monitoring.metrics.cloudwatch (since 5.5.0)Optional. For sending metrics to Cloudwatch. If not set, metrics are not sent.
monitoring.metrics.cloudwatch.namespace (since 5.5.0)Namespace that will contain the metrics in Cloudwatch. Example: snowplow/transformer_batch
monitoring.metrics.cloudwatch.transformDuration (since 5.5.0)Name of the metric that contains the number of milliseconds needed to transform a folder. Example: transform_duration
monitoring.metrics.cloudwatch.dimensions (since 5.5.0)Any key-value pairs to be added as dimensions in Cloudwatch metrics. Example:
{"app_version": "x.y.z", "env": "prod"}
deduplication.*Configure the way in-batch deduplication is performed
deduplication.synthetic.typeOptional. The default is BROADCAST. Can be NONE (disable), BROADCAST (default) and JOIN (different low-level implementations).
deduplication.synthetic.cardinalityOptional. The default is 1. Do not deduplicate pairs with less-or-equal cardinality.
deduplication.naturalOptional. The default is 'true'. Enable or disable natural deduplication. Available since 5.1.0
featureFlags.enableMaxRecordsPerFile (since 5.4.0) Optional, default = false. When enabled, output.maxRecordsPerFile configuration parameter is going to be used.
skipSchemas (since 5.7.1) Optional, default = none. Supply a list of Iglu URIs and the transformer's output files will omit any columns using that schema. This feature could be helpful when recovering from edge-case schemas which for some reason cannot be loaded to the table.
output.pathRequired. S3 URI of the transformed output.
output.compressionOptional. One of NONE or GZIP. The default is GZIP.
output.regionAWS region of the S3 bucket. Optional if it can be resolved with AWS region provider chain.
output.maxRecordsPerFile (since 5.4.0) Optional. Default = 10000. Max number of events per parquet partition.
output.bad.type (since 5.4.0) Optional. Either kinesis or file, default value file. Type of bad output sink. When file, badrows are written as files under URI configured in output.path.
output.bad.streamName (since 5.4.0) Required if output type is kinesis. Name of the Kinesis stream to write to.
output.bad.region (since 5.4.0) AWS region of the Kinesis stream. Optional if it can be resolved with AWS region provider chain.
output.bad.recordLimit (since 5.4.0) Optional, default = 500. Limits the number of events in a single PutRecords Kinesis request.
output.bad.byteLimit (since 5.4.0) Optional, default = 5242880. Limits the number of bytes in a single PutRecords Kinesis request.
output.bad.backoffPolicy.minBackoff (since 5.4.0) Optional, default = 100 milliseconds. Minimum backoff before retrying when writing to Kinesis fails with internal errors.
output.bad.backoffPolicy.maxBackoff (since 5.4.0) Optional, default = 10 seconds. Maximum backoff before retrying when writing to Kinesis fails with internal errors.
output.bad.backoffPolicy.maxRetries (since 5.4.0) Optional, default = 10. Maximum number of retries for internal Kinesis errors.
output.bad.throttledBackoffPolicy.minBackoff (since 5.4.0) Optional, default = 100 milliseconds. Minimum backoff before retrying when writing to Kinesis fails in case of throughput exceeded.
output.bad.throttledBackoffPolicy.maxBackoff (since 5.4.0) Optional, default = 10 seconds. Maximum backoff before retrying when writing to Kinesis fails in case of throughput exceeded. Writing is retried forever.
queue.typeRequired. Type of the message queue. Can be either sqs or sns.
queue.queueNameRequired if queue type is sqs. Name of the SQS queue. SQS queue needs to be FIFO.
queue.topicArnRequired if queue type is sns. ARN of the SNS topic.
queue.regionAWS region of the SQS queue or SNS topic. Optional if it can be resolved with AWS region provider chain.
formats.*Schema-specific format settings.
formats.transformationTypeRequired. Type of transformation, either `shred` or `widerow`. See Shredded data and Wide row format.
formats.fileFormatOptional. The default is JSON. Output file format produced when transformation is widerow. Either JSON or PARQUET.
formats.defaultOptional. The default is TSV. Data format produced by default when transformation is shred. Either TSV or JSON. TSV is recommended as it enables table autocreation, but requires an Iglu Server to be available with known schemas (including Snowplow schemas). JSON does not require an Iglu Server, but requires Redshift JSONPaths to be configured and does not support table autocreation.
formats.tsvOptional. List of Iglu URIs, but can be set to empty list [] which is the default. If default is set to JSON this list of schemas will still be shredded into TSV.
formats.jsonOptional. List of Iglu URIs, but can be set to empty list [] which is the default. If default is set to TSV this list of schemas will still be shredded into JSON.
formats.skipOptional. List of Iglu URIs, but can be set to empty list [] which is the default. Schemas for which loading can be skipped.
validations.*Optional. Criteria to validate events against
validations.minimumTimestampThis is currently the only validation criterion. It checks that all timestamps in the event are older than a specific point in time, eg 2021-11-18T11:00:00.00Z.
featureFlags.*Optional. Enable features that are still in beta, or which aim to enable smoother upgrades.
featureFlags.legacyMessageFormatThis currently the only feature flag. Setting this to true allows you to use a new version of the transformer with an older version of the loader.
featureFlags.truncateAtomicFields (since 5.4.0) Optional, default false. When enabled, event's atomic fields are truncated (based on the length limits from the atomic JSON schema) before transformation.
Was this page helpful?