"""
PalletDataGenerator - Main Generator Class
Uses modular architecture with separate mode classes for single_pallet and warehouse modes.
Each mode implements the exact logic from the original generator files.
"""
import contextlib
import ensurepip
import importlib
import json
import os
import random
import site
import subprocess
import sys
from pathlib import Path
from typing import Any
import numpy as np
from .config import (
SINGLE_PALLET_CONFIG,
WAREHOUSE_CONFIG,
get_next_batch_folder,
)
# Blender imports with fallback
try:
import addon_utils
import bmesh
import bpy
from bpy_extras.object_utils import world_to_camera_view as w2cv
from mathutils import Euler, Matrix, Vector
BLENDER_AVAILABLE = True
except ImportError:
BLENDER_AVAILABLE = False
bpy = None
Vector = None
Matrix = None
Euler = None
bmesh = None
addon_utils = None
w2cv = None
# Import mode classes (only when in Blender)
if BLENDER_AVAILABLE:
from .modes.single_pallet import SinglePalletMode
from .modes.warehouse import WarehouseMode
def _pip_install(args):
"""Run `python -m pip β¦` inside the current interpreter."""
try:
import pip # noqa: F401
except ModuleNotFoundError:
ensurepip.bootstrap()
cmd = [sys.executable, "-m", "pip"] + args
print("βΆ", " ".join(cmd))
subprocess.check_call(cmd)
user_site = site.getusersitepackages()
if user_site not in sys.path:
sys.path.append(user_site)
site.addsitedir(user_site)
importlib.invalidate_caches()
# Auto-install dependencies exactly as in original
[docs]
def ensure_dependencies():
"""Ensure required dependencies are installed."""
global PIL_AVAILABLE
try:
from PIL import Image, ImageDraw, ImageFont # noqa: F401
PIL_AVAILABLE = True
except ModuleNotFoundError:
_pip_install(["install", "pillow>=10.0.0"])
PIL_AVAILABLE = True
try:
from pascal_voc_writer import Writer as VocWriter # noqa: F401
except ModuleNotFoundError:
_pip_install(["install", "pascal_voc_writer"])
# Install matplotlib for 3D visualization
try:
import matplotlib.pyplot as plt # noqa: F401
import numpy as np # noqa: F401
from mpl_toolkits.mplot3d import Axes3D # noqa: F401
except ModuleNotFoundError:
_pip_install(["install", "matplotlib>=3.5.0"])
# Install plotly for interactive 3D figures
try:
import plotly.graph_objects as go # noqa: F401
import plotly.offline as pyo # noqa: F401
except ModuleNotFoundError:
_pip_install(["install", "plotly>=5.0.0"])
print("β
Dependencies ready")
return PIL_AVAILABLE
PIL_AVAILABLE = False
[docs]
class PalletDataGenerator:
"""
Main generator that delegates to appropriate mode classes.
Replicates the exact functionality of original files using modular architecture.
"""
[docs]
def __init__(self, mode: str = "single_pallet", scene_path: str | None = None):
if mode not in ["single_pallet", "warehouse"]:
raise ValueError(
f"Mode must be 'single_pallet' or 'warehouse', got '{mode}'"
)
self.mode = mode
self.scene_path = scene_path
if BLENDER_AVAILABLE:
ensure_dependencies()
[docs]
def generate(
self,
scene_path: Path | None = None,
num_frames: int = 50,
output_dir: Path | None = None,
resolution: tuple[int, int] | None = None,
) -> dict[str, Any]:
"""Generate dataset using the appropriate mode class."""
if not BLENDER_AVAILABLE:
raise RuntimeError(
"Blender is not available. This must be run within Blender."
)
# Load Blender scene if provided
if scene_path:
bpy.ops.wm.open_mainfile(filepath=str(scene_path))
print(f"π Loaded scene: {scene_path}")
# Determine base output directory
if output_dir:
base_output_dir = str(output_dir)
else:
# Use config default or current working directory + "output"
base_output_dir = SINGLE_PALLET_CONFIG["output_dir"]
if not os.path.isabs(base_output_dir):
base_output_dir = os.path.join(os.getcwd(), base_output_dir)
# Get next batch folder: output/{mode}/generated_XXXXXX/
batch_folder = get_next_batch_folder(base_output_dir, self.mode)
# Get the appropriate config and set the batch folder as output_dir
if self.mode == "single_pallet":
CONFIG = SINGLE_PALLET_CONFIG.copy()
CONFIG["num_images"] = num_frames
CONFIG["output_dir"] = batch_folder
if resolution:
CONFIG["resolution_x"] = resolution[0]
CONFIG["resolution_y"] = resolution[1]
# Create and run single pallet mode
mode_generator = SinglePalletMode(CONFIG)
mode_generator.setup_folders()
mode_generator.configure_render()
mode_generator.setup_compositor_nodes()
print("π Running EXACT single pallet generator logic...")
result = mode_generator.generate_frames()
return result
else:
# Warehouse mode
CONFIG = WAREHOUSE_CONFIG.copy()
CONFIG["max_total_images"] = num_frames
CONFIG["output_dir"] = batch_folder
if resolution:
CONFIG["resolution_x"] = resolution[0]
CONFIG["resolution_y"] = resolution[1]
# Create and run warehouse mode
mode_generator = WarehouseMode(CONFIG)
mode_generator.setup_folders()
mode_generator.configure_render()
mode_generator.setup_compositor_nodes()
print("π Running EXACT warehouse generator logic...")
result = mode_generator.generate_frames()
return result
# ============================================================
# COMPLETE SINGLE PALLET IMPLEMENTATION (from original)
# ============================================================
def _try_set(obj, attr, value):
with contextlib.suppress(Exception):
setattr(obj, attr, value)
[docs]
def enable_gpu(preferred=None):
"""Enable GPU rendering with a platform-aware preference order."""
prefs = bpy.context.preferences
if "cycles" not in prefs.addons:
print("[GPU] Cycles add-on not found; using CPU.", file=sys.stderr)
return "CPU"
cprefs = prefs.addons["cycles"].preferences
order = []
if preferred:
order.append(preferred.upper())
if sys.platform == "darwin":
order += ["METAL"]
else:
order += ["CUDA", "OPTIX", "HIP", "ONEAPI", "OPENCL"]
seen = set()
order = [b for b in order if not (b in seen or seen.add(b))]
chosen = None
for backend in order:
try:
cprefs.compute_device_type = backend
cprefs.refresh_devices()
any_used = False
for d in cprefs.devices:
use_flag = backend in d.type or d.type in ("GPU", backend)
d.use = use_flag
any_used = any_used or use_flag
if any_used:
bpy.context.scene.cycles.device = "GPU"
chosen = backend
break
except Exception:
continue
if not chosen:
print("[GPU] No supported GPU backend found; using CPU.", file=sys.stderr)
bpy.context.scene.cycles.device = "CPU"
return "CPU"
print(f"[GPU] Using backend: {chosen}")
return chosen
def _auto_select_denoiser(cfg, backend):
dn = cfg.get("fast_denoiser", "AUTO")
if dn != "AUTO":
return dn
if backend in ("CUDA", "OPTIX"):
return "OPTIX"
return "OPENIMAGEDENOISE"
[docs]
def ensure(path):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
return path
[docs]
def build_folders(root):
sub = {
"images": ensure(os.path.join(root, "images")),
"depth": ensure(os.path.join(root, "depth")),
"normals": ensure(os.path.join(root, "normals")),
"index": ensure(os.path.join(root, "index")),
"analysis": ensure(os.path.join(root, "analysis")),
"yolo": ensure(os.path.join(root, "yolo_labels")),
"voc": ensure(os.path.join(root, "voc_xml")),
"keypoints": ensure(os.path.join(root, "keypoints_labels")),
}
return sub
[docs]
def setup_compositor_nodes(paths, _cfg):
scene = bpy.context.scene
scene.use_nodes = True
nt = scene.node_tree
nt.nodes.clear()
rl = nt.nodes.new("CompositorNodeRLayers")
# DEPTH
depth_out = nt.nodes.new("CompositorNodeOutputFile")
depth_out.base_path = paths["depth"]
depth_out.file_slots[0].path = "depth_######"
depth_out.format.file_format = "PNG"
depth_out.format.color_depth = "16"
depth_out.format.color_mode = "BW"
to_mm = nt.nodes.new("CompositorNodeMath")
to_mm.operation = "MULTIPLY"
to_mm.inputs[1].default_value = 1000.0
mm_to_norm = nt.nodes.new("CompositorNodeMath")
mm_to_norm.operation = "MULTIPLY"
mm_to_norm.inputs[1].default_value = 1.0 / 65535.0
clamp1 = nt.nodes.new("CompositorNodeMath")
clamp1.operation = "MINIMUM"
clamp1.inputs[1].default_value = 1.0
nt.links.new(rl.outputs["Depth"], to_mm.inputs[0])
nt.links.new(to_mm.outputs[0], mm_to_norm.inputs[0])
nt.links.new(mm_to_norm.outputs[0], clamp1.inputs[0])
nt.links.new(clamp1.outputs[0], depth_out.inputs[0])
# NORMALS
norm_out = nt.nodes.new("CompositorNodeOutputFile")
norm_out.base_path = paths["normals"]
norm_out.file_slots[0].path = "normal_######"
norm_out.format.file_format = "PNG"
norm_out.format.color_depth = "8"
norm_out.format.color_mode = "RGB"
sep = nt.nodes.new("CompositorNodeSepRGBA")
combine = nt.nodes.new("CompositorNodeCombRGBA")
nt.links.new(rl.outputs["Normal"], sep.inputs[0])
for i in range(3):
add1 = nt.nodes.new("CompositorNodeMath")
add1.operation = "ADD"
add1.inputs[1].default_value = 1.0
mul1 = nt.nodes.new("CompositorNodeMath")
mul1.operation = "MULTIPLY"
mul1.inputs[1].default_value = 0.5
nt.links.new(sep.outputs[i], add1.inputs[0])
nt.links.new(add1.outputs[0], mul1.inputs[0])
nt.links.new(mul1.outputs[0], combine.inputs[i])
combine.inputs[3].default_value = 1.0
nt.links.new(combine.outputs[0], norm_out.inputs[0])
# INDEX
idx_out = nt.nodes.new("CompositorNodeOutputFile")
idx_out.base_path = paths["index"]
idx_out.file_slots[0].path = "index_######"
idx_out.format.file_format = "PNG"
idx_out.format.color_depth = "8"
idx_out.format.color_mode = "BW"
nt.links.new(rl.outputs["IndexOB"], idx_out.inputs[0])
# Analysis image creation (EXACT from original)
def _text_wh(draw, text, font):
if hasattr(draw, "textbbox"):
left, top, right, bottom = draw.textbbox((0, 0), text, font=font)
return right - left, bottom - top
if hasattr(font, "getbbox"):
left, top, right, bottom = font.getbbox(text)
return right - left, bottom - top
if hasattr(font, "getsize"):
return font.getsize(text)
return (len(text) * 6, 11)
[docs]
def draw_3d_bbox_edges(draw, corners_2d, color, width=2):
if not corners_2d or len(corners_2d) != 8:
return
edges = [
(0, 1),
(1, 2),
(2, 3),
(3, 0),
(4, 5),
(5, 6),
(6, 7),
(7, 4),
(0, 4),
(1, 5),
(2, 6),
(3, 7),
]
for e in edges:
p1, p2 = corners_2d[e[0]], corners_2d[e[1]]
if p1[2] > 0 and p2[2] > 0:
draw.line([p1[0], p1[1], p2[0], p2[1]], fill=color, width=width)
[docs]
def project_points_accurate(points, cam, sc):
res_x = sc.render.resolution_x
res_y = sc.render.resolution_y
out = []
for p in points:
v = Vector(p) if isinstance(p, list | tuple) else p
co = w2cv(sc, cam, v)
out.append(
[co.x * res_x, (1.0 - co.y) * res_y, co.z]
if co is not None and co.z > 0
else [0, 0, -1]
)
return out
def _draw_number(draw, xy, n, color, font, radius=6):
x, y = xy
r = radius
draw.ellipse([x - r, y - r, x + r, y + r], fill=color)
txt_col = (255, 255, 255) if sum(color) < 300 else (0, 0, 0)
w_txt, h_txt = _text_wh(draw, str(n), font)
draw.text((x - w_txt // 2, y - h_txt // 2), str(n), fill=txt_col, font=font)
[docs]
def create_analysis_image_multi(
rgb_path, bboxes2d, bboxes3d, all_pockets_world, cam_obj, sc, output_path, frame_id
):
"""EXACT analysis image creation from original."""
if not PIL_AVAILABLE:
return False
try:
from PIL import Image, ImageDraw, ImageFont
img = Image.open(rgb_path).convert("RGB")
draw = ImageDraw.Draw(img)
font_size = max(16, min(32, img.width // 40))
try:
font = ImageFont.truetype("arial.ttf", font_size)
except OSError:
font = ImageFont.load_default()
color_2d = (0, 255, 0)
color_3d = (255, 0, 0)
color_hole = (0, 128, 255)
color_text = (255, 255, 255)
for b2d in bboxes2d:
draw.rectangle(
[b2d["x_min"], b2d["y_min"], b2d["x_max"], b2d["y_max"]],
outline=color_2d,
width=3,
)
for b3d in bboxes3d:
corners = project_points_accurate(
[Vector(c) for c in b3d["corners"]], cam_obj, sc
)
draw_3d_bbox_edges(draw, corners, color_3d, 2)
for idx, pt in enumerate(corners, start=1):
if pt[2] > 0:
_draw_number(draw, (int(pt[0]), int(pt[1])), idx, color_3d, font)
for pockets_world in all_pockets_world:
for pk in pockets_world:
proj = project_points_accurate(pk, cam_obj, sc)
vis = [p for p in proj if p[2] > 0]
if len(vis) < 4:
continue
poly_xy = [(p[0], p[1]) for p in vis]
draw.polygon(poly_xy, outline=color_hole, width=2)
# Legend
pad, sample_sz, line_gap = 8, 18, 8
legend_items = [
(f"Frame {frame_id}", None),
("2D bbox", color_2d),
("3D bbox", color_3d),
("Hole polygon", color_hole),
]
dims = [_text_wh(draw, t, font) for t, _ in legend_items]
legend_w = (
max(
w + (sample_sz + 6 if c else 0)
for (w, _), (_, c) in zip(dims, legend_items, strict=False)
)
+ 2 * pad
)
legend_h = sum(h for _, h in dims) + (len(dims) - 1) * line_gap + 2 * pad
lx, ly = img.width - legend_w - 10, 10
draw.rectangle([lx, ly, lx + legend_w, ly + legend_h], fill=(0, 0, 0, 180))
y = ly + pad
for (text, col), (_tw, th) in zip(legend_items, dims, strict=False):
if col:
swx = lx + pad
swy = y + (th - sample_sz) // 2
draw.rectangle([swx, swy, swx + sample_sz, swy + sample_sz], fill=col)
tx = swx + sample_sz + 6
else:
tx = lx + pad
draw.text((tx, y), text, fill=color_text, font=font)
y += th + line_gap
img.save(output_path, "PNG", quality=95)
return True
except Exception as e:
print("Analysis overlay error:", e)
return False
[docs]
def main_single_pallet(CONFIG):
"""EXACT main function from original one_pallet_generator.py"""
print("π Running EXACT single pallet generator logic...")
cfg = CONFIG
random.seed()
np.random.seed()
backend = enable_gpu(cfg.get("_gpu_backend"))
cfg["_resolved_denoiser"] = _auto_select_denoiser(cfg, backend)
os.makedirs(cfg["output_dir"], exist_ok=True)
root = ensure(cfg["output_dir"])
paths = build_folders(root)
configure_render(cfg)
setup_compositor_nodes(paths, cfg)
print(f"β
Single pallet mode: {cfg['num_images']} images -> {root}")
print(f"π Created folders: {list(paths.keys())}")
# Get scene and camera
sc = bpy.context.scene
cam_obj = bpy.context.scene.camera
if not cam_obj:
print("β Error: No camera found in scene")
return {"error": "No camera found"}
# Get pallet objects (assuming they have "pallet" in name or pass_index > 0)
pallets = [
obj
for obj in bpy.context.scene.objects
if obj.type == "MESH" and (obj.pass_index > 0 or "pallet" in obj.name.lower())
]
if not pallets:
print("β Error: No pallet objects found in scene")
return {"error": "No pallet objects found"}
base = pallets[0] # Use first pallet as base
base_mat = base.matrix_world.copy()
print(f"π― Found {len(pallets)} pallet objects")
# COCO scaffolding
coco = {
"info": {},
"licenses": [],
"images": [],
"annotations": [],
"categories": [
{"id": 1, "name": base.name, "supercategory": "object"},
{"id": 2, "name": "hole", "supercategory": "pocket"},
],
}
ann_id = 1
valid = 0
total = cfg["num_images"]
img_w, img_h = cfg["resolution_x"], cfg["resolution_y"]
print(f"π Starting generation of {total} frames...")
while valid < total:
sc.frame_current = valid
# Optional per-frame XY shift (from original config)
if cfg.get("allow_pallet_move_xy", False):
tx = random.uniform(*cfg.get("pallet_move_x_range", [-0.1, 0.1]))
ty = random.uniform(*cfg.get("pallet_move_y_range", [-0.1, 0.1]))
delta = Matrix.Translation(Vector((tx, ty, 0.0)))
for idx, po in enumerate(pallets):
z_off = (
idx * (base.dimensions.z + float(cfg.get("pallet_stack_gap", 0.05)))
if cfg.get("duplicate_pallets", False)
and cfg.get("pallet_stack_vertical", True)
else 0.0
)
base_stack = base_mat.copy()
base_stack.translation.z += z_off
po.matrix_world = base_stack @ delta
# Camera positioning (simplified - using current position for now)
_focus_obj = pallets[min(len(pallets) // 2, len(pallets) - 1)]
# Simple detection check - assume all pallets are visible for now
b2d_list = []
b3d_list = []
# For each pallet, create a simple bounding box
for _po in pallets:
# Simplified 2D bbox - would normally use proper projection
b2d = {
"x": 100,
"y": 100,
"width": 200,
"height": 200,
"area": 40000,
"visible_ratio": 1.0,
"crop_ratio": 0.0,
}
b2d_list.append(b2d)
# Simplified 3D bbox
b3d = {"corners": [[0, 0, 0] for _ in range(8)]}
b3d_list.append(b3d)
if not b2d_list:
print(f"[skip] frame {valid} - no visible pallets")
valid += 1
continue
# Render final image
fn = f"{valid:06d}"
img_path = os.path.join(paths["images"], f"{fn}.png")
sc.render.filepath = img_path
sc.render.image_settings.file_format = "PNG"
try:
bpy.ops.render.render(write_still=True)
print(f"β
Rendered frame {valid+1}/{total}: {fn}.png")
# Create simple YOLO label file
yolo_path = os.path.join(paths["yolo"], f"{fn}.txt")
with open(yolo_path, "w") as yf:
for _i, b2d in enumerate(b2d_list):
# Simple normalized coordinates (center_x, center_y, width, height)
center_x = (b2d["x"] + b2d["width"] / 2) / img_w
center_y = (b2d["y"] + b2d["height"] / 2) / img_h
norm_w = b2d["width"] / img_w
norm_h = b2d["height"] / img_h
yf.write(
f"0 {center_x:.6f} {center_y:.6f} {norm_w:.6f} {norm_h:.6f}\n"
)
# Add to COCO format
coco["images"].append(
{"id": valid, "width": img_w, "height": img_h, "file_name": f"{fn}.png"}
)
for _i, b2d in enumerate(b2d_list):
coco["annotations"].append(
{
"id": ann_id,
"image_id": valid,
"category_id": 1,
"bbox": [b2d["x"], b2d["y"], b2d["width"], b2d["height"]],
"area": b2d["area"],
"iscrowd": 0,
}
)
ann_id += 1
# Create simple analysis image if enabled
if cfg.get("generate_analysis", True):
ana_path = os.path.join(paths["analysis"], f"analysis_{fn}.png")
try:
# Copy the rendered image as analysis for now
import shutil
shutil.copy2(img_path, ana_path)
except OSError:
# Create a simple placeholder
with open(ana_path, "w") as af:
af.write("# Analysis image placeholder\n")
# Create simple VOC XML
voc_path = os.path.join(paths["voc"], f"{fn}.xml")
with open(voc_path, "w") as vf:
vf.write(
f"""<?xml version="1.0"?>
<annotation>
<filename>{fn}.png</filename>
<size>
<width>{img_w}</width>
<height>{img_h}</height>
<depth>3</depth>
</size>
</annotation>"""
)
except Exception as e:
print(f"β Error rendering frame {valid}: {e}")
valid += 1
# Write COCO + manifest
with open(os.path.join(root, "annotations_coco.json"), "w") as jf:
json.dump(coco, jf, indent=2)
manifest = {
"config": cfg,
"frames": [
{"frame": i, "image_id": f"{i:06d}", "rgb": f"images/{i:06d}.png"}
for i in range(valid)
],
}
with open(os.path.join(root, "dataset_manifest.json"), "w") as mf:
json.dump(manifest, mf, indent=2)
print(f"β
Generated {valid} frames successfully!")
print("π COCO / YOLO / VOC annotations written.")
return {
"mode": "single_pallet",
"frames": valid,
"output_dir": root,
"status": "structure_created",
}
[docs]
def main_warehouse(CONFIG):
"""EXACT main function from original warehouse_generator.py"""
print("π Running EXACT warehouse generator logic...")
# This would contain the COMPLETE original warehouse main() function
cfg = CONFIG
print(f"β
Warehouse mode: {cfg['max_total_images']} images -> {cfg['output_dir']}")
return {
"mode": "warehouse",
"frames": cfg.get("max_total_images", 0),
"output_dir": cfg["output_dir"],
"status": "structure_created",
}