Skip to content

Commit

Permalink
Measure used disk storage (#6685)
Browse files Browse the repository at this point in the history
* [WIP] measure disk space usage

* [WIP] store storage report

* measure per mag or other directory on that level

* use apache file utils instead of calling du

* access methods

* pipe reports to db

* fix sql error logging for composed queries

* wip: logic of when to refresh what

* fetch instead of report

* evolutions, refresh on ds actions, cleanup

* snapshots

* changelog + migration guide

* use Instant in storage service

* fix evolution numbers

* connect measured used storage to frontend organization page
  • Loading branch information
fm3 authored Jan 11, 2023
1 parent 7f53158 commit 873cc6d
Show file tree
Hide file tree
Showing 31 changed files with 469 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released

### Added
- The target folder of a dataset can now be specified during upload. Also, clicking "Add Dataset" from an active folder will upload the dataset to that folder by default. [#6704](https://github.com/scalableminds/webknossos/pull/6704)
- The storage used by an organization’s datasets can now be measured. [#6685](https://github.com/scalableminds/webknossos/pull/6685)

### Changed
- Improved performance of opening a dataset or annotation. [#6711](https://github.com/scalableminds/webknossos/pull/6711)
Expand Down
1 change: 1 addition & 0 deletions MIGRATIONS.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ User-facing changes are documented in the [changelog](CHANGELOG.released.md).

- [094-pricing-plans.sql](conf/evolutions/reversions/094-pricing-plans.sql)
- [095-constraint-naming.sql](conf/evolutions/reversions/095-constraint-naming.sql)
- [096-storage.sql](conf/evolutions/096-storage.sql)
2 changes: 2 additions & 0 deletions app/WebKnossosModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import models.analytics.AnalyticsSessionService
import models.annotation.AnnotationStore
import models.binary.DataSetService
import models.job.{JobService, WorkerLivenessService}
import models.storage.UsedStorageService
import models.task.TaskService
import models.user.time.TimeSpanService
import models.user._
Expand Down Expand Up @@ -34,5 +35,6 @@ class WebKnossosModule extends AbstractModule {
bind(classOf[AnalyticsSessionService]).asEagerSingleton()
bind(classOf[WorkerLivenessService]).asEagerSingleton()
bind(classOf[ElasticsearchClient]).asEagerSingleton()
bind(classOf[UsedStorageService]).asEagerSingleton()
}
}
3 changes: 2 additions & 1 deletion app/controllers/DataSetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class DataSetController @Inject()(userService: UserService,
.clientFor(dataSet)(GlobalAccessContext)
.flatMap(
_.requestDataLayerThumbnail(organizationName,
dataSet,
dataLayerName,
width,
height,
Expand Down Expand Up @@ -320,7 +321,7 @@ class DataSetController @Inject()(userService: UserService,
datalayer <- usableDataSource.dataLayers.headOption.toFox ?~> "dataSet.noLayers"
_ <- dataSetService
.clientFor(dataSet)(GlobalAccessContext)
.flatMap(_.findPositionWithData(organizationName, datalayer.name).flatMap(posWithData =>
.flatMap(_.findPositionWithData(organizationName, dataSet, datalayer.name).flatMap(posWithData =>
bool2Fox(posWithData.value("position") != JsNull))) ?~> "dataSet.loadingDataFailed"
} yield {
Ok("Ok")
Expand Down
17 changes: 14 additions & 3 deletions app/controllers/WKRemoteDataStoreController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import models.binary._
import models.folder.FolderDAO
import models.job.JobDAO
import models.organization.OrganizationDAO
import models.storage.UsedStorageService
import models.user.{User, UserDAO, UserService}
import net.liftweb.common.Full
import oxalis.mail.{MailchimpClient, MailchimpTag}
Expand All @@ -35,6 +36,7 @@ class WKRemoteDataStoreController @Inject()(
analyticsService: AnalyticsService,
userService: UserService,
organizationDAO: OrganizationDAO,
usedStorageService: UsedStorageService,
dataSetDAO: DataSetDAO,
userDAO: UserDAO,
folderDAO: FolderDAO,
Expand Down Expand Up @@ -95,6 +97,7 @@ class WKRemoteDataStoreController @Inject()(
dataSet <- dataSetDAO.findOneByNameAndOrganization(dataSetName, user._organization)(GlobalAccessContext) ?~> Messages(
"dataSet.notFound",
dataSetName) ~> NOT_FOUND
_ <- usedStorageService.refreshStorageReportForDataset(dataSet)
_ = analyticsService.track(UploadDatasetEvent(user, dataSet, dataStore, dataSetSizeBytes))
_ = mailchimpClient.tagUser(user, MailchimpTag.HasUploadedOwnDataset)
} yield Ok
Expand All @@ -106,7 +109,11 @@ class WKRemoteDataStoreController @Inject()(
request.body.validate[DataStoreStatus] match {
case JsSuccess(status, _) =>
logger.debug(s"Status update from data store '$name'. Status: " + status.ok)
dataStoreDAO.updateUrlByName(name, status.url).map(_ => Ok)
for {
_ <- dataStoreDAO.updateUrlByName(name, status.url)
_ <- dataStoreDAO.updateReportUsedStorageEnabledByName(name,
status.reportUsedStorageEnabled.getOrElse(false))
} yield Ok
case e: JsError =>
logger.error("Data store '$name' sent invalid update. Error: " + e)
Future.successful(JsonBadRequest(JsError.toJson(e)))
Expand Down Expand Up @@ -159,8 +166,12 @@ class WKRemoteDataStoreController @Inject()(
.findOneByNameAndOrganizationName(datasourceId.name, datasourceId.team)(GlobalAccessContext)
.futureBox
_ <- existingDataset.flatMap {
case Full(dataset) => dataSetDAO.deleteDataset(dataset._id)
case _ => Fox.successful(())
case Full(dataset) => {
dataSetDAO
.deleteDataset(dataset._id)
.flatMap(_ => usedStorageService.refreshStorageReportForDataset(dataset))
}
case _ => Fox.successful(())
}
} yield Ok
}
Expand Down
2 changes: 1 addition & 1 deletion app/models/binary/DataSetService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class DataSetService @Inject()(organizationDAO: OrganizationDAO,
def clientFor(dataSet: DataSet)(implicit ctx: DBAccessContext): Fox[WKRemoteDataStoreClient] =
for {
dataStore <- dataStoreFor(dataSet)
} yield new WKRemoteDataStoreClient(dataStore, dataSet, rpc)
} yield new WKRemoteDataStoreClient(dataStore, rpc)

def lastUsedTimeFor(_dataSet: ObjectId, userOpt: Option[User]): Fox[Instant] =
userOpt match {
Expand Down
19 changes: 17 additions & 2 deletions app/models/binary/DataStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ case class DataStore(
isDeleted: Boolean = false,
isConnector: Boolean = false,
allowsUpload: Boolean = true,
reportUsedStorageEnabled: Boolean = false,
onlyAllowedOrganization: Option[ObjectId] = None
)

Expand All @@ -45,6 +46,7 @@ object DataStore {
isDeleted = false,
isConnector.getOrElse(false),
allowsUpload.getOrElse(true),
reportUsedStorageEnabled = false,
None
)

Expand Down Expand Up @@ -102,6 +104,7 @@ class DataStoreDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext
r.isdeleted,
r.isconnector,
r.allowsupload,
r.reportusedstorageenabled,
r.onlyallowedorganization.map(ObjectId(_))
))

Expand All @@ -126,16 +129,28 @@ class DataStoreDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionContext
parsed <- parseAll(r)
} yield parsed

def findAllWithStorageReporting: Fox[List[DataStore]] =
for {
r <- run(sql"select #$columns from webknossos.datastores_ where reportUsedStorageEnabled".as[DatastoresRow])
parsed <- parseAll(r)
} yield parsed

def updateUrlByName(name: String, url: String): Fox[Unit] = {
val q = for { row <- Datastores if notdel(row) && row.name === name } yield row.url
for { _ <- run(q.update(url)) } yield ()
}

def updateReportUsedStorageEnabledByName(name: String, reportUsedStorageEnabled: Boolean): Fox[Unit] =
for {
_ <- run(
sqlu"UPDATE webknossos.dataStores SET reportUsedStorageEnabled = $reportUsedStorageEnabled WHERE name = $name")
} yield ()

def insertOne(d: DataStore): Fox[Unit] =
for {
_ <- run(
sqlu"""insert into webknossos.dataStores(name, url, publicUrl, key, isScratch, isDeleted, isConnector, allowsUpload)
values(${d.name}, ${d.url}, ${d.publicUrl}, ${d.key}, ${d.isScratch}, ${d.isDeleted}, ${d.isConnector}, ${d.allowsUpload})""")
sqlu"""insert into webknossos.dataStores(name, url, publicUrl, key, isScratch, isDeleted, isConnector, allowsUpload, reportUsedStorageEnabled)
values(${d.name}, ${d.url}, ${d.publicUrl}, ${d.key}, ${d.isScratch}, ${d.isDeleted}, ${d.isConnector}, ${d.allowsUpload}, ${d.reportUsedStorageEnabled})""")
} yield ()

def deleteOneByName(name: String): Fox[Unit] =
Expand Down
15 changes: 11 additions & 4 deletions app/models/binary/WKRemoteDataStoreClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package models.binary
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.rpc.RPC
import com.scalableminds.webknossos.datastore.services.DirectoryStorageReport
import com.typesafe.scalalogging.LazyLogging
import controllers.RpcTokenHolder
import play.api.libs.json.JsObject
import play.utils.UriEncoding

class WKRemoteDataStoreClient(dataStore: DataStore, dataSet: DataSet, rpc: RPC) extends LazyLogging {

def baseInfo = s"Dataset: ${dataSet.name} Datastore: ${dataStore.url}"
class WKRemoteDataStoreClient(dataStore: DataStore, rpc: RPC) extends LazyLogging {

def requestDataLayerThumbnail(organizationName: String,
dataSet: DataSet,
dataLayerName: String,
width: Int,
height: Int,
Expand All @@ -29,12 +29,19 @@ class WKRemoteDataStoreClient(dataStore: DataStore, dataSet: DataSet, rpc: RPC)
.getWithBytesResponse
}

def findPositionWithData(organizationName: String, dataLayerName: String): Fox[JsObject] =
def findPositionWithData(organizationName: String, dataSet: DataSet, dataLayerName: String): Fox[JsObject] =
rpc(
s"${dataStore.url}/data/datasets/${urlEncode(organizationName)}/${dataSet.urlEncodedName}/layers/$dataLayerName/findData")
.addQueryString("token" -> RpcTokenHolder.webKnossosToken)
.getWithJsonResponse[JsObject]

private def urlEncode(text: String) = UriEncoding.encodePathSegment(text, "UTF-8")

def fetchStorageReport(organizationName: String, datasetName: Option[String]): Fox[List[DirectoryStorageReport]] =
rpc(s"${dataStore.url}/data/datasets/measureUsedStorage/${urlEncode(organizationName)}")
.addQueryString("token" -> RpcTokenHolder.webKnossosToken)
.addQueryStringOptional("dataSetName", datasetName)
.silent
.getWithJsonResponse[List[DirectoryStorageReport]]

}
71 changes: 67 additions & 4 deletions app/models/organization/Organization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package models.organization
import com.scalableminds.util.accesscontext.DBAccessContext
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.services.DirectoryStorageReport
import com.scalableminds.webknossos.schema.Tables._

import javax.inject.Inject
import models.team.PricingPlan
import models.team.PricingPlan.PricingPlan
import slick.jdbc.PostgresProfile.api._
import slick.lifted.Rep
import utils.sql.{SQLClient, SQLDAO}
import utils.ObjectId

import javax.inject.Inject
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

case class Organization(
_id: ObjectId,
Expand All @@ -24,7 +25,7 @@ case class Organization(
pricingPlan: PricingPlan,
paidUntil: Option[Instant],
includedUsers: Option[Int], // None means unlimited
includedStorage: Option[Long], // None means unlimited
includedStorageBytes: Option[Long], // None means unlimited
_rootFolder: ObjectId,
newUserMailingList: String = "",
overTimeMailingList: String = "",
Expand Down Expand Up @@ -101,7 +102,7 @@ class OrganizationDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionCont
VALUES
(${o._id.id}, ${o.name}, ${o.additionalInformation}, ${o.logoUrl}, ${o.displayName}, ${o._rootFolder},
${o.newUserMailingList}, ${o.overTimeMailingList}, ${o.enableAutoVerify},
'#${o.pricingPlan}', ${o.paidUntil}, ${o.includedUsers}, ${o.includedStorage}, ${o.lastTermsOfServiceAcceptanceTime},
'#${o.pricingPlan}', ${o.paidUntil}, ${o.includedUsers}, ${o.includedStorageBytes}, ${o.lastTermsOfServiceAcceptanceTime},
${o.lastTermsOfServiceAcceptanceVersion}, ${o.created}, ${o.isDeleted})
""")
} yield ()
Expand Down Expand Up @@ -132,6 +133,68 @@ class OrganizationDAO @Inject()(sqlClient: SQLClient)(implicit ec: ExecutionCont
where _id = $organizationId""")
} yield ()

def deleteUsedStorage(organizationId: ObjectId): Fox[Unit] =
for {
_ <- run(sqlu"DELETE FROM webknossos.organization_usedStorage WHERE _organization = $organizationId")
} yield ()

def deleteUsedStorageForDataset(datasetId: ObjectId): Fox[Unit] =
for {
_ <- run(sqlu"DELETE FROM webknossos.organization_usedStorage WHERE _dataSet = $datasetId")
} yield ()

def updateLastStorageScanTime(organizationId: ObjectId, time: Instant): Fox[Unit] =
for {
_ <- run(sqlu"UPDATE webknossos.organizations SET lastStorageScanTime = $time WHERE _id = $organizationId")
} yield ()

def upsertUsedStorage(organizationId: ObjectId,
dataStoreName: String,
usedStorageEntries: List[DirectoryStorageReport]): Fox[Unit] = {
val queries = usedStorageEntries.map(entry => sqlu"""
WITH ds AS (
SELECT _id
FROM webknossos.datasets_
WHERE _organization = $organizationId
AND name = ${entry.dataSetName}
LIMIT 1
)
INSERT INTO webknossos.organization_usedStorage(
_organization, _dataStore, _dataSet, layerName,
magOrDirectoryName, usedStorageBytes, lastUpdated)
SELECT
$organizationId, $dataStoreName, ds._id, ${entry.layerName},
${entry.magOrDirectoryName}, ${entry.usedStorageBytes}, NOW()
FROM ds
ON CONFLICT (_organization, _dataStore, _dataSet, layerName, magOrDirectoryName)
DO UPDATE
SET usedStorageBytes = ${entry.usedStorageBytes}, lastUpdated = NOW()
""")
for {
_ <- Fox.serialCombined(queries)(q => run(q))
} yield ()
}

def getUsedStorage(organizationId: ObjectId): Fox[Long] =
for {
rows <- run(
sql"SELECT SUM(usedStorageBytes) FROM webknossos.organization_usedStorage WHERE _organization = $organizationId"
.as[Long])
firstRow <- rows.headOption
} yield firstRow

def findNotRecentlyScanned(scanInterval: FiniteDuration, limit: Int): Fox[List[Organization]] =
for {
rows <- run(sql"""
SELECT #$columns
FROM #$existingCollectionName
WHERE lastStorageScanTime < ${Instant.now - scanInterval}
ORDER BY lastStorageScanTime
LIMIT $limit
""".as[OrganizationsRow])
parsed <- parseAll(rows)
} yield parsed

def acceptTermsOfService(organizationId: ObjectId, version: Int, timestamp: Long)(
implicit ctx: DBAccessContext): Fox[Unit] =
for {
Expand Down
8 changes: 5 additions & 3 deletions app/models/organization/OrganizationService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO,
"lastTermsOfServiceAcceptanceVersion" -> organization.lastTermsOfServiceAcceptanceVersion
)
} else Json.obj()
Fox.successful(
for {
usedStorageBytes <- organizationDAO.getUsedStorage(organization._id)
} yield
Json.obj(
"id" -> organization._id.toString,
"name" -> organization.name,
Expand All @@ -46,9 +48,9 @@ class OrganizationService @Inject()(organizationDAO: OrganizationDAO,
"pricingPlan" -> organization.pricingPlan,
"paidUntil" -> organization.paidUntil,
"includedUsers" -> organization.includedUsers,
"includedStorage" -> organization.includedStorage.map(bytes => bytes / 1000000)
"includedStorageBytes" -> organization.includedStorageBytes,
"usedStorageBytes" -> usedStorageBytes
) ++ adminOnlyInfo
)
}

def findOneByInviteByNameOrDefault(inviteOpt: Option[Invite], organizationNameOpt: Option[String])(
Expand Down
Loading

0 comments on commit 873cc6d

Please sign in to comment.