Class: SnowplowTracker::AsyncEmitter

Inherits:
Emitter
  • Object
show all
Defined in:
lib/snowplow-tracker/emitters.rb

Overview

This Emitter subclass provides asynchronous event sending. Whenever the buffer is flushed, the AsyncEmitter places the flushed events in a work queue. The AsyncEmitter asynchronously sends events in this queue using a thread pool of a fixed size. The size of the thread pool is 1 by default, but can be configured as part of the options hash during initialization.

See Also:

Constant Summary

Constants inherited from Emitter

Emitter::DEFAULT_CONFIG

Instance Method Summary collapse

Methods inherited from Emitter

#input

Constructor Details

#initialize(endpoint:, options: {}) ⇒ AsyncEmitter

Note:

if you test the AsyncEmitter by using a short script to send an event, you may find that the event fails to send. This is because the process exits before the flushing thread is finished. You can get round this either by adding a sleep(10) to the end of your script or by using the synchronous flush.

Create a new AsyncEmitter object. The endpoint is required.

The options hash can have any of these optional parameters:

Parameter Description Type
path Override the default path for appending to the endpoint String
protocol 'http' or 'https' String
port The port for the connection Integer
method 'get' or 'post' String
buffer_size Number of events to send at once Integer
on_success A function to call if events were sent successfully Function
on_failure A function to call if events did not send Function
thread_count Number of threads to use Integer
logger Log somewhere other than STDERR Logger

The thread_count determines the number of worker threads which will be used to send events.

If you choose to use HTTPS, we recommend using port 443.

Only 2xx and 3xx status codes are considered successes.

The on_success callback should accept one argument: the number of requests sent this way. The on_failure callback should accept two arguments: the number of successfully sent events, and an array containing the unsuccessful events.

Examples:

Initializing an AsyncEmitter with all the possible extra configuration.

success_callback = ->(success_count) { puts "#{success_count} events sent successfully" }
failure_callback = ->(success_count, failures) do
  puts "#{success_count} events sent successfully, #{failures.size} sent unsuccessfully"
end

SnowplowTracker::Emitter.new(endpoint: 'collector.example.com',
            options: { path: '/my-pipeline/1',
                       protocol: 'https',
                       port: 443,
                       method: 'post',
                       buffer_size: 5,
                       on_success: success_callback,
                       on_failure: failure_callback,
                       logger: Logger.new(STDOUT),
                       thread_count: 5 })

Parameters:

  • endpoint (String)

    the endpoint to send the events to

  • options (Hash) (defaults to: {})

    allowed configuration options

See Also:



411
412
413
414
415
416
417
418
419
# File 'lib/snowplow-tracker/emitters.rb', line 411

def initialize(endpoint:, options: {})
  @queue = Queue.new
  # @all_processed_condition and @results_unprocessed are used to emulate Python's Queue.task_done()
  @queue.extend(MonitorMixin)
  @all_processed_condition = @queue.new_cond
  @results_unprocessed = 0
  (options[:thread_count] || 1).times { Thread.new { consume } }
  super(endpoint: endpoint, options: options)
end