You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have been trying to use LakeFS in conjunction with Xarray and Zarr, but I'm getting the following error when I try to write a Zarr file with many chunks:
OSError: [Errno 5] 500 request size exceeded, max paths is set to 1000.
I would like to know how I can drop that limitation, I need it to be able to write Zarr files with many chunks (every chunk is an individual file).
Additionally, I would like to know if you think that LakeFS is a good option to use with Zarr. I'm asking this because this format can create many files to represent a single Array. In my particular case, I have more than 300 data fields and all of them have more than 10K chunks which is equivalent to 10K files, so I'm not sure if it can affect the performance of LakeFS.
importxarrayasxrimportdask.arrayasdafromlakefs_specimportLakeFSFileSystemlfs=LakeFSFileSystem(
host="127.0.0.1:8000",
username="AKIAIOSFOLQUICKSTART",
password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
access_key_id="AKIAIOSFOLQUICKSTART",
secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
# endpoint_url="http://127.0.0.1:8000",use_listings_cache=False
)
forfolderin ["test-zarr", "test-zarr"]:
# The first execution is going to workpath=f"lakefs://quickstart/main/{folder}"print(path)
arr=da.zeros(shape=(100, 30), chunks=(2, 1))
arr=xr.DataArray(
arr,
dims=["a", "b"],
coords={
"a": list(range(arr.shape[0])),
"b": list(range(arr.shape[1]))
}
).to_dataset(name="data")
fs_map=fsspec.FSMap(root=path, fs=lfs)
# The error comes when it tries to clean the whole directory to rewrite the dataarr.to_zarr(fs_map, mode="w")
print(xr.open_zarr(fs_map).compute())
I deployed LakeFS using the quickstart docker command:
docker run --name lakefs --pull always --rm --publish 8000:8000 treeverse/lakefs:latest run --quickstart
Expected behavior
I would expect that there is an env variable that allows to modify the max_paths to drop the limitation on the number of requests.
lakeFS version
1.28.2
How lakeFS is installed
docker run --name lakefs --pull always --rm --publish 8000:8000 treeverse/lakefs:latest run --quickstart
Affected clients
No response
Relevant log output
---------------------------------------------------------------------------
ServiceException Traceback (most recent call last)
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs\exceptions.py:141, in api_exception_handler(custom_handler)
140 try:
--> 141 yield
142 except lakefs_sdk.ApiException as e:
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs\branch.py:90, in _BaseBranch.delete_objects(self, object_paths)
89 with api_exception_handler():
---> 90 return self._client.sdk_client.objects_api.delete_objects(
91 self._repo_id,
92 self._id,
93 lakefs_sdk.PathList(paths=object_paths)
94 )
File ~\.conda\envs\tensordb\Lib\site-packages\pydantic\v1\decorator.py:40, in validate_arguments.<locals>.validate.<locals>.wrapper_function(*args, **kwargs)
38 @wraps(_func)
39 def wrapper_function(*args: Any, **kwargs: Any) -> Any:
---> 40 return vd.call(*args, **kwargs)
File ~\.conda\envs\tensordb\Lib\site-packages\pydantic\v1\decorator.py:134, in ValidatedFunction.call(self, *args, **kwargs)
133 m = self.init_model_instance(*args, **kwargs)
--> 134 return self.execute(m)
File ~\.conda\envs\tensordb\Lib\site-packages\pydantic\v1\decorator.py:206, in ValidatedFunction.execute(self, m)
205 else:
--> 206 return self.raw_function(**d, **var_kwargs)
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\api\objects_api.py:424, in ObjectsApi.delete_objects(self, repository, branch, path_list, force, **kwargs)
423 raise ValueError("Error! Please call the delete_objects_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data")
--> 424 return self.delete_objects_with_http_info(repository, branch, path_list, force, **kwargs)
File ~\.conda\envs\tensordb\Lib\site-packages\pydantic\v1\decorator.py:40, in validate_arguments.<locals>.validate.<locals>.wrapper_function(*args, **kwargs)
38 @wraps(_func)
39 def wrapper_function(*args: Any, **kwargs: Any) -> Any:
---> 40 return vd.call(*args, **kwargs)
File ~\.conda\envs\tensordb\Lib\site-packages\pydantic\v1\decorator.py:134, in ValidatedFunction.call(self, *args, **kwargs)
133 m = self.init_model_instance(*args, **kwargs)
--> 134 return self.execute(m)
File ~\.conda\envs\tensordb\Lib\site-packages\pydantic\v1\decorator.py:206, in ValidatedFunction.execute(self, m)
205 else:
--> 206 return self.raw_function(**d, **var_kwargs)
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\api\objects_api.py:547, in ObjectsApi.delete_objects_with_http_info(self, repository, branch, path_list, force, **kwargs)
539 _response_types_map = {
540 '200': "ObjectErrorList",
541 '401': "Error",
(...)
544 '420': None,
545 }
--> 547 return self.api_client.call_api(
548 '/repositories/{repository}/branches/{branch}/objects/delete', 'POST',
549 _path_params,
550 _query_params,
551 _header_params,
552 body=_body_params,
553 post_params=_form_params,
554 files=_files,
555 response_types_map=_response_types_map,
556 auth_settings=_auth_settings,
557 async_req=_params.get('async_req'),
558 _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
559 _preload_content=_params.get('_preload_content', True),
560 _request_timeout=_params.get('_request_timeout'),
561 collection_formats=_collection_formats,
562 _request_auth=_params.get('_request_auth'))
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\api_client.py:407, in ApiClient.call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_types_map, auth_settings, async_req, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host, _request_auth)
406 if not async_req:
--> 407 return self.__call_api(resource_path, method,
408 path_params, query_params, header_params,
409 body, post_params, files,
410 response_types_map, auth_settings,
411 _return_http_data_only, collection_formats,
412 _preload_content, _request_timeout, _host,
413 _request_auth)
415 return self.pool.apply_async(self.__call_api, (resource_path,
416 method, path_params,
417 query_params,
(...)
425 _request_timeout,
426 _host, _request_auth))
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\api_client.py:222, in ApiClient.__call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_types_map, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host, _request_auth)
221 e.body = e.body.decode('utf-8')
--> 222 raise e
224 self.last_response = response_data
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\api_client.py:212, in ApiClient.__call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_types_map, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout, _host, _request_auth)
210 try:
211 # perform request and return response
--> 212 response_data = self.request(
213 method, url,
214 query_params=query_params,
215 headers=header_params,
216 post_params=post_params, body=body,
217 _preload_content=_preload_content,
218 _request_timeout=_request_timeout)
219 except ApiException as e:
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\api_client.py:451, in ApiClient.request(self, method, url, query_params, headers, post_params, body, _preload_content, _request_timeout)
450 elif method == "POST":
--> 451 return self.rest_client.post_request(url,
452 query_params=query_params,
453 headers=headers,
454 post_params=post_params,
455 _preload_content=_preload_content,
456 _request_timeout=_request_timeout,
457 body=body)
458 elif method == "PUT":
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\rest.py:278, in RESTClientObject.post_request(self, url, headers, query_params, post_params, body, _preload_content, _request_timeout)
276 def post_request(self, url, headers=None, query_params=None, post_params=None,
277 body=None, _preload_content=True, _request_timeout=None):
--> 278 return self.request("POST", url,
279 headers=headers,
280 query_params=query_params,
281 post_params=post_params,
282 _preload_content=_preload_content,
283 _request_timeout=_request_timeout,
284 body=body)
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_sdk\rest.py:235, in RESTClientObject.request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
234 if 500 <= r.status <= 599:
--> 235 raise ServiceException(http_resp=r)
237 raise ApiException(http_resp=r)
ServiceException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Request-Id': '586c5cf7-a698-4d84-97af-d4e6de8e6eb2', 'Date': 'Sun, 14 Jul 2024 17:56:29 GMT', 'Content-Length': '62'})
HTTP response body: {"message":"request size exceeded, max paths is set to 1000"}
The above exception was the direct cause of the following exception:
ServerException Traceback (most recent call last)
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_spec\spec.py:168, in LakeFSFileSystem.wrapped_api_call(self, rpath, message, set_cause)
167 try:
--> 168 yield
169 except ServerException as e:
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_spec\spec.py:718, in LakeFSFileSystem.rm(self, path, recursive, maxdepth)
717 if maxdepth is None:
--> 718 branch.delete_objects(obj.path forobjin objgen)
719 else:
720 # nesting level is just the amount of "/"s in the path, no leading "/".
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs\branch.py:89, in _BaseBranch.delete_objects(self, object_paths)
88 object_paths = [o.path if isinstance(o, StoredObject) else o foroin object_paths]
---> 89 with api_exception_handler():
90 return self._client.sdk_client.objects_api.delete_objects(
91 self._repo_id,
92 self._id,
93 lakefs_sdk.PathList(paths=object_paths)
94 )
File ~\.conda\envs\tensordb\Lib\contextlib.py:155, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
154 try:
--> 155 self.gen.throw(typ, value, traceback)
156 except StopIteration as exc:
157 # Suppress StopIteration *unless* it's the same exception that
158 # was passed to throw(). This prevents a StopIteration
159 # raised inside the "with" statement from being suppressed.
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs\exceptions.py:148, in api_exception_handler(custom_handler)
147 if lakefs_ex is not None:
--> 148 raise lakefs_ex from e
ServerException: code: 500, reason: Internal Server Error, body: {'message': 'request size exceeded, max paths is set to 1000'}
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
Cell In[56], line 23
20 fs_map = fsspec.FSMap(root=path, fs=lfs)
22 # The error comes when it tries to clean the whole directory to rewrite the data
---> 23 arr.to_zarr(fs_map, mode="w")
25 print(xr.open_zarr(fs_map).compute())
27 time.sleep(5)
File ~\.conda\envs\tensordb\Lib\site-packages\xarray\core\dataset.py:2549, in Dataset.to_zarr(self, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, write_empty_chunks, chunkmanager_store_kwargs)
2404 """Write dataset contents to a zarr group. 2405 2406 Zarr chunks are determined in the following way: (...) 2545 The I/O user guide, with more details and examples. 2546 """
2547 from xarray.backends.api import to_zarr
-> 2549 return to_zarr( # type: ignore[call-overload,misc]
2550 self,
2551 store=store,
2552 chunk_store=chunk_store,
2553 storage_options=storage_options,
2554 mode=mode,
2555 synchronizer=synchronizer,
2556 group=group,
2557 encoding=encoding,
2558 compute=compute,
2559 consolidated=consolidated,
2560 append_dim=append_dim,
2561 region=region,
2562 safe_chunks=safe_chunks,
2563 zarr_version=zarr_version,
2564 write_empty_chunks=write_empty_chunks,
2565 chunkmanager_store_kwargs=chunkmanager_store_kwargs,
2566 )
File ~\.conda\envs\tensordb\Lib\site-packages\xarray\backends\api.py:1661, in to_zarr(dataset, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, write_empty_chunks, chunkmanager_store_kwargs)
1659 already_consolidated = False
1660 consolidate_on_close = consolidated or consolidated is None
-> 1661 zstore = backends.ZarrStore.open_group(
1662 store=mapper,
1663 mode=mode,
1664 synchronizer=synchronizer,
1665 group=group,
1666 consolidated=already_consolidated,
1667 consolidate_on_close=consolidate_on_close,
1668 chunk_store=chunk_mapper,
1669 append_dim=append_dim,
1670 write_region=region,
1671 safe_chunks=safe_chunks,
1672 stacklevel=4, # for Dataset.to_zarr()
1673 zarr_version=zarr_version,
1674 write_empty=write_empty_chunks,
1675 )
1677 if region is not None:
1678 zstore._validate_and_autodetect_region(dataset)
File ~\.conda\envs\tensordb\Lib\site-packages\xarray\backends\zarr.py:483, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, stacklevel, zarr_version, write_empty)
464 @classmethod
465 def open_group(
466 cls,
(...)
480 write_empty: bool | None = None,
481 ):
--> 483 zarr_group, consolidate_on_close, close_store_on_close = _get_open_params(
484 store=store,
485 mode=mode,
486 synchronizer=synchronizer,
487 group=group,
488 consolidated=consolidated,
489 consolidate_on_close=consolidate_on_close,
490 chunk_store=chunk_store,
491 storage_options=storage_options,
492 stacklevel=stacklevel,
493 zarr_version=zarr_version,
494 )
496 return cls(
497 zarr_group,
498 mode,
(...)
504 close_store_on_close,
505 )
File ~\.conda\envs\tensordb\Lib\site-packages\xarray\backends\zarr.py:1332, in _get_open_params(store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, stacklevel, zarr_version)
1330 zarr_group = zarr.open_consolidated(store, **open_kwargs)
1331 else:
-> 1332 zarr_group = zarr.open_group(store, **open_kwargs)
1333 close_store_on_close = zarr_group.store is not store
1334 return zarr_group, consolidate_on_close, close_store_on_close
File ~\.conda\envs\tensordb\Lib\site-packages\zarr\hierarchy.py:1581, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, meta_array)
1578 raise GroupNotFoundError(path)
1580 elif mode == "w":
-> 1581 init_group(store, overwrite=True, path=path, chunk_store=chunk_store)
1583 elif mode == "a":
1584 if not contains_group(store, path=path):
File ~\.conda\envs\tensordb\Lib\site-packages\zarr\storage.py:682, in init_group(store, overwrite, path, chunk_store)
679 store["zarr.json"] = store._metadata_class.encode_hierarchy_metadata(None) # type: ignore
681 # initialise metadata
--> 682 _init_group_metadata(store=store, overwrite=overwrite, path=path, chunk_store=chunk_store)
684 if store_version == 3:
685 # TODO: Should initializing a v3 group also create a corresponding
686 # empty folder under data/root/? I think probably not until there
687 # is actual data written there.
688 pass
File ~\.conda\envs\tensordb\Lib\site-packages\zarr\storage.py:704, in _init_group_metadata(store, overwrite, path, chunk_store)
701 if overwrite:
702 if store_version == 2:
703 # attempt to delete any pre-existing items in store
--> 704 rmdir(store, path)
705 if chunk_store is not None:
706 rmdir(chunk_store, path)
File ~\.conda\envs\tensordb\Lib\site-packages\zarr\storage.py:212, in rmdir(store, path)
209 store_version = getattr(store, "_store_version", 2)
210 if hasattr(store, "rmdir") and store.is_erasable(): # type: ignore
211 # pass through
--> 212 store.rmdir(path)
213 else:
214 # slow version, delete one key at a time
215 if store_version == 2:
File ~\.conda\envs\tensordb\Lib\site-packages\zarr\storage.py:1549, in FSStore.rmdir(self, path)
1547 store_path = self.dir_path(path)
1548 if self.fs.isdir(store_path):
-> 1549 self.fs.rm(store_path, recursive=True)
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_spec\spec.py:714, in LakeFSFileSystem.rm(self, path, recursive, maxdepth)
711 path = stringify_path(path)
712 repository, ref, prefix = parse(path)
--> 714 with self.wrapped_api_call(rpath=path):
715 branch = lakefs.Branch(repository, ref, client=self.client)
716 objgen = branch.objects(prefix=prefix, delimiter=""if recursive else"/")
File ~\.conda\envs\tensordb\Lib\contextlib.py:155, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
153 value = typ()
154 try:
--> 155 self.gen.throw(typ, value, traceback)
156 except StopIteration as exc:
157 # Suppress StopIteration *unless* it's the same exception that
158 # was passed to throw(). This prevents a StopIteration
159 # raised inside the "with" statement from being suppressed.
160 return exc is not value
File ~\.conda\envs\tensordb\Lib\site-packages\lakefs_spec\spec.py:170, in LakeFSFileSystem.wrapped_api_call(self, rpath, message, set_cause)
168 yield
169 except ServerException as e:
--> 170 raise translate_lakefs_error(e, rpath=rpath, message=message, set_cause=set_cause)
OSError: [Errno 5] 500 request size exceeded, max paths is set to 1000: 'quickstart/main/test-zarr'
Thanks for your very detailed report! As you can probably tell from reading both issues, we'll have to figure this out in conjunction with the lakefs-spec people.
Until we do, is there any way to control how zarr deletes objects? (Apologies, I am unfamiliar with zarr...)
Hi @arielshaqed,
thanks for your prompt reply, I think that Zarr calls the following methods to erase multiple files:
In particular, I think that it invokes the "rm" (line 1481) method of the "fs" attribute which is a file system that is compliant with the fsspec standard (LakeFS in my case).
I have not seen this error before using Zarr because I have only used it with s3fs package to connect to S3. I checked their code and they do a bulk delete on batches of size 1000 (you can see the logic on the screenshot), so I think that the solution that is being proposed on Lakefs-spec is the correct one, but I think that as a workaround I could overwrite the rm method of LakeFS-spec to delete on batches.
Given that aai-institute/lakefs-spec#284 was successfully resolved, I'm closing this issue with a hat-tip to the AAI-Institute people, who are great! Please do re-open if you feel that this is a mistake!
What happened?
Note: I opened this issue first on the LakeFS spec repository aai-institute/lakefs-spec#284
I have been trying to use LakeFS in conjunction with Xarray and Zarr, but I'm getting the following error when I try to write a Zarr file with many chunks:
OSError: [Errno 5] 500 request size exceeded, max paths is set to 1000.
I would like to know how I can drop that limitation, I need it to be able to write Zarr files with many chunks (every chunk is an individual file).
Additionally, I would like to know if you think that LakeFS is a good option to use with Zarr. I'm asking this because this format can create many files to represent a single Array. In my particular case, I have more than 300 data fields and all of them have more than 10K chunks which is equivalent to 10K files, so I'm not sure if it can affect the performance of LakeFS.
I deployed LakeFS using the quickstart docker command:
docker run --name lakefs --pull always --rm --publish 8000:8000 treeverse/lakefs:latest run --quickstart
Expected behavior
I would expect that there is an env variable that allows to modify the max_paths to drop the limitation on the number of requests.
lakeFS version
1.28.2
How lakeFS is installed
docker run --name lakefs --pull always --rm --publish 8000:8000 treeverse/lakefs:latest run --quickstart
Affected clients
No response
Relevant log output
Contact details
[email protected]
The text was updated successfully, but these errors were encountered: