Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,23 +468,32 @@ def _watch_service(
time.sleep(configuration.service_watch_poll_interval)

try:
service_status = service.get_service_status()
container_status = service_status[0]
# There should only be one container in the service.
container = next(service.get_containers())
service_status = container.service_status

# If status == PENDING or similar, we'll get an exception if we try to retrieve logs.
if configuration.stream_output and container_status["status"] in (
"ACTIVE",
if configuration.stream_output and service_status["status"] in (
"RUNNING",
"DONE",
"FAILED",
):
last_log_time = self._get_and_stream_output(
service=service,
container_status=container_status,
service_status=service_status,
last_log_time=last_log_time,
)

# If status in ("DONE", "FAILED"), the container is no longer running.
if container_status["status"] in ("DONE", "FAILED"):
# If status in one of these, the container is no longer running.
if service_status["status"] in (
"DONE",
"FAILED",
"SUSPENDING",
"SUSPENDED",
"DELETING",
"DELETED",
"INTERNAL_ERROR",
):
break
except NotFoundError:
self._logger.info(
Expand Down Expand Up @@ -516,43 +525,43 @@ def _slugify_service_name(service_name: str, flow_run_id: UUID) -> Optional[str]
def _get_and_stream_output(
self,
service: ServiceResource,
container_status: dict[str, Any],
service_status: dict[str, Any],
last_log_time: datetime,
) -> datetime:
"""Fetches logs output from the job container and writes all entries after a given time to stderr.

Args:
service: The service we want to retrieve logs for.
container_status: The status object containing the instanceId and containerName of the container.
service_status: The status object containing the instance_id and container_name of the service.
last_log_time: The timestamp of the last output line already streamed.

Returns:
The time of the most recent output line written by this call.

"""
logs = self._get_logs(service=service, container_status=container_status)
logs = self._get_logs(service=service, service_status=service_status)

return self._stream_output(logs, last_log_time)

def _get_logs(
self,
service: ServiceResource,
container_status: dict[str, Any],
service_status: dict[str, Any],
max_lines: int = 100,
) -> str:
"""Gets the most container logs up to a given maximum.
"""Gets the most recent service logs up to a given maximum.

Args:
service: The service we want to retrieve logs for.
container_status: The status object containing the instanceId and containerName of the container.
service_status: The status object containing the instance_id and container_name of the service.
max_lines: The number of log lines to pull. Defaults to 100.

Returns:
A string containing the requested log entries, one per line.

"""
instance_id = container_status["instanceId"]
container_name = container_status["containerName"]
instance_id = service_status["instance_id"]
container_name = service_status["container_name"]

return service.get_service_logs(
instance_id=instance_id, container_name=container_name, num_lines=max_lines
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/prefect-snowflake/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "prefect-snowflake"
dependencies = [
"snowflake>=1.2.0",
"snowflake>=1.5.0",
"snowflake-connector-python>=3.0.4",
"prefect>=3.0.0",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,5 +438,5 @@ def test_snowflake_credentials_encrypted_private_key_parses_correctly():
# We expect _compose_pem to succeed (no InvalidPemFormat),
# but load_pem_private_key to fail because the key/passphrase
# are dummies.
with pytest.raises(ValueError, match="Could not deserialize key data"):
with pytest.raises(ValueError, match="Unable to load PEM file"):
creds.resolve_private_key()
Loading