Skip to content
This repository has been archived by the owner on Aug 27, 2023. It is now read-only.

Stream files to storage backend chunk by chunk #304

Merged
merged 7 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/topics/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,10 @@ A valid access key, either key1 or key2. If not present, will look for the
**Argument:** string

Name of the container you wish to store packages in.

``storage.storage_account_url``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
**Argument:** string, optional

Storage data service endpoint. If not present, will look for the
``AZURE_STORAGE_SERVICE_ENDPOINT`` environment variable.
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
- postgres
- mysql
- ldap
- azurite
- redis-cluster-node-2
redis:
image: "redis"
Expand Down Expand Up @@ -86,3 +87,9 @@ services:
volumes:
- ./ldap:/container/service/slapd/assets/config/bootstrap/ldif/custom
command: ['--loglevel', 'debug', '--copy-service']
azurite:
image: 'mcr.microsoft.com/azure-storage/azurite'
hostname: "devstoreaccount1.azurite"
ports:
- "10000:10000"
command: ["azurite-blob", "--blobHost=0.0.0.0", "--blobPort=10000"]
12 changes: 2 additions & 10 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
FROM ubuntu:20.04
FROM snakepacker/python:all
MAINTAINER Steven Arcangeli <[email protected]>

RUN apt-get update -qq \
&& DEBIAN_FRONTEND=noninteractive apt-get install -yqq \
python3-pip python3-dev libldap2-dev libsasl2-dev \
libmysqlclient-dev libffi-dev libssl-dev default-jre curl git \
&& pip3 install --upgrade pip \
&& pip3 install --upgrade setuptools tox
RUN curl https://raw.githubusercontent.com/fkrull/docker-multi-python/master/setup.sh -o /setup.sh \
&& bash setup.sh \
&& rm /setup.sh
RUN apt-install libldap2-dev libsasl2-dev default-jre
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ref https://github.com/snakepacker/python#apt-install

this image contains git, tox, all the python versions etc. shaves off 1+ minute from CI for each env

16 changes: 9 additions & 7 deletions pypicloud/cache/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
import hashlib
import logging
import posixpath
from io import BytesIO
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Tuple
from typing import Any, BinaryIO, Dict, List, Optional, Tuple

from pyramid.settings import asbool

from pypicloud.dateutil import utcfromtimestamp
from pypicloud.models import Package
from pypicloud.storage import get_storage_impl
from pypicloud.util import create_matcher, normalize_name, parse_filename
from pypicloud.util import create_matcher, normalize_name, parse_filename, stream_file

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -140,10 +139,13 @@ def upload(
if old_pkg is not None and not self.allow_overwrite:
raise ValueError("Package '%s' already exists!" % filename)
if self.calculate_hashes:
file_data = data.read()
metadata["hash_sha256"] = hashlib.sha256(file_data).hexdigest()
metadata["hash_md5"] = hashlib.md5(file_data).hexdigest()
data = BytesIO(file_data)
sha256, md5 = hashlib.sha256(), hashlib.md5()
for chunk in stream_file(data):
sha256.update(chunk)
md5.update(chunk)
data.seek(0)
metadata["hash_sha256"] = sha256.hexdigest()
metadata["hash_md5"] = md5.hexdigest()

new_pkg = self.new_package(name, version, filename, summary=summary, **metadata)
self.storage.upload(new_pkg, data)
Expand Down
44 changes: 34 additions & 10 deletions pypicloud/storage/azure_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from pypicloud.dateutil import utcnow
from pypicloud.models import Package
from pypicloud.util import stream_file

from .base import IStorage

Expand All @@ -31,6 +32,7 @@ def __init__(
redirect_urls=None,
storage_account_name=None,
storage_account_key=None,
storage_account_url=None,
storage_container_name=None,
):
super(AzureBlobStorage, self).__init__(request)
Expand All @@ -41,12 +43,16 @@ def __init__(
self.storage_account_name = storage_account_name
self.storage_account_key = storage_account_key
self.storage_container_name = storage_container_name
self.azure_storage_account_url = "https://{}.blob.core.windows.net".format(
storage_account_name
self.azure_storage_account_url = (
storage_account_url
or "https://{}.blob.core.windows.net".format(storage_account_name)
)
self.blob_service_client = BlobServiceClient(
account_url=self.azure_storage_account_url,
credential=self.storage_account_key,
credential={ # https://github.com/Azure/azure-sdk-for-python/issues/24957#issuecomment-1164786540
"account_name": self.storage_account_name,
"account_key": self.storage_account_key,
},
)
self.container_client = self.blob_service_client.get_container_client(
self.storage_container_name
Expand All @@ -58,6 +64,9 @@ def configure(cls, settings):
kwargs["expire_after"] = int(settings.get("storage.expire_after", 60 * 60 * 24))
kwargs["path_prefix"] = settings.get("storage.prefix", "")
kwargs["redirect_urls"] = asbool(settings.get("storage.redirect_urls", True))
kwargs["storage_account_url"] = settings.get(
"storage.storage_account_url", os.getenv("AZURE_STORAGE_SERVICE_ENDPOINT")
)
kwargs["storage_account_name"] = settings.get(
"storage.storage_account_name", os.getenv("AZURE_STORAGE_ACCOUNT")
)
Expand Down Expand Up @@ -118,7 +127,7 @@ def list(self, factory=Package):
posixpath.basename(blob_properties.name),
blob_properties.last_modified,
path=blob_properties.name,
**Package.read_metadata(metadata.metadata)
**Package.read_metadata(metadata.metadata),
)

def get_path(self, package):
Expand All @@ -129,15 +138,25 @@ def get_path(self, package):
)
return package.data["path"]

def upload(self, package, datastream):
path = self.get_path(package)
def get_uri(self, package):
return f"azure://{self.storage_container_name}/{self.get_path(package)}"

def upload(self, package, datastream):
metadata = package.get_metadata()
metadata["name"] = package.name
metadata["version"] = package.version

blob_client = self.container_client.get_blob_client(blob=path)
blob_client.upload_blob(data=datastream, metadata=metadata)
with _open(
self.get_uri(package),
"wb",
compression="disable",
transport_params={
"client": self.container_client,
"blob_kwargs": {"metadata": metadata},
},
) as fp:
for chunk in stream_file(datastream):
fp.write(chunk) # multipart upload

def delete(self, package):
path = self.get_path(package)
Expand All @@ -156,5 +175,10 @@ def check_health(self):
return True, ""

def open(self, package):
url = self._generate_url(package)
return _open(url, "rb", compression="disable")
"""Overwrite open method to re-use client instead of using signed url."""
return _open(
self.get_uri(package),
"rb",
compression="disable",
transport_params={"client": self.container_client},
)
3 changes: 2 additions & 1 deletion pypicloud/storage/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pypicloud.dateutil import utcfromtimestamp
from pypicloud.models import Package
from pypicloud.util import stream_file

from .base import IStorage

Expand Down Expand Up @@ -90,7 +91,7 @@ def upload(self, package, datastream):
# Write to a temporary file
tempfile = os.path.join(destdir, "." + package.filename + "." + uid)
with open(tempfile, "wb") as ofile:
for chunk in iter(lambda: datastream.read(16 * 1024), b""):
for chunk in stream_file(datastream):
ofile.write(chunk)

os.rename(tempfile, destfile)
Expand Down
42 changes: 31 additions & 11 deletions pypicloud/storage/gcs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
""" Store packages in GCS """
import io
import json
import logging
import os
Expand All @@ -10,8 +9,10 @@
from google.auth.transport import requests
from google.cloud import storage
from pyramid.settings import asbool
from smart_open import open as _open

from pypicloud.models import Package
from pypicloud.util import stream_file

from .object_store import ObjectStoreStorage

Expand All @@ -32,7 +33,7 @@ def __init__(
project_id=None,
use_iam_signer=False,
iam_signer_service_account_email=None,
**kwargs
**kwargs,
):
super(GoogleCloudStorage, self).__init__(request=request, **kwargs)

Expand Down Expand Up @@ -81,7 +82,7 @@ def _subclass_specific_config(cls, settings, common_config):
"storage.iam_signer_service_account_email"
)
if iam_signer_service_account_email is None and service_account_json_filename:
with io.open(service_account_json_filename, "r", encoding="utf-8") as ifile:
with open(service_account_json_filename, "r", encoding="utf-8") as ifile:
credentials = json.load(ifile)
iam_signer_service_account_email = credentials.get("client_email")

Expand Down Expand Up @@ -195,19 +196,38 @@ def _get_gcs_blob(self, package):
"""Get a GCS blob object for the specified package"""
return self.bucket.blob(self.get_path(package))

def get_uri(self, package):
return f"gs://{self.bucket.name}/{self.get_path(package)}"

def upload(self, package, datastream):
"""Upload the package to GCS"""
metadata = {"name": package.name, "version": package.version}
metadata.update(package.get_metadata())

blob = self._get_gcs_blob(package)

blob.metadata = metadata

blob.upload_from_file(datastream, predefined_acl=self.object_acl)

if self.storage_class is not None:
blob.update_storage_class(self.storage_class)
with _open(
self.get_uri(package),
"wb",
compression="disable",
transport_params={
"client": self.bucket.client,
"blob_properties": {
"metadata": metadata,
"acl": self.object_acl,
"storage_class": self.storage_class,
},
},
) as fp:
for chunk in stream_file(datastream):
fp.write(chunk) # multipart upload

def open(self, package):
"""Overwrite open method to re-use client instead of using signed url."""
return _open(
self.get_uri(package),
"rb",
compression="disable",
transport_params={"client": self.bucket.client},
)

def delete(self, package):
"""Delete the package"""
Expand Down
2 changes: 1 addition & 1 deletion pypicloud/storage/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def configure(cls, settings):
kwargs["bucket_prefix"] = settings.get("storage.prefix", "")
kwargs["prepend_hash"] = asbool(settings.get("storage.prepend_hash", True))
kwargs["object_acl"] = settings.get("storage.object_acl", None)
kwargs["storage_class"] = storage_class = settings.get("storage.storage_class")
kwargs["storage_class"] = settings.get("storage.storage_class")
kwargs["redirect_urls"] = asbool(settings.get("storage.redirect_urls", True))
kwargs["region_name"] = settings.get("storage.region_name")
kwargs["public_url"] = asbool(settings.get("storage.public_url"))
Expand Down
33 changes: 29 additions & 4 deletions pypicloud/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from cryptography.hazmat.primitives.asymmetric import padding
from pyramid.settings import asbool, falsey
from pyramid_duh.settings import asdict
from smart_open import open as _open

from pypicloud.dateutil import utcnow
from pypicloud.models import Package
Expand All @@ -21,6 +22,7 @@
PackageParseError,
normalize_metadata,
parse_filename,
stream_file,
)

from .object_store import ObjectStoreStorage
Expand Down Expand Up @@ -101,12 +103,12 @@ def verify_value(val):
aws_access_key_id=str,
aws_secret_access_key=str,
aws_session_token=str,
)
),
)

bucket = s3conn.Bucket(bucket_name)
try:
head = s3conn.meta.client.head_bucket(Bucket=bucket_name)
s3conn.meta.client.head_bucket(Bucket=bucket_name)
except ClientError as e:
if e.response["Error"]["Code"] == "404":
LOG.info("Creating S3 bucket %s", bucket_name)
Expand Down Expand Up @@ -195,8 +197,10 @@ def _log_region_warning(self):
"without any dots ('.') in the name."
)

def get_uri(self, package):
return f"s3://{self.bucket.name}/{self.get_path(package)}"

def upload(self, package, datastream):
key = self.bucket.Object(self.get_path(package))
kwargs = {}
if self.sse is not None:
kwargs["ServerSideEncryption"] = self.sse
Expand All @@ -208,7 +212,28 @@ def upload(self, package, datastream):
metadata["name"] = package.name
metadata["version"] = package.version
metadata = normalize_metadata(metadata)
key.put(Metadata=metadata, Body=datastream, **kwargs)
kwargs["Metadata"] = metadata

with _open(
self.get_uri(package),
"wb",
compression="disable",
transport_params={
"client": self.bucket.meta.client,
"client_kwargs": {"S3.Client.create_multipart_upload": kwargs},
},
) as fp:
for chunk in stream_file(datastream):
fp.write(chunk) # multipart upload

def open(self, package):
"""Overwrite open method to re-use client instead of using signed url."""
return _open(
self.get_uri(package),
"rb",
compression="disable",
transport_params={"client": self.bucket.meta.client},
)

def delete(self, package):
self.bucket.delete_objects(
Expand Down
14 changes: 14 additions & 0 deletions pypicloud/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import time
import unicodedata
from typing import (
IO,
Any,
AnyStr,
Callable,
Dict,
ItemsView,
Expand All @@ -24,6 +26,7 @@
LOG = logging.getLogger(__name__)
ALL_EXTENSIONS = Locator.source_extensions + Locator.binary_extensions
SENTINEL = object()
CHUNK_SIZE = 1 << 20 # read 1MB chunks


class PackageParseError(ValueError):
Expand Down Expand Up @@ -115,6 +118,17 @@ def create_matcher(queries: List[str], query_type: str) -> Callable[[str], bool]
return lambda x: all((q in x.lower() for q in queries))


def stream_file(fp: IO[AnyStr], chunk_size: int = CHUNK_SIZE) -> Iterator[AnyStr]:
"""
Read an (opened) file in chunks of chunk_size bytes
"""
while True:
chunk = fp.read(chunk_size)
if not chunk:
break
yield chunk


class EnvironSettings:
def __init__(self, settings: Dict[str, Any], env: Dict[str, str] = None):
self._settings = settings
Expand Down
Loading