Skip to content

Commit

Permalink
Merge pull request #4 from CDRV/dev
Browse files Browse the repository at this point in the history
Main merge for 1.0.3 release
  • Loading branch information
SBriere authored Sep 22, 2022
2 parents 36842ca + 8251d93 commit 8e312f8
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 32 deletions.
2 changes: 1 addition & 1 deletion Globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

config_man = ConfigManager()

version_string = '1.0.1'
version_string = '1.0.3'
13 changes: 7 additions & 6 deletions config/PiHub.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"General": {
"enable_logging": false,
"enable_logging": true,
"enable_bed_server": false,
"enable_watch_server": false,
"enable_folderWatcher_server": true,
"enable_watch_server": true,
"enable_folderWatcher_server": false,
"logs_path": "logs"
},
"WatchServer": {
Expand All @@ -13,7 +13,8 @@
"sftp_transfer": true,
"opentera_transfer": false,
"server_base_folder": "Watch",
"send_logs_only": false
"send_logs_only": false,
"minimal_dataset_duration": 10
},
"BedServer": {
"hostname": "0.0.0.0",
Expand All @@ -29,10 +30,10 @@
"sensor_ID": "Sensor_0"
},
"SFTP": {
"hostname": "telesante.cdrv.ca",
"hostname": "",
"port": 40091,
"username": "dev",
"password": "tr3cr100"
"password": ""
},
"OpenTera": {
"hostname": "localhost",
Expand Down
79 changes: 55 additions & 24 deletions libs/servers/WatchServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ def __init__(self, server_config: dict, sftp_config: dict):
self.sftp_transfer = server_config['sftp_transfer']
self.opentera_transfer = server_config['opentera_transfer']
self.send_logs_only = server_config['send_logs_only']
self.minimal_dataset_duration = server_config['minimal_dataset_duration']

self.synching_files = False

# Set file synching after a few seconds without receiving any data
self.file_syncher_timer = threading.Timer(20, self.sync_files)

def run(self):
logging.info('Apple Watch Server starting...')

Expand Down Expand Up @@ -59,7 +63,7 @@ def sync_files(self, check_internet: bool = True):
logging.info("WatchServer: Synchronizing files with server...")

if self.synching_files:
logging_info("*** WatchServer: Already synching files. Will wait for next time.")
logging.info("*** WatchServer: Already synching files. Will wait for next time.")
return

self.synching_files = True
Expand All @@ -70,13 +74,39 @@ def sync_files(self, check_internet: bool = True):
file_folders = []
for (dp, dn, f) in os.walk(base_folder):
if f:
dp = dp.replace('/', os.sep)
if self.send_logs_only:
# Filter list of files to keep only log files
folder_files = [file for file in f if file.lower().endswith("txt") or file.lower().endswith("oimi")]
else:
if self.minimal_dataset_duration > 0:
# Filter dataset that are too small (<10 seconds)
if 'watch_logs.txt' in f:
import csv
try:
with open(os.path.join(dp, 'watch_logs.txt'), newline='') as csvfile:
log_reader = csv.reader(csvfile, delimiter='\t')
first_timestamp = None
for row in log_reader:
if len(row) == 0:
continue
if not first_timestamp:
first_timestamp = row[0]
last_timestamp = row[0]
duration = float(last_timestamp) - float(first_timestamp)
if duration <= self.minimal_dataset_duration:
# Must reject! Too short!
self.move_files([os.path.join(dp, file) for file in f], 'Rejected')
logging.info('Rejected folder ' + dp + ': dataset too small.')
continue # Move to next folder
except IOError:
pass # Ignore error and move on!
except AssertionError:
pass

folder_files = f
files.extend(folder_files)
full_files.extend([os.path.join(dp.replace('/', os.sep), file) for file in folder_files])
full_files.extend([os.path.join(dp, file) for file in folder_files])
file_folder = dp.replace(base_folder, '')
file_folders.extend("/" + self.server_base_folder + "/" + file_folder.replace(os.sep, '/')
for _ in folder_files)
Expand All @@ -95,8 +125,9 @@ def sync_files(self, check_internet: bool = True):
file_transferred_callback=self.file_was_processed,
check_internet=check_internet)

# Set files as processed
self.move_processed_files()
# Set files as processed
if success:
self.move_processed_files()
# for file in full_files:
# WatchServer.file_was_processed(file)
else:
Expand All @@ -110,26 +141,29 @@ def file_was_processed(self, full_filepath: str):
# Mark file as processed - will be moved later on to prevent conflicts
self.processed_files.append(full_filepath)

def move_processed_files(self):
for full_filepath in self.processed_files:
# Move file to the "Processed" folder
target_file = full_filepath.replace(os.sep + 'ToProcess' + os.sep, os.sep + 'Processed' + os.sep)
def move_files(self, source_files, target_folder):
for full_filepath in source_files:
# Move file from "ToProcess" to the target folder
target_file = full_filepath.replace(os.sep + 'ToProcess' + os.sep, os.sep + target_folder + os.sep)

# Create directory, if needed
target_dir = os.path.dirname(target_file)
try:
os.makedirs(name=target_dir, exist_ok=True)
except OSError as exc:
logging.error('Error creating ' + target_dir + ': ' + exc.strerror)
raise
continue
# raise

try:
os.replace(full_filepath, target_file)
except (OSError, IOError) as exc:
logging.error('Error moving ' + full_filepath + ' to ' + target_file + ': ' + exc.strerror)
raise
# logging.info("Processed file: " + full_filepath)
continue
# raise

def move_processed_files(self):
self.move_files(self.processed_files, 'Processed')
self.processed_files.clear()

@staticmethod
Expand Down Expand Up @@ -166,7 +200,7 @@ def do_GET(self):
self.send_response(202)
self.send_header('Content-type', 'cdrv-cmd/Disconnect')
self.end_headers()
self.base_server.sync_files()
# self.base_server.sync_files()
return

self.send_response(200)
Expand Down Expand Up @@ -196,6 +230,11 @@ def do_POST(self):
self.end_headers()
return

# Stop timer to send data, since we received new data
if self.base_server.file_syncher_timer.is_alive():
self.base_server.file_syncher_timer.cancel()
self.base_server.file_syncher_timer = None

destination_dir = (self.base_server.data_path + '/ToProcess/' + device_name + '/' + file_path + '/')\
.replace('//', '/').replace('/', os.sep)
destination_path = destination_dir + file_name
Expand Down Expand Up @@ -281,19 +320,11 @@ def do_POST(self):
# All is good!
logging.info(device_name + " - " + file_name + ": transfer complete.")

# # Need to transfer using SFTP?
# if self.base_server.sftp_transfer:
#
# # Check if we need to transfer only log files and if it's a log file
# if not self.base_server.send_logs_only or \
# (self.base_server.send_logs_only and file_type.lower() in ['txt', 'oimi']):
# file_name = Path(file_name).absolute() # Get full path
# file_server_location = "/" + self.base_server.server_base_folder + "/" + device_name + "/" + file_path
# sftp = threading.Thread(target=SFTPUploader.sftp_send, args=(self.base_server.sftp_config,
# file_server_location, file_name))
# sftp.start()

self.send_response(200)
self.send_header('Content-type', 'file-transfer/ack')
self.end_headers()

# Start timer to sync data, if no other transfer occurs until timeout
self.base_server.file_syncher_timer = threading.Timer(20, self.base_server.sync_files)
self.base_server.file_syncher_timer.start()

17 changes: 17 additions & 0 deletions libs/uploaders/SFTPUploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ def sftp_send(sftp_config: dict, files_directory_on_server: [str], files_to_tran
if not (s.isdir(file_server_location)):
s.mkdir(file_server_location)
with s.cd(file_server_location):
file_name = os.path.basename(file_to_transfer)
# Query file size from server and compare to local file size
try:
remote_attr = s.stat(remotepath=file_server_location + '/' + file_name)
remote_size = remote_attr.st_size
local_attr = os.stat(file_to_transfer)
local_size = local_attr.st_size
if local_size == remote_size:
# Same size on server as local file, skip!
logging.info('Skipping ' + file_to_transfer + ': already present on server.')
if file_transferred_callback:
file_transferred_callback(file_to_transfer)
continue
except IOError:
# File not on server = ok, continue!
pass
# TODO: Use pysftp.Connection.stat() to find file size and only send if is different
s.put(localpath=file_to_transfer, preserve_mtime=True,
callback=lambda current, total:
SFTPUploader.file_upload_progress(current, total, file_to_transfer,
Expand Down
3 changes: 2 additions & 1 deletion piHub.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def handle_exception(exc_type, exc_value, exc_traceback):
folderWatcher_server.start()
servers.append(folderWatcher_server)

logging.info("PiHub started.")
logging.info("PiHub " + version_string + " started.")

try:
# Main loop on main thread
while True:
Expand Down

0 comments on commit 8e312f8

Please sign in to comment.