Flink
The Flink job reads bad rows from an S3 location and stores the recovered payloads in Kinesis, unrecovered and unrecoverable in other S3 buckets.
Building
To build the fat jar, run:
sbt flink/assembly
Running
Event recovery jobs are usually ran using our dataflow-runner application. A configuration template for running is available in the repository.
To run a transient EMR cluster and execute the job using the templates, download Flink dataflow-runner templates from the repository and run:
dataflow-runner run-transient \
--emr-playbook flink-playbook.json.tmpl \
--emr-config flink-cluster.json.tmpl \
--vars bucket,$BUCKET_INPUT,region,$AWS_REGION,subnet,$AWS_SUBNET,role,$AWS_IAM_ROLE,keypair,$AWS_KEYPAIR,client,$JOB_OWNER,version,$RECOVERY_VERSION,config,$RECOVERY_CONFIG,resolver,$IGLU_RESOLVER,output,$KINESIS_OUTPUT,inputdir,$BUCKET_INPUT_DIRECTORY,interval,$INTERVAL
Where:
BUCKET_INPUT
- S3 bucket containing bad events to be recovered (eg. geoffs-bad-events)BUCKET_INPUT_DIRECTORY
- directory in theBUCKET_INPUT
to use as the source for bad eventsAWS_REGION
- region in which the job is being ran (eg. eu-central-1)AWS_SUBNET
- network subnet to run the job in (eg. subnet-435010347a21886ab)AWS_IAM_ROLE
- AWS IAM role to assume while running the job (eg. geoffs-recovery-role)AWS_KEYPAIR
- AWS EC2 key pair to use for the instances (eg. geoffs-keypair)JOB_OWNER
- tag to use to mark the owner of the job (eg. goeff)RECOVERY_VERSION
- application version (eg. 0.6.0)RECOVERY_CONFIG
- recovery job config as described in ConfigurationIGLU_RESOLVER
- iglu resolver configuration as described in ConfigurationKINESIS_OUTPUT
- Kinesis stream to output recovered events toINTERVAL
- job checkpointing interval (we recommend starting with 10 minutes and tuning to your job's characteristics)
The job uses checkpointing to allow for job restarts. Kinesis connector which is a part of the checkpointing mechanism guarantees at-least-once delivery semantics and provides backpressure to the job.
Monitoring
EMR Flink Job emits a wide range of metrics through Cloudwatch Agent which is installed during cluster bootstrap using a provided script (see project repo's dataflow-runner directory). Custom metrics include:
- number of unrecoverable events in the run:
taskmanager_container_XXXX_XXX__Process_0_events_unrecoverable
- number of failed recovery attempts:
taskmanager_container_XXXX_XXX__Process_0_events_failed
- number of recovered events:
taskmanager_container_XXXX_XXX__Process_0_events_recovered
- total number of events submitted for processing:
taskmanager_container_XXXX_XXX__Process_0_numRecordsIn
The metrics are delivered to snowplow/event-recovery
Cloudwatch namespace.
Given a one-second aggregation in Cloudwatch the diagram should show an always increasing metrics that should balance themselves up to total sum.
Flink's numRecordsOut
metric for sinks does not reflect an actual number of records saved in the sink.