Skip to content

Commit

Permalink
feat: supports exporting "labelme" format files
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-Shen committed Nov 5, 2024
1 parent 652c003 commit 8d4ea8e
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 1 deletion.
1 change: 1 addition & 0 deletions labelu/internal/application/command/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ExportType(str, Enum):
JSON = "JSON"
MASK = "MASK"
COCO = "COCO"
LABEL_ME = "LABEL_ME"


class CreateSampleCommand(BaseModel):
Expand Down
157 changes: 156 additions & 1 deletion labelu/internal/common/converter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import base64
import json
import os
from zipfile import ZipFile
from PIL import Image, ImageDraw
from enum import Enum
Expand All @@ -7,12 +9,14 @@
from loguru import logger

from labelu.internal.common.color import colors
from labelu.internal.common.config import settings


class Format(str, Enum):
JSON = "JSON"
COCO = "COCO"
MASK = "MASK"
LABEL_ME = "LABEL_ME"


class Converter:
Expand Down Expand Up @@ -43,6 +47,13 @@ def convert(
out_data_dir=out_data_dir,
out_data_file_name_prefix=out_data_file_name_prefix,
)
elif format == Format.LABEL_ME.value:
return self.convert_to_labelme(
config=config,
input_data=input_data,
out_data_file_name_prefix=out_data_file_name_prefix,
out_data_dir=out_data_dir,
)

def convert_to_json(
self,
Expand Down Expand Up @@ -336,7 +347,151 @@ def convert_to_mask(
logger.info("Export file path: {}", file_full_path_zip)
return file_full_path_zip


def convert_to_labelme(self, config: dict, input_data: List[dict], out_data_file_name_prefix: str, out_data_dir: str):
out_data_dir.mkdir(parents=True, exist_ok=True)
export_files = []
result = []
# does not support cuboid / spline
shape_dict = {
"polygonTool": "polygon",
"rectTool": "rectangle",
"lineTool": "linestrip",
"pointTool": "point",
}

label_text_dict = {}
common_attributes = { attr.get("value"): attr.get("key") for attr in config.get("attributes", [])}

for tool in config.get("tools", []):
label_text_dict[tool.get("tool")] = { attr.get("value"): attr.get("key") for attr in tool.get("config", {}).get("attributes", [])}

def get_label(_tool: str, _input_label: str):
_label = label_text_dict.get(_tool, {}).get(_input_label, "")

if not _label:
_label = common_attributes.get(_input_label, "")

return _label

def convert_points(points: List[dict]):
return [[point.get("x"), point.get("y")] for point in points]

def image_to_base64(file_path: str):
file_full_path = settings.MEDIA_ROOT.joinpath(file_path.lstrip("/"))
with open(file_full_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')

for sample in input_data:
labelme_item = {
"version": "5.5.0",
"flags": {},
"shapes": [],
"imagePath": "",
"imageData": "",
"imageHeight": 0,
"imageWidth": 0,
}
data = json.loads(sample.get("data"))
file = sample.get("file", {})

# skip invalid data
annotated_result = json.loads(data.get("result"))
if sample.get("state") == "SKIPPED":
continue

labelme_item["imagePath"] = file.get("filename", "")[9:]
labelme_item["imageData"] = image_to_base64(file.get("path"))

if annotated_result:
labelme_item["imageWidth"] = annotated_result.get("width", 0)
labelme_item["imageHeight"] = annotated_result.get("height", 0)

for tool in annotated_result.copy().keys():
if tool.endswith("Tool") and tool in shape_dict:
# polygon
if tool == "polygonTool":
tool_results = annotated_result.pop(tool)
for tool_result in tool_results.get("result", []):
attributes = tool_result.get("attributes", {})
shape = {
"label": get_label(tool, tool_result.get("label", "")),
"points": convert_points(tool_result.get("points", [])),
"group_id": "",
# Get description from attributes
"description": attributes.get("description", ""),
"shape_type": shape_dict.get(tool, "polygon"),
"flags": {},
"mask": "",
}
labelme_item["shapes"].append(shape)

# rect
if tool == "rectTool":
tool_results = annotated_result.pop(tool)
for tool_result in tool_results.get("result", []):
attributes = tool_result.get("attributes", {})
x = tool_result.get("x", 0)
y = tool_result.get("y", 0)
width = tool_result.get("width", 0)
height = tool_result.get("height", 0)
shape = {
"label": get_label(tool, tool_result.get("label", "")),
"points": [[x, y], [x + width, y + height]],
"group_id": "",
"description": attributes.get("description", ""),
"shape_type": shape_dict.get(tool, "rectangle"),
"flags": {},
"mask": "",
}
labelme_item["shapes"].append(shape)

if tool == "lineTool":
tool_results = annotated_result.pop(tool)
for tool_result in tool_results.get("result", []):
attributes = tool_result.get("attributes", {})
shape = {
"label": get_label(tool, tool_result.get("label", "")),
"points": convert_points(tool_result.get("points", [])),
"group_id": "",
"description": attributes.get("description", ""),
"shape_type": shape_dict.get(tool, "linestrip"),
"flags": {},
"mask": "",
}
labelme_item["shapes"].append(shape)

if tool == "pointTool":
tool_results = annotated_result.pop(tool)
for tool_result in tool_results.get("result", []):
attributes = tool_result.get("attributes", {})
shape = {
"label": get_label(tool, tool_result.get("label", "")),
"points": [[tool_result.get("x", 0), tool_result.get("y", 0)]],
"group_id": "",
"description": attributes.get("description", ""),
"shape_type": shape_dict.get(tool, "point"),
"flags": {},
"mask": "",
}
labelme_item["shapes"].append(shape)
result.append(labelme_item)
file_basename = os.path.splitext(file.get("filename", "")[9:])[0]
file_name = out_data_dir.joinpath(f"{file_basename}.json")
with file_name.open("w") as outfile:
# 格式化json,两个空格缩进
json.dump(labelme_item, outfile, indent=2, ensure_ascii=False)

export_files.append(file_name)


file_relative_path_zip = f"task-{out_data_file_name_prefix}-label-me.zip"
file_full_path_zip = out_data_dir.joinpath(file_relative_path_zip)
with ZipFile(file_full_path_zip, "w") as zipf:
for f in export_files:
zipf.write(str(f), arcname=f.name)
logger.info("Export file path: {}", file_full_path_zip)
return file_full_path_zip

def _polygonArea(X, Y):

# Initialize area
Expand Down

0 comments on commit 8d4ea8e

Please sign in to comment.