@@ -7,13 +7,15 @@ import (
7
7
8
8
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
9
9
"github.com/buildbarn/bb-storage/pkg/blobstore"
10
+ "github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
10
11
"github.com/buildbarn/bb-storage/pkg/digest"
11
12
"github.com/buildbarn/bb-storage/pkg/filesystem"
12
13
"github.com/buildbarn/bb-storage/pkg/filesystem/path"
13
14
"github.com/buildbarn/bb-storage/pkg/util"
14
15
15
16
"google.golang.org/grpc/codes"
16
17
"google.golang.org/grpc/status"
18
+ "google.golang.org/protobuf/encoding/protowire"
17
19
"google.golang.org/protobuf/proto"
18
20
)
19
21
@@ -201,97 +203,57 @@ func (s *uploadOutputsState) saveError(err error) {
201
203
}
202
204
}
203
205
204
- // UploadDirectory is called to upload a single directory. Elements in
205
- // the directory are stored in a remoteexecution.Directory, so that they
206
- // can be placed in a remoteexecution.Tree.
207
- func (s * uploadOutputsState ) uploadDirectory (d UploadableDirectory , dPath * path.Trace , children map [digest.Digest ]* remoteexecution.Directory ) * remoteexecution.Directory {
208
- files , err := d .ReadDir ()
209
- if err != nil {
210
- s .saveError (util .StatusWrapf (err , "Failed to read output directory %#v" , dPath .String ()))
211
- return & remoteexecution.Directory {}
212
- }
213
-
214
- var directory remoteexecution.Directory
215
- for _ , file := range files {
216
- name := file .Name ()
217
- childPath := dPath .Append (name )
218
- switch fileType := file .Type (); fileType {
219
- case filesystem .FileTypeRegularFile :
220
- if childDigest , err := d .UploadFile (s .context , name , s .digestFunction ); err == nil {
221
- directory .Files = append (directory .Files , & remoteexecution.FileNode {
222
- Name : name .String (),
223
- Digest : childDigest .GetProto (),
224
- IsExecutable : file .IsExecutable (),
225
- })
226
- } else {
227
- s .saveError (util .StatusWrapf (err , "Failed to store output file %#v" , childPath .String ()))
228
- }
229
- case filesystem .FileTypeDirectory :
230
- if childDirectory , err := d .EnterUploadableDirectory (name ); err == nil {
231
- child := s .uploadDirectory (childDirectory , dPath , children )
232
- childDirectory .Close ()
233
-
234
- // Compute digest of the child directory. This requires serializing it.
235
- if data , err := proto .Marshal (child ); err == nil {
236
- digestGenerator := s .digestFunction .NewGenerator ()
237
- if _ , err := digestGenerator .Write (data ); err == nil {
238
- childDigest := digestGenerator .Sum ()
239
- children [childDigest ] = child
240
- directory .Directories = append (directory .Directories , & remoteexecution.DirectoryNode {
241
- Name : name .String (),
242
- Digest : childDigest .GetProto (),
243
- })
244
- } else {
245
- s .saveError (util .StatusWrapf (err , "Failed to compute digest of output directory %#v" , childPath .String ()))
246
- }
247
- } else {
248
- s .saveError (util .StatusWrapf (err , "Failed to marshal output directory %#v" , childPath .String ()))
249
- }
250
- } else {
251
- s .saveError (util .StatusWrapf (err , "Failed to enter output directory %#v" , childPath .String ()))
252
- }
253
- case filesystem .FileTypeSymlink :
254
- if target , err := d .Readlink (name ); err == nil {
255
- directory .Symlinks = append (directory .Symlinks , & remoteexecution.SymlinkNode {
256
- Name : name .String (),
257
- Target : target ,
258
- })
259
- } else {
260
- s .saveError (util .StatusWrapf (err , "Failed to read output symlink %#v" , childPath .String ()))
261
- }
262
- }
263
- }
264
- return & directory
265
- }
266
-
267
206
// UploadOutputDirectoryEntered is called to upload a single output
268
207
// directory as a remoteexecution.Tree. The root directory is assumed to
269
208
// already be opened.
270
209
func (s * uploadOutputsState ) uploadOutputDirectoryEntered (d UploadableDirectory , dPath * path.Trace , paths []string ) {
271
- children := map [digest.Digest ]* remoteexecution.Directory {}
272
- tree := & remoteexecution.Tree {
273
- Root : s .uploadDirectory (d , dPath , children ),
274
- }
275
-
276
- childDigests := digest .NewSetBuilder ()
277
- for childDigest := range children {
278
- childDigests .Add (childDigest )
279
- }
280
- for _ , childDigest := range childDigests .Build ().Items () {
281
- tree .Children = append (tree .Children , children [childDigest ])
210
+ dState := uploadOutputDirectoryState {
211
+ uploadOutputsState : s ,
212
+ directoriesSeen : map [digest.Digest ]struct {}{},
282
213
}
214
+ if rootDirectory , err := dState .uploadDirectory (d , dPath ); err == nil {
215
+ // Approximate the size of the resulting Tree object, so
216
+ // that we may allocate all space at once.
217
+ directories := append (dState .directories , rootDirectory )
218
+ maximumTreeSizeBytes := 0
219
+ for _ , directory := range directories {
220
+ maximumTreeSizeBytes += len (directory )
221
+ }
222
+ maximumTreeSizeBytes += len (directories ) * (1 + protowire .SizeVarint (uint64 (maximumTreeSizeBytes )))
223
+
224
+ // Construct the Tree object. We don't want to use
225
+ // proto.Marshal() for this, as it would require us to
226
+ // marshal all of the directories a second time.
227
+ treeData := make ([]byte , 0 , maximumTreeSizeBytes )
228
+ tag := byte (blobstore .TreeRootFieldNumber << 3 ) | byte (protowire .BytesType )
229
+ for i := len (directories ); i > 0 ; i -- {
230
+ directory := directories [i - 1 ]
231
+ treeData = append (treeData , tag )
232
+ treeData = protowire .AppendVarint (treeData , uint64 (len (directory )))
233
+ treeData = append (treeData , directory ... )
234
+ tag = byte (blobstore .TreeChildrenFieldNumber << 3 ) | byte (protowire .BytesType )
235
+ }
283
236
284
- if treeDigest , err := blobstore .CASPutProto (s .context , s .contentAddressableStorage , tree , s .digestFunction ); err == nil {
285
- for _ , path := range paths {
286
- s .actionResult .OutputDirectories = append (
287
- s .actionResult .OutputDirectories ,
288
- & remoteexecution.OutputDirectory {
289
- Path : path ,
290
- TreeDigest : treeDigest .GetProto (),
291
- })
237
+ digestGenerator := s .digestFunction .NewGenerator ()
238
+ if _ , err := digestGenerator .Write (treeData ); err != nil {
239
+ panic (err )
240
+ }
241
+ treeDigest := digestGenerator .Sum ()
242
+
243
+ if err := s .contentAddressableStorage .Put (s .context , treeDigest , buffer .NewValidatedBufferFromByteSlice (treeData )); err == nil {
244
+ for _ , path := range paths {
245
+ s .actionResult .OutputDirectories = append (
246
+ s .actionResult .OutputDirectories ,
247
+ & remoteexecution.OutputDirectory {
248
+ Path : path ,
249
+ TreeDigest : treeDigest .GetProto (),
250
+ })
251
+ }
252
+ } else {
253
+ s .saveError (util .StatusWrapf (err , "Failed to store output directory %#v" , dPath .String ()))
292
254
}
293
255
} else {
294
- s .saveError (util . StatusWrapf ( err , "Failed to store output directory %#v" , dPath . String ()) )
256
+ s .saveError (err )
295
257
}
296
258
}
297
259
@@ -341,6 +303,91 @@ func (s *uploadOutputsState) uploadOutputSymlink(d UploadableDirectory, name pat
341
303
}
342
304
}
343
305
306
+ // UploadOutputDirectoryState is used by OutputHierarchy.UploadOutputs()
307
+ // to track state specific to uploading a single output directory.
308
+ type uploadOutputDirectoryState struct {
309
+ * uploadOutputsState
310
+
311
+ directories [][]byte
312
+ directoriesSeen map [digest.Digest ]struct {}
313
+ }
314
+
315
+ // UploadDirectory is called to upload a single directory. Elements in
316
+ // the directory are stored in a remoteexecution.Directory, so that they
317
+ // can be placed in a remoteexecution.Tree.
318
+ func (s * uploadOutputDirectoryState ) uploadDirectory (d UploadableDirectory , dPath * path.Trace ) ([]byte , error ) {
319
+ files , err := d .ReadDir ()
320
+ if err != nil {
321
+ return nil , util .StatusWrapf (err , "Failed to read output directory %#v" , dPath .String ())
322
+ }
323
+
324
+ var directory remoteexecution.Directory
325
+ for _ , file := range files {
326
+ name := file .Name ()
327
+ childPath := dPath .Append (name )
328
+ switch fileType := file .Type (); fileType {
329
+ case filesystem .FileTypeRegularFile :
330
+ if childDigest , err := d .UploadFile (s .context , name , s .digestFunction ); err == nil {
331
+ directory .Files = append (directory .Files , & remoteexecution.FileNode {
332
+ Name : name .String (),
333
+ Digest : childDigest .GetProto (),
334
+ IsExecutable : file .IsExecutable (),
335
+ })
336
+ } else {
337
+ s .saveError (util .StatusWrapf (err , "Failed to store output file %#v" , childPath .String ()))
338
+ }
339
+ case filesystem .FileTypeDirectory :
340
+ if childDirectory , err := d .EnterUploadableDirectory (name ); err == nil {
341
+ childData , err := s .uploadDirectory (childDirectory , dPath )
342
+ childDirectory .Close ()
343
+ if err == nil {
344
+ // Compute the digest of the child
345
+ // directory, so that it may be
346
+ // referenced by the parent.
347
+ digestGenerator := s .digestFunction .NewGenerator ()
348
+ if _ , err := digestGenerator .Write (childData ); err != nil {
349
+ panic (err )
350
+ }
351
+ childDigest := digestGenerator .Sum ()
352
+
353
+ // There is no need to make the
354
+ // directory part of the Tree if we
355
+ // have seen an identical directory
356
+ // previously.
357
+ if _ , ok := s .directoriesSeen [childDigest ]; ! ok {
358
+ s .directories = append (s .directories , childData )
359
+ s .directoriesSeen [childDigest ] = struct {}{}
360
+ }
361
+
362
+ directory .Directories = append (directory .Directories , & remoteexecution.DirectoryNode {
363
+ Name : name .String (),
364
+ Digest : childDigest .GetProto (),
365
+ })
366
+ } else {
367
+ s .saveError (err )
368
+ }
369
+ } else {
370
+ s .saveError (util .StatusWrapf (err , "Failed to enter output directory %#v" , childPath .String ()))
371
+ }
372
+ case filesystem .FileTypeSymlink :
373
+ if target , err := d .Readlink (name ); err == nil {
374
+ directory .Symlinks = append (directory .Symlinks , & remoteexecution.SymlinkNode {
375
+ Name : name .String (),
376
+ Target : target ,
377
+ })
378
+ } else {
379
+ s .saveError (util .StatusWrapf (err , "Failed to read output symlink %#v" , childPath .String ()))
380
+ }
381
+ }
382
+ }
383
+
384
+ data , err := proto .Marshal (& directory )
385
+ if err != nil {
386
+ return nil , util .StatusWrapf (err , "Failed to marshal output directory %#v" , dPath .String ())
387
+ }
388
+ return data , nil
389
+ }
390
+
344
391
// outputNodePath is an implementation of path.ComponentWalker that is
345
392
// used by NewOutputHierarchy() to compute normalized paths of outputs
346
393
// of a build action.
0 commit comments