Snowflake Streaming Loader configuration reference
The configuration reference in this page is written for Snowflake Streaming Loader 0.3.0
Snowflake configuration
Parameter | Description |
---|---|
output.good.url | Required, e.g. https://orgname.accountname.snowflakecomputing.com . URI of the Snowflake account. |
output.good.user | Required. Snowflake user who has necessary privileges |
output.good.privateKey | Required. Snowflake private key, used to connect to the account |
output.good.privateKeyPassphrase | Optional. Passphrase for the private key |
output.good.role | Optional. Snowflake role which the Snowflake user should assume |
output.good.database | Required. Name of the Snowflake database containing the events table |
output.good.schema | Required. Name of the Snowflake schema containing the events table |
output.good.table | Optional. Default value events . Name to use for the events table |
output.good.channel | Optional. Default value snowplow . Prefix to use for the snowflake channels. The full name will be suffixed with a number, e.g. snowplow-1 . If you run multiple loaders in parallel, then each loader must be configured with a unique channel prefix. |
Streams configuration
- AWS
- GCP
- Azure
Parameter | Description |
---|---|
input.streamName | Required. Name of the Kinesis stream with the enriched events |
input.appName | Optional, default snowplow-snowflake-loader . Name to use for the dynamodb table, used by the underlying Kinesis Consumer Library for managing leases. |
input.initialPosition | Optional, default LATEST . Allowed values are LATEST , TRIM_HORIZON , AT_TIMESTAMP . When the loader is deployed for the first time, this controls from where in the kinesis stream it should start consuming events. On all subsequent deployments of the loader, the loader will resume from the offsets stored in the DynamoDB table. |
input.initialPosition.timestamp | Required if input.initialPosition is AT_TIMESTAMP . A timestamp in ISO8601 format from where the loader should start consuming events. |
input.retrievalMode | Optional, default Polling. Change to FanOut to enable the enhance fan-out feature of Kinesis. |
input.retrievalMode.maxRecords | Optional. Default value 1000. How many events the Kinesis client may fetch in a single poll. Only used when `input.retrievalMode` is Polling. |
input.workerIdentifier | Optional. Defaults to the HOSTNAME environment variable. The name of this KCL worker used in the dynamodb lease table. |
input.leaseDuration | Optional. Default value 10 seconds . The duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires. |
input.maxLeasesToStealAtOneTimeFactor | Optional. Default value 2.0 . Controls how to pick the max number of shard-leases to steal at one time. E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor is 2.0, then allow the KCL to steal up to 8 leases. Allows bigger instances to more quickly acquire the shard-leases they need to combat latency. |
output.bad.streamName | Required. Name of the Kinesis stream that will receive failed events. |
output.bad.throttledBackoffPolicy.minBackoff | Optional. Default value 100 milliseconds . Initial backoff used to retry sending failed events if we exceed the Kinesis write throughput limits. |
output.bad.throttledBackoffPolicy.maxBackoff | Optional. Default value 1 second . Maximum backoff used to retry sending failed events if we exceed the Kinesis write throughput limits. |
output.bad.recordLimit | Optional. Default value 500. The maximum number of records we are allowed to send to Kinesis in 1 PutRecords request. |
output.bad.byteLimit | Optional. Default value 5242880. The maximum number of bytes we are allowed to send to Kinesis in 1 PutRecords request. |
output.bad.maxRecordSize | Optional. Default value 1000000. Any single event failed event sent to Kinesis should not exceed this size in bytes |
Parameter | Description |
---|---|
input.subscription | Required, e.g. projects/myproject/subscriptions/snowplow-enriched . Name of the Pub/Sub subscription with the enriched events |
input.parallelPullFactor | Optional. Default value 0.5. parallelPullFactor * cpu count will determine the number of threads used internally by the pubsub client library for fetching events |
input.bufferMaxBytes | Optional. Default value 10000000. How many bytes can be buffered by the loader app before blocking the pubsub client library from fetching more events. This is a balance between memory usage vs how efficiently the app can operate. The default value works well. |
input.maxAckExtensionPeriod | Optional. Default value 1 hour. For how long the pubsub client library will continue to re-extend the ack deadline of an unprocessed event. |
input.minDurationPerAckExtension | Optional. Default value 60 seconds. Sets min boundary on the value by which an ack deadline is extended. The actual value used is guided by runtime statistics collected by the pubsub client library. |
input.maxDurationPerAckExtension | Optional. Default value 600 seconds. Sets max boundary on the value by which an ack deadline is extended. The actual value used is guided by runtime statistics collected by the pubsub client library. |
output.bad.topic | Required, e.g. projects/myproject/topics/snowplow-bad . Name of the Pub/Sub topic that will receive failed events. |
output.bad.batchSize | Optional. Default value 1000. Bad events are sent to Pub/Sub in batches not exceeding this count. |
output.bad.requestByteThreshold | Optional. Default value 1000000. Bad events are sent to Pub/Sub in batches with a total size not exceeding this byte threshold |
output.bad.maxRecordSize | Optional. Default value 10000000. Any single failed event sent to Pub/Sub should not exceed this size in bytes |
Parameter | Description |
---|---|
input.topicName | Required. Name of the Kafka topic for the source of enriched events. |
input.bootstrapServers | Required. Hostname and port of Kafka bootstrap servers hosting the source of enriched events. |
input.consumerConf.* | Optional. A map of key/value pairs for any standard Kafka consumer configuration option. |
output.bad.topicName | Required. Name of the Kafka topic that will receive failed events. |
output.bad.bootstrapServers | Required. Hostname and port of Kafka bootstrap servers hosting the bad topic |
output.bad.producerConf.* | Optional. A map of key/value pairs for any standard Kafka producer configuration option. |
output.bad.maxRecordSize | Optional. Default value 1000000. Any single failed event sent to Kafka should not exceed this size in bytes |
Event Hubs Authentication
You can use the input.consumerConf
and output.bad.producerConf
options to configure authentication to Azure event hubs using SASL. For example:
"input.consumerConf": {
"security.protocol": "SASL_SSL"
"sasl.mechanism": "PLAIN"
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\$ConnectionString\" password=<PASSWORD>;"
}
Other configuration options
Parameter | Description |
---|---|
batching.maxBytes | Optional. Default value 16000000 . Events are emitted to Snowflake when the batch reaches this size in bytes |
batching.maxDelay | Optional. Default value 1 second . Events are emitted to Snowflake after a maximum of this duration, even if the maxBytes size has not been reached |
batching.uploadParallelismFactor | Optional. Default value 2.5. Controls how many batches can we send simultaneously over the network to Snowflake. E.g. If there are 4 available processors, and uploadParallelismFactor is 2.5, then the loader sends up to 10 batches in parallel. Adjusting this value can cause the app to use more or less of the available CPU. |
cpuParallelismFactor | Optional. Default value 0.75. Controls how the loaders splits the workload into concurrent batches which can be run in parallel. E.g. If there are 4 available processors, and cpuParallelismFactor is 0.75, then the loader processes 3 batches concurrently. Adjusting this value can cause the app to use more or less of the available CPU. |
retries.setupErrors.delay | Optional. Default value 30 seconds . Configures exponential backoff on errors related to how Snowflake is set up for this loader. Examples include authentication errors and permissions errors. This class of errors are reported periodically to the monitoring webhook. |
retries.transientErrors.delay | Optional. Default value 1 second . Configures exponential backoff on errors that are likely to be transient. Examples include server errors and network errors. |
retries.transientErrors.attempts | Optional. Default value 5. Maximum number of attempts to make before giving up on a transient error. |
skipSchemas | Optional, e.g. ["iglu:com.example/skipped1/jsonschema/1-0-0"] or with wildcards ["iglu:com.example/skipped2/jsonschema/1-*-*"] . A list of schemas that won't be loaded to Snowflake. This feature could be helpful when recovering from edge-case schemas which for some reason cannot be loaded to the table. |
monitoring.metrics.statsd.hostname | Optional. If set, the loader sends statsd metrics over UDP to a server on this host name. |
monitoring.metrics.statsd.port | Optional. Default value 8125. If the statsd server is configured, this UDP port is used for sending metrics. |
monitoring.metrics.statsd.tags.* | Optional. A map of key/value pairs to be sent along with the statsd metric. |
monitoring.metrics.statsd.period | Optional. Default 1 minute . How often to report metrics to statsd. |
monitoring.metrics.statsd.prefix | Optional. Default snowplow.snowflake-loader . Prefix used for the metric name when sending to statsd. |
monitoring.webhook.endpoint | Optional, e.g. https://webhook.example.com . The loader will send to the webhook a payload containing details of any error related to how Snowflake is set up for this loader. |
monitoring.webhook.tags.* | Optional. A map of key/value strings to be included in the payload content sent to the webhook. |
monitoring.webhook.heartbeat.* | Optional. Default value 5.minutes . How often to send a heartbeat event to the webhook when healthy. |
monitoring.sentry.dsn | Optional. Set to a Sentry URI to report unexpected runtime exceptions. |
monitoring.sentry.tags.* | Optional. A map of key/value strings which are passed as tags when reporting exceptions to Sentry. |
telemetry.disable | Optional. Set to true to disable telemetry. |
telemetry.userProvidedId | Optional. See here for more information. |
output.good.jdbcLoginTimeout | Optional. Sets the login timeout on the JDBC driver which connects to Snowflake |
output.good.jdbcNetworkTimeout | Optional. Sets the network timeout on the JDBC driver which connects to Snowflake |
output.good.jdbcQueryTimeout | Optional. Sets the query timeout on the JDBC driver which connects to Snowflake |
http.client.maxConnectionsPerServer | Optional. Default value 4. Configures the internal HTTP client used for alerts and telemetry. The maximum number of open HTTP requests to any single server at any one time. |