Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEVX-288] Dataset Upload Status #218

Merged
merged 7 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions clarifai/client/dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import time
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
from typing import Generator, List, Tuple, Type, TypeVar, Union
Expand All @@ -20,10 +22,11 @@
from clarifai.datasets.upload.image import (VisualClassificationDataset, VisualDetectionDataset,
VisualSegmentationDataset)
from clarifai.datasets.upload.text import TextClassificationDataset
from clarifai.datasets.upload.utils import DisplayUploadStatus
from clarifai.errors import UserError
from clarifai.urls.helper import ClarifaiUrlHelper
from clarifai.utils.logging import get_logger
from clarifai.utils.misc import Chunker
from clarifai.utils.misc import BackoffIterator, Chunker

ClarifaiDatasetType = TypeVar('ClarifaiDatasetType', VisualClassificationDataset,
VisualDetectionDataset, VisualSegmentationDataset,
Expand Down Expand Up @@ -284,12 +287,16 @@ def _data_upload(self, dataset_obj: ClarifaiDatasetType) -> None:
self._retry_uploads(retry_input_ids, retry_annot_protos, dataset_obj)
progress.update()

def upload_dataset(self, dataloader: Type[ClarifaiDataLoader], batch_size: int = 32) -> None:
def upload_dataset(self,
dataloader: Type[ClarifaiDataLoader],
batch_size: int = 32,
get_upload_status: bool = False) -> None:
"""Uploads a dataset to the app.

Args:
dataloader (Type[ClarifaiDataLoader]): ClarifaiDataLoader object
batch_size (int): batch size for concurrent upload of inputs and annotations (max: 128)
get_upload_status (bool): True if you want to get the upload status of the dataset
"""
self.batch_size = min(self.batch_size, batch_size)
self.task = dataloader.task
Expand All @@ -312,6 +319,9 @@ def upload_dataset(self, dataloader: Type[ClarifaiDataLoader], batch_size: int =

self._data_upload(dataset_obj)

if get_upload_status:
self.get_upload_status(dataloader)

def upload_from_csv(self,
csv_path: str,
input_type: str = 'text',
Expand Down Expand Up @@ -384,6 +394,65 @@ def upload_from_folder(self,
folder_path=folder_path, dataset_id=self.id, labels=labels)
self.input_object._bulk_upload(inputs=input_protos, batch_size=batch_size)

def get_upload_status(self,
dataloader: Type[ClarifaiDataLoader],
delete_version: bool = False,
timeout: int = 600) -> None:
"""Creates a new dataset version and displays the upload status of the dataset.

Args:
dataloader (Type[ClarifaiDataLoader]): ClarifaiDataLoader object
delete_version (bool): True if you want to delete the version after getting the upload status
isaac-chung marked this conversation as resolved.
Show resolved Hide resolved
timeout (int): Timeout in seconds for getting the upload status. Default is 600 seconds.

Example:
>>> from clarifai.client.dataset import Dataset
>>> dataset = Dataset(dataset_id='dataset_id', user_id='user_id', app_id='app_id')
>>> dataset.get_upload_status(dataloader)

Note:
This is a beta feature and is subject to change.
"""
self.logger.info("Getting dataset upload status...")
dataset_version_id = uuid.uuid4().hex
_ = self.create_version(id=dataset_version_id, description="SDK Upload Status")

request_data = dict(
user_app_id=self.user_app_id,
dataset_id=self.id,
dataset_version_id=dataset_version_id,
)

start_time = time.time()
backoff_iterator = BackoffIterator()
while (True):
dataset_metrics_response = self._grpc_request(
self.STUB.ListDatasetVersionMetricsGroups,
service_pb2.ListDatasetVersionMetricsGroupsRequest(**request_data),
)

if dataset_metrics_response.status.code != status_code_pb2.SUCCESS:
self.delete_version(dataset_version_id)
raise Exception("Failed to get dataset metrics {}".format(dataset_metrics_response.status))

dict_response = MessageToDict(dataset_metrics_response)
if len(dict_response.keys()) == 1 and time.time() - start_time < timeout:
self.logger.info("Crunching the dataset metrics. Please wait...")
time.sleep(next(backoff_iterator))
continue
else:
if time.time() - start_time > timeout:
self.delete_version(dataset_version_id)
raise UserError(
"Dataset metrics are taking too long to process. Please try again later.")
break

dataset_info_dict = dict(user_id=self.user_id, app_id=self.app_id, dataset_id=self.id)
DisplayUploadStatus(dataloader, dataset_metrics_response, dataset_info_dict)

if delete_version:
self.delete_version(dataset_version_id)

def export(self,
save_path: str,
archive_url: str = None,
Expand Down
3 changes: 1 addition & 2 deletions clarifai/client/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ def upload_annotations(self, batch_annot: List[resources_pb2.Annotation], show_l
response = self._grpc_request(self.STUB.PostAnnotations, request)
if response.status.code != status_code_pb2.SUCCESS:
try:
self.logger.warning(
f"Post annotations failed, status: {response.annotations[0].status.details}")
self.logger.warning(f"Post annotations failed, status: {response.annotations[0].status}")
except Exception:
self.logger.warning(f"Post annotations failed, status: {response.status.details}")
finally:
Expand Down
18 changes: 18 additions & 0 deletions clarifai/constants/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,21 @@
"visual_classification", "text_classification", "visual_detection", "visual_segmentation",
"visual_captioning"
]

TASK_TO_ANNOTATION_TYPE = {
"visual_classification": {
"concepts": "labels"
},
"text_classification": {
"concepts": "labels"
},
"visual_captioning": {
"concepts": "labels"
},
"visual_detection": {
"bboxes": "bboxes"
},
"visual_segmentation": {
"polygons": "polygons"
},
}
13 changes: 8 additions & 5 deletions clarifai/datasets/upload/loaders/coco_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def load_data(self) -> None:
self.map_ids = {i: img_id for i, img_id in enumerate(list(self.coco.imgs.keys()))}

def __len__(self):
return len(self.img_ids)
return len(self.coco.imgs)

def __getitem__(self, index):
"""Get image and annotations for a given index."""
Expand All @@ -59,9 +59,11 @@ def __getitem__(self, index):

# get polygons
if isinstance(ann['segmentation'], list):
poly = np.array(ann['segmentation']).reshape((int(len(ann['segmentation'][0]) / 2), 2))
annots.append(poly.tolist()) #[[x=col, y=row],...]
poly = np.array(ann['segmentation']).reshape((int(len(ann['segmentation'][0]) / 2),
2)).astype(float)
poly[:, 0], poly[:, 1] = poly[:, 0] / value['width'], poly[:, 1] / value['height']
poly = np.clip(poly, 0, 1)
annots.append(poly.tolist()) #[[x=col, y=row],...]
concept_ids.append(concept_id)
else: # seg: {"counts":[...]}
if isinstance(ann['segmentation']['counts'], list):
Expand All @@ -82,10 +84,11 @@ def __getitem__(self, index):
del mask
gc.collect()

polygons = np.array(polygons_flattened).reshape((int(len(polygons_flattened) / 2), 2))
polygons = np.array(polygons_flattened).reshape((int(len(polygons_flattened) / 2),
2)).astype(float)
polygons[:, 0] = polygons[:, 0] / value['width']
polygons[:, 1] = polygons[:, 1] / value['height']

polygons = np.clip(polygons, 0, 1)
annots.append(polygons.tolist()) #[[x=col, y=row],...,[x=col, y=row]]
concept_ids.append(concept_id)

Expand Down
4 changes: 1 addition & 3 deletions clarifai/datasets/upload/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ def process_data_item(id):
text = data_item.text
labels = data_item.labels if isinstance(data_item.labels,
list) else [data_item.labels] # clarifai concept
input_id = f"{self.dataset_id}-{self.split}-{id}" if data_item.id is None else f"{self.dataset_id}-{self.split}-{str(data_item.id)}"
input_id = f"{self.dataset_id}-{id}" if data_item.id is None else f"{self.dataset_id}-{str(data_item.id)}"
if data_item.metadata is not None:
metadata.update(data_item.metadata)
else:
metadata.update({"split": self.split})

self.all_input_ids[id] = input_id
input_protos.append(
Expand Down
Loading
Loading