-
Notifications
You must be signed in to change notification settings - Fork 51
/
writer.py
377 lines (316 loc) · 15.9 KB
/
writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import bz2
import io
import os
import zlib
from collections import OrderedDict
from datetime import datetime
from typing import IO, Any, Dict
import numpy.typing as npt
from nrrd.errors import NRRDError
from nrrd.formatters import *
from nrrd.reader import _get_field_type
from nrrd.types import IndexOrder, NRRDFieldMap, NRRDFieldType, NRRDHeader
# Older versions of Python had issues when uncompressed data was larger than 4GB (2^32). This should be fixed in latest
# version of Python 2.7 and all versions of Python 3. The fix for this issue is to read the data in smaller chunks. The
# chunk size is set to be small here at 1MB since performance did not vary much based on the chunk size. A smaller chunk
# size has the benefit of using less RAM at once.
_WRITE_CHUNKSIZE: int = 2 ** 20
_NRRD_FIELD_ORDER = [
'type',
'dimension',
'space dimension',
'space',
'sizes',
'space directions',
'kinds',
'endian',
'encoding',
'min',
'max',
'oldmin',
'old min',
'oldmax',
'old max',
'content',
'sample units',
'spacings',
'thicknesses',
'axis mins',
'axismins',
'axis maxs',
'axismaxs',
'centerings',
'labels',
'units',
'space units',
'space origin',
'measurement frame',
'data file'
]
_TYPEMAP_NUMPY2NRRD = {
'i1': 'int8',
'u1': 'uint8',
'i2': 'int16',
'u2': 'uint16',
'i4': 'int32',
'u4': 'uint32',
'i8': 'int64',
'u8': 'uint64',
'f4': 'float',
'f8': 'double',
'V': 'block'
}
_NUMPY2NRRD_ENDIAN_MAP = {
'<': 'little',
'L': 'little',
'>': 'big',
'B': 'big'
}
def _format_field_value(value: Any, field_type: NRRDFieldType) -> str:
if field_type == 'int':
return format_number(value)
elif field_type == 'double':
return format_number(value)
elif field_type == 'string':
return str(value)
elif field_type == 'int list':
return format_number_list(value)
elif field_type == 'double list':
return format_number_list(value)
elif field_type == 'string list':
return ' '.join(value)
elif field_type == 'quoted string list':
return ' '.join(f'"{x}"' for x in value)
elif field_type == 'int vector':
return format_vector(value)
elif field_type == 'double vector':
return format_optional_vector(value)
elif field_type == 'int matrix':
return format_matrix(value)
elif field_type == 'double matrix':
return format_optional_matrix(value)
else:
raise NRRDError(f'Invalid field type given: {field_type}')
def _handle_header(data: npt.NDArray, header: Optional[NRRDHeader] = None, index_order: IndexOrder = 'F') -> NRRDHeader:
if header is None:
header = {}
# Infer a number of fields from the NumPy array and overwrite values in the header dictionary.
# Get type string identifier from the NumPy datatype
header['type'] = _TYPEMAP_NUMPY2NRRD[data.dtype.str[1:]]
# If the datatype contains more than one byte and the encoding is not ASCII, then set the endian header value
# based on the datatype's endianness. Otherwise, delete the endian field from the header if present
if data.dtype.itemsize > 1 and header.get('encoding', '').lower() not in ['ascii', 'text', 'txt']:
header['endian'] = _NUMPY2NRRD_ENDIAN_MAP[data.dtype.str[:1]]
elif 'endian' in header:
del header['endian']
# If space is specified in the header, then space dimension can not. See
# http://teem.sourceforge.net/nrrd/format.html#space
if 'space' in header.keys() and 'space dimension' in header.keys():
del header['space dimension']
# Update the dimension and sizes fields in the header based on the data. Since NRRD expects meta data to be in
# Fortran order we are required to reverse the shape in the case of the array being in C order. E.g., data was read
# using index_order='C'.
header['dimension'] = data.ndim
header['sizes'] = list(data.shape) if index_order == 'F' else list(data.shape[::-1])
# The default encoding is 'gzip'
if 'encoding' not in header:
header['encoding'] = 'gzip'
# Remove detached data filename from the header
header.pop('datafile', None)
header.pop('data file', None)
return header
def _write_header(file: IO, header: Dict[str, Any], custom_field_map: Optional[NRRDFieldMap] = None):
file.write(b'NRRD0005\n')
file.write(b'# This NRRD file was generated by pynrrd\n')
file.write(b'# on ' + datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S').encode('ascii') + b'(GMT).\n')
file.write(b'# Complete NRRD file format specification at:\n')
file.write(b'# http://teem.sourceforge.net/nrrd/format.html\n')
# Copy the options since dictionaries are mutable when passed as an argument
# Thus, to prevent changes to the actual options, a copy is made
# Empty ordered_options list is made (will be converted into dictionary)
local_options = header.copy()
ordered_options = []
# Loop through field order and add the key/value if present
# Remove the key/value from the local options so that we know not to add it again
for field in _NRRD_FIELD_ORDER:
if field in local_options:
ordered_options.append((field, local_options[field]))
del local_options[field]
# Leftover items are assumed to be the custom field/value options
# So get current size and any items past this index will be a custom value
custom_field_start_index = len(ordered_options)
# Add the leftover items to the end of the list and convert the options into a dictionary
ordered_options.extend(local_options.items())
ordered_options = OrderedDict(ordered_options)
for x, (field, value) in enumerate(ordered_options.items()):
# Get the field_type based on field and then get corresponding
# value as a str using _format_field_value
field_type = _get_field_type(field, custom_field_map)
value_str = _format_field_value(value, field_type)
# Custom fields are written as key/value pairs with a := instead of : delimiter
if x >= custom_field_start_index:
file.write((f'{field}:={value_str}\n').encode('ascii'))
else:
file.write((f'{field}: {value_str}\n').encode('ascii'))
# Write the closing extra newline
file.write(b'\n')
def _write_data(data: npt.NDArray, fh: IO, header: NRRDHeader, compression_level: Optional[int] = None,
index_order: IndexOrder = 'F'):
if index_order not in ['F', 'C']:
raise NRRDError('Invalid index order')
if header['encoding'] == 'raw':
# Convert the data into a string
raw_data = data.tobytes(order=index_order)
# Write the data in chunks (see _WRITE_CHUNKSIZE declaration for more information why)
# Obtain the length of the data since we will be using it repeatedly, more efficient
start_index = 0
raw_data_len = len(raw_data)
# Loop through the data and write it by chunk
while start_index < raw_data_len:
# End index is start index plus the chunk size
# Set to the string length to read the remaining chunk at the end
end_index = min(start_index + _WRITE_CHUNKSIZE, raw_data_len)
# Write the compressed data
fh.write(raw_data[start_index:end_index])
start_index = end_index
# Finish writing the data
fh.flush()
elif header['encoding'].lower() in ['ascii', 'text', 'txt']:
# savetxt only works for 1D and 2D arrays, so reshape any > 2 dim arrays into one long 1D array
if data.ndim > 2:
np.savetxt(fh, data.ravel(order=index_order), '%.17g')
else:
np.savetxt(fh, data if index_order == 'C' else data.T, '%.17g')
else:
# Convert the data into a string
raw_data = data.tobytes(order=index_order)
# Construct the compressor object based on encoding
if header['encoding'] in ['gzip', 'gz']:
compressobj = zlib.compressobj(compression_level, zlib.DEFLATED, zlib.MAX_WBITS | 16)
elif header['encoding'] in ['bzip2', 'bz2']:
compressobj = bz2.BZ2Compressor(compression_level)
else:
raise NRRDError(f'Unsupported encoding: {header["encoding"]}')
# Write the data in chunks (see _WRITE_CHUNKSIZE declaration for more information why)
# Obtain the length of the data since we will be using it repeatedly, more efficient
start_index = 0
raw_data_len = len(raw_data)
# Loop through the data and write it by chunk
while start_index < raw_data_len:
# End index is start index plus the chunk size
# Set to the string length to read the remaining chunk at the end
end_index = min(start_index + _WRITE_CHUNKSIZE, raw_data_len)
# Write the compressed data
fh.write(compressobj.compress(raw_data[start_index:end_index]))
start_index = end_index
# Finish writing the data
fh.write(compressobj.flush())
fh.flush()
def write(file: Union[str, IO], data: npt.NDArray, header: Optional[NRRDHeader] = None,
detached_header: bool = False, relative_data_path: bool = True,
custom_field_map: Optional[NRRDFieldMap] = None, compression_level: int = 9, index_order: IndexOrder = 'F'):
"""Write :class:`numpy.ndarray` to NRRD file
The :obj:`file` parameter specifies the absolute or relative filename to write the NRRD file to or an
io.BytesIO object.
If :obj:`file` is a filename and the extension is .nhdr, then the :obj:`detached_header` parameter is set to true
automatically. If the :obj:`detached_header` parameter is set to :obj:`True` and the :obj:`filename` ends in .nrrd,
then the header file will have the same path and base name as the :obj:`file` but with an extension of .nhdr.
In all other cases, the header and data are saved in the same file.
:obj:`header` is an optional parameter containing the fields and values to be added to the NRRD header.
.. note::
:obj:`detached_header` is ignored if :obj:`file` is io.BytesIO and not a str filename
.. note::
The following fields are automatically generated based on the :obj:`data` parameter ignoring these values
in the :obj:`header`: 'type', 'endian', 'dimension', 'sizes', and 'data file'. In addition, the generated
fields will be added to the given :obj:`header`. Thus, one can check the generated fields by viewing the
passed :obj:`header`.
.. note::
The default encoding field used if not specified in :obj:`header` is 'gzip'.
.. note::
The :obj:`index_order` parameter must be consistent with the index order specified in :meth:`read`.
Reading an NRRD file in C-order and then writing as Fortran-order or vice versa will result in the data
being transposed in the NRRD file.
See :ref:`reference/writing:writing nrrd files` for more information on writing NRRD files.
Parameters
----------
file : :class:`str` or :class:`io.IOBase`
Filename or file object of the NRRD file
data : :class:`numpy.ndarray`
Data to save to the NRRD file
header : :class:`dict` (:class:`str`, :obj:`Object`), optional
NRRD headers
detached_header : :obj:`bool` or :obj:`str`, optional
Whether the header and data should be saved in separate files. Defaults to :obj:`False`. If a :obj:`str` is
given this specifies the path to the datafile. This path will ONLY be used if the given filename ends with nhdr
(i.e. the file is a header)
relative_data_path : :class:`bool`
Whether the data filename in detached header is saved with a relative path or absolute path.
This parameter is ignored if there is no detached header. Defaults to :obj:`True`
custom_field_map : :class:`dict` (:class:`str`, :class:`str`), optional
Dictionary used for parsing custom field types where the key is the custom field name and the value is a
string identifying datatype for the custom field.
compression_level : :class:`int`
Integer between 1 and 9 specifying the compression level when using a compressed encoding (gzip or bzip).
A value of :obj:`1` compresses the data the least amount and is the fastest, while a value of :obj:`9`
compresses the data the most and is the slowest.
index_order : {'C', 'F'}, optional
Specifies the index order used for writing. Either 'C' (C-order) where the dimensions are ordered from
slowest-varying to fastest-varying (e.g. (z, y, x)), or 'F' (Fortran-order) where the dimensions are ordered
from fastest-varying to slowest-varying (e.g. (x, y, z)).
See Also
--------
:meth:`read`, :meth:`read_header`, :meth:`read_data`
"""
header = _handle_header(data, header, index_order)
# If the file is a file handle, then write to the file.
if isinstance(file, io.IOBase):
_write_header(file, header, custom_field_map)
_write_data(data, file, header, compression_level=compression_level, index_order=index_order)
return
# A bit of magic in handling options here.
# If *.nhdr filename provided, this overrides `detached_header=False`
# If *.nrrd filename provided AND detached_header=True, separate header and data files written.
# For all other cases, header & data written to same file.
if file.endswith('.nhdr'):
if isinstance(detached_header, str):
# Utilize the detached_header if a string was given as the path
# Note: An absolute path is obtained and assumed to be relative to the current path of the running Python
# program
data_filename = os.path.abspath(detached_header)
else:
# Get the base filename without the extension
base_filename = os.path.splitext(file)[0]
# Get the appropriate data filename based on encoding, see here for information on the standard detached
# filename: http://teem.sourceforge.net/nrrd/format.html#encoding
if header['encoding'] == 'raw':
data_filename = f'{base_filename}.raw'
elif header['encoding'] in ['ASCII', 'ascii', 'text', 'txt']:
data_filename = f'{base_filename}.txt'
elif header['encoding'] in ['gzip', 'gz']:
data_filename = f'{base_filename}.raw.gz'
elif header['encoding'] in ['bzip2', 'bz2']:
data_filename = f'{base_filename}.raw.bz2'
else:
raise NRRDError(f'Invalid encoding specification while writing NRRD file: {header["encoding"]}')
# Update the data file field in the header with the path of the detached data
# TODO This will cause problems when the user specifies a relative data path and gives a custom path OUTSIDE
# of the current directory.
header['data file'] = os.path.basename(data_filename) if relative_data_path else os.path.abspath(data_filename)
detached_header = True
elif file.endswith('.nrrd') and detached_header:
data_filename = file
file = f'{os.path.splitext(file)[0]}.nhdr'
header['data file'] = os.path.basename(data_filename) if relative_data_path else os.path.abspath(data_filename)
else:
# Write header & data as one file
data_filename = file
detached_header = False
with open(file, 'wb') as fh:
_write_header(fh, header, custom_field_map)
# If header & data in the same file is desired, write data in the file
if not detached_header:
_write_data(data, fh, header, compression_level=compression_level, index_order=index_order)
# If detached header desired, write data to different file
if detached_header:
with open(data_filename, 'wb') as data_fh:
_write_data(data, data_fh, header, compression_level=compression_level, index_order=index_order)