Skip to content

Commit

Permalink
Feature event on receive (#3411)
Browse files Browse the repository at this point in the history
* Implemented on_receive mode for event registration

* Removed scope from object

* Fixed various issues

- fixed JSDoc
- changed on start and on receive event names to be more precise
- consolidated long .filter change to one to improve performance
- refactored code to use eventBus.on properly with priorities
- ignoring not set event mode
- skipping logic specific to event start triggers if event was just
  receive

* Use explicit mode in EventController.js and adjust dispatching logic in EventBus accordingly.

* Add additional unit tests for the EventBus

* Minor adjustment to the EMSG sample

Co-authored-by: dsilhavy <[email protected]>
  • Loading branch information
FritzHeiden and dsilhavy authored Nov 17, 2020
1 parent 5be3377 commit a6006a6
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 41 deletions.
38 changes: 29 additions & 9 deletions samples/advanced/listening-to-SCTE-EMSG-events.html
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,46 @@
var player;
const URL = "https://livesim.dashif.org/livesim/scte35_2/testpic_2s/Manifest.mpd";
const SCHEMEIDURI = "urn:scte:scte35:2013:xml";
const EVENT_MODE_ON_START = dashjs.MediaPlayer.events.EVENT_MODE_ON_START;
const EVENT_MODE_ON_RECEIVE = dashjs.MediaPlayer.events.EVENT_MODE_ON_RECEIVE;

function init() {
player = dashjs.MediaPlayer().create();
player.updateSettings({ 'debug': { 'logLevel': dashjs.Debug.LOG_LEVEL_NONE }});
player.on(SCHEMEIDURI, showEvent);
player.on(SCHEMEIDURI, showStartEvent, null); /* Default mode is onStart, no need to specify a mode */
player.on(SCHEMEIDURI, showReceiveEvent, null, { mode: EVENT_MODE_ON_RECEIVE });
player.initialize(document.querySelector("video"), URL, true);
}

function showEvent(e) {
log("EVENT RECEIVED: " + e.type);
function showStartEvent(e) {
showEvent(e, "start");
}

function showReceiveEvent(e) {
showEvent(e, "receive");
}

function showEvent(e, output) {
/* We double process in order to pretty-print. Only two levels of object properties are exposed. */
for (var name in e) {
if (typeof e[name] != 'object') {
log(" " + name + ": " + e[name]);
log(" " + name + ": " + e[name], output);
}
}
for (name in e) {
if (typeof e[name] == 'object' ) {
log(" " + name + ":");
log(" " + name + ":", output);
for (name2 in e[name]) {
log(" " + name2 + ": " + JSON.stringify(e[name][name2]));
log(" " + name2 + ": " + JSON.stringify(e[name][name2]), output);
}
}
}
log("", output);
}

function log(msg) {
function log(msg, output) {
msg = msg.length > 200 ? msg.substring(0, 200) + "..." : msg; /* to avoid repeated wrapping with large objects */
var tracePanel = document.getElementById("trace");
var tracePanel = document.getElementById(output);
tracePanel.innerHTML += msg + "\n";
tracePanel.scrollTop = tracePanel.scrollHeight;
console.log(msg);
Expand All @@ -68,7 +79,16 @@
<div>
<video controls="true">
</video>
<textarea id="trace" placeholder="Trapped events will be displayed here"></textarea>
</div>
<div style="display:flex">
<div>
<p>Received Events</p>
<textarea id="receive" placeholder="Trapped on_receive events will be displayed here"></textarea>
</div>
<div>
<p>Started Events</p>
<textarea id="start" placeholder="Trapped on_start events will be displayed here"></textarea>
</div>
</div>
<script>
document.addEventListener("DOMContentLoaded", function () {
Expand Down
43 changes: 30 additions & 13 deletions src/core/EventBus.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* POSSIBILITY OF SUCH DAMAGE.
*/
import FactoryMaker from './FactoryMaker';
import {EVENT_MODE_ON_RECEIVE} from '../streaming/MediaPlayerEvents';

const EVENT_PRIORITY_LOW = 0;
const EVENT_PRIORITY_HIGH = 5000;
Expand All @@ -37,7 +38,7 @@ function EventBus() {

let handlers = {};

function on(type, listener, scope, priority = EVENT_PRIORITY_LOW) {
function on(type, listener, scope, options = {}) {

if (!type) {
throw new Error('event type cannot be null or undefined');
Expand All @@ -46,14 +47,16 @@ function EventBus() {
throw new Error('listener must be a function: ' + listener);
}

let priority = options.priority || EVENT_PRIORITY_LOW;

if (getHandlerIdx(type, listener, scope) >= 0) return;

handlers[type] = handlers[type] || [];

const handler = {
callback: listener,
scope: scope,
priority: priority
scope,
priority
};

if (scope && scope.getStreamId) {
Expand All @@ -62,9 +65,12 @@ function EventBus() {
if (scope && scope.getType) {
handler.mediaType = scope.getType();
}
if (options && options.mode) {
handler.mode = options.mode;
}

const inserted = handlers[type].some((item , idx) => {
if (item && priority > item.priority ) {
const inserted = handlers[type].some((item, idx) => {
if (item && priority > item.priority) {
handlers[type].splice(idx, 0, handler);
return true;
}
Expand Down Expand Up @@ -98,13 +104,24 @@ function EventBus() {
payload.mediaType = filters.mediaType;
}

handlers[type] = handlers[type].filter((item) => item);
handlers[type].forEach(handler => {
if (!handler) return;
if (filters.streamId && handler.streamId && handler.streamId !== filters.streamId) return;
if (filters.mediaType && handler.mediaType && handler.mediaType !== filters.mediaType) return;
handler.callback.call(handler.scope, payload);
});
handlers[type]
.filter((handler) => {
if (!handler) {
return false;
}
if (filters.streamId && handler.streamId && handler.streamId !== filters.streamId) {
return false;
}
if (filters.mediaType && handler.mediaType && handler.mediaType !== filters.mediaType) {
return false;
}
// This is used for dispatching DASH events. By default we use the onStart mode. Consequently we filter everything that has a non matching mode and the onReceive events for handlers that did not specify a mode.
if ((filters.mode && handler.mode && handler.mode !== filters.mode) || (!handler.mode && filters.mode && filters.mode === EVENT_MODE_ON_RECEIVE)) {
return false;
}
return true;
})
.forEach(handler => handler && handler.callback.call(handler.scope, payload));
}

function getHandlerIdx(type, listener, scope) {
Expand All @@ -113,7 +130,7 @@ function EventBus() {

if (!handlers[type]) return idx;

handlers[type].some( (item, index) => {
handlers[type].some((item, index) => {
if (item && item.callback === listener && (!scope || scope === item.scope)) {
idx = index;
return true;
Expand Down
8 changes: 4 additions & 4 deletions src/mss/MssHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ function MssHandler(config) {
}

function registerEvents() {
eventBus.on(events.INIT_FRAGMENT_NEEDED, onInitFragmentNeeded, instance, dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH); /* jshint ignore:line */
eventBus.on(events.PLAYBACK_PAUSED, onPlaybackPaused, instance, dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH); /* jshint ignore:line */
eventBus.on(events.PLAYBACK_SEEK_ASKED, onPlaybackSeekAsked, instance, dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH); /* jshint ignore:line */
eventBus.on(events.FRAGMENT_LOADING_COMPLETED, onSegmentMediaLoaded, instance, dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH); /* jshint ignore:line */
eventBus.on(events.INIT_FRAGMENT_NEEDED, onInitFragmentNeeded, instance, { priority: dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH }); /* jshint ignore:line */
eventBus.on(events.PLAYBACK_PAUSED, onPlaybackPaused, instance, { priority: dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH }); /* jshint ignore:line */
eventBus.on(events.PLAYBACK_SEEK_ASKED, onPlaybackSeekAsked, instance, { priority: dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH }); /* jshint ignore:line */
eventBus.on(events.FRAGMENT_LOADING_COMPLETED, onSegmentMediaLoaded, instance, { priority: dashjs.FactoryMaker.getSingletonFactoryByName(eventBus.getClassName()).EVENT_PRIORITY_HIGH }); /* jshint ignore:line */
eventBus.on(events.TTML_TO_PARSE, onTTMLPreProcess, instance);
}

Expand Down
5 changes: 3 additions & 2 deletions src/streaming/MediaPlayer.js
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,12 @@ function MediaPlayer() {
* @param {string} type - {@link MediaPlayerEvents}
* @param {Function} listener - callback method when the event fires.
* @param {Object} scope - context of the listener so it can be removed properly.
* @param {Object} options - object to define various options such as priority and mode
* @memberof module:MediaPlayer
* @instance
*/
function on(type, listener, scope) {
eventBus.on(type, listener, scope);
function on(type, listener, scope, options) {
eventBus.on(type, listener, scope, options);
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/streaming/MediaPlayerEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,18 @@ class MediaPlayerEvents extends EventsBase {
* @event MediaPlayerEvents#GAP_CAUSED_SEEK_TO_PERIOD_END
*/
this.GAP_CAUSED_SEEK_TO_PERIOD_END = 'gapCausedSeekToPeriodEnd';

/**
* Dash events are triggered at their respective start points on the timeline.
* @event MediaPlayerEvents#EVENT_MODE_ON_START
*/
this.EVENT_MODE_ON_START = 'eventModeOnStart';

/**
* Dash events are triggered as soon as they were parsed.
* @event MediaPlayerEvents#EVENT_MODE_ON_RECEIVE
*/
this.EVENT_MODE_ON_RECEIVE = 'eventModeOnReceive';
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/streaming/StreamProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function StreamProcessor(config) {
logger = Debug(context).getInstance().getLogger(instance);
resetInitialSettings();

eventBus.on(Events.DATA_UPDATE_COMPLETED, onDataUpdateCompleted, instance, EventBus.EVENT_PRIORITY_HIGH); // High priority to be notified before Stream
eventBus.on(Events.DATA_UPDATE_COMPLETED, onDataUpdateCompleted, instance, { priority: EventBus.EVENT_PRIORITY_HIGH }); // High priority to be notified before Stream
eventBus.on(Events.QUALITY_CHANGE_REQUESTED, onQualityChanged, instance);
eventBus.on(Events.INIT_FRAGMENT_NEEDED, onInitFragmentNeeded, instance);
eventBus.on(Events.MEDIA_FRAGMENT_NEEDED, onMediaFragmentNeeded, instance);
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/controllers/BufferController.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ function BufferController(config) {
eventBus.on(Events.PLAYBACK_SEEKED, onPlaybackSeeked, this);
eventBus.on(Events.PLAYBACK_STALLED, onPlaybackStalled, this);
eventBus.on(Events.WALLCLOCK_TIME_UPDATED, onWallclockTimeUpdated, this);
eventBus.on(Events.CURRENT_TRACK_CHANGED, onCurrentTrackChanged, this, EventBus.EVENT_PRIORITY_HIGH);
eventBus.on(Events.CURRENT_TRACK_CHANGED, onCurrentTrackChanged, this, { priority: EventBus.EVENT_PRIORITY_HIGH });
eventBus.on(Events.SOURCEBUFFER_REMOVE_COMPLETED, onRemoved, this);
}

Expand Down
27 changes: 20 additions & 7 deletions src/streaming/controllers/EventController.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import Debug from '../../core/Debug';
import EventBus from '../../core/EventBus';
import Events from '../../core/events/Events';
import XHRLoader from '../net/XHRLoader';
import {EVENT_MODE_ON_START, EVENT_MODE_ON_RECEIVE} from '../MediaPlayerEvents';

function EventController() {

Expand Down Expand Up @@ -121,6 +122,7 @@ function EventController() {
let event = values[i];
inlineEvents[event.id] = event;
logger.debug('Add inline event with id ' + event.id);
_startEvent(event.id, event, values, EVENT_MODE_ON_RECEIVE);
}
}
logger.debug(`Added ${values.length} inline events`);
Expand All @@ -145,6 +147,7 @@ function EventController() {
}
inbandEvents[event.id] = event;
logger.debug('Add inband event with id ' + event.id);
_startEvent(event.id, event, values, EVENT_MODE_ON_RECEIVE);
} else {
logger.debug('Repeated event with id ' + event.id);
}
Expand Down Expand Up @@ -172,6 +175,8 @@ function EventController() {
validUntil: validUntil,
newDuration: newDuration,
newManifestValidAfter: NaN //event.message_data - this is an arraybuffer with a timestring in it, but not used yet
}, {
mode: EVENT_MODE_ON_START
});
}
} catch (e) {
Expand Down Expand Up @@ -207,6 +212,7 @@ function EventController() {
function _onEventTimer() {
try {
if (!eventHandlingInProgress) {
eventHandlingInProgress = true;
const currentVideoTime = playbackController.getTime();
let presentationTimeThreshold = (currentVideoTime - lastEventTimerCall);

Expand All @@ -218,8 +224,8 @@ function EventController() {
_removeEvents();

lastEventTimerCall = currentVideoTime;
eventHandlingInProgress = false;
}
eventHandlingInProgress = false;
} catch (e) {
eventHandlingInProgress = false;
}
Expand Down Expand Up @@ -248,7 +254,7 @@ function EventController() {
const calculatedPresentationTimeInSeconds = event.calculatedPresentationTime / event.eventStream.timescale;

if (calculatedPresentationTimeInSeconds <= currentVideoTime && calculatedPresentationTimeInSeconds + presentationTimeThreshold >= currentVideoTime) {
_startEvent(eventId, event, events);
_startEvent(eventId, event, events, EVENT_MODE_ON_START);
} else if (_eventHasExpired(currentVideoTime, presentationTimeThreshold, calculatedPresentationTimeInSeconds) || _eventIsInvalid(event)) {
logger.debug(`Deleting event ${eventId} as it is expired or invalid`);
delete events[eventId];
Expand Down Expand Up @@ -299,36 +305,43 @@ function EventController() {
const calculatedPresentationTimeInSeconds = event.calculatedPresentationTime / event.eventStream.timescale;

if (Math.abs(calculatedPresentationTimeInSeconds - currentTime) < REMAINING_EVENTS_THRESHOLD) {
_startEvent(eventId, event, events);
_startEvent(eventId, event, events, EVENT_MODE_ON_START);
}
});
} catch (e) {

}
}

function _startEvent(eventId, event, events) {
function _startEvent(eventId, event, events, mode) {
try {
const currentVideoTime = playbackController.getTime();

if (mode === EVENT_MODE_ON_RECEIVE) {
logger.debug(`Received event ${eventId}`);
eventBus.trigger(event.eventStream.schemeIdUri, { event: event }, { mode });
return;
}

if (event.duration > 0) {
activeEvents[eventId] = event;
}

if (event.eventStream.schemeIdUri === MPD_RELOAD_SCHEME && event.eventStream.value == MPD_RELOAD_VALUE) {
if (event.eventStream.schemeIdUri === MPD_RELOAD_SCHEME && event.eventStream.value === MPD_RELOAD_VALUE) {
if (event.duration !== 0 || event.presentationTimeDelta !== 0) { //If both are set to zero, it indicates the media is over at this point. Don't reload the manifest.
logger.debug(`Starting manifest refresh event ${eventId} at ${currentVideoTime}`);
_refreshManifest();
}
} else if (event.eventStream.schemeIdUri === MPD_CALLBACK_SCHEME && event.eventStream.value == MPD_CALLBACK_VALUE) {
} else if (event.eventStream.schemeIdUri === MPD_CALLBACK_SCHEME && event.eventStream.value === MPD_CALLBACK_VALUE) {
logger.debug(`Starting callback event ${eventId} at ${currentVideoTime}`);
_sendCallbackRequest(event.messageData);
} else {
logger.debug(`Starting event ${eventId} at ${currentVideoTime}`);
eventBus.trigger(event.eventStream.schemeIdUri, { event: event });
eventBus.trigger(event.eventStream.schemeIdUri, { event: event }, { mode });
}

delete events[eventId];

} catch (e) {
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/controllers/PlaybackController.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ function PlaybackController() {
eventBus.on(Events.BUFFER_LEVEL_STATE_CHANGED, onBufferLevelStateChanged, this);
eventBus.on(Events.PLAYBACK_PROGRESS, onPlaybackProgression, this);
eventBus.on(Events.PLAYBACK_TIME_UPDATED, onPlaybackProgression, this);
eventBus.on(Events.PLAYBACK_ENDED, onPlaybackEnded, this, EventBus.EVENT_PRIORITY_HIGH);
eventBus.on(Events.PLAYBACK_ENDED, onPlaybackEnded, this, { priority: EventBus.EVENT_PRIORITY_HIGH });
eventBus.on(Events.STREAM_INITIALIZING, onStreamInitializing, this);

if (playOnceInitialized) {
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/controllers/StreamController.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ function StreamController() {
eventBus.on(Events.PLAYBACK_ERROR, onPlaybackError, instance);
eventBus.on(Events.PLAYBACK_STARTED, onPlaybackStarted, instance);
eventBus.on(Events.PLAYBACK_PAUSED, onPlaybackPaused, instance);
eventBus.on(Events.PLAYBACK_ENDED, onEnded, instance, EventBus.EVENT_PRIORITY_HIGH);
eventBus.on(Events.PLAYBACK_ENDED, onEnded, instance, { priority: EventBus.EVENT_PRIORITY_HIGH });
eventBus.on(Events.MANIFEST_UPDATED, onManifestUpdated, instance);
eventBus.on(Events.STREAM_BUFFERING_COMPLETED, onStreamBufferingCompleted, instance);
eventBus.on(Events.MANIFEST_VALIDITY_CHANGED, onManifestValidityChanged, instance);
Expand Down
Loading

0 comments on commit a6006a6

Please sign in to comment.