Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions superset/commands/dashboard/permalink/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.dashboard.permalink.base import BaseDashboardPermalinkCommand
from superset.commands.key_value.upsert import UpsertKeyValueCommand
from superset.daos.dashboard import DashboardDAO
from superset.daos.key_value import KeyValueDAO
from superset.dashboards.permalink.exceptions import DashboardPermalinkCreateFailedError
from superset.dashboards.permalink.types import DashboardPermalinkState
from superset.key_value.exceptions import (
Expand Down Expand Up @@ -70,14 +71,15 @@ def run(self) -> str:
"state": self.state,
}
user_id = get_user_id()
key = UpsertKeyValueCommand(
entry = KeyValueDAO.upsert_entry(
resource=self.resource,
key=get_deterministic_uuid(self.salt, (user_id, value)),
value=value,
codec=self.codec,
).run()
assert key.id # for type checks
return encode_permalink_key(key=key.id, salt=self.salt)
)
db.session.flush()
assert entry.id # for type checks
return encode_permalink_key(key=entry.id, salt=self.salt)

def validate(self) -> None:
pass
9 changes: 2 additions & 7 deletions superset/commands/dashboard/permalink/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from superset.commands.dashboard.exceptions import DashboardNotFoundError
from superset.commands.dashboard.permalink.base import BaseDashboardPermalinkCommand
from superset.commands.key_value.get import GetKeyValueCommand
from superset.daos.dashboard import DashboardDAO
from superset.daos.key_value import KeyValueDAO
from superset.dashboards.permalink.exceptions import DashboardPermalinkGetFailedError
from superset.dashboards.permalink.types import DashboardPermalinkValue
from superset.key_value.exceptions import (
Expand All @@ -43,12 +43,7 @@ def run(self) -> Optional[DashboardPermalinkValue]:
self.validate()
try:
key = decode_permalink_id(self.key, salt=self.salt)
command = GetKeyValueCommand(
resource=self.resource,
key=key,
codec=self.codec,
)
value: Optional[DashboardPermalinkValue] = command.run()
value = KeyValueDAO.get_value(self.resource, key, self.codec)
if value:
DashboardDAO.get_by_id_or_slug(value["dashboardId"])
return value
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,28 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import uuid
from typing import Any

from flask import current_app

from superset.commands.base import BaseCommand
from superset.distributed_lock.utils import get_key
from superset.key_value.types import JsonKeyValueCodec, KeyValueResource

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class BaseDistributedLockCommand(BaseCommand):
key: uuid.UUID
codec = JsonKeyValueCodec()
resource = KeyValueResource.LOCK

def __init__(self, namespace: str, params: dict[str, Any] | None = None):
self.key = get_key(namespace, **(params or {}))

def validate(self) -> None:
pass
64 changes: 64 additions & 0 deletions superset/commands/distributed_lock/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
from datetime import datetime, timedelta
from functools import partial

from flask import current_app
from sqlalchemy.exc import SQLAlchemyError

from superset.commands.distributed_lock.base import BaseDistributedLockCommand
from superset.daos.key_value import KeyValueDAO
from superset.exceptions import CreateKeyValueDistributedLockFailedException
from superset.key_value.exceptions import (
KeyValueCodecEncodeException,
KeyValueUpsertFailedError,
)
from superset.key_value.types import KeyValueResource
from superset.utils.decorators import on_error, transaction

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class CreateDistributedLock(BaseDistributedLockCommand):
lock_expiration = timedelta(seconds=30)

def validate(self) -> None:
pass

@transaction(
on_error=partial(
on_error,
catches=(
KeyValueCodecEncodeException,
KeyValueUpsertFailedError,
SQLAlchemyError,
),
reraise=CreateKeyValueDistributedLockFailedException,
),
)
def run(self) -> None:
KeyValueDAO.delete_expired_entries(self.resource)
KeyValueDAO.create_entry(
resource=KeyValueResource.LOCK,
value={"value": True},
codec=self.codec,
key=self.key,
expires_on=datetime.now() + self.lock_expiration,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,36 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
from datetime import datetime
from functools import partial

from sqlalchemy import and_
from flask import current_app
from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.base import BaseCommand
from superset.commands.distributed_lock.base import BaseDistributedLockCommand
from superset.daos.key_value import KeyValueDAO
from superset.exceptions import DeleteKeyValueDistributedLockFailedException
from superset.key_value.exceptions import KeyValueDeleteFailedError
from superset.key_value.models import KeyValueEntry
from superset.key_value.types import KeyValueResource
from superset.utils.decorators import on_error, transaction

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class DeleteExpiredKeyValueCommand(BaseCommand):
resource: KeyValueResource

def __init__(self, resource: KeyValueResource):
"""
Delete all expired key-value pairs

:param resource: the resource (dashboard, chart etc)
:return: was the entry deleted or not
"""
self.resource = resource

@transaction(on_error=partial(on_error, reraise=KeyValueDeleteFailedError))
def run(self) -> None:
self.delete_expired()

class DeleteDistributedLock(BaseDistributedLockCommand):
def validate(self) -> None:
pass

def delete_expired(self) -> None:
(
db.session.query(KeyValueEntry)
.filter(
and_(
KeyValueEntry.resource == self.resource.value,
KeyValueEntry.expires_on <= datetime.now(),
)
)
.delete()
)
@transaction(
on_error=partial(
on_error,
catches=(
KeyValueDeleteFailedError,
SQLAlchemyError,
),
reraise=DeleteKeyValueDistributedLockFailedException,
),
)
def run(self) -> None:
KeyValueDAO.delete_entry(self.resource, self.key)
45 changes: 45 additions & 0 deletions superset/commands/distributed_lock/get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import logging
from typing import cast

from flask import current_app

from superset.commands.distributed_lock.base import BaseDistributedLockCommand
from superset.daos.key_value import KeyValueDAO
from superset.distributed_lock.types import LockValue

logger = logging.getLogger(__name__)
stats_logger = current_app.config["STATS_LOGGER"]


class GetDistributedLock(BaseDistributedLockCommand):
def validate(self) -> None:
pass

def run(self) -> LockValue | None:
entry = KeyValueDAO.get_entry(
resource=self.resource,
key=self.key,
)
if not entry or entry.is_expired():
return None

return cast(LockValue, self.codec.decode(entry.value))
16 changes: 7 additions & 9 deletions superset/commands/explore/permalink/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.explore.permalink.base import BaseExplorePermalinkCommand
from superset.commands.key_value.create import CreateKeyValueCommand
from superset.daos.key_value import KeyValueDAO
from superset.explore.permalink.exceptions import ExplorePermalinkCreateFailedError
from superset.explore.utils import check_access as check_chart_access
from superset.key_value.exceptions import (
Expand Down Expand Up @@ -65,15 +66,12 @@ def run(self) -> str:
"datasource": self.datasource,
"state": self.state,
}
command = CreateKeyValueCommand(
resource=self.resource,
value=value,
codec=self.codec,
)
key = command.run()
if key.id is None:
entry = KeyValueDAO.create_entry(self.resource, value, self.codec)
db.session.flush()
key = entry.id
if key is None:
raise ExplorePermalinkCreateFailedError("Unexpected missing key id")
return encode_permalink_key(key=key.id, salt=self.salt)
return encode_permalink_key(key=key, salt=self.salt)

def validate(self) -> None:
pass
8 changes: 2 additions & 6 deletions superset/commands/explore/permalink/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from superset.commands.dataset.exceptions import DatasetNotFoundError
from superset.commands.explore.permalink.base import BaseExplorePermalinkCommand
from superset.commands.key_value.get import GetKeyValueCommand
from superset.daos.key_value import KeyValueDAO
from superset.explore.permalink.exceptions import ExplorePermalinkGetFailedError
from superset.explore.permalink.types import ExplorePermalinkValue
from superset.explore.utils import check_access as check_chart_access
Expand All @@ -44,11 +44,7 @@ def run(self) -> Optional[ExplorePermalinkValue]:
self.validate()
try:
key = decode_permalink_id(self.key, salt=self.salt)
value: Optional[ExplorePermalinkValue] = GetKeyValueCommand(
resource=self.resource,
key=key,
codec=self.codec,
).run()
value = KeyValueDAO.get_value(self.resource, key, self.codec)
if value:
chart_id: Optional[int] = value.get("chartId")
# keep this backward compatible for old permalinks
Expand Down
Loading