Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
28ea204
feat(eventProcessor): Adds EventProcessor and BatchEventProcessor
rashidsp Aug 1, 2019
bcfca16
feat(forwarding-EP): Implements forwarding event processor
rashidsp Aug 2, 2019
ccbea99
feat(notification-center): Adds LogEvent notification
rashidsp Jul 31, 2019
f7a5b97
log event in forward-EP
rashidsp Aug 2, 2019
f5db35b
Addressing feedback
rashidsp Aug 2, 2019
32a566e
Addressing feedback
rashidsp Aug 2, 2019
c8ed3f7
Addressing feedback
rashidsp Aug 2, 2019
c0946ea
fixes log event issue
rashidsp Aug 2, 2019
cfcf5a8
Addresses review
rashidsp Aug 5, 2019
717adb0
resolves test issue
rashidsp Aug 5, 2019
6eec6bd
fixes logger issue
rashidsp Aug 5, 2019
7ac5896
feat(eventprocess): Integrate Event Processor with Optimizely
rashidsp Aug 6, 2019
899f3a6
fixes: failing test
rashidsp Aug 6, 2019
a31a9da
tests: adds SizedQueue
rashidsp Aug 7, 2019
93562a1
resolves conflicts
rashidsp Aug 7, 2019
0b7e53d
resolves conflicts
rashidsp Aug 7, 2019
e1f6715
Merge branch 'rashid/log-event' into rashid/optly-EP-integration
rashidsp Aug 7, 2019
309ee3a
Merge branch 'master' into rashid/batch-event-processor
rashidsp Aug 8, 2019
096ae95
Addresses feedback
rashidsp Aug 8, 2019
1341177
Addresses feedback
rashidsp Aug 9, 2019
ededaa7
reverts log-event changes
rashidsp Aug 9, 2019
2512412
resolves conflicts
rashidsp Aug 9, 2019
61e3426
config manager update
rashidsp Aug 9, 2019
b409edf
Removed verbose log.
msohailhussain Aug 15, 2019
9c019c7
Merge branch 'master' into rashid/batch-event-processor
rashidsp Aug 16, 2019
2fd378f
Resolves unit test
rashidsp Aug 16, 2019
efab9f4
removes log
rashidsp Aug 16, 2019
3c4a980
adds positive number conditions
rashidsp Aug 16, 2019
29fc092
resolves commits
rashidsp Aug 18, 2019
cbdce5e
fixes visitor_attributes bug
rashidsp Aug 19, 2019
6021aef
fixes visitor_attributes bug
rashidsp Aug 19, 2019
184fd06
Addressed review
rashidsp Aug 20, 2019
432933c
Merge branch 'rashid/batch-event-processor' into rashid/optly-EP-inte…
rashidsp Aug 20, 2019
0e12040
resolves conflicts
rashidsp Aug 23, 2019
d3a7495
Fixes event bug
rashidsp Aug 21, 2019
e802860
review changes
rashidsp Aug 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ Style/RescueStandardError:

Style/SignalException:
Enabled: false

Lint/RescueException:
Enabled: false
9 changes: 5 additions & 4 deletions .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ Lint/LiteralAsCondition:
Metrics/ParameterLists:
Max: 6
Exclude:
- 'lib/optimizely/config_manager/http_project_config_manager.rb'
- 'lib/optimizely.rb'
- 'lib/optimizely/optimizely_factory.rb'
- 'lib/optimizely/event/entity/impression_event.rb'
- 'lib/optimizely/event/entity/snapshot_event.rb'
- 'lib/optimizely/config_manager/http_project_config_manager.rb'
- 'lib/optimizely/event/batch_event_processor.rb'
- 'lib/optimizely/event/entity/conversion_event.rb'
- 'lib/optimizely/event/entity/event_context.rb'
- 'lib/optimizely/event/entity/impression_event.rb'
- 'lib/optimizely/event/entity/snapshot_event.rb'
- 'lib/optimizely/optimizely_factory.rb'

Naming/AccessorMethodName:
Exclude:
Expand Down
63 changes: 38 additions & 25 deletions lib/optimizely.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
require_relative 'optimizely/decision_service'
require_relative 'optimizely/error_handler'
require_relative 'optimizely/event_builder'
require_relative 'optimizely/event/forwarding_event_processor'
require_relative 'optimizely/event/event_factory'
require_relative 'optimizely/event/user_event_factory'
require_relative 'optimizely/event_dispatcher'
require_relative 'optimizely/exceptions'
require_relative 'optimizely/helpers/constants'
Expand All @@ -30,13 +33,12 @@
require_relative 'optimizely/helpers/variable_type'
require_relative 'optimizely/logger'
require_relative 'optimizely/notification_center'

module Optimizely
class Project
attr_reader :notification_center
# @api no-doc
attr_reader :config_manager, :decision_service, :error_handler,
:event_builder, :event_dispatcher, :logger
attr_reader :config_manager, :decision_service, :error_handler, :event_dispatcher,
:event_processor, :logger, :stopped

# Constructor for Projects.
#
Expand All @@ -61,7 +63,8 @@ def initialize(
user_profile_service = nil,
sdk_key = nil,
config_manager = nil,
notification_center = nil
notification_center = nil,
event_processor = nil
)
@logger = logger || NoOpLogger.new
@error_handler = error_handler || NoOpErrorHandler.new
Expand Down Expand Up @@ -92,7 +95,13 @@ def initialize(
StaticProjectConfigManager.new(datafile, @logger, @error_handler, skip_json_validation)
end
@decision_service = DecisionService.new(@logger, @user_profile_service)
@event_builder = EventBuilder.new(@logger)

@event_processor = if event_processor.respond_to?(:process)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean respond_to ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks instance method process exist, similarly we are checking for config_manager; config_manager.respond_to?(:get_config)

event_processor
else
ForwardingEventProcessor.new(@event_dispatcher, @logger, @notification_center)
end
# @event_builder = EventBuilder.new(@logger)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it.

end

# Buckets visitor and sends impression event to Optimizely.
Expand Down Expand Up @@ -243,20 +252,17 @@ def track(event_key, user_id, attributes = nil, event_tags = nil)
return nil
end

conversion_event = @event_builder.create_conversion_event(config, event, user_id, attributes, event_tags)
user_event = UserEventFactory.create_conversion_event(config, event, user_id, attributes, event_tags)
@event_processor.process(user_event)
@logger.log(Logger::INFO, "Tracking event '#{event_key}' for user '#{user_id}'.")
@logger.log(Logger::INFO,
"Dispatching conversion event to URL #{conversion_event.url} with params #{conversion_event.params}.")
begin
@event_dispatcher.dispatch_event(conversion_event)
rescue => e
@logger.log(Logger::ERROR, "Unable to dispatch conversion event. Error: #{e}")
end

@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, conversion_event
)
if @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:TRACK]).positive?
conversion_event = EventFactory.create_log_event(user_event, @logger)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name it log_event

@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:TRACK],
event_key, user_id, attributes, event_tags, conversion_event
)
end
nil
end

Expand Down Expand Up @@ -507,6 +513,14 @@ def is_valid
config.is_a?(Optimizely::ProjectConfig)
end

def close
return if @stopped

@stopped = true
@config_manager.stop! if @config_manager.respond_to?(:stop!)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how you are making sure, that stop property exists for both event_processor and config_manager

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest stop should be treated as required for any event_processor or config_manager passed in. We could validate them in the constructor. When stop doesn't exist, we can log a message and ignore it.

@event_processor.stop! if @event_processor.respond_to?(:stop!)
end

private

def get_variation_with_config(experiment_key, user_id, attributes, config)
Expand Down Expand Up @@ -692,15 +706,14 @@ def validate_instantiation_options
def send_impression(config, experiment, variation_key, user_id, attributes = nil)
experiment_key = experiment['key']
variation_id = config.get_variation_id_from_key(experiment_key, variation_key)
impression_event = @event_builder.create_impression_event(config, experiment, variation_id, user_id, attributes)
@logger.log(Logger::INFO,
"Dispatching impression event to URL #{impression_event.url} with params #{impression_event.params}.")
begin
@event_dispatcher.dispatch_event(impression_event)
rescue => e
@logger.log(Logger::ERROR, "Unable to dispatch impression event. Error: #{e}")
end
user_event = UserEventFactory.create_impression_event(config, experiment, variation_id, user_id, attributes)
@event_processor.process(user_event)

return unless @notification_center.notification_count(NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE]).positive?

@logger.log(Logger::INFO, "Activating user '#{user_id}' in experiment '#{experiment_key}'.")
variation = config.get_variation_from_id(experiment_key, variation_id)
impression_event = EventFactory.create_log_event(user_event, @logger)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log_event

@notification_center.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:ACTIVATE],
experiment, user_id, attributes, variation, impression_event

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log_event change will be reflected here.

Expand Down
5 changes: 4 additions & 1 deletion lib/optimizely/config_manager/http_project_config_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Optimizely
class HTTPProjectConfigManager < ProjectConfigManager
# Config manager that polls for the datafile and updated ProjectConfig based on an update interval.

attr_reader :config
attr_reader :config, :stopped

# Initialize config manager. One of sdk_key or url has to be set to be able to use.
#
Expand Down Expand Up @@ -85,10 +85,13 @@ def ready?

def start!
@async_scheduler.start!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must check if it is stopped, then no need to start again.

@stopped = false
end

def stop!
@async_scheduler.stop!
@config = nil
@stopped = true
end

def get_config
Expand Down
199 changes: 199 additions & 0 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# frozen_string_literal: true

#
# Copyright 2019, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require_relative 'event_processor'
module Optimizely
class BatchEventProcessor < EventProcessor
# BatchEventProcessor is a batched implementation of the Interface EventProcessor.
# Events passed to the BatchEventProcessor are immediately added to a EventQueue.
# The BatchEventProcessor maintains a single consumer thread that pulls events off of

attr_reader :event_queue

DEFAULT_BATCH_SIZE = 10
DEFAULT_BATCH_INTERVAL = 30_000
DEFAULT_QUEUE_CAPACITY = 1000

FLUSH_SIGNAL = 'FLUSH_SIGNAL'
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'

def initialize(
event_queue:,
event_dispatcher:,
batch_size:,
flush_interval:,
logger: nil,
notification_center: nil
)
@event_queue = event_queue || SizedQueue.new(DEFAULT_QUEUE_CAPACITY)
@event_dispatcher = event_dispatcher
@batch_size = batch_size || DEFAULT_BATCH_SIZE
@flush_interval = flush_interval || DEFAULT_BATCH_INTERVAL
@logger = logger || NoOpLogger.new
@notification_center = notification_center
@mutex = Mutex.new
@received = ConditionVariable.new
@current_batch = []
@is_started = false
start!
end

def start!
if @is_started == true
@logger.log(Logger::WARN, 'Service already started.')
return
end
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
@thread = Thread.new { run }
@is_started = true
end

def flush
@mutex.synchronize do
@event_queue << FLUSH_SIGNAL
@received.signal
end
end

def process(user_event)
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

unless @thread.alive?
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
return
end

@mutex.synchronize do
begin
@event_queue << user_event
@received.signal
rescue Exception
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
return
end
end
end

def stop!
return unless @thread.alive?

@mutex.synchronize do
@event_queue << SHUTDOWN_SIGNAL
@received.signal
end

@is_started = false
@logger.log(Logger::WARN, 'Stopping scheduler.')
@thread.exit
end

private

def run
loop do
if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
@logger.log(
Logger::DEBUG,
'Deadline exceeded flushing current batch.'
)
flush_queue!
end

item = nil

@mutex.synchronize do
@received.wait(@mutex, 0.05)
item = @event_queue.pop if @event_queue.length.positive?
end

if item.nil?
@logger.log(Logger::DEBUG, 'Empty item, sleeping for 50ms.')
sleep(0.05)
next
end

if item == SHUTDOWN_SIGNAL
@logger.log(Logger::INFO, 'Received shutdown signal.')
break
end

if item == FLUSH_SIGNAL
@logger.log(Logger::DEBUG, 'Received flush signal.')
flush_queue!
next
end

if item.is_a? Optimizely::UserEvent
@logger.log(Logger::DEBUG, "Received add to batch signal. with event: #{item.event['key']}.")
add_to_batch(item)
end
end
end

def flush_queue!
return if @current_batch.empty?

log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
@current_batch = []
end

def add_to_batch(user_event)
if should_split?(user_event)
flush_queue!
@current_batch = []
end

# Reset the deadline if starting a new batch.
@flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty?

@logger.log(Logger::DEBUG, "Adding user event: #{user_event.event['key']} to batch.")
@current_batch << user_event
return unless @current_batch.length >= @batch_size

@logger.log(Logger::DEBUG, 'Flushing on max batch size!')
flush_queue!
end

def should_split?(user_event)
return false if @current_batch.empty?

current_context = @current_batch.last.event_context

new_context = user_event.event_context
# Revisions should match
unless current_context[:revision] == new_context[:revision]
@logger.log(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.')
return true
end
# Projects should match
unless current_context[:project_id] == new_context[:project_id]
@logger.log(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.')
return true
end
false
end
end
end
25 changes: 25 additions & 0 deletions lib/optimizely/event/event_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

#
# Copyright 2019, Optimizely and contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Optimizely
class EventProcessor
# EventProcessor interface is used to provide an intermediary processing stage within
# event production. It's assumed that the EventProcessor dispatches events via a provided
# EventDispatcher.
def process(user_event); end
end
end
Loading