Skip to content

Commit a200cfb

Browse files
ashahabi-ony
authored andcommitted
Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches
Added AVRO_PARSER_NUM_MINIBATCH to override num_minibatches. This is recommended to be set equal to the vcore request.
1 parent 171b826 commit a200cfb

File tree

2 files changed

+44
-10
lines changed

2 files changed

+44
-10
lines changed

tensorflow_io/core/kernels/avro/parse_avro_kernels.cc

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,9 @@ Status ParseAvro(const AvroParserConfig& config,
172172
const gtl::ArraySlice<tstring>& serialized,
173173
thread::ThreadPool* thread_pool, AvroResult* result) {
174174
DCHECK(result != nullptr);
175-
175+
using clock = std::chrono::system_clock;
176+
using ms = std::chrono::duration<double, std::milli>;
177+
const auto before = clock::now();
176178
// Allocate dense output for fixed length dense values
177179
// (variable-length dense and sparse and ragged have to be buffered).
178180
/* std::vector<Tensor> fixed_len_dense_values(config.dense.size());
@@ -204,8 +206,14 @@ Status ParseAvro(const AvroParserConfig& config,
204206
minibatch_bytes = 0;
205207
}
206208
}
207-
// 'special logic'
208-
const size_t min_minibatches = std::min<size_t>(8, serialized.size());
209+
if (const char* n_minibatches =
210+
std::getenv("AVRO_PARSER_NUM_MINIBATCHES")) {
211+
VLOG(5) << "Overriding num_minibatches with " << n_minibatches;
212+
result = std::stoi(n_minibatches);
213+
}
214+
// This is to ensure users can control the num minibatches all the way down
215+
// to size of 1(no parallelism).
216+
const size_t min_minibatches = std::min<size_t>(1, serialized.size());
209217
const size_t max_minibatches = 64;
210218
return std::max<size_t>(min_minibatches,
211219
std::min<size_t>(max_minibatches, result));
@@ -245,13 +253,16 @@ Status ParseAvro(const AvroParserConfig& config,
245253
auto read_value = [&](avro::GenericDatum& d) {
246254
return range_reader.read(d);
247255
};
248-
256+
VLOG(5) << "Processing minibatch " << minibatch;
249257
status_of_minibatch[minibatch] = parser_tree.ParseValues(
250258
&buffers[minibatch], read_value, reader_schema, defaults);
251259
};
252-
260+
const auto before_parse = clock::now();
253261
ParallelFor(ProcessMiniBatch, num_minibatches, thread_pool);
254-
262+
const auto after_parse = clock::now();
263+
const ms parse_read_duration = after_parse - before_parse;
264+
VLOG(5) << "PARSER_TIMING: Time spend reading and parsing "
265+
<< parse_read_duration.count() << " ms ";
255266
for (Status& status : status_of_minibatch) {
256267
TF_RETURN_IF_ERROR(status);
257268
}
@@ -367,15 +378,22 @@ Status ParseAvro(const AvroParserConfig& config,
367378

368379
return Status::OK();
369380
};
370-
381+
const auto before_sparse_merge = clock::now();
371382
for (size_t d = 0; d < config.sparse.size(); ++d) {
372383
TF_RETURN_IF_ERROR(MergeSparseMinibatches(d));
373384
}
374-
385+
const auto after_sparse_merge = clock::now();
386+
const ms s_merge_duration = after_sparse_merge - before_sparse_merge;
375387
for (size_t d = 0; d < config.dense.size(); ++d) {
376388
TF_RETURN_IF_ERROR(MergeDenseMinibatches(d));
377389
}
390+
const auto after_dense_merge = clock::now();
391+
const ms d_merge_duration = after_dense_merge - after_sparse_merge;
392+
VLOG(5) << "PARSER_TIMING: Sparse merge duration" << s_merge_duration.count()
393+
<< " ms ";
378394

395+
VLOG(5) << "PARSER_TIMING: Dense merge duration" << d_merge_duration.count()
396+
<< " ms ";
379397
return Status::OK();
380398
}
381399

tensorflow_io/core/kernels/avro/utils/avro_parser_tree.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ Status AvroParserTree::ParseValues(
8181
const std::function<bool(avro::GenericDatum&)> read_value,
8282
const avro::ValidSchema& reader_schema,
8383
const std::map<string, Tensor>& defaults) const {
84+
using clock = std::chrono::system_clock;
85+
using ms = std::chrono::duration<double, std::milli>;
86+
8487
// new assignment of all buffers
8588
TF_RETURN_IF_ERROR(InitializeValueBuffers(key_to_value));
8689

@@ -90,11 +93,24 @@ Status AvroParserTree::ParseValues(
9093
avro::GenericDatum datum(reader_schema);
9194

9295
bool has_value = false;
93-
94-
while ((has_value = read_value(datum))) {
96+
ms parse_duration;
97+
ms read_duration;
98+
while (true) {
99+
const auto before_read = clock::now();
100+
if (!(has_value = read_value(datum))) {
101+
break;
102+
}
103+
const auto after_read = clock::now();
95104
TF_RETURN_IF_ERROR((*root_).Parse(key_to_value, datum, defaults));
105+
const auto after_parse = clock::now();
106+
parse_duration += after_parse - after_read;
107+
read_duration += after_read - before_read;
96108
}
97109

110+
VLOG(5) << "PARSER_TIMING: Avro Read times " << read_duration.count()
111+
<< " ms ";
112+
VLOG(5) << "PARSER_TIMING: Avro Parse times " << parse_duration.count()
113+
<< " ms ";
98114
// add end marks to all buffers for batch
99115
TF_RETURN_IF_ERROR(AddFinishMarks(key_to_value));
100116

0 commit comments

Comments
 (0)