Skip to main content

Elasticsearch Loader

If you are using Enrich to write enriched Snowplow events to one stream and bad events to another, you can use the Elasticsearch Loader to read events from a Kinesis stream and write them to Elasticsearch or OpenSearch.

note

We only offer this loader on AWS or as part of Snowplow Mini.

What the data looks like

There are a few changes compared to the standard structure of Snowplow data.

Boolean fields reformatted

All boolean fields like br_features_java are normally either "0" or "1". In Elasticsearch, these values are converted to false and true.

New geo_location field

The geo_latitude and geo_longitude fields are combined into a single geo_location field of Elasticsearch's "geo_point" type.

Self-describing events

Each self-describing event gets its own field (same naming rules as for Snowflake). For example:

json
{
"unstruct_com_snowplowanalytics_snowplow_link_click_1": {
"targetUrl": "http://snowplow.io",
"elementId": "action",
"elementClasses": [],
"elementTarget": ""
}
}

Entities

Each entity type attached to the event gets its own field (same naming rules as for Snowflake). The field contains an array with the data for all entities of the given type. For example:

json
{
"contexts_com_acme_user_1": [
{
"name": "Alice"
}
],
"contexts_com_acme_product_1": [
{
"name": "Apple"
},
{
"name": "Orange"
}
]
}

Setup guide

Configuring Elasticsearch

Getting started

First, install and set up Elasticsearch. For more information, see the installation guide for installation information and Supported versions of OpenSearch and Elasticsearch for the latest information of ElasticSearch/OpenSearch supported versions by AWS.

Supported versions

The Elasticsearch Loader supports Elasticsearch 6.x, 7.x, 8.x, and 9.x. It also supports OpenSearch 1.x, 2.x, and 3.x.

Raising the file limit

Elasticsearch keeps a lot of files open simultaneously, so you will need to increase the maximum number of files a user can have open. To do this:

bash
sudo vim /etc/security/limits.conf

Append the following lines to the file:

bash
{{USERNAME}} soft nofile 32000
{{USERNAME}} hard nofile 32000

Where {{USERNAME}} is the name of the user running Elasticsearch. You will need to logout and restart Elasticsearch before the new file limit takes effect.

To check that this new limit has taken effect you can run the following command from the terminal:

bash
curl localhost:9200/_nodes/process?pretty

If the max_file_descriptors equals 32000 it is running with the new limit.

Defining the mapping

Use the following request to create the mapping with Elasticsearch 7+:

bash
curl -XPUT 'http://localhost:9200/snowplow' -d '{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
}
},
"mappings": {
"properties": {
"geo_location": {
"type": "geo_point"
}
}
}
}'

Note that Elasticsearch 7+ no longer uses mapping types. If you have an older version, you might need to include mapping types in the above snippet.

This initialization sets the default analyzer to "keyword". This means that string fields will not be split into separate tokens for the purposes of searching. This saves space and ensures that URL fields are handled correctly.

If you want to tokenize specific string fields, you can change the "properties" field in the mapping like this:

bash
curl -XPUT 'http://localhost:9200/snowplow' -d '{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
}
},
"mappings": {
"properties": {
"geo_location": {
"type": "geo_point"
},
"field_to_tokenize": {
"type": "string",
"analyzer": "english"
}
}
}
}'

Installing the Elasticsearch Loader

The Elasticsearch Loader is published on Docker Hub:

bash
docker pull snowplow/elasticsearch-loader:3.0.1

The container can be run with the following command:

bash
docker run \
-v /path/to/config.hocon:/snowplow/config.hocon \
snowplow/elasticsearch-loader:3.0.1 \
--config /snowplow/config.hocon

Configure the Elasticsearch Loader

The loader is configured using a HOCON file. You can find a minimal example and a full reference example in the config directory.

License acceptance

The loader requires explicit acceptance of the Snowplow Limited Use License Agreement:

hocon
"license": {
"accept": true
}

Input

The loader reads from a Kinesis stream. The following fields configure the input:

NameDescription
input.streamNameRequired. Name of the Kinesis stream to read from.
input.appNameOptional. Name used for the KCL DynamoDB lease table. Default: "snowplow-elasticsearch-loader".
input.initialPosition.typeOptional. Where to start reading the first time the app runs. Options: "TRIM_HORIZON" (oldest), "LATEST" (newest), or "AT_TIMESTAMP". Default: "TRIM_HORIZON". On subsequent runs, the app always resumes from the last checkpoint.
input.initialPosition.timestampRequired when initialPosition.type is "AT_TIMESTAMP". Timestamp to start reading from, e.g. "2023-01-01T00:00:00Z".
input.retrievalMode.typeOptional. How the KCL fetches events. Options: "Polling" or "FanOut" (Kinesis Enhanced Fan-Out). Default: "Polling".
input.retrievalMode.maxRecordsOptional. Used when retrievalMode.type is "Polling". Maximum number of events per poll request. Default: 750.
input.retrievalMode.idleTimeBetweenReadsOptional. Used when retrievalMode.type is "Polling". Idle time between GetRecords requests. Default: "1500 millis".
input.workerIdentifierOptional. Name of this KCL worker in the DynamoDB lease table. Default: ${HOSTNAME}.
input.leaseDurationOptional. Duration of shard leases. Workers must refresh leases in DynamoDB before this expires. Default: "10 seconds".
input.maxLeasesToStealAtOneTimeFactorOptional. Controls how many leases can be stolen at once, as a multiple of available processors. Default: 2.0.
input.checkpointThrottledBackoffPolicy.minBackoffOptional. Minimum backoff when DynamoDB provisioned throughput limits are hit. Default: "100 millis".
input.checkpointThrottledBackoffPolicy.maxBackoffOptional. Maximum backoff when DynamoDB provisioned throughput limits are hit. Default: "1 second".
input.debounceCheckpointsOptional. How often to checkpoint progress to DynamoDB. Default: "10 seconds".
input.maxRetriesOptional. Maximum number of retries for Kinesis API operations. Default: 10.
input.apiCallAttemptTimeoutOptional. Maximum time for a single AWS API call attempt. Default: "15 seconds".

Output: good events

Good events are written to Elasticsearch. The following fields configure the Elasticsearch output:

NameDescription
output.good.urlRequired. URL of the Elasticsearch cluster, including scheme and port. Example: "http://localhost:9200".
output.good.indexRequired. Name of the Elasticsearch index to write events into.
output.good.auth.typeRequired. Authentication method. Options: "NoAuth", "Basic", or "AWSSigning".
output.good.auth.usernameRequired when auth.type is "Basic". HTTP Basic Auth username.
output.good.auth.passwordRequired when auth.type is "Basic". HTTP Basic Auth password.
output.good.auth.regionRequired when auth.type is "AWSSigning". AWS region of the OpenSearch Service domain.
output.good.auth.serviceSigningNameRequired when auth.type is "AWSSigning". AWS service name for SigV4 signing. Use "es" for OpenSearch Service or "aoss" for OpenSearch Serverless.
output.good.documentTypeOptional. Elasticsearch document type. Only required for Elasticsearch 6.x compatibility.
output.good.sharding.dateFormatOptional. Date format to append to the index name for time-partitioned indices, e.g. "yyyy-MM-dd".
output.good.sharding.dateFieldRequired when sharding is set. Timestamp field to extract the date from for index sharding. Must be one of: collector_tstamp, derived_tstamp, dvce_created_tstamp, dvce_sent_tstamp, etl_tstamp, refr_dvce_tstamp, true_tstamp.
output.good.indexTimeoutOptional. Timeout passed to Elasticsearch for each bulk request. Default: "1 minute".
output.good.additionalBadRowErrorTypesOptional. Additional Elasticsearch error types to treat as permanent bad rows instead of retrying. By default, mapper_parsing_exception and document_parsing_exception are treated as bad rows.

Output: bad rows

Events that cannot be written to Elasticsearch are sent to a Kinesis stream as bad rows:

NameDescription
output.bad.streamNameRequired. Name of the Kinesis stream for bad rows.
output.bad.maxRecordSizeOptional. Maximum record size in bytes. Records exceeding this are replaced with a SizeViolation bad row. Default: 1000000.
output.bad.throttledBackoffPolicy.minBackoffOptional. Minimum backoff when Kinesis write throughput limits are exceeded. Default: "100 milliseconds".
output.bad.throttledBackoffPolicy.maxBackoffOptional. Maximum backoff when Kinesis write throughput limits are exceeded. Default: "1 second".
output.bad.recordLimitOptional. Maximum number of records per PutRecords request. Default: 500.
output.bad.byteLimitOptional. Maximum number of bytes per PutRecords request. Default: 5242880.
output.bad.maxRetriesOptional. Maximum number of retries for Kinesis write operations. Default: 10.

Purpose

NameDescription
purposeRequired. Type of events to process. Options: "ENRICHED_EVENTS" for Snowplow enriched events, "BAD_ROWS" for Snowplow bad rows, or "JSON" for arbitrary JSON.

Batching

The loader accumulates events into batches before sending them to Elasticsearch. A batch is flushed when the first condition is met:

NameDescription
batching.maxBytesOptional. Flush after this many source bytes have been accumulated. Default: 10000000.
batching.maxDelayOptional. Flush after this delay has elapsed. Default: "1 second".

Retries

NameDescription
retries.transientErrors.delayOptional. Delay between retry attempts for transient Elasticsearch errors. Default: "1 second".
retries.transientErrors.attemptsOptional. Maximum number of retry attempts before treating the batch as failed. Default: 5.

Decompression

The loader automatically detects and decompresses zstd- or gzip-compressed Kinesis messages. Uncompressed messages are unaffected.

NameDescription
decompression.maxBytesInBatchOptional. Maximum total decompressed bytes per batch. Protects memory when a single compressed message expands into many large records. Default: 5242880.
decompression.maxBytesSinglePayloadOptional. Maximum size of a single decompressed record in bytes. Records exceeding this limit are dropped and emitted as bad rows. Default: 10000000.

Parallelism

NameDescription
cpuParallelismFactorOptional. Controls how the app splits the workload into concurrent batches for parsing and transformation, as a multiple of available processors. Default: 1.
uploadParallelismFactorOptional. Controls how many Elasticsearch bulk upload jobs can run in parallel, as a multiple of available processors. Default: 4.

Monitoring

The loader exposes runtime metrics and health information through several optional integrations:

NameDescription
monitoring.metrics.statsd.hostnameOptional. Hostname of the StatsD server to send metrics to.
monitoring.metrics.statsd.portOptional. Port of the StatsD server. Default: 8125.
monitoring.metrics.statsd.tagsOptional. Map of key/value pairs sent along with each metric.
monitoring.metrics.statsd.periodOptional. How often to report metrics. Default: "1 minute".
monitoring.metrics.statsd.prefixOptional. Prefix for metric names.
monitoring.metrics.prometheus.tagsOptional. Map of key/value pairs used as common labels on all Prometheus metrics. The loader exposes these metrics at the /metrics endpoint on the health probe port (see monitoring.healthProbe.port).
monitoring.sentry.dsnOptional. Sentry DSN for reporting unexpected runtime exceptions.
monitoring.sentry.tagsOptional. Map of key/value pairs included as tags on Sentry events.
monitoring.healthProbe.portOptional. Port for the HTTP health probe server. Returns 200 OK when healthy.
monitoring.healthProbe.unhealthyLatencyOptional. The health probe becomes unhealthy if any received event has not been fully processed before this cutoff time. Default: "2 minutes".

Telemetry

NameDescription
telemetry.disableOptional. Set to true to disable telemetry. Default: false.
telemetry.userProvidedIdOptional. Identifier to tie events together across modules and infrastructure.

Check document count

To check the number of documents in an Elasticsearch or OpenSearch cluster, use the Count API provided by Elasticsearch/OpenSearch. For example, to get the total number of documents in the cluster, use GET _count.