-
Notifications
You must be signed in to change notification settings - Fork 1
/
bin2parquet.py
196 lines (159 loc) · 7.25 KB
/
bin2parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# This file is almost entirely copied from "parquet_example.py" from the psr.graf
# package. Changes include a slightly different API for the commandline script
# removal of the csv functionality, and a work-around for the inflow files, which
# cause graf_to_parquet to fail due to stage starting from 0.
# Converts a SDDP result binary file to Apache Parquet file format.
#from __future__ import print_function
import argparse
from contextlib import contextmanager
import logging
import os
import queue
import struct
import threading
import numpy as np
from psr.graf import BinReader, load_as_dataframe
import pyarrow as pa
import pyarrow.parquet as pq
_WORD = 4
# these variables are needed for TPM clause 51 and 52
DEFAULT_VARS = [
'cmgbus', 'defbus', 'gerter', 'gergnd', 'gerbat', 'gerhid', 'coster',
'cosco2', 'demxba', 'usedcl'
]
# Change this number to optimize the number of stages read/written
# in parquet files. Not seeing much motivation to change this for now.
_stage_chunk_size = 10
@contextmanager
def my_open_bin(file_path: str, **kwargs):
reader = BinReader()
reader.open(file_path, **kwargs)
yield reader
reader.close()
# copied from the parquet_example.py file from the psr.graf package
# used to break the list of stages into chunks
# chunk_size would be better named num_chunks
def chunkfy(data:list, chunk_size:int) -> list:
n = len(data)
l = data
k = chunk_size
return [l[i * (n // k) + min(i, n % k):
(i + 1) * (n // k) + min(i + 1, n % k)] for i in range(k)]
def inflow_to_parquet(graf_file_path:str, parquet_file_path: str) -> None:
"""This a special case for inflow bin files which cause the graf_to_parquet
function to fail due to the stage starting from 0."""
# inflow tables are relatively small so we can get away with loading the
# entire table into memory at once.
df = load_as_dataframe(graf_file_path)
df.reset_index().to_parquet(parquet_file_path)
# this started as a copy of the graf_to_parquet function from the parquet_example.py
# file from the psr.graf package, but has been modified to use the MyBinReader
# class, get speed and memmory improvements from using numpy instead of for loops and lists,
# and to use a queue to write the parquet file in the background.
def graf_to_parquet(graf_file_path:str, parquet_file_path:str) -> None:
# parquet will be written to this temporary file before moving to the final
# destination
part_parquet_file_path = parquet_file_path + '.part'
logging.info(f"writing to temporary file {part_parquet_file_path}")
with my_open_bin(graf_file_path) as graf_file:
# The code below specifies the table layout.
fields = [
pa.field('stage', pa.int64()),
pa.field('scenario', pa.int64()),
pa.field('block', pa.int64())
]
fields.extend([pa.field(agent, pa.float32())
for agent in graf_file.agents])
first_chunk = True
for stage_chunk in chunkfy(
list(range(graf_file.min_stage,graf_file.max_stage + 1)),
_stage_chunk_size):
stages = [] # Stage number column.
scenarios = [] # Scenario number column.
blocks = [] # Blocks number column.
agents = [] # Stores a 2D numpy array for each chunk.
for stage in stage_chunk:
for scenario in range(1, graf_file.scenarios + 1):
data = graf_file.read_blocks_as_array(stage, scenario)
total_blocks = data.shape[0]
current_blocks = list(range(1, total_blocks + 1))
stages.extend([stage] * total_blocks)
scenarios.extend([scenario] * total_blocks)
blocks.extend(current_blocks)
agents.append(data)
agents = np.concatenate(agents, axis=0)
# create a pyarrow table from the data
arrays = [
pa.array(stages),
pa.array(scenarios),
pa.array(blocks)
]
arrays.extend([pa.array(agents[:, i]) for i in range(agents.shape[1])])
table = pa.Table.from_arrays(arrays=arrays, schema=pa.schema(fields))
# if this is the first chunk, initialize the parquet writer and the
# write queue
if first_chunk:
parquet_writer = pq.ParquetWriter(part_parquet_file_path, table.schema)
# setting maxsize to 1 to guard against memory use climbing if
# the writing thread falls behind.
write_queue = queue.Queue(maxsize=1)
def writer():
while True:
this_table = write_queue.get()
parquet_writer.write_table(this_table)
write_queue.task_done()
threading.Thread(target=writer, daemon=True).start()
first_chunk = False
write_queue.put(table)
# wait for the write queue to finish
write_queue.join()
parquet_writer.close()
logging.info(f"successfully finished writing to {part_parquet_file_path}")
# Move the temporary file to the final destination.
logging.info(f"moving {part_parquet_file_path} to {parquet_file_path}")
os.rename(part_parquet_file_path, parquet_file_path)
def main():
# Read file name from command line arguments
# - or use sample data if not provided.
parser = argparse.ArgumentParser(
description='Converts a SDDP result binary file to Apache Parquet '
'file format.')
parser.add_argument(
'input_dir', type=str, help='directory of SDDP binary outputs')
parser.add_argument(
"vars", nargs = "*", help = "variables to convert", default=DEFAULT_VARS)
parser.add_argument(
'--output-dir', type=str, help='directory for output Parquet files',
default="./")
args = parser.parse_args()
log_path = 'bin2parquet.log'
print(f"Log messages going to {log_path}")
logging.basicConfig(
level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s',
filename=log_path, filemode='w')
if not os.path.exists(args.output_dir):
print(f"created output directory; {args.output_dir}")
os.makedirs(args.output_dir)
for var in args.vars:
bin_path = os.path.join(args.input_dir, f"{var}.hdr")
parquet_path = os.path.join(args.output_dir, f'{var}.parquet')
if os.path.exists(parquet_path):
msg = f"output file {parquet_path} already exists. Skipping"
print(msg)
logging.info(msg)
continue
try:
msg = f"converting {bin_path} to {parquet_path}"
print(msg)
logging.info(msg)
if var == "inflow":
inflow_to_parquet(bin_path, parquet_path)
else:
graf_to_parquet(bin_path, parquet_path)
except Exception as err:
msg = f"conversion for variable {var} failed: {err.__class__.__name__}, {err}"
print(msg)
logging.warning(msg)
logging.info("Done")
if __name__ == "__main__":
main()