This repository was archived by the owner on Dec 21, 2018. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathDataflow.js
149 lines (129 loc) · 4.24 KB
/
Dataflow.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import add from './add';
import connect from './connect';
import events from './events';
import {ingest, request} from './load';
import on from './on';
import {rank, rerank} from './rank';
import {run, runAsync, runAfter, enqueue, getPulse} from './run';
import {pulse, touch, update} from './update';
import changeset from '../ChangeSet';
import Heap from '../util/Heap';
import UniqueList from '../util/UniqueList';
import {loader} from 'vega-loader';
import {id, logger, Error} from 'vega-util';
/**
* A dataflow graph for reactive processing of data streams.
* @constructor
*/
export default function Dataflow() {
this._log = logger();
this.logLevel(Error);
this._clock = 0;
this._rank = 0;
try {
this._loader = loader();
} catch (e) {
// do nothing if loader module is unavailable
}
this._touched = UniqueList(id);
this._pulses = {};
this._pulse = null;
this._heap = new Heap(function(a, b) { return a.qrank - b.qrank; });
this._postrun = [];
}
var prototype = Dataflow.prototype;
/**
* The current timestamp of this dataflow. This value reflects the
* timestamp of the previous dataflow run. The dataflow is initialized
* with a stamp value of 0. The initial run of the dataflow will have
* a timestap of 1, and so on. This value will match the
* {@link Pulse.stamp} property.
* @return {number} - The current timestamp value.
*/
prototype.stamp = function() {
return this._clock;
};
/**
* Gets or sets the loader instance to use for data file loading. A
* loader object must provide a "load" method for loading files and a
* "sanitize" method for checking URL/filename validity. Both methods
* should accept a URI and options hash as arguments, and return a Promise
* that resolves to the loaded file contents (load) or a hash containing
* sanitized URI data with the sanitized url assigned to the "href" property
* (sanitize).
* @param {object} _ - The loader instance to use.
* @return {object|Dataflow} - If no arguments are provided, returns
* the current loader instance. Otherwise returns this Dataflow instance.
*/
prototype.loader = function(_) {
if (arguments.length) {
this._loader = _;
return this;
} else {
return this._loader;
}
};
/**
* Empty entry threshold for garbage cleaning. Map data structures will
* perform cleaning once the number of empty entries exceeds this value.
*/
prototype.cleanThreshold = 1e4;
// OPERATOR REGISTRATION
prototype.add = add;
prototype.connect = connect;
prototype.rank = rank;
prototype.rerank = rerank;
// OPERATOR UPDATES
prototype.pulse = pulse;
prototype.touch = touch;
prototype.update = update;
prototype.changeset = changeset;
// DATA LOADING
prototype.ingest = ingest;
prototype.request = request;
// EVENT HANDLING
prototype.events = events;
prototype.on = on;
// PULSE PROPAGATION
prototype.run = run;
prototype.runAsync = runAsync;
prototype.runAfter = runAfter;
prototype._enqueue = enqueue;
prototype._getPulse = getPulse;
// LOGGING AND ERROR HANDLING
function logMethod(method) {
return function() {
return this._log[method].apply(this, arguments);
};
}
/**
* Logs an error message. By default, logged messages are written to console
* output. The message will only be logged if the current log level is high
* enough to permit error messages.
*/
prototype.error = logMethod('error');
/**
* Logs a warning message. By default, logged messages are written to console
* output. The message will only be logged if the current log level is high
* enough to permit warning messages.
*/
prototype.warn = logMethod('warn');
/**
* Logs a information message. By default, logged messages are written to
* console output. The message will only be logged if the current log level is
* high enough to permit information messages.
*/
prototype.info = logMethod('info');
/**
* Logs a debug message. By default, logged messages are written to console
* output. The message will only be logged if the current log level is high
* enough to permit debug messages.
*/
prototype.debug = logMethod('debug');
/**
* Get or set the current log level. If an argument is provided, it
* will be used as the new log level.
* @param {number} [level] - Should be one of None, Warn, Info
* @return {number} - The current log level.
*/
prototype.logLevel = logMethod('level');