Running
Now that we've discussed configuring the recovery, let's dive in to running it on your pipeline.
First we'll need to define configuration:
Define configuration​
The configuration consists of the resolver-config.json
that your pipeline uses to resolve events against corresponding schema, for example:
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 0,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [
"com.snowplowanalytics"
],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
}
]
}
}
Also, thejob-config.json
which describes the recovery job to be done, for example:
{
"schema": "iglu:com.snowplowanalytics.snowplow/recoveries/jsonschema/4-0-0",
"data": {
"iglu:com.snowplowanalytics.snowplow.badrows/enrichment_failures/jsonschema/1-0-0": [
{
"name": "passthrough",
"conditions": [],
"steps": []
}
]
}
}
Encode configuration​
Configuration is supplied to recovery jobs as a Base64-encoded string. You can use your encoding plugin of choice, but here is an example ammonite script you can use to encode your configuration:
import java.util.Base64
import java.nio.charset.StandardCharsets
import ammonite.ops._
import $ivy.`io.circe::circe-parser:0.13.0`, io.circe._, io.circe.syntax._, io.circe.parser._
val load = (path: String) => read! os.Path(path, base = os.pwd)
val encode = (str: String) => Base64.getEncoder.encodeToString(str.getBytes(StandardCharsets.UTF_8))
@main def run(config: String): Unit =
parse(load(config)) match {
case Right(data) => println(encode(data.asJson.noSpaces))
case Left(err) => println(s"Invalid JSON input in: ${err.getMessage}")
}
And then just run (assuming ammonite is on your path and above script is called encode.sc
:
amm ./encode.sc resolver-config.json
amm ./encode.sc job-config.json
Once the configuration files are encrypted it's time to deploy the recovery job on your pipeline.
Deploy the job​
There are several runtimes that recovery process can be deployed to: