6
6
from cf_units import Unit
7
7
from netCDF4 import num2date , Dataset
8
8
import datetime
9
- from ioos_qc .qartod import aggregate
9
+ from ioos_qc .stores import PandasStore
10
10
from ioos_qc .streams import PandasStream
11
11
from ioos_qc .results import collect_results , CollectedResult
12
12
from ioos_qc .config import Config
@@ -430,35 +430,29 @@ def apply_qc(self, df, varname, configset):
430
430
431
431
# Step 3: Run the QC tests
432
432
try :
433
- runner = list ( qc_x .run (c_x ) )
433
+ results = qc_x .run (c_x )
434
434
except Exception as e :
435
435
log .error (f"Error running QC tests on { varname } : { e } " )
436
436
return []
437
437
438
- # Step 4: Collect the results from the QC run
438
+ # Step 4: Store the results in another DataFrame
439
439
try :
440
- results = collect_results ( runner , how = 'list' )
440
+ store = PandasStore ( results )
441
441
except Exception as e :
442
442
log .error (f"Error collecting QC results for { varname } : { e } " )
443
443
return []
444
444
445
- # Step 5: Add the qc_rollup results
445
+ # Step 5: Compute any aggregations
446
446
try :
447
- agg = CollectedResult (
448
- stream_id = varname ,
449
- package = 'qartod' ,
450
- test = 'qc_rollup' ,
451
- function = aggregate ,
452
- results = aggregate (results ),
453
- tinp = qc_x .time (),
454
- data = qc_x .data (varname )
455
- )
456
- results .append (agg )
447
+ store .compute_aggregate (name = 'rollup_qc' ) # Appends to the results internally
457
448
except Exception as e :
458
- log .error (f"Error adding qc_rollup for { varname } : { e } " )
449
+ log .error (f"Error computing any aggregations for { varname } : { e } " )
459
450
return []
460
451
461
- return results
452
+ # Step 6: Write only the test results to the store
453
+ results_store = store .save (write_data = False , write_axes = False )
454
+
455
+ return results_store
462
456
463
457
def check_geophysical_variables (self , var_name ):
464
458
'''
@@ -647,16 +641,17 @@ def check_time(self, tnp, nc_path):
647
641
return ' ' .join (report_list )
648
642
649
643
# the main function
650
- def run_qc (config , ncfile , nc_path ):
644
+ def run_qc (config , ncfile , ncfile_path ):
651
645
'''
652
646
Runs IOOS QARTOD tests on a netCDF file
653
647
654
648
:param config: string defining path to the configuration file
655
- :param nc_path : string defining path to the netCDF file
649
+ :param ncfile_path : string defining path to the netCDF file
656
650
:param ncfile: netCDF4._netCDF4.Dataset
657
651
'''
658
652
report_list = []
659
653
xyz = GliderQC (ncfile , config )
654
+ deployment_name = ncfile_path .split ('/' )[- 2 ]
660
655
file_name = ncfile_path .split ('/' )[- 1 ]
661
656
662
657
times = ncfile .variables ['time' ]
@@ -672,7 +667,7 @@ def run_qc(config, ncfile, nc_path):
672
667
# log time array issues
673
668
report = ' ' .join (report_list ).strip ()
674
669
if len (report .strip ()) != 0 :
675
- ncfile .dac_qc_comment = file_name + ': ' + report
670
+ ncfile .dac_qc_comment = str ( deployment_name ) + ' (' + str ( file_name ) + ': ' + report + ')'
676
671
else :
677
672
log .info (" Running IOOS QARTOD tests on %s" , file_name )
678
673
@@ -686,7 +681,7 @@ def run_qc(config, ncfile, nc_path):
686
681
report_list .append (f"{ location_err } : { str (e )} " )
687
682
688
683
# Find geophysical variables
689
- legacy_variables , note = xyz .find_geophysical_variables () #ncfile
684
+ legacy_variables , note = xyz .find_geophysical_variables ()
690
685
if not legacy_variables :
691
686
log .info ("No variables found." )
692
687
report_list .append ("No variables found." )
@@ -706,7 +701,7 @@ def run_qc(config, ncfile, nc_path):
706
701
707
702
# Check the Data Array
708
703
if xyz .check_geophysical_variables (var_name ): #cfile,
709
- report_list .append (var_name + ': ' + xyz .check_geophysical_variables (var_name ))
704
+ report_list .append (xyz .check_geophysical_variables (var_name ))
710
705
continue
711
706
712
707
# Check the mapping of standard names with units
@@ -739,31 +734,23 @@ def run_qc(config, ncfile, nc_path):
739
734
results = xyz .apply_qc (df ,var_name , config_set )
740
735
log .info ("Generated QC test results for %s" , var_name )
741
736
742
- for testname in ['gross_range_test' , 'spike_test' , 'rate_of_change_test' , 'flat_line_test' ,
743
- 'qc_rollup' ]:
744
- try :
745
- qc_test = next (r for r in results if r .stream_id == var_name and r .test == testname )
746
- except Exception as e :
747
- test_err = "Unable to read qc test results"
748
- log .exception (f"{ test_err } : { str (e )} " )
749
- report_list .append (f"{ test_err } : { str (e )} " )
750
- continue
737
+ for testname in results .columns :
751
738
752
739
# create the qartod variable name and get the config specs
753
- if testname == 'qc_rollup ' :
740
+ if testname == 'qartod_rollup_qc ' :
754
741
qartodname = 'qartod_' + var_name + '_primary_flag'
755
742
# Pass the config specs to a variable
756
743
testconfig = config_set ['contexts' ][0 ]['streams' ][var_name ]['qartod' ]
757
744
else :
758
- qartodname = 'qartod_' + var_name + '_' + testname .split ('_test' )[0 ]+ '_flag'
745
+ qartodname = 'qartod_' + var_name + '_' + testname .split ('qartod_' )[ - 1 ]. split ( ' _test' )[0 ]+ '_flag'
759
746
# Pass the config specs to a variable
760
- testconfig = config_set ['contexts' ][0 ]['streams' ][var_name ]['qartod' ][testname ]
747
+ testconfig = config_set ['contexts' ][0 ]['streams' ][var_name ]['qartod' ][testname . split ( 'qartod_' )[ - 1 ] ]
761
748
762
749
# Update the qartod variable
763
750
log .info ("Updating %s" , qartodname )
764
751
qartod_var = ncfile .variables [qartodname ]
765
- qartod_var [:] = np .array (qc_test . results )
766
- qartod_var .qartod_test = f"{ testname } "
752
+ qartod_var [:] = np .array (results [ testname ]. values )
753
+ qartod_var .qartod_test = f"{ testname . split ( 'qartod_' )[ - 1 ] } "
767
754
768
755
# Set the dictionary as a string attribute to the variable
769
756
qartod_var .setncattr ('qartod_config' , json .dumps (testconfig ))
@@ -772,10 +759,10 @@ def run_qc(config, ncfile, nc_path):
772
759
apply_qc_err = "apply_qc failed: could not calculate QC flags."
773
760
log .exception (f"{ apply_qc_err } : " )
774
761
report_list .append (f"{ apply_qc_err } : { str (e )} " )
775
- continu
762
+ continue
776
763
# log issues qc
777
764
report = ' ' .join (report_list ).strip ()
778
- ncfile .dac_qc_comment = ' (' + file_name + ': ' + str (report ) + ') '
765
+ ncfile .dac_qc_comment = str ( deployment_name ) + ' (' + str ( file_name ) + ': ' + str (report ) + ')'
779
766
780
767
def qc_task (nc_path , config ):
781
768
'''
@@ -856,6 +843,17 @@ def check_needs_qc(nc_path):
856
843
# if this section was reached, QC has been run, but xattr remains unset
857
844
try :
858
845
os .setxattr (nc_path , "user.qc_run" , b"true" )
846
+
847
+ # TODO: Set time as the extended file attribute
848
+ # Get the current date-time in ISO format
849
+ #iso_date = datetime.datetime.utcnow().isoformat()
850
+
851
+ # Convert it to bytes
852
+ #iso_date_bytes = iso_date.encode("utf-8")
853
+
854
+ # Set the extended attribute
855
+ #os.setxattr(nc_path, "user.qc_run", iso_date_bytes)
856
+
859
857
except OSError :
860
858
log .exception (f"Exception occurred trying to set xattr on already QCed file at { nc_path } :" )
861
859
return False
0 commit comments