Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disallow deletion AdminFlag #6518

Merged
merged 5 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tests/unit/accounts/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
TokenMissing,
TooManyFailedLogins,
)
from warehouse.admin.flags import AdminFlag
from warehouse.admin.flags import AdminFlag, AdminFlagValue

from ...common.db.accounts import EmailFactory, UserFactory

Expand Down Expand Up @@ -903,7 +903,9 @@ def test_register_redirect(self, db_request, monkeypatch):

def test_register_fails_with_admin_flag_set(self, db_request):
# This flag was already set via migration, just need to enable it
flag = db_request.db.query(AdminFlag).get("disallow-new-user-registration")
flag = db_request.db.query(AdminFlag).get(
AdminFlagValue.DISALLOW_NEW_USER_REGISTRATION
)
flag.enabled = True

db_request.method = "POST"
Expand Down
23 changes: 21 additions & 2 deletions tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from wtforms.form import Form
from wtforms.validators import ValidationError

from warehouse.admin.flags import AdminFlag
from warehouse.admin.flags import AdminFlag, AdminFlagValue
from warehouse.admin.squats import Squat
from warehouse.classifiers.models import Classifier
from warehouse.forklift import legacy
Expand Down Expand Up @@ -753,6 +753,25 @@ def test_is_duplicate_false(self, pyramid_config, db_request):


class TestFileUpload:
def test_fails_disallow_new_upload(self, pyramid_config, pyramid_request):
pyramid_config.testing_securitypolicy(userid=1)
pyramid_request.flags = pretend.stub(
enabled=lambda value: value == AdminFlagValue.DISALLOW_NEW_UPLOAD
)
pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/")
pyramid_request.user = pretend.stub(primary_email=pretend.stub(verified=True))

with pytest.raises(HTTPForbidden) as excinfo:
legacy.file_upload(pyramid_request)

resp = excinfo.value

assert resp.status_code == 403
assert resp.status == (
"403 New uploads are temporarily disabled. "
"See /the/help/url/ for details"
)

@pytest.mark.parametrize("version", ["2", "3", "-1", "0", "dog", "cat"])
def test_fails_invalid_version(self, pyramid_config, pyramid_request, version):
pyramid_config.testing_securitypolicy(userid=1)
Expand Down Expand Up @@ -1118,7 +1137,7 @@ def test_fails_with_stdlib_names(self, pyramid_config, db_request, name):
def test_fails_with_admin_flag_set(self, pyramid_config, db_request):
admin_flag = (
db_request.db.query(AdminFlag)
.filter(AdminFlag.id == "disallow-new-project-registration")
.filter(AdminFlag.id == AdminFlagValue.DISALLOW_NEW_PROJECT_REGISTRATION)
.first()
)
admin_flag.enabled = True
Expand Down
135 changes: 135 additions & 0 deletions tests/unit/manage/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import warehouse.utils.otp as otp

from warehouse.accounts.interfaces import IPasswordBreachedService, IUserService
from warehouse.admin.flags import AdminFlagValue
from warehouse.macaroons.interfaces import IMacaroonService
from warehouse.manage import views
from warehouse.packaging.models import (
Expand Down Expand Up @@ -2014,6 +2015,7 @@ def test_delete_project_no_confirm(self):
project = pretend.stub(normalized_name="foo")
request = pretend.stub(
POST={},
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
route_path=lambda *a, **kw: "/foo/bar/",
)
Expand All @@ -2023,6 +2025,9 @@ def test_delete_project_no_confirm(self):
assert exc.value.status_code == 303
assert exc.value.headers["Location"] == "/foo/bar/"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.session.flash.calls == [
pretend.call("Confirm the request", queue="error")
]
Expand All @@ -2031,6 +2036,7 @@ def test_delete_project_wrong_confirm(self):
project = pretend.stub(normalized_name="foo")
request = pretend.stub(
POST={"confirm_project_name": "bar"},
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
route_path=lambda *a, **kw: "/foo/bar/",
)
Expand All @@ -2040,13 +2046,46 @@ def test_delete_project_wrong_confirm(self):
assert exc.value.status_code == 303
assert exc.value.headers["Location"] == "/foo/bar/"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.session.flash.calls == [
pretend.call(
"Could not delete project - 'bar' is not the same as 'foo'",
queue="error",
)
]

def test_delete_project_disallow_deletion(self):
project = pretend.stub(name="foo", normalized_name="foo")
request = pretend.stub(
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: True)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)

result = views.delete_project(project, request)
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]

assert request.session.flash.calls == [
pretend.call(
(
"Project deletion temporarily disabled. "
"See https://pypi.org/help#admin-intervention for details."
),
queue="error",
)
]

assert request.route_path.calls == [
pretend.call("manage.project.settings", project_name="foo")
]

def test_delete_project(self, db_request):
project = ProjectFactory.create(name="foo")

Expand Down Expand Up @@ -2159,6 +2198,7 @@ def test_manage_project_releases(self, db_request):
filename=f"foobar-{release.version}.tar.gz",
packagetype="sdist",
)
db_request.flags = pretend.stub(enabled=pretend.call_recorder(lambda *a: False))

assert views.manage_project_releases(project, db_request) == {
"project": project,
Expand All @@ -2182,6 +2222,48 @@ def test_manage_project_release(self):
"files": files,
}

def test_delete_project_release_disallow_deletion(self, monkeypatch):
release = pretend.stub(
version="1.2.3",
canonical_version="1.2.3",
project=pretend.stub(
name="foobar", record_event=pretend.call_recorder(lambda *a, **kw: None)
),
)
request = pretend.stub(
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: True)),
method="POST",
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
view = views.ManageProjectRelease(release, request)

result = view.delete_project_release()
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]

assert request.session.flash.calls == [
pretend.call(
(
"Project deletion temporarily disabled. "
"See https://pypi.org/help#admin-intervention for details."
),
queue="error",
)
]

assert request.route_path.calls == [
pretend.call(
"manage.project.release",
project_name=release.project.name,
version=release.version,
)
]

def test_delete_project_release(self, monkeypatch):
release = pretend.stub(
version="1.2.3",
Expand All @@ -2197,6 +2279,7 @@ def test_delete_project_release(self, monkeypatch):
delete=pretend.call_recorder(lambda a: None),
add=pretend.call_recorder(lambda a: None),
),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
user=pretend.stub(username=pretend.stub()),
Expand All @@ -2215,6 +2298,9 @@ def test_delete_project_release(self, monkeypatch):

assert request.db.delete.calls == [pretend.call(release)]
assert request.db.add.calls == [pretend.call(journal_obj)]
assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert journal_cls.calls == [
pretend.call(
name=release.project.name,
Expand Down Expand Up @@ -2247,6 +2333,7 @@ def test_delete_project_release_no_confirm(self):
POST={"confirm_version": ""},
method="POST",
db=pretend.stub(delete=pretend.call_recorder(lambda a: None)),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
Expand All @@ -2261,6 +2348,9 @@ def test_delete_project_release_no_confirm(self):
assert request.session.flash.calls == [
pretend.call("Confirm the request", queue="error")
]
assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.route_path.calls == [
pretend.call(
"manage.project.release",
Expand All @@ -2275,6 +2365,7 @@ def test_delete_project_release_bad_confirm(self):
POST={"confirm_version": "invalid"},
method="POST",
db=pretend.stub(delete=pretend.call_recorder(lambda a: None)),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
Expand All @@ -2301,6 +2392,42 @@ def test_delete_project_release_bad_confirm(self):
)
]

def test_delete_project_release_file_disallow_deletion(self):
release = pretend.stub(version="1.2.3", project=pretend.stub(name="foobar"))
request = pretend.stub(
method="POST",
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: True)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
view = views.ManageProjectRelease(release, request)

result = view.delete_project_release_file()

assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]

assert request.session.flash.calls == [
pretend.call(
(
"Project deletion temporarily disabled. "
"See https://pypi.org/help#admin-intervention for details."
),
queue="error",
)
]
assert request.route_path.calls == [
pretend.call(
"manage.project.release",
project_name=release.project.name,
version=release.version,
)
]

def test_delete_project_release_file(self, db_request):
user = UserFactory.create()

Expand Down Expand Up @@ -2359,6 +2486,7 @@ def test_delete_project_release_file_no_confirm(self):
POST={"confirm_project_name": ""},
method="POST",
db=pretend.stub(delete=pretend.call_recorder(lambda a: None)),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
Expand All @@ -2370,6 +2498,9 @@ def test_delete_project_release_file_no_confirm(self):
assert result.headers["Location"] == "/the-redirect"

assert request.db.delete.calls == []
assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.session.flash.calls == [
pretend.call("Confirm the request", queue="error")
]
Expand All @@ -2396,6 +2527,7 @@ def no_result_found():
filter=lambda *a: pretend.stub(one=no_result_found)
),
)
db_request.flags = pretend.stub(enabled=pretend.call_recorder(lambda *a: False))
db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/the-redirect")
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
Expand All @@ -2409,6 +2541,9 @@ def no_result_found():
assert result.headers["Location"] == "/the-redirect"

assert db_request.db.delete.calls == []
assert db_request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert db_request.session.flash.calls == [
pretend.call("Could not find file", queue="error")
]
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sqlalchemy.exc import OperationalError

from warehouse import db
from warehouse.admin.flags import AdminFlagValue
from warehouse.db import (
DEFAULT_ISOLATION,
DatabaseNotAvailable,
Expand Down Expand Up @@ -273,7 +274,7 @@ def test_create_session_read_only_mode(
)

assert _create_session(request) is session_obj
assert get.calls == [pretend.call("read-only")]
assert get.calls == [pretend.call(AdminFlagValue.READ_ONLY)]
assert request.tm.doom.calls == doom_calls


Expand Down
3 changes: 2 additions & 1 deletion warehouse/accounts/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
TooManyFailedLogins,
)
from warehouse.accounts.models import Email, User
from warehouse.admin.flags import AdminFlagValue
from warehouse.cache.origin import origin_cache
from warehouse.email import send_email_verification_email, send_password_reset_email
from warehouse.packaging.models import Project, Release
Expand Down Expand Up @@ -377,7 +378,7 @@ def register(request, _form_class=RegistrationForm):
if request.method == "POST" and request.POST.get("confirm_form"):
return HTTPSeeOther(request.route_path("index"))

if request.flags.enabled("disallow-new-user-registration"):
if request.flags.enabled(AdminFlagValue.DISALLOW_NEW_USER_REGISTRATION):
request.session.flash(
(
"New user registration temporarily disabled. "
Expand Down
8 changes: 8 additions & 0 deletions warehouse/admin/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
from warehouse import db


class AdminFlagValue:
calvin marked this conversation as resolved.
Show resolved Hide resolved
DISALLOW_DELETION = "disallow-deletion"
DISALLOW_NEW_PROJECT_REGISTRATION = "disallow-new-project-registration"
DISALLOW_NEW_UPLOAD = "disallow-new-upload"
DISALLOW_NEW_USER_REGISTRATION = "disallow-new-user-registration"
READ_ONLY = "read-only"


class AdminFlag(db.ModelBase):

__tablename__ = "admin_flags"
Expand Down
4 changes: 2 additions & 2 deletions warehouse/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ def cleanup(request):
connection.close()

# Check if we're in read-only mode
from warehouse.admin.flags import AdminFlag
from warehouse.admin.flags import AdminFlag, AdminFlagValue

flag = session.query(AdminFlag).get("read-only")
flag = session.query(AdminFlag).get(AdminFlagValue.READ_ONLY)
if flag and flag.enabled and not request.user.is_superuser:
request.tm.doom()

Expand Down
Loading