Skip to main content

Elasticsearch Loader

If you are using Enrich to write enriched Snowplow events to one stream and bad events to another, you can use the Elasticsearch Loader to read events from either of those streams and write them to Elasticsearch. It works with either Kinesis or NSQ streams.

danger

We only offer this loader on AWS or as part of Snowplow Mini.

What the data looks like

There are a few changes compared to the standard structure of Snowplow data.

Boolean fields reformatted

All boolean fields like br_features_java are normally either "0" or "1". In Elasticsearch, these values are converted to false and true.

New geo_location field

The geo_latitude and geo_longitude fields are combined into a single geo_location field of Elasticsearch's "geo_point" type.

Self-describing events

Each self-describing event gets its own field (same naming rules as for Snowflake). For example:

{
"unstruct_com_snowplowanalytics_snowplow_link_click_1": {
"targetUrl": "http://snowplow.io",
"elementId": "action",
"elementClasses": [],
"elementTarget": ""
}
}

Entities

Each entity type attached to the event gets its own field (same naming rules as for Snowflake). The field contains an array with the data for all entities of the given type. For example:

{
"contexts_com_acme_user_1": [
{
"name": "Alice"
}
],
"contexts_com_acme_product_1": [
{
"name": "Apple"
},
{
"name": "Orange"
}
]
}

Setup guide

Configuring Elasticsearch

Getting started

First off, install and set up Elasticsearch. We have tested the loader with versions 6.x, 7.x and 8.x of Elasticsearch. For more information, check out the installation guide.

OpenSearch

We have also tested this loader with OpenSearch 2.2.x, 2.4.x and 2.5.

Raising the file limit

Elasticsearch keeps a lot of files open simultaneously so you will need to increase the maximum number of files a user can have open. To do this:

sudo vim /etc/security/limits.conf

Append the following lines to the file:

{{USERNAME}} soft nofile 32000
{{USERNAME}} hard nofile 32000

Where {{USERNAME}} is the name of the user running Elasticsearch. You will need to logout and restart Elasticsearch before the new file limit takes effect.

To check that this new limit has taken effect you can run the following command from the terminal:

curl localhost:9200/_nodes/process?pretty

If the max_file_descriptors equals 32000 it is running with the new limit.

Defining the mapping

Use the following request to create the mapping with Elasticsearch 7+:

curl -XPUT 'http://localhost:9200/snowplow' -d '{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
}
},
"mappings": {
"properties": {
"geo_location": {
"type": "geo_point"
}
}
}
}'

Note that Elasticsearch 7+ no longer uses mapping types. If you have an older version, you might need to include mapping types in the above snippet.

This initialization sets the default analyzer to "keyword". This means that string fields will not be split into separate tokens for the purposes of searching. This saves space and ensures that URL fields are handled correctly.

If you want to tokenize specific string fields, you can change the "properties" field in the mapping like this:

curl -XPUT 'http://localhost:9200/snowplow' -d '{
"settings": {
"analysis": {
"analyzer": {
"default": {
"type": "keyword"
}
}
}
},
"mappings": {
"properties": {
"geo_location": {
"type": "geo_point"
},
"field_to_tokenize": {
"type": "string",
"analyzer": "english"
}
}
}
}'

Installing the Elasticsearch Loader

The Elasticsearch Loader is published on Docker Hub:

docker pull snowplow/snowplow-elasticsearch-loader:2.1.2

The container can be run with the following command:

docker run \
-v /path/to/config.hocon:/snowplow/config.hocon \
snowplow/snowplow-elasticsearch-loader:2.1.2 \
--config /snowplow/config.hocon

Alternatively you can download and run a jar file from the github release:

java -jar snowplow-elasticsearch-loader-2.1.2.jar --config /path/to/config.hocon

Using the Elasticsearch Loader

Configuration

The sink is configured using a HOCON file, for which you can find examples here. These are the fields:

NameDescription
purposeRequired. "ENRICHED_EVENTS" for a stream of successfully enriched events
"BAD_ROWS" for a stream of bad events
"JSON" for writing plain json
input.typeRequired. Configures where input events will be read from.
Can be “kinesis”, “stdin” or “nsq”
input.streamNameRequired when input.type is kinesis or nsq. Name of the stream to read from.
input.initialPositionRequired when input.type is kinesis. Used when input.type is Kinesis. Specifies where to start reading from the stream the first time the app is run. "TRIM_HORIZON" for as far back as possible, "LATEST" for as recent as possibly, "AT_TIMESTAMP" for after specified timestamp.
input.initialTimestampUsed when input.type is kinesis. Required when input.initialTimestamp is "AT_TIMESTAMP". Specifies the timestamp to start read.
input.maxRecordsUsed when input.type is kinesis. Optional. Maximum number of records fetched in a single request. Default value 10000.
input.regionUsed when input.type is kinesis. Optional if it can be resolved with AWS region provider chain. Region where the Kinesis stream is located.
input.customEndpointUsed when input.type is kinesis. Optional. Custom endpoint to override AWS Kinesis endpoints, this can be used to specify local endpoints when using localstack.
input.dynamodbCustomEndpointUsed when input.type is kinesis. Optional. Custom endpoint to override AWS DynamoDB endpoints for Kinesis checkpoints lease table, this can be used to specify local endpoints when using Localstack.
input.appNameUsed when input.type is kinesis. Optional. Used by a DynamoDB table to maintain stream state. Default value "snowplow-elasticsearch-loader".
input.buffer.byteLimitUsed when input.type is kinesis. Optional. The limit of the buffer in terms of bytes. When this value is exceeded, events will be sent to Elasticsearch. Default value 1000000.
input.buffer.recordLimitUsed when input.type is kinesis. Optional. The limit of the buffer in terms of record count. When this value is exceeded, events will be sent to Elasticsearch. Default value 500.
input.buffer.timeLimitUsed when input.type is kinesis. Optional. The time limit in milliseconds to wait to send the buffer to Elasticsearch. Default value 500.
input.channelNameRequired when input.type is nsq. Channel name for NSQ source stream. If more than one application reading from the same NSQ topic at the same time, all of them must have unique channel name to be able to get all the data from the same topic.
input.nsqlookupdHostRequired when input.type is nsq. Host name for nsqlookupd
input.nsqlookupdPortRequired when input.type is nsq. HTTP port for nsqd.
output.good.typeRequired. Configure where to write good events. Can be "elasticsearch" or "stdout".
output.good.client.endpointRequired. The Elasticsearch cluster endpoint.
output.good.client.portOptional. The port the Elasticsearch cluster can be accessed on. Default value 9200.
output.good.client.usernameOptional. HTTP Basic Auth username. Can be removed if not active.
output.good.client.passwordOptional. HTTP Basic Auth password. Can be removed if not active.
output.good.client.shardDateFormatOptional. Formatting used for sharding good stream, i.e. _yyyy-MM-dd. Can be removed if not needed.
output.good.client.shardDateFieldOptional. Timestamp field for sharding good stream. If not specified derived_tstamp is used.
output.good.client.maxRetriesOptional. The maximum number of request attempts before giving up. Default value 6.
output.good.client.sslOptional. Whether to use ssl or not. Default value false.
output.good.aws.signingOptional. Whether to activate AWS signing or not. It should be activated if AWS OpenSearch service is used. Default value false.
output.good.aws.regionOptional. Region where the AWS OpenSearch service is located.
output.good.cluster.indexRequired. The Elasticsearch index name.
output.good.cluster.documentTypeOptional. The Elasticsearch index type. Index types are deprecated in ES >=7.x Therefore, it shouldn't be set with ES >=7.x
output.good.chunk.byteLimitOptional. Bulk request to Elasticsearch will be splitted to chunks according given byte limit. Default value 1000000.
output.good.chunk.recordLimitOptional. Bulk request to Elasticsearch will be splitted to chunks according given record limit. Default value 500.
output.bad.typeRequired. Configure where to write failed events. Can be "kinesis", "nsq", "stderr" or "none".
output.bad.streamNameRequired. Stream name for events which are rejected by Elasticsearch.
output.bad.regionUsed when output.bad.type is kinesis. Optional if it can be resolved with AWS region provider chain. Region where the bad Kinesis stream is located.
output.bad.customEndpointUsed when output.bad.type is kinesis. Optional. Custom endpoint to override AWS Kinesis endpoints, this can be used to specify local endpoints when using localstack.
output.bad.nsqdHostRequired when output.bad.type is nsq. Host name for nsqd.
output.bad.nsqdPortRequired when output.bad.type is nsq. HTTP port for nsqd.
monitoring.snowplow.collectorOptional. Snowplow collector URI for monitoring. Can be removed together with monitoring section.
monitoring.snowplow.appIdOptional. The app id used in decorating the events sent for monitoring. Can be removed together with monitoring section.
monitoring.metrics.cloudWatchOptional. Whether to enable Cloudwatch metrics or not. Default value true.