Skip to content

Commit

Permalink
RemoveData and Decode
Browse files Browse the repository at this point in the history
- Unsealing replica update with sector key works and tested
- Sector key generation added and tested
  • Loading branch information
ZenGround0 committed Dec 3, 2021
1 parent 91f6d4b commit a5be808
Show file tree
Hide file tree
Showing 21 changed files with 281 additions and 25 deletions.
1 change: 1 addition & 0 deletions api/api_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type StorageMiner interface {
ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storage.ReplicaUpdateOut, err *storiface.CallError) error //perm:admin retry:true
ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, vanillaProofs storage.ReplicaVanillaProofs, err *storiface.CallError) error //perm:admin retry:true
ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error //perm:admin retry:true
ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
Expand Down
1 change: 1 addition & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Worker interface {
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Expand Down
26 changes: 26 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
25 changes: 25 additions & 0 deletions documentation/en/api-v0-methods-miner.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnFetch](#ReturnFetch)
* [ReturnFinalizeSector](#ReturnFinalizeSector)
* [ReturnGenerateSectorKeyFromData](#ReturnGenerateSectorKeyFromData)
* [ReturnMoveStorage](#ReturnMoveStorage)
* [ReturnProveReplicaUpdate1](#ReturnProveReplicaUpdate1)
* [ReturnProveReplicaUpdate2](#ReturnProveReplicaUpdate2)
Expand Down Expand Up @@ -1443,6 +1444,30 @@ Response: `{}`
### ReturnFinalizeSector


Perms: admin

Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Code": 0,
"Message": "string value"
}
]
```

Response: `{}`

### ReturnGenerateSectorKeyFromData


Perms: admin

Inputs:
Expand Down
37 changes: 37 additions & 0 deletions documentation/en/api-v0-methods-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* [AddPiece](#AddPiece)
* [Finalize](#Finalize)
* [FinalizeSector](#FinalizeSector)
* [Generate](#Generate)
* [GenerateSectorKeyFromData](#GenerateSectorKeyFromData)
* [Move](#Move)
* [MoveStorage](#MoveStorage)
* [Process](#Process)
Expand Down Expand Up @@ -220,6 +222,41 @@ Response:
}
```

## Generate


### GenerateSectorKeyFromData


Perms: admin

Inputs:
```json
[
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
]
```

Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```

## Move


Expand Down
2 changes: 1 addition & 1 deletion extern/filecoin-ffi
Submodule filecoin-ffi updated 1 files
+21 −12 rust/Cargo.lock
60 changes: 54 additions & 6 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,23 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
return pieceCID, werr()
}

func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string) (bool, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
return false, nil
} else if err != nil {
return false, xerrors.Errorf("reading updated replica: %w", err)
}
defer done()

// Sector data stored in replica update
updateProof, err := sector.ProofType.RegisteredUpdateProof()
if err != nil {
return false, err
}
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, paths.Update, paths.Sealed, paths.Cache, commD)
}

func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
Expand Down Expand Up @@ -301,6 +318,16 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
return nil
}

// If piece data stored in updated replica decode whole sector
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed)
if err != nil {
return xerrors.Errorf("decoding sector from replica: %w", err)
}
if decoded {
return pf.MarkAllocated(0, maxPieceSize)
}

// Piece data sealed in sector
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err)
Expand Down Expand Up @@ -626,12 +653,6 @@ func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, p
return empty, err
}
}

// XXX: we want to keep the stuff at the end
if err := os.Truncate(paths.Unsealed, sealedSize); err != nil {
return empty, xerrors.Errorf("failed to truncate unsealed data file: %w", err)
}

sealed, unsealed, err := ffi.SectorUpdate.EncodeInto(updateProofType, paths.Update, paths.UpdateCache, paths.Sealed, paths.Cache, paths.Unsealed, pieces)
if err != nil {
return empty, xerrors.Errorf("failed to update replica %d with new deal data: %w", sector.ID.Number, err)
Expand Down Expand Up @@ -661,6 +682,33 @@ func (sb *Sealer) ProveReplicaUpdate2(ctx context.Context, sector storage.Sector
return ffi.SectorUpdate.GenerateUpdateProofWithVanilla(updateProofType, sectorKey, newSealed, newUnsealed, vanillaProofs)
}

func (sb *Sealer) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTSealed, storiface.PathSealing)
defer done()
if err != nil {
return xerrors.Errorf("failed to acquire sector paths: %w", err)
}

s, err := os.Stat(paths.Update)
if err != nil {
return xerrors.Errorf("measuring update file size: %w", err)
}
sealedSize := s.Size()
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
if err != nil {
return xerrors.Errorf("ensuring sector key file exists: %w", err)
}
if err := fallocate.Fallocate(e, 0, sealedSize); err != nil {
return xerrors.Errorf("allocating space for sector key file: %w", err)
}
if err := e.Close(); err != nil {
return err
}

updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
return ffi.SectorUpdate.RemoveData(updateProofType, paths.Sealed, paths.Cache, paths.Update, paths.UpdateCache, paths.Unsealed, commD)
}

func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
return xerrors.Errorf("not supported at this layer")
}
Expand Down
73 changes: 70 additions & 3 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,15 +578,72 @@ func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef,
return nil
}

func (m *Manager) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
return nil
func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storage.SectorRef) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}

return m.storage.Remove(ctx, sector.ID, storiface.FTSealed, true, nil)
}

func (m *Manager) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTRegenSectorKey, sector, commD)
if err != nil {
return xerrors.Errorf("getWork: %w", err)
}
defer cancel()

var waitErr error
waitRes := func() {
_, werr := m.waitWork(ctx, wk)
if werr != nil {
waitErr = werr
return
}
}

if wait { // already in progress
waitRes()
return waitErr
}

if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTSealed|storiface.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}

// NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort
selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTCache, true)

err = m.sched.Schedule(ctx, sector, sealtasks.TTRegenSectorKey, selector, m.schedFetch(sector, storiface.FTUpdate|storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
err := m.startWork(ctx, w, wk)(w.GenerateSectorKeyFromData(ctx, sector, commD))
if err != nil {
return err
}

waitRes()
return nil
})
if err != nil {
return err
}

return waitErr
}

func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}

Expand All @@ -601,6 +658,12 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUpdate, true, nil); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUpdateCache, true, nil); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
}

return err
}
Expand Down Expand Up @@ -790,6 +853,10 @@ func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storifac
return m.returnResult(ctx, callID, proof, err)
}

func (m *Manager) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}
Expand Down
Loading

0 comments on commit a5be808

Please sign in to comment.