Initialization
Assuming you have completed the Scala Tracker Setup, you are ready to initialize the Scala Tracker.
Emitters
Each tracker instance must be initialized with an Emitter which is responsible for firing events to a Collector. We offer two different modules of emitter: the http4s-emitters and the id-emitters
Http4s Emitter
Backed by a Http4s client, this emitter captures actions in the context of a functional effect type, such as a cats IO, ZIO or Monix Task. This is the recommended emitter if you are familiar with functional programming and the cats ecosystem of type classes.
import org.http4s.client.blaze.BlazeClientBuilder
import com.snowplowanalytics.snowplow.scalatracker.Tracker
import com.snowplowanalytics.snowplow.scalatracker.Emitter.EndpointParams
import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.Http4sEmitter
import scala.concurrent.ExecutionContext
import cats.data.NonEmptyList
import cats.implicits._
import cats.effect.{ContextShift, IO, Timer}
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val resource = for {
client <- BlazeClientBuilder[IO](ExecutionContext.global).resource
emitter1 <- Http4sEmitter.build[IO](EndpointParams("mycollector.com"), client)
emitter2 <- Http4sEmitter.build[IO](EndpointParams("myothercollector.com", port = Some(8080)), client)
} yield new Tracker(NonEmptyList.of(emitter1, emitter2), "mytrackername", "myapplicationid")
resource.use { tracker =>
// Use the tracker inside this block to initialize and run your application
MyApp.run(tracker)
}
The above code:
- Creates an implicit
Timer
and aContextShift
, which are required for bringing aConcurrent[IO]
into implicit scope - Creates a Http4s client from the BlazeClientBuilder. Http4s offers other variants of the client, but Blaze is the recommended default choice.
- Creates an emitter
emitter1
which sends events to "mycollector.com" on the default port 80 - Creates a second emitter
emitter2
which sends events to “myothercollector.com” on port 8080 - Creates a tracker which can be used to send events to all emitters
Your application might then use the tracker by flatMapping over the functional effect:
for {
_ <- tracker.trackPageView("http://example.com")
_ <- tracker.trackTransaction("order1234", 42.0)
} yield ()
ID Emitters
The emitters in the "id" module have a simpler developer interface, because return types are not wrapped in functional events. Use these emitters if your application code is not using any other typeclasses from the cats ecosystem.
The above code:
- Creates a non-blocking emitter
emitter1
which sends events to "mycollector.com" on the default port 80 - Creates a blocking emitter
emitter2
which sends events to “myothercollector.com” on port 8080 - Creates a tracker which can be used to send events to all emitters
import scala.concurrent.ExecutionContext.Implicits.global
import com.snowplowanalytics.snowplow.scalatracker.idimplicits._
import com.snowplowanalytics.snowplow.scalatracker.Tracker
import com.snowplowanalytics.snowplow.scalatracker.Emitter.EndpointParams
import com.snowplowanalytics.snowplow.scalatracker.emitters.id.{AsyncEmitter, SyncEmitter}
import cats.data.NonEmptyList
val emitter1 = AsyncEmitter.createAndStart(EndpointParams("mycollector.com"))
val emitter2 = SyncEmitter(EndpointParams("myothercollector.com", port = Some(8080)))
val tracker = new Tracker(NonEmptyList.of(emitter1, emitter2), "mytrackername", "myapplicationid")
When using this emitter, the methods on the tracker all return Unit
, so there is no requirement to flatMap over effect types.
tracker.trackPageView("http://example.com") // returns Unit
tracker.trackTransaction("order1234", 42.0) // returns Unit
The SyncEmitter
blocks the whole thread when it sends events to the collector. The AsyncEmitter
sends requests asynchronously from the tracker's main thread of execution, but in doing so it blocks a thread on the provided execution for each http request. The blocking calls are wrapped in scala's blocking construct, which is respected by the global execution context.
Subject
You can configure a subject with extra data and attach it to the tracker so that the data will be attached to every event:
val subject = new Subject()
.setUserId("user-00035")
.setPlatform(Desktop)
val tracker = Tracker(emitters, ns, appId).setSubject(subject)
Alternatively, you can set the subject for each event individually.
Buffer Configuration
Emitters use a buffer, so they can send larger payloads comprising several events, instead of sending each event immediately. You can choose the behavior of your emitter's buffering by passing in a BufferConfig
during initialisation. For example:
val emitter = Http4sEmitter.build(endpoint, bufferConfig = PayloadSize(40000))
The available configs are:
EventsCardinality(size: Int)
Configures the emitter to buffer events and send batched payloads comprising a fixed number of eventsPayloadSize(bytes: Int)
Configures the emitter to buffer events and send batched payloads of a minimum size in bytesNoBuffering
Configures the emitter to send events immediately to the collector, without buffering for larger batches
Retry Policies
Emitters can be configured to retry sending events to the collector if the initial attempt fails. For example:
val emitter = Http4sEmitter.build(endpoint, retryPolicy = MaxAttempts(10))
The available policies are:
RetryForever
A RetryPolicy with no cap on maximum of attempts to send an event to the collector. This policy might appear attractive where it is critical to avoid data loss because it never deliberately drops events. However, it could cause a backlog of events in the buffered queue if the collector is unavailable for too long. This RetryPolicy could be paired with anEventQueuePolicy
that manages the behavior of a large backlog.MaxAttempts(max: Int)
A RetryPolicy which drops events after failing to contact the collector within a fixed number of attempts. This policy can smooth over short outages of connection to the collector. Events will be dropped only if the collector is unreachable for a relatively long span of time. Dropping events can be a safety mechanism against a growing backlog of unsent events.NoRetry
A RetryPolicy that drops events immediately after a failed attempt to send to the collector.
Event Queue Policy
An EventQueuePolicy
becomes important when the queue of pending events grows to an unexpectedly large number; for example, if the collector is unreachable for a long period of time
Picking an EventQueuePolicy is an opportunity to protect against excessive heap usage by limiting the maximum size of the queue
An EventQueuePolicy can be paired with an appropriate RetryPolicy
, which controls dropping events when they cannot be sent
For example:
val emitter = new Http4sEmitter(endpoint, queuePolicy = IgnoreWhenFull(100))
The available policies are:
UnboundedQueue
. AnEventQueuePolicy
with no upper bound on the number pending events in the emitter's queue. This policy never deliberately drops events, but it comes at the expense of potentially high heap usage if the collector is unavailable for a long period of time.BlockwhenFull(limit: Int)
. AnEventQueuePolicy
that blocks the tracker's thread until the queue of pending events falls below a threshold. This policy never deliberately drops events, but it comes at the expense of blocking threads if the collector is unavailable for long period of time.IgnoreWhenFull(limit: Int)
. AnEventQueuePolicy
that silently drops new events when the queue of pending events exceeds a threshold.ErrorWhenFull(limit: Int)
. AnEventQueuePolicy
that raises an exception when the queue of pending events exceeds a threshold.
Global contexts
You can configure your tracker to add a context to every event sent:
val tracker = Tracker(emitters, namespace, appId).addContext(selfDescribingJson)
There is also syntax for adding EC2 or GCE contexts automatically:
EC2 Context
Amazon Elastic Cloud can provide basic information about instance running your app. You can add this informational as additional custom context to all sent events by enabling it in Tracker after initializaiton of your tracker:
import com.snowplowanalytics.snowplow.scalatracker.metadata._
val tracker = Tracker(emitters, ns, appId).enableEc2Context()
Google Compute Engine Metadata context
Google Cloud Compute Engine can provide basic information about instance running your app. You can add this informational as additional custom context to all sent events by enabling it in Tracker after initializaiton of your tracker:
import com.snowplowanalytics.snowplow.scalatracker.metadata._
val tracker = Tracker(emitters, ns, appId).enableGceContext()
This will add iglu:com.google.cloud.gce/instance_metadata/jsonschema/1-0-0
context to all your events
Callbacks
All emitters supplied with Scala Tracker support callbacks invoked after every sent event (or batch of events) whether it was successful or not. This feature particularly useful for checking collector unavailability and tracker debugging.
Callbacks should have following signature:
type Callback = (Emitter.EndpointParams, Emitter.Request, Emitter.Result) => Unit
EndpointParams
is collector configuration attached to emitterRequest
is raw collector's payload, which can be eitherGET
orPOST
and holding number of undertaken attemptsResult
is processed collector's response or failure reason. You'll want to pattern-match it to either no-op or notify DevOps about non-working collector
To add a callback to AsyncBatchEmitter
you can use following approach:
import com.snowplowanalytics.snowplow.scalatracker.Emitter._
def emitterCallback(params: EndpointParams, req: Request, res: Result): Unit = {
res match {
case Result.Success(_) => ()
case Result.Failure(code) =>
devopsIncident(s"Scala Tracker got unexpected HTTP code $code from ${params.getUri}")
case Result.TrackerFailure(exception) =>
devopsIncident(s"Scala Tracker failed to reach ${params.getUri} with following exception $exception after ${req.attempt} attempt")
case Result.RetriesExceeded(failure) =>
devopsIncident(s"Scala Tracker has stopped trying to deliver payload after following failure: $failure")
savePayload(req) // can be investigated and sent afterwards
}
}
val emitter = AsyncEmitter.createAndStart(endpointParams, callback = Some(emitterCallback))
The async emitter will perform callbacks asynchronously in its ExecutionContext
.