Source code for atlas_gui.datasets.frames

from atlas_gui.datasets.dataset import DatasetBase
import os
import json
import numpy as np
import cv2


[docs] class Frames(DatasetBase): """ Dataset handler for image frame sequences stored in folders. Supports three folder structures (auto-detected): - Flat: images directly in the selected folder (single segment, single camera) - Subfolder: subfolders each containing images (each subfolder = one segment, single camera) - Multi-camera: subfolders containing camera subfolders with images (each top-level subfolder = one segment, camera subfolders = cameras) """ VALID_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif'} def __init__(self, config): super().__init__() self.config = config self.dataset_name = config['dataset_name'] self.annotation_dir = config['annotation_dir'] self.current_segment_idx = 0 self.structure = None # 'flat', 'subfolder', or 'multicam' self.segments = {} # idx -> segment directory path os.makedirs(self.annotation_dir, exist_ok=True) def _is_image(self, filename): return os.path.splitext(filename)[1].lower() in self.VALID_EXTENSIONS def _list_images(self, folder_path): """Return sorted list of image file paths in a folder.""" return sorted([ os.path.join(folder_path, f) for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f)) and self._is_image(f) ])
[docs] def load_data(self, file_path): self.file_path = file_path self._detect_structure(file_path) self.load_segments_info(file_path)
def _detect_structure(self, root): """Auto-detect the folder structure and populate self.segments.""" entries = sorted(os.listdir(root)) subdirs = [e for e in entries if os.path.isdir(os.path.join(root, e))] images = [e for e in entries if os.path.isfile(os.path.join(root, e)) and self._is_image(e)] if images and not subdirs: # Flat: images directly in root folder self.structure = 'flat' self.segments = {0: root} if not self.config.get('camera_keys'): self.config['camera_keys'] = ['camera'] elif subdirs: first_sub = os.path.join(root, subdirs[0]) sub_entries = os.listdir(first_sub) sub_subdirs = [e for e in sub_entries if os.path.isdir(os.path.join(first_sub, e))] sub_images = [e for e in sub_entries if os.path.isfile(os.path.join(first_sub, e)) and self._is_image(e)] if sub_images and not sub_subdirs: # Subfolder: each subfolder is a segment with images directly inside self.structure = 'subfolder' self.segments = { i: os.path.join(root, d) for i, d in enumerate(subdirs) } if not self.config.get('camera_keys'): self.config['camera_keys'] = ['camera'] elif sub_subdirs: # Multi-camera: subfolders contain camera subfolders self.structure = 'multicam' self.segments = { i: os.path.join(root, d) for i, d in enumerate(subdirs) } if not self.config.get('camera_keys'): self.config['camera_keys'] = sorted(sub_subdirs) else: raise ValueError( f"Could not detect folder structure in '{root}'. " "Expected images or subfolders containing images." )
[docs] def load_segments_info(self, file_path): self.segments_info = {} for idx, segment_path in self.segments.items(): if self.structure == 'multicam': first_cam = self.config['camera_keys'][0] cam_path = os.path.join(segment_path, first_cam) n_frames = len(self._list_images(cam_path)) else: n_frames = len(self._list_images(segment_path)) duration = (n_frames - 1) / self.config['fps'] if n_frames > 1 else 0.0 self.segments_info[str(idx)] = { 'index': idx, 'start': 0.0, 'end': duration, 'text': os.path.basename(segment_path) if self.structure != 'flat' else os.path.basename(file_path), 'uid': f"segment_{idx}", }
[docs] def get_segment(self, segment_idx): self.current_segment_idx = segment_idx segment_path = self.segments[segment_idx] result = {} if self.structure in ('flat', 'subfolder'): cam_key = self.config['camera_keys'][0] image_paths = self._list_images(segment_path) frames = [cv2.imread(p) for p in image_paths] result[cam_key] = np.stack(frames) elif self.structure == 'multicam': for cam_key in self.config['camera_keys']: cam_path = os.path.join(segment_path, cam_key) image_paths = self._list_images(cam_path) frames = [cv2.imread(p) for p in image_paths] result[cam_key] = np.stack(frames) self.data = result return result
[docs] def get_max_timestamp(self): return self.segments_info[str(self.current_segment_idx)]['end']
[docs] def write_annot_data(self, segment_idx, annots): annotations_path = os.path.join( self.annotation_dir, f"{self.dataset_name}_annotations.json" ) if os.path.exists(annotations_path): with open(annotations_path, "r") as f: all_annotations = json.load(f) else: all_annotations = {} def convert(obj): if isinstance(obj, np.generic): return obj.item() if isinstance(obj, dict): return {k: convert(v) for k, v in obj.items()} return obj uid = self.segments_info[str(segment_idx)]['uid'] all_annotations[uid] = convert(annots) with open(annotations_path, "w") as f: json.dump(all_annotations, f, indent=2)
[docs] def load_annot_data(self, segment_idx): uid = self.segments_info[str(segment_idx)]['uid'] annotations_path = os.path.join( self.annotation_dir, f"{self.dataset_name}_annotations.json" ) if not os.path.exists(annotations_path): return {} with open(annotations_path, 'r') as f: all_annotations = json.load(f) return all_annotations.get(uid, {})