Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persistant availabilties #798

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion codex/rest/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ logScope:
declareCounter(codex_api_uploads, "codex API uploads")
declareCounter(codex_api_downloads, "codex API downloads")

proc getLongestRequestEnd(node: CodexNodeRef, availabilityId: AvailabilityId): ?!SecondsSince1970 =
without contracts =? node.contracts.host:
return failure("Sales unavailable")

let
reservations = contracts.sales.context.reservations
market = contracts.sales.context.market
requestEndFutures = reservations.all(Reservation, availabilityId).mapIt(market.getRequestEnd(it.requestId))

if len(requestEndFutures) == 0:
return success(0)

try:
let requestEnds = await allFutures(requestEndFutures)

return success(requestEnds.reduce(max))
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)

proc validate(
pattern: string,
value: string): int
Expand Down Expand Up @@ -276,6 +297,9 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
if restAv.totalSize == 0:
return RestApiResponse.error(Http400, "Total size must be larger then zero")

if restAv.until < 0:
return RestApiResponse.error(Http400, "Until parameter has to be positive integer")

if not reservations.hasAvailable(restAv.totalSize.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")

Expand All @@ -284,7 +308,9 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
restAv.totalSize,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
restAv.maxCollateral,
restAv.until,
restAv.enabled |? true)
), error:
return RestApiResponse.error(Http500, error.msg)

Expand Down Expand Up @@ -350,6 +376,19 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
if maxCollateral =? restAv.maxCollateral:
availability.maxCollateral = maxCollateral

if enabled =? restAv.enabled:
availability.enabled = enabled

if until =? restAv.until:
if until < 0:
return RestApiResponse.error(Http400, "Until parameter must be greater or equal 0. Got: " & $until)

let longestRequestEnd = node.getLongestRequestEnd(id)
if until != 0 && until < longestRequestEnd:
return RestApiResponse.error(Http400, "Until parameter must be greater or equal the current longest request. Longest request ends at: " & $longestRequestEnd)

availability.until = until

if err =? (await reservations.update(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)

Expand Down
4 changes: 3 additions & 1 deletion codex/rest/json.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type

RestAvailability* = object
totalSize* {.serialize.}: UInt256
freeSize* {.serialize.}: ?UInt256
duration* {.serialize.}: UInt256
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
freeSize* {.serialize.}: ?UInt256
until* {.serialize.}: int64
enabled* {.serialize.}: ?bool

RestSalesAgent* = object
state* {.serialize.}: string
Expand Down
25 changes: 5 additions & 20 deletions codex/sales.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func new*(_: type Sales,
repo: RepoStore,
simulateProofFailures: int): Sales =

let reservations = Reservations.new(repo)
let reservations = Reservations.new(repo, clock)
Sales(
context: SalesContext(
market: market,
Expand All @@ -110,7 +110,6 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =

proc cleanUp(sales: Sales,
agent: SalesAgent,
returnBytes: bool,
processing: Future[void]) {.async.} =

let data = agent.data
Expand All @@ -121,20 +120,6 @@ proc cleanUp(sales: Sales,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default

# if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned
if returnBytes and request =? data.request and reservation =? data.reservation:
if returnErr =? (await sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId,
reservation.id,
request.ask.slotSize
)).errorOption:
error "failure returning bytes",
error = returnErr.msg,
availabilityId = reservation.availabilityId,
bytes = request.ask.slotSize

# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
Expand Down Expand Up @@ -176,8 +161,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)

agent.onCleanUp = proc (returnBytes = false) {.async.} =
await sales.cleanUp(agent, returnBytes, done)
agent.onCleanUp = proc () {.async.} =
await sales.cleanUp(agent, done)

agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done)
Expand Down Expand Up @@ -241,9 +226,9 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex,
some slot.request)

agent.onCleanUp = proc(returnBytes = false) {.async.} =
agent.onCleanUp = proc() {.async.} =
let done = newFuture[void]("onCleanUp_Dummy")
await sales.cleanUp(agent, returnBytes, done)
await sales.cleanUp(agent, done)
await done # completed in sales.cleanUp

agent.start(SaleUnknown())
Expand Down
59 changes: 36 additions & 23 deletions codex/sales/reservations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,32 @@ type
ReservationId* = distinct array[32, byte]
SomeStorableObject = Availability | Reservation
SomeStorableId = AvailabilityId | ReservationId

Availability* = ref object
id* {.serialize.}: AvailabilityId
totalSize* {.serialize.}: UInt256
freeSize* {.serialize.}: UInt256
duration* {.serialize.}: UInt256
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
# 0 means non-restricted, otherwise contains timestamp until the Availability will be renewed
until* {.serialize.}: SecondsSince1970
# false means that the availability won't be immidiatelly considered for sale
enabled* {.serialize.}: bool

Reservation* = ref object
id* {.serialize.}: ReservationId
availabilityId* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
reservedSize* {.serialize.}: UInt256
totalSize* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256

Reservations* = ref object
repo: RepoStore
clock: Clock
onAvailabilityAdded: ?OnAvailabilityAdded

GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
StorableIter* = ref object
Expand All @@ -91,9 +101,10 @@ const
ReservationsKey = (SalesKey / "reservations").tryGet

proc new*(T: type Reservations,
repo: RepoStore): Reservations =
repo: RepoStore,
clock: Clock): Reservations =

T(repo: repo)
T(repo: repo, clock: clock)

proc init*(
_: type Availability,
Expand All @@ -110,14 +121,15 @@ proc init*(
proc init*(
_: type Reservation,
availabilityId: AvailabilityId,
size: UInt256,
totalSize: UInt256,
reservedSize: UInt256,
requestId: RequestId,
slotIndex: UInt256
): Reservation =

var id: array[32, byte]
doAssert randomBytes(id) == 32
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex)
Reservation(id: ReservationId(id), availabilityId: availabilityId, totalSize: totalSize, reservedSize: reservedSize, requestId: requestId, slotIndex: slotIndex)

func toArray(id: SomeStorableId): array[32, byte] =
array[32, byte](id)
Expand Down Expand Up @@ -168,8 +180,7 @@ proc exists*(
self: Reservations,
key: Key): Future[bool] {.async.} =

let exists = await self.repo.metaDs.contains(key)
return exists
return await self.repo.metaDs.contains(key)

proc getImpl(
self: Reservations,
Expand Down Expand Up @@ -280,17 +291,17 @@ proc deleteReservation*(
else:
return failure(error)

if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)

without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)

without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
if reservation.reservedSize > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.reservedSize

availability.freeSize += reservation.size
availability.freeSize += reservation.reservedSize

if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)
Expand All @@ -305,12 +316,14 @@ proc createAvailability*(
size: UInt256,
duration: UInt256,
minPrice: UInt256,
maxCollateral: UInt256): Future[?!Availability] {.async.} =
maxCollateral: UInt256,
until: SecondsSince1970 = 0,
enabled = true): Future[?!Availability] {.async.} =

trace "creating availability", size, duration, minPrice, maxCollateral

let availability = Availability.init(
size, size, duration, minPrice, maxCollateral
size, size, duration, minPrice, maxCollateral, until, enabled
)
let bytes = availability.freeSize.truncate(uint)

Expand All @@ -327,7 +340,8 @@ proc createAvailability*(

return failure(updateErr)

if onAvailabilityAdded =? self.onAvailabilityAdded:
# we won't trigger the callback if the availability is not enabled
if enabled and onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onAvailabilityAdded(availability)
except CatchableError as e:
Expand All @@ -348,7 +362,7 @@ proc createReservation*(

trace "creating reservation", availabilityId, slotSize, requestId, slotIndex

let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
let reservation = Reservation.init(availabilityId, slotSize, slotSize, requestId, slotIndex)

without availabilityKey =? availabilityId.key, error:
return failure(error)
Expand Down Expand Up @@ -397,7 +411,6 @@ proc returnBytesToAvailability*(
reservationId
availabilityId


without key =? key(reservationId, availabilityId), error:
return failure(error)

Expand All @@ -406,7 +419,7 @@ proc returnBytesToAvailability*(

# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
let bytesToBeReturned = bytes - reservation.reservedSize

if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
Expand Down Expand Up @@ -459,7 +472,7 @@ proc release*(
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)

if reservation.size < bytes.u256:
if reservation.reservedSize < bytes.u256:
let error = newException(
BytesOutOfBoundsError,
"trying to release an amount of bytes that is greater than the total size of the Reservation")
Expand All @@ -468,7 +481,7 @@ proc release*(
if releaseErr =? (await self.repo.release(bytes)).errorOption:
return failure(releaseErr.toErr(ReleaseFailedError))

reservation.size -= bytes.u256
reservation.reservedSize -= bytes.u256

# persist partially used Reservation with updated size
if err =? (await self.update(reservation)).errorOption:
Expand Down
2 changes: 1 addition & 1 deletion codex/sales/salesagent.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled

OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].}
OnCleanUp* = proc (): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}

Expand Down
2 changes: 1 addition & 1 deletion codex/sales/states/cancelled.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)

if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true)
await onCleanUp()

warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex
7 changes: 7 additions & 0 deletions codex/sales/states/downloading.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,11 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
return some State(SaleErrored(error: err))

trace "Download complete"

if updatedReservation =? await reservations.get(reservation.id, Reservation):
if updatedReservation.size != 0:
error "After downloading the data there is unused capacity in Reservation"
else:
error "Couldn't get updated reservation"

return some State(SaleInitialProving())
2 changes: 1 addition & 1 deletion codex/sales/states/errored.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)

if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true)
await onCleanUp()

Loading