Skip to content

Commit

Permalink
Revert "chore(internal): streaming refactors (#2012)"
Browse files Browse the repository at this point in the history
This reverts commit d76a748.
  • Loading branch information
RobertCraigie authored and stainless-app[bot] committed Jan 17, 2025
1 parent 82ccc98 commit e081d99
Showing 1 changed file with 72 additions and 32 deletions.
104 changes: 72 additions & 32 deletions src/openai/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,42 @@ def __stream__(self) -> Iterator[_T]:
if sse.data.startswith("[DONE]"):
break

data = sse.json()
if is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"

raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)

yield process_data(data=data, cast_to=cast_to, response=response)
if sse.event is None:
data = sse.json()
if is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"

raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)

yield process_data(data=data, cast_to=cast_to, response=response)

else:
data = sse.json()

if sse.event == "error" and is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"

raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)

yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)

# Ensure the entire stream is consumed
for _sse in iterator:
Expand Down Expand Up @@ -141,22 +161,42 @@ async def __stream__(self) -> AsyncIterator[_T]:
if sse.data.startswith("[DONE]"):
break

data = sse.json()
if is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"

raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)

yield process_data(data=data, cast_to=cast_to, response=response)
if sse.event is None:
data = sse.json()
if is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"

raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)

yield process_data(data=data, cast_to=cast_to, response=response)

else:
data = sse.json()

if sse.event == "error" and is_mapping(data) and data.get("error"):
message = None
error = data.get("error")
if is_mapping(error):
message = error.get("message")
if not message or not isinstance(message, str):
message = "An error occurred during streaming"

raise APIError(
message=message,
request=self.response.request,
body=data["error"],
)

yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)

# Ensure the entire stream is consumed
async for _sse in iterator:
Expand Down

0 comments on commit e081d99

Please sign in to comment.