Skip to content

Commit

Permalink
feat: supports exporting "csv" format files
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-Shen committed Nov 5, 2024
1 parent b3d07b4 commit d35a93a
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 0 deletions.
2 changes: 2 additions & 0 deletions labelu/internal/application/command/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class ExportType(str, Enum):
MASK = "MASK"
COCO = "COCO"
YOLO = "YOLO"
CSV = "CSV"
XML = "XML"
LABEL_ME = "LABEL_ME"


Expand Down
148 changes: 148 additions & 0 deletions labelu/internal/common/converter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import csv
import json
import os
from zipfile import ZipFile
Expand All @@ -17,6 +18,10 @@ class Format(str, Enum):
COCO = "COCO"
MASK = "MASK"
YOLO = "YOLO"
CSV = "CSV"
XML = "XML"
VOC = "VOC"
TFRECORD = "TFRECORD"
LABEL_ME = "LABEL_ME"


Expand Down Expand Up @@ -62,6 +67,13 @@ def convert(
out_data_file_name_prefix=out_data_file_name_prefix,
out_data_dir=out_data_dir,
)
elif format == Format.CSV.value:
return self.convert_to_csv(
config=config,
input_data=input_data,
out_data_file_name_prefix=out_data_file_name_prefix,
out_data_dir=out_data_dir,
)

def convert_to_json(
self,
Expand Down Expand Up @@ -561,6 +573,142 @@ def convert_to_yolo(self, config: dict, input_data: List[dict], out_data_file_na
zipf.write(str(f), arcname=f.name)
logger.info("Export file path: {}", file_full_path_zip)
return file_full_path_zip

def convert_to_csv(self, config: dict, input_data: List[dict], out_data_file_name_prefix: str, out_data_dir: str):
out_data_dir.mkdir(parents=True, exist_ok=True)
export_files = []

label_text_dict = {}
common_attributes = { attr.get("value"): attr.get("key") for attr in config.get("attributes", [])}

def get_label(_tool: str, _input_label: str):
_label = label_text_dict.get(_tool, {}).get(_input_label, "")

if not _label:
_label = common_attributes.get(_input_label, "")

return _label

def get_attributes(attributes: dict):
result = []

for value in attributes.values():
result.append(", ".join(value) if isinstance(value, list) else value)

return ', '.join(result)

def get_points(direction: dict):
return [
(
direction.get("tl").get("x"),
direction.get("tl").get("y"),
),
(
direction.get("tr").get("x"),
direction.get("tr").get("y"),
),
(
direction.get("br").get("x"),
direction.get("br").get("y"),
),
(
direction.get("bl").get("x"),
direction.get("bl").get("y"),
),
]


for tool in config.get("tools", []):
label_text_dict[tool.get("tool")] = { attr.get("value"): attr.get("key") for attr in tool.get("config", {}).get("attributes", [])}

for sample in input_data:
data = json.loads(sample.get("data"))
file = sample.get("file", {})
# tool_name, label, x, y, width, height etc.
rows = []

# skip invalid data
annotated_result = json.loads(data.get("result"))
if sample.get("state") == "SKIPPED" or not annotated_result:
continue

for tool in annotated_result.copy().keys():
if tool == 'rectTool':
tool_results = annotated_result.pop(tool)
rows.append(["tool_name", "label", "label_text", "x", "y", "width", "height", "attributes", "order"])
for tool_result in tool_results.get("result", []):
x = tool_result.get("x", 0)
y = tool_result.get("y", 0)
width = tool_result.get("width", 0)
height = tool_result.get("height", 0)
label = tool_result.get("label", "")
order = tool_result.get("order", 0)
label_text = get_label(tool, label)
rows.append([tool, label, label_text, x, y, width, height, get_attributes(tool_result.get('attributes', {})), order])

if tool == 'lineTool':
tool_results = annotated_result.pop(tool)
rows.append(["tool_name", "label", "label_text", "points", "control_points", "attributes", "order"])
for tool_result in tool_results.get("result", []):
points = tool_result.get("points", [])
control_points = tool_result.get("controlPoints", [])
label = tool_result.get("label", "")
order = tool_result.get("order", 0)
label_text = get_label(tool, label)
rows.append([tool, label, label_text, points, control_points, get_attributes(tool_result.get('attributes', {})), order])

if tool == 'pointTool':
tool_results = annotated_result.pop(tool)
rows.append(["tool_name", "label", "label_text", "x", "y", "order"])
for tool_result in tool_results.get("result", []):
x = tool_result.get("x", 0)
y = tool_result.get("y", 0)
label = tool_result.get("label", "")
order = tool_result.get("order", 0)
label_text = get_label(tool, label)
rows.append([tool, label, label_text, x, y, order])

if tool == 'polygonTool':
tool_results = annotated_result.pop(tool)
rows.append(["tool_name", "label", "label_text", "points", "attributes", "order"])
for tool_result in tool_results.get("result", []):
points = tool_result.get("points", [])
control_points = tool_result.get("controlPoints", [])
label = tool_result.get("label", "")
order = tool_result.get("order", 0)
label_text = get_label(tool, label)
rows.append([tool, label, label_text, points, control_points, get_attributes(tool_result.get('attributes', {})), order])

if tool == 'cuboidTool':
tool_results = annotated_result.pop(tool)
rows.append(["tool_name", "label", "label_text", "direction", "front", "back", "attributes", "order"])
for tool_result in tool_results.get("result", []):
direction = tool_result.get("direction")
# [[x,y], ...]
front = get_points(tool_result.get("front"))
back = get_points(tool_result.get("back"))
width = tool_result.get("width", 0)
height = tool_result.get("height", 0)
label = tool_result.get("label", "")
order = tool_result.get("order", 0)
label_text = get_label(tool, label)
rows.append([tool, label, label_text, direction, front, back, get_attributes(tool_result.get('attributes', {})), order])

file_basename = os.path.splitext(file.get("filename", "")[9:])[0]
file_name = out_data_dir.joinpath(f"{file_basename}.csv")
with file_name.open("w") as outfile:
writer = csv.writer(outfile)
writer.writerows(rows)
export_files.append(file_name)

file_relative_path_zip = f"task-{out_data_file_name_prefix}-csv.zip"
file_full_path_zip = out_data_dir.joinpath(file_relative_path_zip)
with ZipFile(file_full_path_zip, "w") as zipf:
for f in export_files:
zipf.write(str(f), arcname=f.name)
logger.info("Export file path: {}", file_full_path_zip)
return file_full_path_zip

def _polygonArea(X, Y):

# Initialize area
Expand Down

0 comments on commit d35a93a

Please sign in to comment.