Source code for atlas_gui.utils.reassemble_helpers

import h5py
import cv2
import numpy as np
import tempfile

[docs] def merge_dict_keys(data_dict): """ Reorganize robot state data by merging related keys into structured arrays. This function combines separate x, y, z components into single vector arrays and organizes joint data into logical groups. Args: data_dict: Dictionary containing robot state data with individual components Returns: Reorganized dictionary with merged components """ # Define which keys to merge and what their new names should be merge_map = { # "F_ext_base": ["F_ext_base_force_x", "F_ext_base_force_y", "F_ext_base_force_z"], # "F_ext_base_torque": ["F_ext_base_torque_x", "F_ext_base_torque_y", "F_ext_base_torque_z"], "compensated_base_force": ["compensated_base_force_x", "compensated_base_force_y", "compensated_base_force_z"], "compensated_base_torque": ["compensated_base_torque_x", "compensated_base_torque_y", "compensated_base_torque_z"], "measured_force": ["measured_force_x", "measured_force_y", "measured_force_z"], "measured_torque": ["measured_torque_x", "measured_torque_y", "measured_torque_z"] } # Linear + angular components for pose and velocity (combined into one) pose_keys = ["pose_x", "pose_y", "pose_z", "pose_qw", "pose_qx", "pose_qy", "pose_qz"] vel_keys = ["vel_x", "vel_y", "vel_z", "vel_qx", "vel_qy", "vel_qz"] # Joint positions, velocities, and efforts (combined by joints) joint_position_keys = [f"pos_joint{i}" for i in range(1, 10)] # Joint positions joint_velocity_keys = [f"vel_joint{i}" for i in range(1, 10)] # Joint velocities joint_effort_keys = [f"eff_joint{i}" for i in range(1, 10)] # Joint efforts new_dict = {} # Merge keys from merge_map (force, torque, etc.) for new_key, old_keys in merge_map.items(): merged_array = np.stack([data_dict[key] for key in old_keys], axis=-1) new_dict[new_key] = merged_array # Remove old keys after merging for key in old_keys: data_dict.pop(key, None) # Handle pose (linear and angular combined into one) pose_array = np.stack([data_dict[key] for key in pose_keys], axis=-1) new_dict["pose"] = pose_array # Remove old keys after merging for key in pose_keys: data_dict.pop(key, None) # Handle velocity (linear and angular combined into one) vel_array = np.stack([data_dict[key] for key in vel_keys], axis=-1) new_dict["velocity"] = vel_array # Remove old keys after merging for key in vel_keys: data_dict.pop(key, None) # Handle main joint positions (first 7 joints) joint_position_array = np.stack([data_dict[key] for key in joint_position_keys[:7]], axis=-1) new_dict["joint_positions"] = joint_position_array # Handle gripper position (last 2 joints) gripper_position_array = np.stack([data_dict[key] for key in joint_position_keys[7:]], axis=-1) new_dict["gripper_positions"] = gripper_position_array # Remove old keys after merging for key in joint_position_keys: data_dict.pop(key, None) # Handle main joint velocities (first 7 joints) joint_velocity_array = np.stack([data_dict[key] for key in joint_velocity_keys[:7]], axis=-1) new_dict["joint_velocities"] = joint_velocity_array # Handle gripper velocities (last 2 joints) gripper_velocity_array = np.stack([data_dict[key] for key in joint_velocity_keys[7:]], axis=-1) new_dict["gripper_velocities"] = gripper_velocity_array # Remove old keys after merging for key in joint_velocity_keys: data_dict.pop(key, None) # Handle main joint efforts (first 7 joints) joint_effort_array = np.stack([data_dict[key] for key in joint_effort_keys[:7]], axis=-1) new_dict["joint_efforts"] = joint_effort_array # Handle gripper efforts (last 2 joints) gripper_effort_array = np.stack([data_dict[key] for key in joint_effort_keys[7:]], axis=-1) new_dict["gripper_efforts"] = gripper_effort_array # Remove old keys after merging for key in joint_effort_keys: data_dict.pop(key, None) # Add other untouched keys back to the new dictionary new_dict.update(data_dict) return new_dict
[docs] def load_segments_info(file_path): """ Load only the segments_info from an h5 file. Args: file_path (str): Path to the h5 file Returns: The segments_info data from the h5 file """ def recursively_convert_to_dict(obj): if isinstance(obj, h5py.Group): return {key: recursively_convert_to_dict(obj[key]) for key in obj.keys()} elif isinstance(obj, h5py.Dataset): return obj[()] # Convert dataset to numpy array or other types else: raise TypeError("Unknown object type") # Open the HDF5 file with h5py.File(file_path, 'r') as h5_file: if 'segments_info' not in h5_file: raise KeyError("No segments_info found in the h5 file") return recursively_convert_to_dict(h5_file['segments_info'])
[docs] def mp4_blob_to_numpy_interval(binary_blob: bytes, frame_indices: list) -> np.ndarray: """ Convert MP4 binary blob to numpy array, loading only specified frames using OpenCV for better performance. Args: binary_blob (bytes): MP4 encoded video as bytes frame_indices (list): List of frame indices to extract Returns: np.ndarray: Array of decoded frames at specified indices """ # Sort indices for sequential access sorted_indices = sorted(frame_indices) # Create a temporary file to write the video with tempfile.NamedTemporaryFile(suffix='.mp4', delete=True) as temp_file: temp_file.write(binary_blob) temp_file.flush() # Open video with OpenCV cap = cv2.VideoCapture(temp_file.name) frames = [] current_frame = 0 last_needed = max(sorted_indices) for idx in sorted_indices: # Skip frames until we reach the desired index if idx > current_frame: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) current_frame = idx ret, frame = cap.read() if ret: # Convert BGR to RGB (OpenCV uses BGR by default) frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(frame) current_frame += 1 else: break # Stop if we've processed the last needed frame if current_frame > last_needed: break cap.release() return np.array(frames)
[docs] def load_h5_time_interval(file_path, start_time, end_time, skip_no_timestamps=True): """ Load data from h5 file for a specified time interval, maintaining original dictionary structure. Args: file_path (str): Path to the h5 file start_time (float): Start time in seconds end_time (float): End time in seconds skip_no_timestamps (bool): If True, skip loading data without corresponding timestamps. If False, load the entire data for keys without timestamps. Returns: dict: Dictionary containing the data within the specified time interval """ def recursively_convert_to_dict(obj, timestamps_dict=None, time_indices=None): if isinstance(obj, h5py.Group): result = {} for key in obj.keys(): # Skip data without timestamps if specified if skip_no_timestamps and timestamps_dict is not None: # if obj.name == '/' and key not in timestamps_dict.keys(): #key not in ['timestamps']: # For root level if key not in [*timestamps_dict.keys(), 'robot_state']: continue converted = recursively_convert_to_dict(obj[key], timestamps_dict, time_indices) if converted is not None: # Only add non-None results result[key] = converted return result elif isinstance(obj, h5py.Dataset): data = obj[()] # Check if this data has corresponding timestamps # data_key = obj.name[1:] # Remove leading '/' data_key = obj.name.split("/")[-1] if timestamps_dict is not None and data_key not in timestamps_dict and skip_no_timestamps: if obj.parent.name != '/timestamps': # Don't skip timestamp arrays themselves return None # Handle video data if isinstance(data, np.void): if timestamps_dict is not None and data_key in timestamps_dict: indices = time_indices[data_key] return mp4_blob_to_numpy_interval(data, indices) return data if not skip_no_timestamps else None # Handle numerical data else: if timestamps_dict is not None and data_key in timestamps_dict: indices = time_indices[data_key] return data[indices] return data if not skip_no_timestamps else None else: raise TypeError(f"Unknown object type: {type(obj)}") def get_time_indices(timestamps, start_time, end_time): return np.where((timestamps >= start_time) & (timestamps <= end_time))[0] with h5py.File(file_path, 'r') as h5_file: # First, get all timestamps and calculate indices timestamps_dict = {} time_indices = {} for key in h5_file['timestamps'].keys(): timestamps = h5_file['timestamps'][key][()] indices = get_time_indices(timestamps, start_time, end_time) if len(indices) > 0: timestamps_dict[key] = timestamps[indices] time_indices[key] = indices # Now convert the whole file with time filtering data = recursively_convert_to_dict(h5_file, timestamps_dict, time_indices) # Filter timestamps data['timestamps'] = timestamps_dict return data
[docs] def save_data_to_h5(file_path, data): """ Save modified data back to the original HDF5 file. Args: file_path (str): Path to the original HDF5 file. data (dict): Modified data dictionary (including additional keys). """ def recursively_save(group, data_dict): for key, value in data_dict.items(): if isinstance(value, dict): # Recursively handle groups (sub-dictionaries) if key not in group: subgroup = group.create_group(key) else: subgroup = group[key] recursively_save(subgroup, value) else: # If dataset exists, delete & overwrite it if key in group: del group[key] # Create new dataset group.create_dataset(key, data=value) with h5py.File(file_path, 'r+') as h5_file: recursively_save(h5_file, data)