Skip to content

Commit

Permalink
Fix CVE-XXXX-XXXX -- Fix Path Traversal security vulnerability
Browse files Browse the repository at this point in the history
  • Loading branch information
codingjoe committed Jun 6, 2022
1 parent 03f6ff8 commit 68ccd2c
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 58 deletions.
16 changes: 13 additions & 3 deletions s3file/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid

from django.conf import settings
from django.core import signing
from django.utils.functional import cached_property
from storages.utils import safe_join

Expand All @@ -16,10 +17,14 @@ class S3FileInputMixin:
"""FileInput that uses JavaScript to directly upload to Amazon S3."""

needs_multipart_form = False
upload_path = str(
getattr(settings, "S3FILE_UPLOAD_PATH", pathlib.PurePosixPath("tmp", "s3file"))
upload_path = safe_join(
str(storage.aws_location),
str(
getattr(
settings, "S3FILE_UPLOAD_PATH", pathlib.PurePosixPath("tmp", "s3file")
)
),
)
upload_path = safe_join(str(storage.location), upload_path)
expires = settings.SESSION_COOKIE_AGE

@property
Expand All @@ -45,6 +50,11 @@ def build_attrs(self, *args, **kwargs):
"data-fields-%s" % key: value for key, value in response["fields"].items()
}
defaults["data-url"] = response["url"]
signer = signing.Signer(
salt=f"{S3FileInputMixin.__module__}.{S3FileInputMixin.__name__}"
)
print(self.upload_folder)
defaults["data-s3f-signature"] = signer.signature(self.upload_folder)
defaults.update(attrs)

try:
Expand Down
43 changes: 36 additions & 7 deletions s3file/middleware.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import logging
import pathlib

from s3file.storages import local_dev, storage
from django.core import signing
from django.core.exceptions import PermissionDenied, SuspiciousFileOperation
from django.utils.crypto import constant_time_compare

from . import views
from .forms import S3FileInputMixin
from .storages import local_dev, storage

logger = logging.getLogger("s3file")

Expand All @@ -15,25 +19,50 @@ def __init__(self, get_response):
def __call__(self, request):
file_fields = request.POST.getlist("s3file")
for field_name in file_fields:

paths = request.POST.getlist(field_name)
request.FILES.setlist(field_name, list(self.get_files_from_storage(paths)))
if paths:
try:
signature = request.POST[f"{field_name}-s3f-signature"]
except KeyError:
raise PermissionDenied("No signature provided.")
try:
request.FILES.setlist(
field_name, list(self.get_files_from_storage(paths, signature))
)
except SuspiciousFileOperation as e:
raise PermissionDenied("Illegal file name!") from e

if local_dev and request.path == "/__s3_mock__/":
return views.S3MockView.as_view()(request)

return self.get_response(request)

@staticmethod
def get_files_from_storage(paths):
def get_files_from_storage(paths, signature):
"""Return S3 file where the name does not include the path."""
try:
location = storage.aws_location
except AttributeError:
location = storage.location
signer = signing.Signer(
salt=f"{S3FileInputMixin.__module__}.{S3FileInputMixin.__name__}"
)
for path in paths:
path = pathlib.PurePosixPath(path)
print(path)
print(signer.signature(path.parent), signature)
if not constant_time_compare(signer.signature(path.parent), signature):
raise PermissionDenied("Illegal signature!")
try:
location = storage.aws_location
except AttributeError:
location = storage.location
relative_path = str(path.relative_to(location))
except ValueError as e:
raise SuspiciousFileOperation(
f"Path is not inside the designated upload location: {path}"
) from e

try:
f = storage.open(str(path.relative_to(location)))
f = storage.open(relative_path)
f.name = path.name
yield f
except (OSError, ValueError):
Expand Down
6 changes: 6 additions & 0 deletions s3file/static/s3file/js/s3file.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
hiddenFileInput.name = name
hiddenFileInput.value = parseURL(result)
form.appendChild(hiddenFileInput)
var hiddenSignatureInput = document.createElement('input')
hiddenSignatureInput.type = 'hidden'
hiddenSignatureInput.name = name + '-s3f-signature'
console.log(fileInput.dataset.s3fSignature)
hiddenSignatureInput.value = fileInput.dataset.s3fSignature
form.appendChild(hiddenSignatureInput)
})
fileInput.name = ''
window.uploading -= 1
Expand Down
1 change: 1 addition & 0 deletions s3file/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import hmac
import logging
from pathlib import Path

from django import http
from django.conf import settings
Expand Down
53 changes: 37 additions & 16 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import tempfile
from pathlib import Path

import pytest
from django.core.files.base import ContentFile
from django.utils.encoding import force_str
from selenium import webdriver
from selenium.common.exceptions import WebDriverException

from s3file.storages import storage


@pytest.fixture(scope="session")
def driver():
Expand All @@ -22,30 +24,49 @@ def driver():


@pytest.fixture
def upload_file(request):
path = tempfile.mkdtemp()
file_name = os.path.join(path, "%s.txt" % request.node.name)
with open(file_name, "w") as f:
def freeze_upload_folder(monkeypatch):
"""Freeze datetime and UUID."""
upload_folder = Path(storage.aws_location) / "tmp" / "s3file"
monkeypatch.setattr(
"s3file.forms.S3FileInputMixin.upload_folder",
str(upload_folder),
)
return upload_folder


@pytest.fixture
def upload_file(request, freeze_upload_folder):
path = Path(tempfile.mkdtemp()) / freeze_upload_folder / f"{request.node.name}.txt"
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
f.write(request.node.name)
return file_name
return str(path.absolute())


@pytest.fixture
def another_upload_file(request):
path = tempfile.mkdtemp()
file_name = os.path.join(path, "another_%s.txt" % request.node.name)
with open(file_name, "w") as f:
def another_upload_file(request, freeze_upload_folder):
path = (
Path(tempfile.mkdtemp())
/ freeze_upload_folder
/ f"another_{request.node.name}.txt"
)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
f.write(request.node.name)
return file_name
return str(path.absolute())


@pytest.fixture
def yet_another_upload_file(request):
path = tempfile.mkdtemp()
file_name = os.path.join(path, "yet_another_%s.txt" % request.node.name)
with open(file_name, "w") as f:
def yet_another_upload_file(request, freeze_upload_folder):
path = (
Path(tempfile.mkdtemp())
/ freeze_upload_folder
/ f"yet_another_{request.node.name}.txt"
)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
f.write(request.node.name)
return file_name
return str(path.absolute())


@pytest.fixture
Expand Down
54 changes: 31 additions & 23 deletions tests/test_forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,15 @@ class TestS3FileInput:
def url(self):
return reverse("upload")

@pytest.fixture
def freeze(self, monkeypatch):
"""Freeze datetime and UUID."""
monkeypatch.setattr(
"s3file.forms.S3FileInputMixin.upload_folder",
os.path.join(storage.aws_location, "tmp"),
)

def test_value_from_datadict(self, client, upload_file):
print(storage.location)
def test_value_from_datadict(self, freeze_upload_folder, client, upload_file):
with open(upload_file) as f:
uploaded_file = storage.save("test.jpg", f)
uploaded_file = storage.save(freeze_upload_folder / "test.jpg", f)
response = client.post(
reverse("upload"),
{
"file": json.dumps([uploaded_file]),
"s3file": '["file"]',
"file": f"custom/location/{uploaded_file}",
"file-s3f-signature": "m94qBxBsnMIuIICiY133kX18KkllSPMVbhGAdAwNn1A",
"s3file": "file",
},
)

Expand Down Expand Up @@ -82,7 +74,7 @@ def test_clear(self, filemodel):
assert form.is_valid()
assert not form.cleaned_data["file"]

def test_build_attr(self):
def test_build_attr(self, freeze_upload_folder):
assert set(ClearableFileInput().build_attrs({}).keys()) == {
"class",
"data-url",
Expand All @@ -92,21 +84,26 @@ def test_build_attr(self):
"data-fields-x-amz-credential",
"data-fields-policy",
"data-fields-key",
"data-s3f-signature",
}
assert (
ClearableFileInput().build_attrs({})["data-s3f-signature"]
== "tFV9nGZlq9WX1I5Sotit18z1f4C_3lPnj33_zo4LZRc"
)
assert ClearableFileInput().build_attrs({})["class"] == "s3file"
assert (
ClearableFileInput().build_attrs({"class": "my-class"})["class"]
== "my-class s3file"
)

def test_get_conditions(self, freeze):
def test_get_conditions(self, freeze_upload_folder):
conditions = ClearableFileInput().get_conditions(None)
assert all(
condition in conditions
for condition in [
{"bucket": "test-bucket"},
{"success_action_status": "201"},
["starts-with", "$key", "custom/location/tmp"],
["starts-with", "$key", "custom/location/tmp/s3file"],
["starts-with", "$Content-Type", ""],
]
), conditions
Expand Down Expand Up @@ -145,20 +142,24 @@ def test_no_js_error(self, driver, live_server):
error = driver.find_element(By.XPATH, "//body[@JSError]")
pytest.fail(error.get_attribute("JSError"))

def test_file_insert(self, request, driver, live_server, upload_file, freeze):
def test_file_insert(
self, request, driver, live_server, upload_file, freeze_upload_folder
):
driver.get(live_server + self.url)
file_input = driver.find_element(By.XPATH, "//input[@name='file']")
file_input.send_keys(upload_file)
assert file_input.get_attribute("name") == "file"
with wait_for_page_load(driver, timeout=10):
file_input.submit()
assert storage.exists("tmp/%s.txt" % request.node.name)
assert storage.exists("tmp/s3file/%s.txt" % request.node.name)

with pytest.raises(NoSuchElementException):
error = driver.find_element(By.XPATH, "//body[@JSError]")
pytest.fail(error.get_attribute("JSError"))

def test_file_insert_submit_value(self, driver, live_server, upload_file, freeze):
def test_file_insert_submit_value(
self, driver, live_server, upload_file, freeze_upload_folder
):
driver.get(live_server + self.url)
file_input = driver.find_element(By.XPATH, "//input[@name='file']")
file_input.send_keys(upload_file)
Expand All @@ -178,7 +179,7 @@ def test_file_insert_submit_value(self, driver, live_server, upload_file, freeze
assert "save_continue" in driver.page_source
assert "continue_value" in driver.page_source

def test_progress(self, driver, live_server, upload_file, freeze):
def test_progress(self, driver, live_server, upload_file, freeze_upload_folder):
driver.get(live_server + self.url)
file_input = driver.find_element(By.XPATH, "//input[@name='file']")
file_input.send_keys(upload_file)
Expand All @@ -202,16 +203,23 @@ def test_multi_file(
self,
driver,
live_server,
freeze,
freeze_upload_folder,
upload_file,
another_upload_file,
yet_another_upload_file,
):
driver.get(live_server + self.url)
file_input = driver.find_element(By.XPATH, "//input[@name='file']")
file_input.send_keys(" \n ".join([upload_file, another_upload_file]))
file_input.send_keys(
" \n ".join(
[
str(freeze_upload_folder / upload_file),
str(freeze_upload_folder / another_upload_file),
]
)
)
file_input = driver.find_element(By.XPATH, "//input[@name='other_file']")
file_input.send_keys(yet_another_upload_file)
file_input.send_keys(str(freeze_upload_folder / yet_another_upload_file))
save_button = driver.find_element(By.XPATH, "//input[@name='save']")
with wait_for_page_load(driver, timeout=10):
save_button.click()
Expand Down
Loading

0 comments on commit 68ccd2c

Please sign in to comment.