Skip to content

Commit

Permalink
Added check for result size at the start of a simulation (modelon-com…
Browse files Browse the repository at this point in the history
…munity#270)

* WIP: Early abort

* Fixed early estimate of result size

* Minor

* Review fixes

* Added changelog

---------

Co-authored-by: Christian Winther <[email protected]>
  • Loading branch information
chria and chria authored Oct 23, 2024
1 parent 8c86d9b commit 68bf34c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
--- CHANGELOG ---
--- Future ---
* Added prediction of the size of the result to get early feedback if it will reach the limit or not.
* Added option to limit the size of the result ("result_max_size"), default set to 2GB.
* Added method ResultDymolaBinary.get_variables_data. Included some minor refactorization.
The new method allows for retrieving partial trajectories, and multiple trajectories at once.
Expand Down
77 changes: 50 additions & 27 deletions src/common/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from pyfmi.common.diagnostics import DIAGNOSTICS_PREFIX, DiagnosticsBase

SYS_LITTLE_ENDIAN = sys.byteorder == 'little'
NCP_LARGE = 5000

class Trajectory:
"""
Expand Down Expand Up @@ -1757,6 +1758,7 @@ class ResultHandlerMemory(ResultHandler):
def __init__(self, model, delimiter=";"):
super().__init__(model)
self.supports['result_max_size'] = True
self._first_point = True

def simulation_start(self):
"""
Expand Down Expand Up @@ -1794,6 +1796,12 @@ def integration_point(self, solver = None):
"""
model = self.model

previous_size = 0
if self._first_point:
previous_size = sys.getsizeof(self.time_sol) + sys.getsizeof(self.real_sol) + \
sys.getsizeof(self.int_sol) + sys.getsizeof(self.bool_sol) + \
sys.getsizeof(self.param_sol)

#Retrieves the time-point
self.time_sol += [model.time]
self.real_sol += [model.get_real(self.real_var_ref)]
Expand All @@ -1807,14 +1815,11 @@ def integration_point(self, solver = None):
max_size = self.options.get("result_max_size", None)
if max_size is not None:
current_size = sys.getsizeof(self.time_sol) + sys.getsizeof(self.real_sol) + \
sys.getsizeof(self.int_sol) + sys.getsizeof(self.bool_sol) + \
sys.getsizeof(self.param_sol)
sys.getsizeof(self.int_sol) + sys.getsizeof(self.bool_sol) + \
sys.getsizeof(self.param_sol)

if current_size > max_size:
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size', consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, self.model.time))
verify_result_size(self._first_point, current_size, previous_size, max_size, self.options["ncp"], self.model.time)
self._first_point = False

def get_result(self):
"""
Expand All @@ -1841,6 +1846,7 @@ def __init__(self, model, delimiter=";"):
self.supports['result_max_size'] = True
self.delimiter = delimiter
self._current_file_size = 0
self._first_point = True

def _write(self, msg):
self._current_file_size = self._current_file_size+len(msg)
Expand Down Expand Up @@ -1972,6 +1978,7 @@ def integration_point(self, solver = None):
"""
model = self.model
delimiter = self.delimiter
previous_size = self._current_file_size

#Retrieves the time-point
t = model.time
Expand All @@ -1997,14 +2004,9 @@ def integration_point(self, solver = None):
self._write(cont_str[:-1]+"\n")

max_size = self.options.get("result_max_size", None)
if max_size is not None and self._current_file_size > max_size:
#Make the file consistent
self.simulation_end()
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size', consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, self.model.time))

if max_size is not None:
verify_result_size(self._first_point, self._current_file_size, previous_size, max_size, self.options["ncp"], self.model.time)
self._first_point = False

def simulation_end(self):
"""
Expand Down Expand Up @@ -2032,6 +2034,7 @@ def __init__(self, model):
super().__init__(model)
self.supports['result_max_size'] = True
self._current_file_size = 0
self._first_point = True

def initialize_complete(self):
pass
Expand Down Expand Up @@ -2407,6 +2410,7 @@ def integration_point(self, solver = None):#parameter_data=[]):
"""
data_order = self._data_order
model = self.model
previous_size = self._current_file_size

#Retrieves the time-point
r = model.get_real(self.real_var_ref)
Expand All @@ -2430,12 +2434,9 @@ def integration_point(self, solver = None):#parameter_data=[]):
self.nbr_points+=1

max_size = self.options.get("result_max_size", None)
if max_size is not None and self._current_file_size > max_size:
self.simulation_end()
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size', consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, self.model.time))
if max_size is not None:
verify_result_size(self._first_point, self._current_file_size, previous_size, max_size, self.options["ncp"], self.model.time)
self._first_point = False

def simulation_end(self):
"""
Expand Down Expand Up @@ -2552,6 +2553,8 @@ def __init__(self, model):
self.supports['dynamic_diagnostics'] = True
self.supports['result_max_size'] = True
self.data_2_header_end_position = 0
self._size_point = -1
self._first_point = True

def _data_header(self, name, nbr_rows, nbr_cols, data_type):
if data_type == "int":
Expand Down Expand Up @@ -2750,7 +2753,11 @@ def integration_point(self, solver = None):
"""
Writes the current status of the model to file.
"""
if self._size_point == -1:
pos = self._file.tell()
self.dump_data_internal.save_point()
if self._size_point == -1:
self._size_point = self._file.tell() - pos

#Increment number of points
self.nbr_points += 1
Expand All @@ -2760,7 +2767,7 @@ def integration_point(self, solver = None):

def diagnostics_point(self, diag_data):
""" Generates a data point for diagnostics data by invoking the util function save_diagnostics_point. """
self.dump_data_internal.save_diagnostics_point(diag_data)
self.dump_data_internal.save_diagnostics_point(diag_data)
self.nbr_diag_points += 1
self._make_consistent(diag=True)

Expand Down Expand Up @@ -2795,11 +2802,11 @@ def _make_consistent(self, diag=False):
f.seek(file_pos)

max_size = self.options.get("result_max_size", None)
if max_size is not None and file_pos > max_size:
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size', consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, self.model.time))
if max_size is not None:
verify_result_size(self._first_point, file_pos, file_pos-self._size_point, max_size, self.options["ncp"], self.model.time)
#We can go in here before we've stored a full result point (due to storing diagnostic points). So check that a point has been fully stored
if self._first_point and self._size_point > 0:
self._first_point = False

def simulation_end(self):
"""
Expand All @@ -2823,6 +2830,22 @@ def get_result(self):
"""
return ResultDymolaBinary(self.file_name)

def verify_result_size(first_point, current_size, previous_size, max_size, ncp, time):
if first_point:
point_size = current_size - previous_size
estimate = ncp*point_size + previous_size
if estimate > max_size:
msg = "The result is estimated to exceed the allowed maximum size (limit: %g GB, estimate: %g GB). "%(max_size/1024**3, estimate/1024**3)
if ncp > NCP_LARGE:
msg = msg + "The number of result points is large (%d), consider reducing the number of points. "%ncp
raise ResultSizeError(msg + "To change the maximum allowed result file size, please use the option 'result_max_size'")

if current_size > max_size:
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size' or consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, time))

def get_result_handler(model, opts):
result_handler = None

Expand Down
87 changes: 68 additions & 19 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pyfmi.fmi import FMUException, FMUModelME2
from pyfmi.common.io import (ResultHandler, ResultDymolaTextual, ResultDymolaBinary, JIOError, ResultSizeError,
ResultHandlerCSV, ResultCSVTextual, ResultHandlerBinaryFile, ResultHandlerFile)
from pyfmi.common.io import get_result_handler
from pyfmi.common.diagnostics import DIAGNOSTICS_PREFIX, setup_diagnostics_variables

import pyfmi.fmi as fmi
Expand Down Expand Up @@ -1931,51 +1932,87 @@ def _test_result_exception(self, result_type, result_file_name="", fmi_type="me"
with nose.tools.assert_raises(ResultSizeError):
res = model.simulate(options=opts)

def _test_result_size_verification(self, result_type, result_file_name=""):
def _test_result_size_verification(self, result_type, result_file_name="", dynamic_diagnostics=False):
"""
Verifies that the ResultSizeError exception is triggered (due to too large result) and also verifies
that the resulting file is within bounds of the set maximum size.
"""
model, opts = self._setup(result_type, result_file_name)
model.setup_experiment()
model.initialize()

max_size = 1e6
opts["result_max_size"] = max_size
opts["ncp"] = 10000
opts["dynamic_diagnostics"] = dynamic_diagnostics
opts["logging"] = dynamic_diagnostics
opts["ncp"] = 0 #Set to zero to circumvent the early size check
ncp = 10000

result_handler = get_result_handler(model, opts)

result_handler.set_options(opts)
result_handler.initialize_complete()

if opts["dynamic_diagnostics"]:
opts['CVode_options']['rtol'] = 1e-6
opts['CVode_options']['atol'] = model.nominal_continuous_states * opts['CVode_options']['rtol']
diag_params, diag_vars = setup_diagnostics_variables(model, 0, opts, opts['CVode_options'])
result_handler.simulation_start(diag_params, diag_vars)
else:
result_handler.simulation_start()

with nose.tools.assert_raises(ResultSizeError):
res = model.simulate(options=opts)
for i in range(ncp):
result_handler.integration_point()

if opts["dynamic_diagnostics"]:
result_handler.diagnostics_point(np.array([val[0] for val in diag_vars.values()], dtype=float))

result_file = model.get_last_result_file()

file_size = os.path.getsize(result_file)

assert file_size > max_size*0.9 and file_size < max_size*1.1, \
"The file size is not within 10% of the given max size"
# TODO: Pytest parametrization
"""
Binary
"""
@testattr(stddist = True)
def test_binary_file_size_verification_diagnostics(self):
"The file size is not within 10% of the given max size"

def _test_result_size_early_abort(self, result_type, result_file_name=""):
"""
Make sure that the diagnostics variables are also taken into account.
Verifies that the ResultSizeError is triggered and also verifies that the cause of the error being
triggered was due to that the ESTIMATE for the result size was too big.
"""
model, opts = self._setup("binary")
model, opts = self._setup(result_type, result_file_name)

max_size = 1e6
opts["result_max_size"] = max_size
opts["dynamic_diagnostics"] = True
opts["ncp"] = 10000
opts["ncp"] = 10000000

with nose.tools.assert_raises(ResultSizeError):
res = model.simulate(options=opts)

result_file = model.get_last_result_file()

file_size = os.path.getsize(result_file)
if result_file:
file_size = os.path.getsize(result_file)

assert file_size > max_size*0.9 and file_size < max_size*1.1, \
"The file size is not within 10% of the given max size"
assert file_size < max_size*0.1, \
"The file size is not small, no early abort"

# TODO: Pytest parametrization
"""
Binary
"""
@testattr(stddist = True)
def test_binary_file_size_verification_diagnostics(self):
"""
Make sure that the diagnostics variables are also taken into account.
"""
self._test_result_size_verification("binary", dynamic_diagnostics=True)

@testattr(stddist = True)
def test_binary_file_size_verification(self):
self._test_result_size_verification("binary")

@testattr(stddist = True)
def test_binary_file_size_early_abort(self):
self._test_result_size_early_abort("binary")

@testattr(stddist = True)
def test_small_size_binary_file(self):
Expand Down Expand Up @@ -2003,6 +2040,10 @@ def test_large_size_binary_file_stream(self):
@testattr(stddist = True)
def test_text_file_size_verification(self):
self._test_result_size_verification("file")

@testattr(stddist = True)
def test_text_file_size_early_abort(self):
self._test_result_size_early_abort("file")

@testattr(stddist = True)
def test_small_size_text_file(self):
Expand All @@ -2026,6 +2067,10 @@ def test_large_size_text_file_stream(self):
@testattr(stddist = True)
def test_csv_file_size_verification(self):
self._test_result_size_verification("csv")

@testattr(stddist = True)
def test_csv_file_size_early_abort(self):
self._test_result_size_early_abort("csv")

@testattr(stddist = True)
def test_small_size_csv_file(self):
Expand All @@ -2050,6 +2095,10 @@ def test_large_size_csv_file_stream(self):
def test_small_size_memory(self):
self._test_result_exception("memory")

@testattr(stddist = True)
def test_memory_size_early_abort(self):
self._test_result_size_early_abort("memory")

@testattr(stddist = True)
def test_small_size_memory_stream(self):
self._test_result_exception("memory", StringIO())
Expand Down

0 comments on commit 68bf34c

Please sign in to comment.