Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 146 additions & 165 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,21 +497,6 @@ def split_path(
version_id if self.version_aware and version_id else None,
)

def info(self, path, refresh=False, **kwargs):
try:
fetch_from_azure = (path and self._ls_from_cache(path) is None) or refresh
except Exception:
fetch_from_azure = True
if fetch_from_azure:
return sync(
self.loop,
self._info,
path,
refresh,
version_id=kwargs.get("version_id"),
)
return super().info(path)

def modified(self, path: str) -> datetime:
return self.info(path)["last_modified"]

Expand All @@ -531,44 +516,60 @@ async def _info(self, path, refresh=False, **kwargs):
dict with keys: name (full path in the FS), size (in bytes), type (file,
directory, or something else) and other FS-specific keys.
"""
if refresh:
invalidate_cache = True
else:
invalidate_cache = False
container, path, path_version_id = self.split_path(path)
fullpath = "/".join([container, path]) if path else container
version_id = _coalesce_version_id(path_version_id, kwargs.get("version_id"))
kwargs["version_id"] = version_id

out = await self._ls(
fullpath, detail=True, invalidate_cache=invalidate_cache, **kwargs
)
fullpath = fullpath.rstrip("/")
out1 = [
o
for o in out
if o["name"].rstrip("/") == fullpath
and (version_id is None or o["version_id"] == version_id)
]
if len(out1) == 1:
if "size" not in out1[0]:
out1[0]["size"] = None
return out1[0]
elif len(out1) > 1 or out:
return {"name": fullpath, "size": None, "type": "directory"}
else:
# Check the directory listing as the path may have been deleted
out = await self._ls(
self._parent(path),
detail=True,
invalidate_cache=invalidate_cache,
**kwargs,
)
out = [o for o in out if o["name"].rstrip("/") == path]
if out:
return out[0]
else:
raise FileNotFoundError
if fullpath == "":
return {"name": "", "size": None, "type": "directory"}
elif path == "":
if not refresh and _ROOT_PATH in self.dircache:
out = [o for o in self.dircache[_ROOT_PATH] if o["name"] == container]
if out:
return out[0]
try:
async with self.service_client.get_container_client(
container=container
) as cc:
properties = await cc.get_container_properties()
except ResourceNotFoundError as exc:
raise FileNotFoundError from exc
info = (await self._details([properties]))[0]
# Make result consistent with _ls_containers()
if not info.get("metadata"):
info["metadata"] = None
return info

if not refresh:
out = self._ls_from_cache(fullpath)
if out is not None:
if self.version_aware and version_id is not None:
out = [
o
for o in out
if o["name"] == fullpath and match_blob_version(o, version_id)
]
if out:
return out[0]
else:
out = [o for o in out if o["name"] == fullpath]
if out:
return out[0]
return {"name": fullpath, "size": None, "type": "directory"}

try:
async with self.service_client.get_blob_client(container, path) as bc:
props = await bc.get_blob_properties(version_id=version_id)
return (await self._details([props]))[0]
except ResourceNotFoundError:
pass

if not version_id:
if await self._dir_exists(container, path):
return {"name": fullpath, "size": None, "type": "directory"}

raise FileNotFoundError

def glob(self, path, **kwargs):
return sync(self.loop, self._glob, path)
Expand Down Expand Up @@ -679,87 +680,74 @@ async def _ls_blobs(
)

if (
target_path not in self.dircache
or return_glob
or versions
or not any(
target_path in self.dircache
and not return_glob
and not versions
and all(
match_blob_version(b, version_id) for b in self.dircache[target_path]
)
):
if container not in ["", delimiter]:
# This is the case where the container name is passed
async with self.service_client.get_container_client(
container=container
) as cc:
path = path.strip("/")
include = ["metadata"]
if version_id is not None or versions:
assert self.version_aware
include.append("versions")
blobs = cc.walk_blobs(include=include, name_starts_with=path)

# Check the depth that needs to be screened
depth = target_path.count("/")
outblobs = []
try:
async for next_blob in blobs:
if depth in [0, 1] and path == "":
outblobs.append(next_blob)
elif isinstance(next_blob, BlobProperties):
if next_blob["name"].count("/") == depth:
outblobs.append(next_blob)
elif not next_blob["name"].endswith("/") and (
next_blob["name"].count("/") == (depth - 1)
):
outblobs.append(next_blob)
else:
async for blob_ in next_blob:
if isinstance(blob_, BlobProperties) or isinstance(
blob_, BlobPrefix
return self.dircache[target_path]

assert container not in ["", delimiter]
async with self.service_client.get_container_client(container=container) as cc:
path = path.strip("/")
include = ["metadata"]
if version_id is not None or versions:
assert self.version_aware
include.append("versions")
blobs = cc.walk_blobs(include=include, name_starts_with=path)

# Check the depth that needs to be screened
depth = target_path.count("/")
outblobs = []
try:
async for next_blob in blobs:
if depth in [0, 1] and path == "":
outblobs.append(next_blob)
elif isinstance(next_blob, BlobProperties):
if next_blob["name"].count("/") == depth:
outblobs.append(next_blob)
elif not next_blob["name"].endswith("/") and (
next_blob["name"].count("/") == (depth - 1)
):
outblobs.append(next_blob)
else:
async for blob_ in next_blob:
if isinstance(blob_, BlobProperties) or isinstance(
blob_, BlobPrefix
):
if blob_["name"].endswith("/"):
if blob_["name"].rstrip("/").count("/") == depth:
outblobs.append(blob_)
elif blob_["name"].count("/") == depth and (
hasattr(blob_, "size") and blob_["size"] == 0
):
if blob_["name"].endswith("/"):
if (
blob_["name"].rstrip("/").count("/")
== depth
):
outblobs.append(blob_)
elif blob_["name"].count("/") == depth and (
hasattr(blob_, "size")
and blob_["size"] == 0
):
outblobs.append(blob_)
else:
pass
elif blob_["name"].count("/") == (depth):
outblobs.append(blob_)
else:
pass
except ResourceNotFoundError:
raise FileNotFoundError
finalblobs = await self._details(
outblobs,
target_path=target_path,
return_glob=return_glob,
version_id=version_id,
versions=versions,
)
if return_glob:
return finalblobs
finalblobs = await self._details(
outblobs,
target_path=target_path,
version_id=version_id,
versions=versions,
)
if not finalblobs:
if not await self._exists(target_path):
raise FileNotFoundError
return []
if not self.version_aware or finalblobs[0].get("is_current_version"):
self.dircache[target_path] = finalblobs
return finalblobs

return self.dircache[target_path]
outblobs.append(blob_)
else:
pass
elif blob_["name"].count("/") == (depth):
outblobs.append(blob_)
else:
pass
except ResourceNotFoundError:
raise FileNotFoundError
finalblobs = await self._details(
outblobs,
target_path=target_path,
return_glob=return_glob,
version_id=version_id,
versions=versions,
)
if return_glob:
return finalblobs
if not finalblobs:
if not await self._exists(target_path):
raise FileNotFoundError
return []
if not self.version_aware or finalblobs[0].get("is_current_version"):
self.dircache[target_path] = finalblobs
return finalblobs

async def _ls(
self,
Expand Down Expand Up @@ -880,41 +868,29 @@ async def _details(
fname = f"{content.container}{delimiter}{content.name}"
fname = fname.rstrip(delimiter)
if content.has_key("size"): # NOQA
data.update({"name": fname})
data.update({"size": content.size})
data.update({"type": "file"})
data["name"] = fname
data["size"] = content.size
data["type"] = "file"
else:
data.update({"name": fname})
data.update({"size": None})
data.update({"type": "directory"})
data["name"] = fname
data["size"] = None
data["type"] = "directory"
else:
fname = f"{content.name}"
data.update({"name": fname})
data.update({"size": None})
data.update({"type": "directory"})
if "metadata" in data.keys():
if data["metadata"] is not None:
if (
"is_directory" in data["metadata"].keys()
and data["metadata"]["is_directory"] == "true"
):
data.update({"type": "directory"})
data.update({"size": None})
elif (
"is_directory" in data["metadata"].keys()
and data["metadata"]["is_directory"] == "false"
):
data.update({"type": "file"})
elif (
"hdi_isfolder" in data["metadata"].keys()
and data["metadata"]["hdi_isfolder"] == "true"
):
data.update({"type": "directory"})
data.update({"size": None})
else:
pass
data["name"] = fname
data["size"] = None
data["type"] = "directory"
if data.get("metadata"):
if data["metadata"].get("is_directory") == "true":
data["type"] = "directory"
data["size"] = None
elif data["metadata"].get("is_directory") == "false":
data["type"] = "file"
elif data["metadata"].get("hdi_isfolder") == "true":
data["type"] = "directory"
data["size"] = None
if return_glob:
data.update({"name": data["name"].rstrip("/")})
data["name"] = data["name"].rstrip("/")
output.append(data)
if target_path:
if (
Expand Down Expand Up @@ -1413,17 +1389,22 @@ async def _exists(self, path):
if version_id is not None:
return False
raise
return await self._dir_exists(container_name, path)

async def _dir_exists(self, container, path):
dir_path = path.rstrip("/") + "/"
async with self.service_client.get_container_client(
container=container_name
) as container_client:
async for blob in container_client.list_blobs(
results_per_page=1, name_starts_with=dir_path
):
return True
else:
return False
try:
async with self.service_client.get_container_client(
container=container
) as container_client:
async for blob in container_client.list_blobs(
results_per_page=1, name_starts_with=dir_path
):
return True
else:
return False
except ResourceNotFoundError:
return False

async def _pipe_file(self, path, value, overwrite=True, **kwargs):
"""Set the bytes of given file"""
Expand Down
Loading