Skip to main content

Configuration

License

Enrich is released under the Snowplow Limited Use License (FAQ).

To accept the terms of license and run Enrich, set the ACCEPT_LIMITED_USE_LICENSE=yes environment variable. Alternatively, you can configure the license.accept option, like this:

license {
accept = true
}

Common parameters

parameterdescription
concurrency.enrichOptional. Default: 256. Number of events that can get enriched at the same time within a chunk (events are processed by chunks in the app).
concurrency.sinkOptional. Default for enrich-pubsub: 3. Default for enrich-kinesis: 1. Number of chunks that can get sunk at the same time. WARNING for enrich-kinesis: if greater than 1, records can get checkpointed before they are sunk.
assetsUpdatePeriodOptional. E.g. 7 days. Period after which enrich assets (e.g. the maxmind database for the IpLookups enrichment) should be checked for udpates. Assets will never be updated if this key is missing.
monitoring.sentry.dsnOptional. E.g. http://sentry.acme.com. To track uncaught runtime exceptions in Sentry.
monitoring.metrics.statsd.hostnameOptional. E.g. localhost. Hostname of the StatsD server to send enrichment metrics (latency and event counts) to.
monitoring.metrics.statsd.portOptional. E.g. 8125. Port of the StatsD server.
monitoring.metrics.statsd.periodOptional. E.g. 10 seconds. How frequently to send metrics to StatsD server.
monitoring.metrics.statsd.tagsOptional. E.g. { "env": "prod" }. Key-value pairs attached to each metric sent to StatsD to provide contextual information.
monitoring.metrics.statsd.prefixOptional. Default: snowplow.enrich. Pefix of StatsD metric names.
monitoring.metrics.stdout.periodOptional. E.g. 10 seconds. If set, metrics will be printed in the logs with this frequency.
monitoring.metrics.stdout.prefixOptional. Default: snowplow.enrich. Prefix for the metrics appearing in the logs.
telemetry.disableOptional. Set to true to disable telemetry.
telemetry.userProvidedIdOptional. See here for more information.
featureFlags.acceptInvalidOptional. Default: false. Enrich 3.0.0 introduces the validation of the enriched events against atomic schema before emitting. If set to false, a failed event will be emitted instead of the enriched event if validation fails. If set to true, invalid enriched events will be emitted, as before.
featureFlags.legacyEnrichmentOrderOptional. Default: false. In early versions of enrich-kinesis and enrich-pubsub (>= 3.1.5), the Javascript enrichment incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true to keep the erroneous behavior of those previous versions.
validation.atomicFieldsLimits (since 4.0.0)Optional. For the defaults, see here. Configuration for custom maximum atomic fields (strings) length. It's a map-like structure with keys being atomic field names and values being their max allowed length.

Instead of a message queue, it's also possible to read collector payloads from files on disk. This can be used for instance for testing purposes. In this case the configuration needs to be as below.

parameterdescription
input.typeRequired. Must be FileSystem.
input.dirRequired. E.g. /input/collectorPayloads/. Directory containing collector payloads encoded with Thrift.

Likewise, it's possible to write enriched events, pii events and failed events to files instead of PubSub or Kinesis.

To write enriched events to files:

parameterdescription
output.good.typeRequired. Must be FileSystem.
output.good.fileRequired. E.g. /output/enriched. File where enriched events will be written.
output.good.maxBytesOptional. E.g. 1048576. Maximum size of a file in bytes. Triggers file rotation.

To write failed events to files:

parameterdescription
output.bad.typeRequired. Must be FileSystem.
output.bad.fileRequired. E.g. /output/bad. File where failed events will be written.
output.bad.maxBytesOptional. E.g. 1048576. Maximum size of a file in bytes. Triggers file rotation.

To write pii events to files:

parameterdescription
output.pii.typeRequired. Must be FileSystem.
output.pii.fileRequired. E.g. /output/pii. File where pii events will be written.
output.pii.maxBytesOptional. E.g. 1048576. Maximum size of a file in bytes. Triggers file rotation.

enrich-pubsub

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.subscriptionRequired. E.g. projects/example-project/subscriptions/collectorPayloads. PubSub subscription identifier for the collector payloads.
input.parallelPullCountOptional. Default: 1. Number of threads used internally by permutive library to handle incoming messages. These threads do very little "work" apart from writing the message to a concurrent queue.
input.maxQueueSizeOptional. Default: 3000. Configures the "max outstanding element count" of PubSub. This is the principal way we control concurrency in the app; it puts an upper bound on the number of events in memory at once. An event counts towards this limit starting from when it received by the permutive library, until we ack it (after publishing to output). The value must be large enough that it does not cause the sink to block whilst it is waiting for a batch to be completed. The first of maxQueueSize and maxRequestBytes being reached will pause the consumption.
input.maxRequestBytesOptional. Default: 50000000 (50MB). Configures the "maximum outstanding request bytes" of PubSub subscriber. It puts an upper bound on the events' bytes that can be hold in memory at once before getting acked. The value must be large enough to not cause the sink to block whilst it is waiting for a batch to be completed. The first of maxQueueSize and maxRequestBytes being reached will pause the consumption.
input.maxAckExtensionPeriodOptional. Default: 1 hour. Maximum period a message ack deadline can be extended. A zero duration disables auto deadline extension.
output.good.topicRequired. E.g. projects/example-project/topics/enriched. Name of the PubSub topic that will receive the enriched events.
output.good.attributesOptional. Enriched event fields to add as PubSub message attributes. For example, if this is [ "app_id" ] then the enriched event's app_id field will be an attribute of the PubSub message, as well as being a field within the enriched event.
output.good.delayThresholdOptional. Default: 200 milliseconds. Delay threshold to use for batching. After this amount of time has elapsed, before maxBatchSize and maxBatchBytes have been reached, messages from the buffer will be sent.
output.good.maxBatchSizeOptional. Default: 1000 (PubSub maximum). Maximum number of messages sent within a batch. When the buffer reaches this number of messages they are sent.
output.good.maxBatchBytesOptional. Default: 8000000 (PubSub maximum is 10MB). Maximum number of bytes sent within a batch. When the buffer reaches this size messages are sent.
output.incomplete.topicRequired. E.g. projects/example-project/topics/incomplete. Name of the PubSub topic that will receive the failed events (same format as the enriched events).
output.incomplete.delayThresholdSame as output.good.delayThreshold for failed events.
output.incomplete.maxBatchSizeSame as output.good.maxBatchSize for failed events.
output.incomplete.maxBatchBytesSame as output.good.maxBatchBytes for failed events.
output.bad.topicRequired. E.g. projects/example-project/topics/bad. Name of the PubSub topic that will receive the failed events in the "bad row" format (JSON).
output.bad.delayThresholdSame as output.good.delayThreshold for failed events in the "bad row" format (JSON).
output.bad.maxBatchSizeSame as output.good.maxBatchSize for failed events in the "bad row" format (JSON).
output.bad.maxBatchBytesSame as output.good.maxBatchBytes for failed events in the "bad row" format (JSON).
output.pii.topicOptional. Example: projects/test-project/topics/pii. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output topic for writing a pii_transformation event.
output.pii.attributesSame as output.good.attributes for pii events.
output.pii.delayThresholdSame as output.good.delayThreshold for pii events.
output.pii.maxBatchSizeSame as output.good.maxBatchSize for pii events.
output.pii.maxBatchBytesSame as output.good.maxBatchBytes for pii events.

enrich-kinesis

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.appNameOptional. Default: snowplow-enrich-kinesis. Name of the application which the KCL daemon should assume. A DynamoDB table with this name will be created.
input.streamNameRequired. E.g. raw. Name of the Kinesis stream with the collector payloads to read from.
input.regionOptional. E.g. eu-central-1. Region where the Kinesis stream is located. This field is optional if it can be resolved with AWS region provider chain. It checks places like env variables, system properties, AWS profile file.
input.initialPosition.typeOptional. Default: TRIM_HORIZON. Set the initial position to consume the Kinesis stream. Possible values: LATEST (most recent data), TRIM_HORIZON (oldest available data), AT_TIMESTAMP (start from the record at or after the specified timestamp).
input.initialPosition.timestampRequired for AT_TIMESTAMP. E.g. 2020-07-17T10:00:00Z.
input.retrievalMode.typeOptional. Default: Polling. Set the mode for retrieving records. Possible values: Polling or FanOut.
input.retrievalMode.maxRecordsRequired for Polling. Default: 10000. Maximum size of a batch returned by a call to getRecords. Records are checkpointed after a batch has been fully processed, thus the smaller maxRecords, the more often records can be checkpointed into DynamoDb, but possibly reducing the throughput.
input.bufferSizeOptional. Default: 3. Size of the internal buffer used when reading messages from Kinesis, each buffer holding up to maxRecords from above.
input.customEndpointOptional. E.g. http://localhost:4566. Endpoint url configuration to override aws kinesis endpoints. Can be used to specify local endpoint when using localstack.
input.dynamodbCustomEndpointOptional. E.g. http://localhost:4566. Endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table. Can be used to specify local endpoint when using localstack.
input.cloudwatchCustomEndpointOptional. E.g. http://localhost:4566. Endpoint url configuration to override aws cloudwatch endpoint for metrics. Can be used to specify local endpoint when using localstack.
output.good.streamNameRequired. E.g. enriched. Name of the Kinesis stream to write to the enriched events.
output.good.regionSame as input.region for enriched events stream.
output.good.partitionKeyOptional. How the output stream will be partitioned in Kinesis. Events with the same partition key value will go to the same shard. Possible values: event_id, event_fingerprint, domain_userid, network_userid, user_ipaddress, domain_sessionid, user_fingerprint. If not specified, the partition key will be a random UUID.
output.good.backoffPolicy.minBackoffOptional. Default: 100 milliseconds. Minimum backoff before retrying when writing fails with internal errors.
output.good.backoffPolicy.maxBackoffOptional. Default: 10 seconds. Maximum backoff before retrying when writing fails with internal errors.
output.good.backoffPolicy.maxRetriesOptional. Default: 10. Maximum number of retries for internal errors.
output.good.throttledBackoffPolicy.minBackoff (since 3.4.1)Optional. Default: 100 milliseconds. Minimum backoff before retrying when writing fails in case of throughput exceeded.
output.good.throttledBackoffPolicy.maxBackoff (since 3.4.1)Optional. Default: 1 second. Maximum backoff before retrying when writing fails in case of throughput exceeded. Writing is retried forever.
output.good.recordLimitOptional. Default: 500 (maximum allowed). Limits the number of events in a single PutRecords request. Several requests are made in parallel.
output.good.customEndpointOptional. E.g. http://localhost:4566. To use a custom Kinesis endpoint.
output.incomplete.streamNameRequired. E.g. incomplete. Name of the Kinesis stream that will receive the failed events (same format as the enriched events).
output.incomplete.regionSame as output.good.region for failed events.
output.incomplete.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for failed events.
output.incomplete.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for failed events.
output.incomplete.backoffPolicy.maxRetriesSame as output.good.backoffPolicy.maxRetries for failed events.
output.incomplete.throttledBackoffPolicy.minBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.minBackoff for failed events.
output.incomplete.throttledBackoffPolicy.maxBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.maxBackoff for failed events.
output.incomplete.recordLimitSame as output.good.recordLimit for failed events.
output.incomplete.customEndpointSame as output.good.customEndpoint for failed events.
output.bad.streamNameRequired. E.g. bad. Name of the Kinesis stream that will receive the failed events in the "bad row" format (JSON).
output.bad.regionSame as output.good.region for failed events in the "bad row" format (JSON).
output.bad.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for failed events in the "bad row" format (JSON).
output.bad.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for failed events in the "bad row" format (JSON).
output.bad.backoffPolicy.maxRetriesSame as output.good.backoffPolicy.maxRetries for failed events in the "bad row" format (JSON).
output.bad.throttledBackoffPolicy.minBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.minBackoff for failed events in the "bad row" format (JSON).
output.bad.throttledBackoffPolicy.maxBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.maxBackoff for failed events in the "bad row" format (JSON).
output.bad.recordLimitSame as output.good.recordLimit for failed events in the "bad row" format (JSON).
output.bad.customEndpointSame as output.good.customEndpoint for failed events in the "bad row" format (JSON).
output.pii.streamNameOptional. E.g. pii. Should be used in conjunction with the PII pseudonymization enrichment. When configured, enables an extra output stream for writing a pii_transformation event.
output.pii.regionSame as output.good.region for pii events.
output.pii.partitionKeySame as output.good.partitionKey for pii events.
output.pii.backoffPolicy.minBackoffSame as output.good.backoffPolicy.minBackoff for pii events.
output.pii.backoffPolicy.maxBackoffSame as output.good.backoffPolicy.maxBackoff for pii events.
output.pii.backoffPolicy.maxRetriesSame as output.good.backoffPolicy.maxRetries for pii events.
output.pii.throttledBackoffPolicy.minBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.minBackoff for pii events.
output.pii.throttledBackoffPolicy.maxBackoff (since 3.4.1)Same as output.good.throttledBackoffPolicy.maxBackoff for pii events.
output.pii.recordLimitSame as output.good.recordLimit for pii events.
output.pii.customEndpointSame as output.good.customEndpoint for pii events.

enrich-kafka

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.topicNameRequired. Name of the Kafka topic to read collector payloads from.
input.bootstrapServersRequired. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
input.consumerConfOptional. Kafka consumer configuration. See the docs for all properties.
output.good.topicNameRequired. Name of the Kafka topic to write to
output.good.bootstrapServersRequired. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.good.producerConfOptional. Kafka producer configuration. See the docs for all properties
output.good.partitionKeyOptional. Enriched event field to use as Kafka partition key
output.good.headersOptional. Enriched event fields to add as Kafka record headers
output.incomplete.topicNameOptional. Name of the Kafka topic that will receive the failed events (same format as the enriched events)
output.incomplete.bootstrapServersOptional. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.incomplete.producerConfOptional. Kafka producer configuration. See the docs for all properties
output.bad.topicNameOptional. Name of the Kafka topic that will receive the failed events in the “bad row” format (JSON
output.bad.bootstrapServersOptional. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.bad.producerConfOptional. Kafka producer configuration. See the docs for all properties
output.pii.topicNameOptional. Name of the Kafka topic to write to
output.pii.bootstrapServersOptional. A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
output.pii.producerConfOptional. Kafka producer configuration. See the docs for all properties
output.pii.partitionKeyOptional. Enriched event field to use as Kafka partition key
output.pii.headersOptional. Enriched event fields to add as Kafka record headers
blobStorage.s3 (since 4.0.0)Optional. Set to true if S3 client should be initialized to download enrichments assets.
blobStorage.gcs (since 4.0.0)Optional. Set to true if GCS client should be initialized to download enrichments assets.
blobStorage.azureStorage (since 4.0.0)Optional. Azure Blob Storage client configuration. ABS client won't be enabled if it isn't given.
blobStorage.azureStorage.accounts (since 4.0.0)Array of accounts to download from Azure Blob Storage.

Example values for the Azure storage accounts :

  • { "name": "storageAccount1"}: public account with no auth
  • { "name": "storageAccount2", "auth": { "type": "default"} }: private account using default auth chain
  • { "name": "storageAccount3", "auth": { "type": "sas", "value": "tokenValue"}}: private account using SAS token auth

enrich-nsq

A minimal configuration file can be found on the Github repo, as well as a comprehensive one.

parameterdescription
input.topicRequired. Name of the NSQ topic with the collector payloads.
input.lookupHostRequired. The host name of NSQ lookup application.
input.lookupPortRequired. The port number of NSQ lookup application.
input.channelOptional. Default: collector-payloads-channel. Name of the NSQ channel used to retrieve collector payloads.
output.good.topicRequired. Name of the NSQ topic that will receive the enriched events.
output.good.nsqdHostRequired. The host name of nsqd application.
output.good.nsqdPortRequired. The port number of nsqd application.
output.incomplete.topicRequired. Name of the NSQ topic that will receive the failed events (same format as the enriched events).
output.incomplete.nsqdHostRequired. The host name of nsqd application.
output.incomplete.nsqdPortRequired. The port number of nsqd application.
output.bad.topicRequired. Name of the NSQ topic that will receive the failed events in the "bad row" format (JSON).
output.bad.nsqdHostRequired. The host name of nsqd application.
output.bad.nsqdPortRequired. The port number of nsqd application.
output.pii.topicOptional. Name of the NSQ topic that will receive the pii events.
output.pii.nsqdHostOptional. The host name of nsqd application.
output.pii.nsqdPortOptional. The port number of nsqd application.

Enriched events validation against atomic schema

Enriched events are expected to match atomic schema. However, until 3.0.0, it was never checked that the enriched events emitted by enrich were valid. If an event is not valid against atomic schema, a failed event should be emitted instead of the enriched event. However, this is a breaking change, and we want to give some time to users to adapt, in case today they are working downstream with enriched events that are not valid against atomic. For this reason, this new validation was added as a feature that can be deactivated like that:

"featureFlags": {
"acceptInvalid": true
}

In this case, enriched events that are not valid against atomic schema will still be emitted as before, so that Enrich 3.0.0 can be fully backward compatible. It will be possible to know if the new validation would have had an impact by 2 ways:

  1. A new metric invalid_enriched has been introduced. It reports the number of enriched events that were not valid against atomic schema. As the other metrics, it can be seen on stdout and/or StatsD.
  2. Each time there is an enriched event invalid against atomic schema, a line will be logged with the failed event (add -Dorg.slf4j.simpleLogger.log.InvalidEnriched=debug to the JAVA_OPTS to see it).

If acceptInvalid is set to false, a failed event will be emitted instead of the enriched event in case it's not valid against atomic schema.

When we'll know that all our customers don't have any invalid enriched events any more, we'll remove the feature flags and it will be impossible to emit invalid enriched events.

Since 4.0.0, it is possible to configure the lengths of the atomic fields, below is an example:

{
...
# Optional. Configuration section for various validation-oriented settings.
"validation": {
# Optional. Configuration for custom maximum atomic fields (strings) length.
# Map-like structure with keys being field names and values being their max allowed length
"atomicFieldsLimits": {
"app_id": 5
"mkt_clickid": 100000
# ...and any other 'atomic' field with custom limit
}
}
}

Enrichments

The list of the enrichments that can be configured can be found on this page.