Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pydevices/HtsDevices/acq2106_423st.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ def lcma(arr):
except Empty:
continue

if self.dev.trig_time.getDataNoRaise() is None:
self.dev.trig_time.record = self.device_thread.trig_time - \
((self.device_thread.io_buffer_size / np.int16(0).nbytes) * dt)

buffer = np.frombuffer(buf, dtype='int16')
i = 0
for c in self.chans:
Expand All @@ -239,8 +243,6 @@ def lcma(arr):

self.empty_buffers.put(buf)

self.dev.trig_time.record = self.device_thread.trig_time - \
((self.device_thread.io_buffer_size / np.int16(0).nbytes) * dt)
self.device_thread.stop()

class DeviceWorker(threading.Thread):
Expand Down
4 changes: 1 addition & 3 deletions pydevices/HtsDevices/acq2106_435st.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,7 @@ def lcma(arr):
MDSplus.Event.setevent(event_name)

self.empty_buffers.put(buf)

self.dev.trig_time.record = self.device_thread.trig_time - \
((self.device_thread.io_buffer_size / np.int32(0).nbytes) * dt)

self.device_thread.stop()

class DeviceWorker(threading.Thread):
Expand Down
102 changes: 80 additions & 22 deletions tdi/python/influxSignal.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#

import re
import MDSplus

from os import environ

try:
from influxdb import InfluxDBClient
except:
print(
"You must install the `influxdb` python package.")
exit(1)

def influxSignal(fieldKey, where, series=None, database=None, config=None, shotStartTime=None, shotEndTime=None):
def influxSignal(fieldKey, aggregation, where, series=None, database=None, config=None, shotStartTime=None, shotEndTime=None):
tree = MDSplus.Tree()

debug = False
if "DEBUG_DEVICES" in environ:
debug = True

if series is None:
series = tree.getNode('\INFLUX_SERIES')

Expand All @@ -49,60 +56,86 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS

if shotEndTime is None:
shotEndTime = tree.getNode('\INFLUX_END_TIME')

if not isinstance(config, str):
config = config.data()

"""Instantiate a connection to the InfluxDB."""
username = ''
password = ''
try:
with open(config.data()) as file:
with open(config) as file:
lines = file.readlines()

if len(lines) < 2:
print("Failed to read influx config from file %s" %(config.data(),))
print("Failed to read influx config from file %s" %(config,))

host = lines[0].strip('\n')
username = lines[1].strip('\n')
password = lines[2].strip('\n')

except IOError as e:
print("Failed to open credentials file %s" %(config.data(),))
print("Failed to open credentials file %s" %(config,))

port = 8086
if ':' in host:
parts = host.split(':')
host = parts[0]
port = int(parts[1])

database = database.data()
series = series.data()
fieldKey = fieldKey.data()
shotStartTime = shotStartTime.data()
shotEndTime = shotEndTime.getDataNoRaise()
if not isinstance(database, str):
database = database.data()

if not isinstance(series, str):
series = series.data()

if not isinstance(fieldKey, str):
fieldKey = fieldKey.data()

if not isinstance(aggregation, str):
aggregation = aggregation.data()

if isinstance(shotStartTime, str):
shotStartTime = int(shotStartTime)
elif not isinstance(shotStartTime, int):
shotStartTime = shotStartTime.data()

if isinstance(shotEndTime, str):
shotEndTime = int(shotEndTime)
elif not isinstance(shotEndTime, int):
shotEndTime = shotEndTime.getDataNoRaise()

if not isinstance(where, str):
where = where.data()

if where == '':
return

whereList = [where.data()]
whereList = [where]

timeContext = MDSplus.Tree.getTimeContext()

startTime = shotStartTime
endTime = shotEndTime
deltaTime = None

#print('Getting time context from the tree: %s '%(timeContext,))

# The time context is in seconds relative to the start of the shot.
if timeContext[0] is not None:
startTime = shotStartTime + (int(timeContext[0]) * 1000)
startTime = shotStartTime + (float(timeContext[0]) * 1000)

if timeContext[1] is not None:
endTime = shotStartTime + (int(timeContext[1]) * 1000)
endTime = shotStartTime + (float(timeContext[1]) * 1000)

# TODO: timeContext[2] is the interval which influx supports, therefore we should support it too.
if timeContext[2] is not None:
deltaTime = float(timeContext[2])

# Clamp the computed start/end time within the time bounds of the shot
if startTime < shotStartTime:
startTime = shotStartTime
if endTime is not None and endTime > shotEndTime:

if endTime is not None and shotEndTime is not None and endTime > shotEndTime:
endTime = shotEndTime

startTimeQuery = ''
Expand All @@ -125,10 +158,26 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS
if len(whereList) > 0:
where = 'WHERE %s' % (' AND '.join(whereList),)

groupBy = ''
if deltaTime is not None:
# Convert deltaTime from seconds to milliseconds, if deltaTime is less than 1ms this will break
deltaTimeMS = int(deltaTime * 1000)
groupBy = 'GROUP BY time(%sms)' % (deltaTimeMS,) # The trailing 'ms' is for milliseconds
# If the fieldKey does not have an aggregation function, Influx will error out
# Attempt to wrap the field key with an aggregation function, but don't affect
# any equations stored in it
# For example:
# 10*fVal will become 10*MEAN(fVal)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that you can't do math unless there's an aggregation (I believe), so we could potentially simplify this section and make it just wrap the fieldKey without the regex

fieldKey = re.sub(r'([a-zA-Z]*)', '%s(\\1)' % (aggregation,), fieldKey)
if debug:
print(aggregation, fieldKey)

"""Instantiate a connection to the InfluxDB."""
client = InfluxDBClient(host, port, username, password, database)

query = 'SELECT "%s" AS value FROM "%s" %s;' % (fieldKey, series, where)
#print('Query: %s' % query)
query = 'SELECT %s AS value FROM "%s" %s %s;' % (fieldKey, series, where, groupBy)
if debug:
print(query)

result = client.query(query, params={'epoch': 'ms'})

Expand All @@ -137,14 +186,23 @@ def influxSignal(fieldKey, where, series=None, database=None, config=None, shotS
valueData = [None] * len(data)
timeData = [None] * len(data)

i = 0
count = 0
for row in data:
valueData[i] = float(row['value'])
timeData[i] = (row['time'] - shotStartTime) / 1000
i += 1
if debug:
print(row)

# If there are no data points within the time specified by deltaTime, influx
# returns a None for the value
if row['value'] is None:
continue

valueData[count] = float(row['value'])
timeData[count] = (row['time'] - shotStartTime) / 1000
count += 1

values = MDSplus.Float32Array(valueData)
times = MDSplus.Float32Array(timeData)
# If any data points were skipped, shrink the arrays to the actual number of points
values = MDSplus.Float32Array(valueData[:count])
times = MDSplus.Float32Array(timeData[:count])

return MDSplus.Signal(values, None, times)