Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .strict-typing
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ homeassistant.components.automation.*
homeassistant.components.awair.*
homeassistant.components.axis.*
homeassistant.components.azure_storage.*
homeassistant.components.backblaze.*
homeassistant.components.backup.*
homeassistant.components.baf.*
homeassistant.components.bang_olufsen.*
Expand Down
2 changes: 2 additions & 0 deletions CODEOWNERS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 81 additions & 0 deletions homeassistant/components/backblaze/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""The Backblaze integration."""

from __future__ import annotations

from b2sdk.v2 import B2Api, Bucket, InMemoryAccountInfo, exception

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady

from .const import (
CONF_APPLICATION_KEY,
CONF_BUCKET,
CONF_KEY_ID,
DATA_BACKUP_AGENT_LISTENERS,
DOMAIN,
)

type BackblazeConfigEntry = ConfigEntry[Bucket]


async def async_setup_entry(hass: HomeAssistant, entry: BackblazeConfigEntry) -> bool:
"""Set up Backblaze from a config entry."""

info = InMemoryAccountInfo()
b2_api = B2Api(info)

def _authorize_and_get_bucket() -> Bucket:
b2_api.authorize_account(
"production",
entry.data[CONF_KEY_ID],
entry.data[CONF_APPLICATION_KEY],
)
return b2_api.get_bucket_by_name(entry.data[CONF_BUCKET])

try:
bucket = await hass.async_add_executor_job(_authorize_and_get_bucket)

except exception.Unauthorized as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_credentials",
) from err
except exception.RestrictedBucket as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="restricted_bucket",
translation_placeholders={
"restricted_bucket_name": err.bucket_name,
},
) from err
except exception.NonExistentBucket as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_bucket_name",
) from err
except exception.ConnectionReset as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="cannot_connect",
) from err
except exception.MissingAccountData as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_auth",
) from err

entry.runtime_data = bucket

def _async_notify_backup_listeners() -> None:
for listener in hass.data.get(DATA_BACKUP_AGENT_LISTENERS, []):
listener()

entry.async_on_unload(entry.async_on_state_change(_async_notify_backup_listeners))

return True


async def async_unload_entry(hass: HomeAssistant, entry: BackblazeConfigEntry) -> bool:
"""Unload a config entry."""
return True
231 changes: 231 additions & 0 deletions homeassistant/components/backblaze/backup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""Backup platform for the Backblaze integration."""

from collections.abc import AsyncIterator, Callable, Coroutine
import functools
import json
import logging
import os
from typing import Any

from b2sdk.v2 import FileVersion
from b2sdk.v2.exception import B2Error

from homeassistant.components.backup import (
AgentBackup,
BackupAgent,
BackupAgentError,
BackupNotFound,
suggested_filename,
)
from homeassistant.core import HomeAssistant, callback

from . import BackblazeConfigEntry
from .const import CONF_PREFIX, DATA_BACKUP_AGENT_LISTENERS, DOMAIN

_LOGGER = logging.getLogger(__name__)
METADATA_VERSION = "1"


def handle_b2_errors[T](
func: Callable[..., Coroutine[Any, Any, T]],
) -> Callable[..., Coroutine[Any, Any, T]]:
"""Handle B2Errors by converting them to BackupAgentError."""

@functools.wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> T:
"""Catch B2Error and raise BackupAgentError."""
try:
return await func(*args, **kwargs)
except B2Error as err:
error_msg = f"Failed during {func.__name__}"
raise BackupAgentError(error_msg) from err

return wrapper


async def async_get_backup_agents(
hass: HomeAssistant,
) -> list[BackupAgent]:
"""Return a list of backup agents."""
entries: list[BackblazeConfigEntry] = hass.config_entries.async_loaded_entries(
DOMAIN
)
return [BackblazeBackupAgent(hass, entry) for entry in entries]


@callback
def async_register_backup_agents_listener(
hass: HomeAssistant,
*,
listener: Callable[[], None],
**kwargs: Any,
) -> Callable[[], None]:
"""Register a listener to be called when agents are added or removed.

:return: A function to unregister the listener.
"""
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)

@callback
def remove_listener() -> None:
"""Remove the listener."""
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
if not hass.data[DATA_BACKUP_AGENT_LISTENERS]:
del hass.data[DATA_BACKUP_AGENT_LISTENERS]

return remove_listener


class BackblazeBackupAgent(BackupAgent):
"""Backup agent for Backblaze."""

domain = DOMAIN

def __init__(self, hass: HomeAssistant, entry: BackblazeConfigEntry) -> None:
"""Initialize the Backblaze agent."""
super().__init__()
self._hass = hass
self.async_create_task = entry.async_create_task
self._bucket = entry.runtime_data
self._prefix = entry.data[CONF_PREFIX]

self.name = entry.title
self.unique_id = entry.entry_id

@handle_b2_errors
async def async_download_backup(
self, backup_id: str, **kwargs: Any
) -> AsyncIterator[bytes]:
"""Download a backup."""

file = await self._find_file_by_id(backup_id)
if file is None:
raise BackupNotFound(f"Backup {backup_id} not found")

_LOGGER.debug("Downloading %s", file.file_name)

downloaded_file = await self._hass.async_add_executor_job(file.download)
response = downloaded_file.response
Comment on lines +107 to +108
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct me if I'm wrong, but this sounds like you now have the entire file in memory?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is the http response object, not the body of the response, which we start streaming after this


iterator = response.iter_content(chunk_size=8192)

async def stream_buffer() -> AsyncIterator[bytes]:
"""Stream the response into an AsyncIterator."""
while True:
chunk = await self._hass.async_add_executor_job(next, iterator, None)
if chunk is None:
break
yield chunk

return stream_buffer()

@handle_b2_errors
async def async_upload_backup(
self,
*,
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
backup: AgentBackup,
**kwargs: Any,
) -> None:
"""Upload a backup."""

file_info = {
"metadata_version": METADATA_VERSION,
"backup_id": backup.backup_id,
"backup_metadata": json.dumps(backup.as_dict()),
}

filename = self._prefix + suggested_filename(backup)

_LOGGER.debug("Uploading %s", filename)

stream: AsyncIterator[bytes] = await open_stream()

# Create a pipe (file descriptors) to bridge async writes and sync reads
r_fd, w_fd = os.pipe()

async def writer() -> None:
"""Write async stream to the pipe."""
with os.fdopen(w_fd, "wb") as w:
async for chunk in stream:
w.write(chunk)
w.close()

# Schedule the writer coroutine
writer_task = self.async_create_task(self._hass, writer())
Comment on lines +145 to +155
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also sounds dangerous to me? Who guarantees we're not writing the entire file to the pipe, before the other task is able to upload (enough) chunks?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think you might be right. I'm not sure how the streams work internally, but if they are push-based that is possible (but also an issue with any other implementation?)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc none of the other implementations are using a separate task for this, but do read and write for a single chunk together


def upload() -> None:
with os.fdopen(r_fd, "rb") as r:
self._bucket.upload_unbound_stream(
r,
filename,
file_info=file_info,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the maximum metadata size that backblaze allows?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs:

Each key is a UTF-8 string up to 50 bytes. There is an overall 7000-byte limit on the headers that are needed for file name and file information, unless the file is uploaded with server-side encryption in which case the limit is 2048 bytes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that'll be a problem then. Not 100% sure what a key is in that context, but aren't you dropping the entire serialized json into one key? In any case even 2kB are not safe enough, which means we'd need to use metadata files in those cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the docs might be wrong (or I misunderstood). The backup I have been uploading has 370 bytes of metadata and that's been working fine. I don't know how big the metadata can get

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially bigger than 2kB. I think with anything bigger than 4 we should be safe, but 2 is not enough.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7kb is the limit for files when server-side encryption is off (including the filename). I can add a note to the docs that server-side encryption is not recommended for larger instances, because of this limit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather switch to file based metadata (like AWS S3 does it as well), instead of discouraging users to opt out of a security feature.

)

await self._hass.async_add_executor_job(upload)
await writer_task

@handle_b2_errors
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
"""Delete a backup."""
file = await self._find_file_by_id(backup_id)
if file is None:
raise BackupNotFound(f"Backup {backup_id} not found")
_LOGGER.debug("Deleting %s", backup_id)
await self._hass.async_add_executor_job(file.delete)

@handle_b2_errors
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
"""List all backups."""
backups: list[AgentBackup] = []

def get_files() -> None:
for [file, _] in self._bucket.ls(self._prefix):
if (
file.file_info is not None
and file.file_info.get("metadata_version") == METADATA_VERSION
and file.file_info.get("backup_metadata") is not None
):
backups.append(self._backup_from_b2_file(file))

await self._hass.async_add_executor_job(get_files)
_LOGGER.debug("Found %d backups", len(backups))
return backups

@handle_b2_errors
async def async_get_backup(self, backup_id: str, **kwargs: Any) -> AgentBackup:
"""Get a backup."""
return await self._find_backup_by_id(backup_id)

async def _find_file_by_id(self, backup_id: str) -> FileVersion | None:
"""Find a file by its ID."""

def find_file() -> FileVersion | None:
for [file, _] in self._bucket.ls(self._prefix):
if (
file.file_info is not None
and file.file_info.get("metadata_version") == METADATA_VERSION
and file.file_info.get("backup_id") == backup_id
and file.file_info.get("backup_metadata") is not None
):
_LOGGER.debug("Found file %s from id %s", file.file_name, backup_id)
return file
_LOGGER.debug("File %s not found", backup_id)
return None

return await self._hass.async_add_executor_job(find_file)

async def _find_backup_by_id(self, backup_id: str) -> AgentBackup:
"""Find a backup by its ID."""
file = await self._find_file_by_id(backup_id)
if file is None:
raise BackupNotFound(f"Backup {backup_id} not found")
return self._backup_from_b2_file(file)

def _backup_from_b2_file(self, file: FileVersion) -> AgentBackup:
metadata_str: str = file.file_info["backup_metadata"]
metadata = json.loads(metadata_str)

metadata["size"] = file.size
metadata["name"] = file.file_name
return AgentBackup.from_dict(metadata)
Loading
Loading