Skip to content

Commit

Permalink
Hard break if found data after last boundary on MultipartParser (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kludex authored Nov 28, 2024
1 parent 170e604 commit 9205a0e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 5 deletions.
8 changes: 4 additions & 4 deletions python_multipart/multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,6 @@ def data_callback(name: CallbackName, end_i: int, remaining: bool = False) -> No
# Skip leading newlines
if c == CR or c == LF:
i += 1
self.logger.debug("Skipping leading CR/LF at %d", i)
continue

# index is used as in index into our boundary. Set to 0.
Expand Down Expand Up @@ -1398,9 +1397,10 @@ def data_callback(name: CallbackName, end_i: int, remaining: bool = False) -> No
i -= 1

elif state == MultipartState.END:
# Do nothing and just consume a byte in the end state.
if c not in (CR, LF):
self.logger.warning("Consuming a byte '0x%x' in the end state", c) # pragma: no cover
# Skip data after the last boundary.
self.logger.warning("Skipping data after last boundary")
i = length
break

else: # pragma: no cover (error case)
# We got into a strange state somehow! Just stop processing.
Expand Down
40 changes: 39 additions & 1 deletion tests/test_multipart.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ def test_http(self, param: TestParams) -> None:
return

# No error!
self.assertEqual(processed, len(param["test"]))
self.assertEqual(processed, len(param["test"]), param["name"])

# Assert that the parser gave us the appropriate fields/files.
for e in param["result"]["expected"]:
Expand Down Expand Up @@ -1210,6 +1210,44 @@ def on_field(f: FieldProtocol) -> None:
self.assertEqual(fields[2].field_name, b"baz")
self.assertEqual(fields[2].value, b"asdf")

def test_multipart_parser_newlines_before_first_boundary(self) -> None:
"""This test makes sure that the parser does not handle when there is junk data after the last boundary."""
num = 5_000_000
data = (
"\r\n" * num + "--boundary\r\n"
'Content-Disposition: form-data; name="file"; filename="filename.txt"\r\n'
"Content-Type: text/plain\r\n\r\n"
"hello\r\n"
"--boundary--"
)

files: list[File] = []

def on_file(f: FileProtocol) -> None:
files.append(cast(File, f))

f = FormParser("multipart/form-data", on_field=Mock(), on_file=on_file, boundary="boundary")
f.write(data.encode("latin-1"))

def test_multipart_parser_data_after_last_boundary(self) -> None:
"""This test makes sure that the parser does not handle when there is junk data after the last boundary."""
num = 50_000_000
data = (
"--boundary\r\n"
'Content-Disposition: form-data; name="file"; filename="filename.txt"\r\n'
"Content-Type: text/plain\r\n\r\n"
"hello\r\n"
"--boundary--" + "-" * num + "\r\n"
)

files: list[File] = []

def on_file(f: FileProtocol) -> None:
files.append(cast(File, f))

f = FormParser("multipart/form-data", on_field=Mock(), on_file=on_file, boundary="boundary")
f.write(data.encode("latin-1"))

def test_max_size_multipart(self) -> None:
# Load test data.
test_file = "single_field_single_file.http"
Expand Down

0 comments on commit 9205a0e

Please sign in to comment.