Lake Loader configuration reference
The configuration reference in this page is written for Lake Loader 0.6.0
Table configuration
- Delta Lake
- Iceberg / Glue
- Hudi
Parameter | Description |
---|---|
output.good.location | Required, e.g. gs://mybucket/events . URI of the bucket location to which to write Snowplow enriched events in Delta format. The URI should start with the following prefix:
|
output.good.deltaTableProperties.* | Optional. A map of key/value strings corresponding to Delta's table properties. These can be anything from the Delta table properties documentation. The default properties include configuring Delta's data skipping feature for the important Snowplow timestamp columns: load_tstamp , collector_tstamp , derived_tstamp , dvce_created_tstamp . |
Parameter | Description |
---|---|
output.good.type | Required, set this to Iceberg |
output.good.catalog.type | Required, set this to Glue |
output.good.location | Required, e.g. s3a://mybucket/events . URI of the bucket location to which to write Snowplow enriched events in Iceberg format. The URI should start with s3a:// |
output.good.database | Required. Name of the database in the Glue catalog |
output.good.table | Required. The name of the table in the Glue database |
output.good.icebergTableProperties.* | Optional. A map of key/value strings corresponding to Iceberg's table properties. These can be anything from the Iceberg table properties documentation. The default properties include configuring Iceberg's column-level statistics for the important Snowplow timestamp columns: load_tstamp , collector_tstamp , derived_tstamp , dvce_created_tstamp . |
output.good.catalog.options.* | Optional. A map of key/value strings which are passed to the catalog configuration. These can be anything from the Iceberg catalog documentation e.g. "glue.id": "1234567" |
Alternative Docker image
To use the Lake Loader with Hudi support, pull the appropriate alternative image from Docker Hub, e.g.
snowplow/lake-loader-aws:0.6.0-hudi
.Parameter | Description |
---|---|
output.good.location | Required, e.g. gs://mybucket/events . URI of the bucket location to which to write Snowplow enriched events in Hudi format. The URI should start with the following prefix:
|
output.good.hudiWriteOptions.* | Optional. A map of key/value strings corresponding to Hudi's configuration options for writing into a table. The default options configure load_tstamp as the table's partition field. |
output.good.hudiTableProperties.* | Optional. A map of key/value strings corresponding to Hudi's configuration options for creating a table. The default options configure load_tstamp as the table's partition field. |
Streams configuration
- AWS
- GCP
- Azure
Parameter | Description |
---|---|
input.streamName | Required. Name of the Kinesis stream with the enriched events |
input.appName | Optional, default snowplow-lake-loader . Name to use for the dynamodb table, used by the underlying Kinesis Consumer Library for managing leases. |
input.initialPosition | Optional, default LATEST . Allowed values are LATEST , TRIM_HORIZON , AT_TIMESTAMP . When the loader is deployed for the first time, this controls from where in the kinesis stream it should start consuming events. On all subsequent deployments of the loader, the loader will resume from the offsets stored in the DynamoDB table. |
input.initialPosition.timestamp | Required if input.initialPosition is AT_TIMESTAMP . A timestamp in ISO8601 format from where the loader should start consuming events. |
input.retrievalMode | Optional, default Polling. Change to FanOut to enable the enhance fan-out feature of Kinesis. |
input.retrievalMode.maxRecords | Optional. Default value 1000. How many events the Kinesis client may fetch in a single poll. Only used when `input.retrievalMode` is Polling. |
input.workerIdentifier | Optional. Defaults to the HOSTNAME environment variable. The name of this KCL worker used in the dynamodb lease table. |
input.leaseDuration | Optional. Default value 10 seconds . The duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires. |
input.maxLeasesToStealAtOneTimeFactor | Optional. Default value 2.0 . Controls how to pick the max number of shard leases to steal at one time. E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0 , then allow the loader to steal up to 8 leases. Allows bigger instances to more quickly acquire the shard-leases they need to combat latency. |
input.checkpointThrottledBackoffPolicy.minBackoff | Optional. Default value 100 milliseconds . Initial backoff used to retry checkpointing if we exceed the DynamoDB provisioned write limits. |
input.checkpointThrottledBackoffPolicy.maxBackoff | Optional. Default value 1 second . Maximum backoff used to retry checkpointing if we exceed the DynamoDB provisioned write limits. |
output.bad.streamName | Required. Name of the Kinesis stream that will receive failed events. |
output.bad.throttledBackoffPolicy.minBackoff | Optional. Default value 100 milliseconds . Initial backoff used to retry sending failed events if we exceed the Kinesis write throughput limits. |
output.bad.throttledBackoffPolicy.maxBackoff | Optional. Default value 1 second . Maximum backoff used to retry sending failed events if we exceed the Kinesis write throughput limits. |
output.bad.recordLimit | Optional. Default value 500. The maximum number of records we are allowed to send to Kinesis in 1 PutRecords request. |
output.bad.byteLimit | Optional. Default value 5242880. The maximum number of bytes we are allowed to send to Kinesis in 1 PutRecords request. |
Parameter | Description |
---|---|
input.subscription | Required, e.g. projects/myproject/subscriptions/snowplow-enriched . Name of the Pub/Sub subscription with the enriched events |
input.parallelPullCount | Optional. Default value 3. Number of threads used internally by the pubsub client library for fetching events |
input.maxAckExtensionPeriod | Optional. Default value 1 hour . For how long the pubsub client library will continue to re-extend the ack deadline of an unprocessed event. |
input.durationPerAckExtension | Optional. Default value 600 seconds . Pub/Sub ack deadlines are extended for this duration when needed. A sensible value is double the size of the windowing config parameter, but no higher than 10 minutes. |
input.minRemainingAckDeadline | Optional. Default value 0.1 . Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack deadline. For example, if durationPerAckExtension is 600 seconds and minRemainingAckDeadline is 0.1 then the loader will wait until there is 60 seconds left of the remining deadline, before re-extending the message deadline. |
input.maxMessagesPerPull | Optional. Default value 1000 . How many pubsub messages to pull from the server in a single request. |
output.bad.topic | Required, e.g. projects/myproject/topics/snowplow-bad . Name of the Pub/Sub topic that will receive failed events. |
output.bad.batchSize | Optional. Default value 100. Bad events are sent to Pub/Sub in batches not exceeding this count. |
output.bad.requestByteThreshold | Optional. Default value 1000000. Bad events are sent to Pub/Sub in batches with a total size not exceeding this byte threshold |
Parameter | Description |
---|---|
input.topicName | Required. Name of the Kafka topic for the source of enriched events. |
input.bootstrapServers | Required. Hostname and port of Kafka bootstrap servers hosting the source of enriched events. |
input.consumerConf.* | Optional. A map of key/value pairs for any standard Kafka consumer configuration option. |
output.bad.topicName | Required. Name of the Kafka topic that will receive failed events. |
output.bad.bootstrapServers | Required. Hostname and port of Kafka bootstrap servers hosting the bad topic |
output.bad.producerConf.* | Optional. A map of key/value pairs for any standard Kafka producer configuration option. |
Event Hubs Authentication
You can use the input.consumerConf
and output.bad.producerConf
options to configure authentication to Azure event hubs using SASL. For example:
"input.consumerConf": {
"security.protocol": "SASL_SSL"
"sasl.mechanism": "PLAIN"
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\$ConnectionString\" password=<PASSWORD>;"
}
Other configuration options
Parameter | Description |
---|---|
windowing | Optional. Default value 5 minutes . Controls how often the loader writes/commits pending events to the lake. |
exitOnMissingIgluSchema | Optional. Default value true . Whether the loader should crash and exit if it fails to resolve an Iglu Schema. We recommend true because Snowplow enriched events have already passed validation, so a missing schema normally indicates an error that needs addressing. Change to false so events go the failed events stream instead of crashing the loader. |
respectIgluNullability | Optional. Default value true . Whether the output parquet files should declare nested fields as non-nullable according to the Iglu schema. When true , nested fields are nullable only if they are not required fields according to the Iglu schema. When false , all nested fields are defined as nullable in the output table's schemas. Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. |
spark.conf.* | Optional. A map of key/value strings which are passed to the internal spark context. |
spark.taskRetries | Optional. Default value 3. How many times the internal spark context should be retry a task in case of failure |
retries.setupErrors.delay | Optional. Default value 30 seconds . Configures exponential backoff on errors related to how the lake is set up for this loader. Examples include authentication errors and permissions errors. This class of errors are reported periodically to the monitoring webhook. |
retries.transientErrors.delay | Optional. Default value 1 second . Configures exponential backoff on errors that are likely to be transient. Examples include server errors and network errors. |
retries.transientErrors.attempts | Optional. Default value 5. Maximum number of attempts to make before giving up on a transient error. |
monitoring.metrics.statsd.hostname | Optional. If set, the loader sends statsd metrics over UDP to a server on this host name. |
monitoring.metrics.statsd.port | Optional. Default value 8125. If the statsd server is configured, this UDP port is used for sending metrics. |
monitoring.metrics.statsd.tags.* | Optional. A map of key/value pairs to be sent along with the statsd metric. |
monitoring.metrics.statsd.period | Optional. Default 1 minute . How often to report metrics to statsd. |
monitoring.metrics.statsd.prefix | Optional. Default snowplow.lakeloader . Prefix used for the metric name when sending to statsd. |
monitoring.webhook.endpoint | Optional, e.g. https://webhook.example.com . The loader will send to the webhook a payload containing details of any error related to how Snowflake is set up for this loader. |
monitoring.webhook.tags.* | Optional. A map of key/value strings to be included in the payload content sent to the webhook. |
monitoring.webhook.heartbeat.* | Optional. Default value 5.minutes . How often to send a heartbeat event to the webhook when healthy. |
monitoring.sentry.dsn | Optional. Set to a Sentry URI to report unexpected runtime exceptions. |
monitoring.sentry.tags.* | Optional. A map of key/value strings which are passed as tags when reporting exceptions to Sentry. |
telemetry.disable | Optional. Set to true to disable telemetry. |
telemetry.userProvidedId | Optional. See here for more information. |
inMemBatchBytes | Optional. Default value 50000000. Controls how many events are buffered in memory before saving the batch to local disk. The default value works well for reasonably sized VMs. For smaller VMs (e.g. less than 2 cpu core, 8 GG memory) consider decreasing this value. |
cpuParallelismFactor | Optional. Default value 0.75. Controls how the app splits the workload into concurrent batches which can be run in parallel. E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently. The default value works well for most workloads. |
numEagerWindows | Optional. Default value 1. Controls how eagerly the loader starts processing the next timed window even when the previous timed window is still finalizing (committing into the lake). By default, we start processing a timed windows if the previous 1 window is still finalizing, but we do not start processing a timed window if any more older windows are still finalizing. The default value works well for most workloads. |
http.client.maxConnectionsPerServer | Optional. Default value 4. Configures the internal HTTP client used for Iglu resolver, alerts and telemetry. The maximum number of open HTTP requests to any single server at any one time. For Iglu Server in particular, this avoids overwhelming the server with multiple concurrent requests. |