Spark transformer
For a high-level overview of the Transform process, see Transforming enriched data. For guidance on picking the right transformer
app, see How to pick a transformer.
The Spark-based transformer is a batch job designed to be deployed in an EMR cluster and process a bounded data set stored on S3.
In order to run it, you will need:
- the
snowplow-transformer-batch
jar file (from version 3.0.0 this replaces thesnowplow-rdb-shredder
asset) - configuration files for the jar file
- an EMR cluster specification
- a way to spin up an EMR cluster and submit a job to it.
You can use any suitable tool to periodically submit the transformer job to an EMR cluster. We recommend you use our purpose-built Dataflow Runner tool. All the examples below assume that Dataflow Runner is being used. Refer to the app's documentation for more details.
Downloading the artifact
The asset is published as a jar file attached to the Github release notes for each version.
It's also available in several S3 buckets that are accessible to an EMR cluster:
s3://snowplow-hosted-assets/4-storage/transformer-batch/snowplow-transformer-batch-6.1.2.jar
-- or --
s3://snowplow-hosted-{{ region }}/4-storage/transformer-batch/snowplow-transformer-batch-6.1.2.jar
where region
is one of us-east-1
, us-east-2
, us-west-1
, us-west-2
, eu-central-1
, eu-west-2
, ca-central-1
, sa-east-1
, ap-southeast-1
, ap-southeast-2
, ap-northeast-1
, ap-northeast-2
, or ap-south-1
. Pick the region of your EMR cluster.
Configuring the EMR cluster
Starting from version 5.5.0
, batch transformer requires to use Java 11 on EMR (default is Java 8). See the bootstrapActionConfigs
section in the configuration below.
Here's an example of an EMR cluster config file that can be used with Dataflow Runner:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
"data": {
"name": "RDB Transformer",
"region": "eu-central-1",
"logUri": "s3://com-acme/logs/",
"credentials": {
"accessKeyId": "env",
"secretAccessKey": "env"
},
"roles": {
"jobflow": "EMR_EC2_DefaultRole",
"service": "EMR_DefaultRole"
},
"ec2": {
"amiVersion": "6.2.0",
"keyName": "ec2-key-name",
"location": {
"vpc": {
"subnetId": "subnet-id"
}
},
"instances": {
"master": {
"type": "m4.large",
"ebsConfiguration": {
"ebsOptimized": true,
"ebsBlockDeviceConfigs": []
}
},
"core": {
"type": "r4.xlarge",
"count": 1
},
"task": {
"type": "m4.large",
"count": 0,
"bid": "0.015"
}
}
},
"tags": [],
"bootstrapActionConfigs": [
{
"name": "Use Java 11",
"scriptBootstrapAction": {
"path": "s3://<bucket>/<path>/emr-bootstrap-java-11.sh",
"args": []
}
}
],
"configurations": [
{
"classification": "core-site",
"properties": {
"Io.file.buffer.size": "65536"
},
"configurations": []
},
{
"classification": "yarn-site",
"properties": {
"yarn.nodemanager.resource.memory-mb": "57344",
"yarn.scheduler.maximum-allocation-mb": "57344",
"yarn.nodemanager.vmem-check-enabled": "false"
},
"configurations": []
},
{
"classification": "spark",
"properties": {
"maximizeResourceAllocation": "false"
},
"configurations": []
},
{
"classification": "spark-defaults",
"properties": {
"spark.executor.memory": "7G",
"spark.driver.memory": "7G",
"spark.driver.cores": "3",
"spark.yarn.driver.memoryOverhead": "1024",
"spark.default.parallelism": "24",
"spark.executor.cores": "1",
"spark.executor.instances": "6",
"spark.yarn.executor.memoryOverhead": "1024",
"spark.dynamicAllocation.enabled": "false"
},
"configurations": []
}
],
"applications": [
"Hadoop",
"Spark"
]
}
}
You will need to replace <bucket>
and <path>
(in the bootstrapActionConfigs
section) according to where you placed emr-bootstrap-java-11.sh
. The content of this file should be as follows:
#!/bin/bash
set -e
sudo update-alternatives --set java /usr/lib/jvm/java-11-amazon-corretto.x86_64/bin/java
exit 0
This is a typical cluster configuration for processing ~1.5GB of uncompressed enriched data.
You need to change the following settings with your own values:
region
: the AWS region of your EMR clusterlogUri
: the location of an S3 bucket where EMR logs will be writtenec2.keyName
(optional): The name of an EC2 key pair that you’ll use to shh into the EMR clusterec2.location.vpc.subnetId
: your VPN subnet ID.
Configuring snowplow-transformer-batch
The transformer takes two configuration files:
- a
config.hocon
file with application settings - an
iglu_resolver.json
file with the resolver configuration for your Iglu schema registry.
An example of the minimal required config for the Spark transformer can be found here and a more detailed one here. For details about each setting, see the configuration reference.
See here for details on how to prepare the Iglu resolver file.
All self-describing schemas for events processed by the transformer must be hosted on Iglu Server 0.6.0 or above. Iglu Central is a registry containing Snowplow-authored schemas. If you want to use them alongside your own, you will need to add it to your resolver file. Keep it mind that it could override your own private schemas if you give it higher priority. For details on this see here.
Running the Spark transformer
To run the transformer on EMR with Dataflow Runner, you need:
- the EMR cluster config (see Configuring the EMR cluster above)
- a Dataflow Runner playbook (a DAG with steps to be submitted to the EMR cluster).
Preparing the Dataflow Runner playbook
A typical playbook can look like:
{
"schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data": {
"region": "eu-central-1",
"credentials": {
"accessKeyId": "env",
"secretAccessKey": "env"
},
"steps": [
{
"type": "CUSTOM_JAR",
"name": "S3DistCp enriched data archiving",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
"arguments": [
"--src", "s3://com-acme/enriched/sink/",
"--dest", "s3://com-acme/enriched/archive/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
"--s3Endpoint", "s3-eu-central-1.amazonaws.com",
"--srcPattern", ".*",
"--outputCodec", "gz",
"--deleteOnSuccess"
]
},
{
"type": "CUSTOM_JAR",
"name": "RDB Transformer",
"actionOnFailure": "CANCEL_AND_WAIT",
"jar": "command-runner.jar",
"arguments": [
"spark-submit",
"--class", "com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Main",
"--master", "yarn",
"--deploy-mode", "cluster",
"s3://snowplow-hosted-assets-eu-central-1/4-storage/transformer-batch/snowplow-transformer-batch-6.1.2.jar",
"--iglu-config", "{{base64File "/home/snowplow/configs/snowplow/iglu_resolver.json"}}",
"--config", "{{base64File "/home/snowplow/configs/snowplow/config.hocon"}}"
]
}
],
"tags": []
}
}
This playbook consists of two steps. The first one copies the enriched data to a dedicated directory, from which the transformer will read it. The second step is the transformer Spark job that transforms the data.
You need to change the following settings with your own values:
region
: the AWS region of the EMR cluster"--src"
: the bucket in which your enriched data is sunk by Enrich"--dest"
: the bucket in which the data for your enriched data lake lives.
NOTE: The "--src"
and "--dest"
settings above apply only to the s3DistCp
step of the playbook. The source and destination buckets for the transformer step are configured via the config.hocon
file.
Submitting the job to EMR with Dataflow Runner
Here's an example of putting all of the above together on a transient EMR cluster:
$ ./dataflow-runner run-transient \
--emr-config path/to/cluster.conig \
--emr-playbook path/to/playbook
This will spin up the cluster with the above configuration, submit the steps from the playbook, and terminate the cluster once all steps are completed.
For more examples on running EMR jobs with Dataflow Runner, as well as details on cluster configurations and playbooks, see the app's documentation. It also details how you can submit steps to a persistent EMR cluster.