diff --git a/pydevices/HtsDevices/acq2106_423st.py b/pydevices/HtsDevices/acq2106_423st.py index 1359385ddd..4d05ab03b1 100755 --- a/pydevices/HtsDevices/acq2106_423st.py +++ b/pydevices/HtsDevices/acq2106_423st.py @@ -222,6 +222,10 @@ def lcma(arr): except Empty: continue + if self.dev.trig_time.getDataNoRaise() is None: + self.dev.trig_time.record = self.device_thread.trig_time - \ + ((self.device_thread.io_buffer_size / np.int16(0).nbytes) * dt) + buffer = np.frombuffer(buf, dtype='int16') i = 0 for c in self.chans: @@ -239,8 +243,6 @@ def lcma(arr): self.empty_buffers.put(buf) - self.dev.trig_time.record = self.device_thread.trig_time - \ - ((self.device_thread.io_buffer_size / np.int16(0).nbytes) * dt) self.device_thread.stop() class DeviceWorker(threading.Thread): diff --git a/pydevices/HtsDevices/acq2106_435st.py b/pydevices/HtsDevices/acq2106_435st.py index 7ef382f63f..f8dfd16988 100755 --- a/pydevices/HtsDevices/acq2106_435st.py +++ b/pydevices/HtsDevices/acq2106_435st.py @@ -301,9 +301,7 @@ def lcma(arr): MDSplus.Event.setevent(event_name) self.empty_buffers.put(buf) - - self.dev.trig_time.record = self.device_thread.trig_time - \ - ((self.device_thread.io_buffer_size / np.int32(0).nbytes) * dt) + self.device_thread.stop() class DeviceWorker(threading.Thread): diff --git a/tdi/python/influxSignal.py b/tdi/python/influxSignal.py index 71c802e9b6..e90f54ce19 100644 --- a/tdi/python/influxSignal.py +++ b/tdi/python/influxSignal.py @@ -23,8 +23,11 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # +import re import MDSplus +from os import environ + try: from influxdb import InfluxDBClient except: @@ -32,9 +35,13 @@ "You must install the `influxdb` python package.") exit(1) -def influxSignal(fieldKey, where, series=None, database=None, config=None, shotStartTime=None, shotEndTime=None): +def influxSignal(fieldKey, aggregation, where, series=None, database=None, config=None, shotStartTime=None, shotEndTime=None): tree = MDSplus.Tree() + debug = False + if "DEBUG_DEVICES" in environ: + debug = True + if series is None: series = tree.getNode('\INFLUX_SERIES') @@ -49,23 +56,26 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS if shotEndTime is None: shotEndTime = tree.getNode('\INFLUX_END_TIME') + + if not isinstance(config, str): + config = config.data() """Instantiate a connection to the InfluxDB.""" username = '' password = '' try: - with open(config.data()) as file: + with open(config) as file: lines = file.readlines() if len(lines) < 2: - print("Failed to read influx config from file %s" %(config.data(),)) + print("Failed to read influx config from file %s" %(config,)) host = lines[0].strip('\n') username = lines[1].strip('\n') password = lines[2].strip('\n') except IOError as e: - print("Failed to open credentials file %s" %(config.data(),)) + print("Failed to open credentials file %s" %(config,)) port = 8086 if ':' in host: @@ -73,36 +83,59 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS host = parts[0] port = int(parts[1]) - database = database.data() - series = series.data() - fieldKey = fieldKey.data() - shotStartTime = shotStartTime.data() - shotEndTime = shotEndTime.getDataNoRaise() + if not isinstance(database, str): + database = database.data() + + if not isinstance(series, str): + series = series.data() + + if not isinstance(fieldKey, str): + fieldKey = fieldKey.data() + + if not isinstance(aggregation, str): + aggregation = aggregation.data() + + if isinstance(shotStartTime, str): + shotStartTime = int(shotStartTime) + elif not isinstance(shotStartTime, int): + shotStartTime = shotStartTime.data() + + if isinstance(shotEndTime, str): + shotEndTime = int(shotEndTime) + elif not isinstance(shotEndTime, int): + shotEndTime = shotEndTime.getDataNoRaise() + + if not isinstance(where, str): + where = where.data() if where == '': return - whereList = [where.data()] + whereList = [where] timeContext = MDSplus.Tree.getTimeContext() startTime = shotStartTime endTime = shotEndTime + deltaTime = None #print('Getting time context from the tree: %s '%(timeContext,)) # The time context is in seconds relative to the start of the shot. if timeContext[0] is not None: - startTime = shotStartTime + (int(timeContext[0]) * 1000) + startTime = shotStartTime + (float(timeContext[0]) * 1000) + if timeContext[1] is not None: - endTime = shotStartTime + (int(timeContext[1]) * 1000) + endTime = shotStartTime + (float(timeContext[1]) * 1000) - # TODO: timeContext[2] is the interval which influx supports, therefore we should support it too. + if timeContext[2] is not None: + deltaTime = float(timeContext[2]) # Clamp the computed start/end time within the time bounds of the shot if startTime < shotStartTime: startTime = shotStartTime - if endTime is not None and endTime > shotEndTime: + + if endTime is not None and shotEndTime is not None and endTime > shotEndTime: endTime = shotEndTime startTimeQuery = '' @@ -125,10 +158,26 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS if len(whereList) > 0: where = 'WHERE %s' % (' AND '.join(whereList),) + groupBy = '' + if deltaTime is not None: + # Convert deltaTime from seconds to milliseconds, if deltaTime is less than 1ms this will break + deltaTimeMS = int(deltaTime * 1000) + groupBy = 'GROUP BY time(%sms)' % (deltaTimeMS,) # The trailing 'ms' is for milliseconds + # If the fieldKey does not have an aggregation function, Influx will error out + # Attempt to wrap the field key with an aggregation function, but don't affect + # any equations stored in it + # For example: + # 10*fVal will become 10*MEAN(fVal) + fieldKey = re.sub(r'([a-zA-Z]*)', '%s(\\1)' % (aggregation,), fieldKey) + if debug: + print(aggregation, fieldKey) + + """Instantiate a connection to the InfluxDB.""" client = InfluxDBClient(host, port, username, password, database) - query = 'SELECT "%s" AS value FROM "%s" %s;' % (fieldKey, series, where) - #print('Query: %s' % query) + query = 'SELECT %s AS value FROM "%s" %s %s;' % (fieldKey, series, where, groupBy) + if debug: + print(query) result = client.query(query, params={'epoch': 'ms'}) @@ -137,14 +186,23 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS valueData = [None] * len(data) timeData = [None] * len(data) - i = 0 + count = 0 for row in data: - valueData[i] = float(row['value']) - timeData[i] = (row['time'] - shotStartTime) / 1000 - i += 1 + if debug: + print(row) + + # If there are no data points within the time specified by deltaTime, influx + # returns a None for the value + if row['value'] is None: + continue + + valueData[count] = float(row['value']) + timeData[count] = (row['time'] - shotStartTime) / 1000 + count += 1 - values = MDSplus.Float32Array(valueData) - times = MDSplus.Float32Array(timeData) + # If any data points were skipped, shrink the arrays to the actual number of points + values = MDSplus.Float32Array(valueData[:count]) + times = MDSplus.Float32Array(timeData[:count]) return MDSplus.Signal(values, None, times)