@@ -52,6 +52,7 @@ Builder::Builder(
52
52
bool verbose)
53
53
: endpoints(endpoints)
54
54
, metadata(metadata)
55
+ , io(Io::create(this ->metadata, this ->endpoints))
55
56
, manifest(manifest)
56
57
, hierarchy(hierarchy)
57
58
, verbose(verbose)
@@ -64,13 +65,34 @@ uint64_t Builder::run(
64
65
{
65
66
Pool pool (2 );
66
67
68
+ std::string fatalError;
69
+
67
70
std::atomic_uint64_t counter (0 );
68
71
std::atomic_bool done (false );
69
72
pool.add ([&]() { monitor (progressInterval, counter, done); });
70
- pool.add ([&]() { runInserts (threads, limit, counter); done = true ; });
73
+ pool.add ([&]()
74
+ {
75
+ try
76
+ {
77
+ runInserts (threads, limit, counter);
78
+ done = true ;
79
+ }
80
+ catch (std::exception & e)
81
+ {
82
+ fatalError = e.what ();
83
+ done = true ;
84
+ }
85
+ catch (...)
86
+ {
87
+ fatalError = " Fatal error: unknown error" ;
88
+ done = true ;
89
+ }
90
+ });
71
91
72
92
pool.join ();
73
93
94
+ if (fatalError.size ()) throw new std::runtime_error (fatalError);
95
+
74
96
return counter;
75
97
}
76
98
@@ -91,7 +113,7 @@ void Builder::runInserts(
91
113
const uint64_t stolenThreads = threads.work - actualWorkThreads;
92
114
const uint64_t actualClipThreads = threads.clip + stolenThreads;
93
115
94
- ChunkCache cache (endpoints, metadata, hierarchy, actualClipThreads);
116
+ ChunkCache cache (endpoints, metadata, *io, hierarchy, actualClipThreads);
95
117
Pool pool (std::min<uint64_t >(actualWorkThreads, manifest.size ()));
96
118
97
119
uint64_t filesInserted = 0 ;
@@ -101,6 +123,8 @@ void Builder::runInserts(
101
123
origin < manifest.size () && (!limit || filesInserted < limit);
102
124
++origin)
103
125
{
126
+ if (cache.fatalErrors ().size ()) break ;
127
+
104
128
const auto & item = manifest.at (origin);
105
129
const auto & info = item.source .info ;
106
130
if (!item.inserted && info.points && active.overlaps (info.bounds ))
@@ -126,6 +150,24 @@ void Builder::runInserts(
126
150
pool.join ();
127
151
cache.join ();
128
152
153
+ // While pool errors from *input* are not fatal and just get stored and
154
+ // logged as errors to note that an input file failed to be inserted,
155
+ // errors reading/writing from the *output* are irrecoverably fatal. In
156
+ // this case, log the error and throw, no need to save metadata since
157
+ // the build is busted.
158
+ const auto errors = cache.fatalErrors ();
159
+ if (errors.size ())
160
+ {
161
+ if (verbose)
162
+ {
163
+ std::cout << " Fatal error: failed to read or write output data\n " ;
164
+ for (const auto & e : errors) std::cout << " \t " << e << std::endl;
165
+ std::cout << " Terminating fatally corrupted build..." << std::endl;
166
+ }
167
+
168
+ throw std::runtime_error (errors.front ());
169
+ }
170
+
129
171
save (getTotal (threads));
130
172
}
131
173
@@ -616,7 +658,12 @@ void merge(
616
658
Manifest manifest = base.manifest ;
617
659
618
660
Builder builder (endpoints, metadata, manifest, Hierarchy (), verbose);
619
- ChunkCache cache (endpoints, builder.metadata , builder.hierarchy , threads);
661
+ ChunkCache cache (
662
+ endpoints,
663
+ builder.metadata ,
664
+ *builder.io ,
665
+ builder.hierarchy ,
666
+ threads);
620
667
621
668
if (verbose) std::cout << " Merging" << std::endl;
622
669
@@ -670,8 +717,9 @@ void mergeOne(Builder& dst, const Builder& src, ChunkCache& cache)
670
717
{
671
718
// TODO: Should make sure that the src/dst metadata match. For now we're
672
719
// relying on the user not to have done anything weird.
673
- const auto & endpoints = dst.endpoints ;
674
720
const auto & metadata = dst.metadata ;
721
+ const auto & endpoints = dst.endpoints ;
722
+ auto io = Io::create (metadata, endpoints);
675
723
676
724
Clipper clipper (cache);
677
725
const auto sharedDepth = getSharedDepth (src.metadata );
@@ -710,7 +758,7 @@ void mergeOne(Builder& dst, const Builder& src, ChunkCache& cache)
710
758
});
711
759
712
760
const auto stem = key.toString () + getPostfix (src.metadata );
713
- io:: read (metadata. dataType , metadata, endpoints, stem, table);
761
+ io-> read (stem, table);
714
762
}
715
763
}
716
764
}
0 commit comments