代码拉取完成,页面将自动刷新
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
from Utils import *
import json,os,sys
BOP_LIST = ['lmo','tless','ycbv','hb','tudl','icbin','itodd']
BOP_DIR = os.getenv('BOP_DIR')
def get_bop_reader(video_dir, zfar=np.inf):
if 'ycbv' in video_dir or 'YCB' in video_dir:
return YcbVideoReader(video_dir, zfar=zfar)
if 'lmo' in video_dir or 'LINEMOD-O' in video_dir:
return LinemodOcclusionReader(video_dir, zfar=zfar)
if 'tless' in video_dir or 'TLESS' in video_dir:
return TlessReader(video_dir, zfar=zfar)
if 'hb' in video_dir:
return HomebrewedReader(video_dir, zfar=zfar)
if 'tudl' in video_dir:
return TudlReader(video_dir, zfar=zfar)
if 'icbin' in video_dir:
return IcbinReader(video_dir, zfar=zfar)
if 'itodd' in video_dir:
return ItoddReader(video_dir, zfar=zfar)
else:
raise RuntimeError
def get_bop_video_dirs(dataset):
if dataset=='ycbv':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/ycbv/test/*'))
elif dataset=='lmo':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/lmo/lmo_test_bop19/test/*'))
elif dataset=='tless':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/tless/tless_test_primesense_bop19/test_primesense/*'))
elif dataset=='hb':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/hb/hb_test_primesense_bop19/test_primesense/*'))
elif dataset=='tudl':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/tudl/tudl_test_bop19/test/*'))
elif dataset=='icbin':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/icbin/icbin_test_bop19/test/*'))
elif dataset=='itodd':
video_dirs = sorted(glob.glob(f'{BOP_DIR}/itodd/itodd_test_bop19/test/*'))
else:
raise RuntimeError
return video_dirs
class YcbineoatReader:
def __init__(self,video_dir, downscale=1, shorter_side=None, zfar=np.inf):
self.video_dir = video_dir
self.downscale = downscale
self.zfar = zfar
self.color_files = sorted(glob.glob(f"{self.video_dir}/rgb/*.png"))
self.K = np.loadtxt(f'{video_dir}/cam_K.txt').reshape(3,3)
self.id_strs = []
for color_file in self.color_files:
id_str = os.path.basename(color_file).replace('.png','')
self.id_strs.append(id_str)
self.H,self.W = cv2.imread(self.color_files[0]).shape[:2]
if shorter_side is not None:
self.downscale = shorter_side/min(self.H, self.W)
self.H = int(self.H*self.downscale)
self.W = int(self.W*self.downscale)
self.K[:2] *= self.downscale
self.gt_pose_files = sorted(glob.glob(f'{self.video_dir}/annotated_poses/*'))
self.videoname_to_object = {
'bleach0': "021_bleach_cleanser",
'bleach_hard_00_03_chaitanya': "021_bleach_cleanser",
'cracker_box_reorient': '003_cracker_box',
'cracker_box_yalehand0': '003_cracker_box',
'mustard0': '006_mustard_bottle',
'mustard_easy_00_02': '006_mustard_bottle',
'sugar_box1': '004_sugar_box',
'sugar_box_yalehand0': '004_sugar_box',
'tomato_soup_can_yalehand0': '005_tomato_soup_can',
}
def get_video_name(self):
return self.video_dir.split('/')[-1]
def __len__(self):
return len(self.color_files)
def get_gt_pose(self,i):
try:
pose = np.loadtxt(self.gt_pose_files[i]).reshape(4,4)
return pose
except:
logging.info("GT pose not found, return None")
return None
def get_color(self,i):
color = imageio.imread(self.color_files[i])[...,:3]
color = cv2.resize(color, (self.W,self.H), interpolation=cv2.INTER_NEAREST)
return color
def get_mask(self,i):
mask = cv2.imread(self.color_files[i].replace('rgb','masks'),-1)
if len(mask.shape)==3:
for c in range(3):
if mask[...,c].sum()>0:
mask = mask[...,c]
break
mask = cv2.resize(mask, (self.W,self.H), interpolation=cv2.INTER_NEAREST).astype(bool).astype(np.uint8)
return mask
def get_depth(self,i):
depth = cv2.imread(self.color_files[i].replace('rgb','depth'),-1)/1e3
depth = cv2.resize(depth, (self.W,self.H), interpolation=cv2.INTER_NEAREST)
depth[(depth<0.1) | (depth>=self.zfar)] = 0
return depth
def get_xyz_map(self,i):
depth = self.get_depth(i)
xyz_map = depth2xyzmap(depth, self.K)
return xyz_map
def get_occ_mask(self,i):
hand_mask_file = self.color_files[i].replace('rgb','masks_hand')
occ_mask = np.zeros((self.H,self.W), dtype=bool)
if os.path.exists(hand_mask_file):
occ_mask = occ_mask | (cv2.imread(hand_mask_file,-1)>0)
right_hand_mask_file = self.color_files[i].replace('rgb','masks_hand_right')
if os.path.exists(right_hand_mask_file):
occ_mask = occ_mask | (cv2.imread(right_hand_mask_file,-1)>0)
occ_mask = cv2.resize(occ_mask, (self.W,self.H), interpolation=cv2.INTER_NEAREST)
return occ_mask.astype(np.uint8)
def get_gt_mesh(self):
ob_name = self.videoname_to_object[self.get_video_name()]
YCB_VIDEO_DIR = os.getenv('YCB_VIDEO_DIR')
mesh = trimesh.load(f'{YCB_VIDEO_DIR}/models/{ob_name}/textured_simple.obj')
return mesh
class BopBaseReader:
def __init__(self, base_dir, zfar=np.inf, resize=1):
self.base_dir = base_dir
self.resize = resize
self.dataset_name = None
self.color_files = sorted(glob.glob(f"{self.base_dir}/rgb/*"))
if len(self.color_files)==0:
self.color_files = sorted(glob.glob(f"{self.base_dir}/gray/*"))
self.zfar = zfar
self.K_table = {}
with open(f'{self.base_dir}/scene_camera.json','r') as ff:
info = json.load(ff)
for k in info:
self.K_table[f'{int(k):06d}'] = np.array(info[k]['cam_K']).reshape(3,3)
self.bop_depth_scale = info[k]['depth_scale']
if os.path.exists(f'{self.base_dir}/scene_gt.json'):
with open(f'{self.base_dir}/scene_gt.json','r') as ff:
self.scene_gt = json.load(ff)
self.scene_gt = copy.deepcopy(self.scene_gt) # Release file handle to be pickle-able by joblib
assert len(self.scene_gt)==len(self.color_files)
else:
self.scene_gt = None
self.make_id_strs()
def make_scene_ob_ids_dict(self):
with open(f'{BOP_DIR}/{self.dataset_name}/test_targets_bop19.json','r') as ff:
self.scene_ob_ids_dict = {}
data = json.load(ff)
for d in data:
if d['scene_id']==self.get_video_id():
id_str = f"{d['im_id']:06d}"
if id_str not in self.scene_ob_ids_dict:
self.scene_ob_ids_dict[id_str] = []
self.scene_ob_ids_dict[id_str] += [d['obj_id']]*d['inst_count']
def get_K(self, i_frame):
K = self.K_table[self.id_strs[i_frame]]
if self.resize!=1:
K[:2,:2] *= self.resize
return K
def get_video_dir(self):
video_id = int(self.base_dir.rstrip('/').split('/')[-1])
return video_id
def make_id_strs(self):
self.id_strs = []
for i in range(len(self.color_files)):
name = os.path.basename(self.color_files[i]).split('.')[0]
self.id_strs.append(name)
def get_instance_ids_in_image(self, i_frame:int):
ob_ids = []
if self.scene_gt is not None:
name = int(os.path.basename(self.color_files[i_frame]).split('.')[0])
for k in self.scene_gt[str(name)]:
ob_ids.append(k['obj_id'])
elif self.scene_ob_ids_dict is not None:
return np.array(self.scene_ob_ids_dict[self.id_strs[i_frame]])
else:
mask_dir = os.path.dirname(self.color_files[0]).replace('rgb','mask_visib')
id_str = self.id_strs[i_frame]
mask_files = sorted(glob.glob(f'{mask_dir}/{id_str}_*.png'))
ob_ids = []
for mask_file in mask_files:
ob_id = int(os.path.basename(mask_file).split('.')[0].split('_')[1])
ob_ids.append(ob_id)
ob_ids = np.asarray(ob_ids)
return ob_ids
def get_gt_mesh_file(self, ob_id):
raise RuntimeError("You should override this")
def get_color(self,i):
color = imageio.imread(self.color_files[i])
if len(color.shape)==2:
color = np.tile(color[...,None], (1,1,3)) # Gray to RGB
if self.resize!=1:
color = cv2.resize(color, fx=self.resize, fy=self.resize, dsize=None)
return color
def get_depth(self,i, filled=False):
if filled:
depth_file = self.color_files[i].replace('rgb','depth_filled')
depth_file = f'{os.path.dirname(depth_file)}/0{os.path.basename(depth_file)}'
depth = cv2.imread(depth_file,-1)/1e3
else:
depth_file = self.color_files[i].replace('rgb','depth').replace('gray','depth')
depth = cv2.imread(depth_file,-1)*1e-3*self.bop_depth_scale
if self.resize!=1:
depth = cv2.resize(depth, fx=self.resize, fy=self.resize, dsize=None, interpolation=cv2.INTER_NEAREST)
depth[depth<0.1] = 0
depth[depth>self.zfar] = 0
return depth
def get_xyz_map(self,i):
depth = self.get_depth(i)
xyz_map = depth2xyzmap(depth, self.get_K(i))
return xyz_map
def get_mask(self, i_frame:int, ob_id:int, type='mask_visib'):
'''
@type: mask_visib (only visible part) / mask (projected mask from whole model)
'''
pos = 0
name = int(os.path.basename(self.color_files[i_frame]).split('.')[0])
if self.scene_gt is not None:
for k in self.scene_gt[str(name)]:
if k['obj_id']==ob_id:
break
pos += 1
mask_file = f'{self.base_dir}/{type}/{name:06d}_{pos:06d}.png'
if not os.path.exists(mask_file):
logging.info(f'{mask_file} not found')
return None
else:
# mask_dir = os.path.dirname(self.color_files[0]).replace('rgb',type)
# mask_file = f'{mask_dir}/{self.id_strs[i_frame]}_{ob_id:06d}.png'
raise RuntimeError
mask = cv2.imread(mask_file, -1)
if self.resize!=1:
mask = cv2.resize(mask, fx=self.resize, fy=self.resize, dsize=None, interpolation=cv2.INTER_NEAREST)
return mask>0
def get_gt_mesh(self, ob_id:int):
mesh_file = self.get_gt_mesh_file(ob_id)
mesh = trimesh.load(mesh_file)
mesh.vertices *= 1e-3
return mesh
def get_model_diameter(self, ob_id):
dir = os.path.dirname(self.get_gt_mesh_file(self.ob_ids[0]))
info_file = f'{dir}/models_info.json'
with open(info_file,'r') as ff:
info = json.load(ff)
return info[str(ob_id)]['diameter']/1e3
def get_gt_poses(self, i_frame, ob_id):
gt_poses = []
name = int(self.id_strs[i_frame])
for i_k, k in enumerate(self.scene_gt[str(name)]):
if k['obj_id']==ob_id:
cur = np.eye(4)
cur[:3,:3] = np.array(k['cam_R_m2c']).reshape(3,3)
cur[:3,3] = np.array(k['cam_t_m2c'])/1e3
gt_poses.append(cur)
return np.asarray(gt_poses).reshape(-1,4,4)
def get_gt_pose(self, i_frame:int, ob_id, mask=None, use_my_correction=False):
ob_in_cam = np.eye(4)
best_iou = -np.inf
best_gt_mask = None
name = int(self.id_strs[i_frame])
for i_k, k in enumerate(self.scene_gt[str(name)]):
if k['obj_id']==ob_id:
cur = np.eye(4)
cur[:3,:3] = np.array(k['cam_R_m2c']).reshape(3,3)
cur[:3,3] = np.array(k['cam_t_m2c'])/1e3
if mask is not None: # When multi-instance exists, use mask to determine which one
gt_mask = cv2.imread(f'{self.base_dir}/mask_visib/{self.id_strs[i_frame]}_{i_k:06d}.png', -1).astype(bool)
intersect = (gt_mask*mask).astype(bool)
union = (gt_mask+mask).astype(bool)
iou = float(intersect.sum())/union.sum()
if iou>best_iou:
best_iou = iou
best_gt_mask = gt_mask
ob_in_cam = cur
else:
ob_in_cam = cur
break
if use_my_correction:
if 'ycb' in self.base_dir.lower() and 'train_real' in self.color_files[i_frame]:
video_id = self.get_video_id()
if ob_id==1:
if video_id in [12,13,14,17,24]:
ob_in_cam = ob_in_cam@self.symmetry_tfs[ob_id][1]
return ob_in_cam
def load_symmetry_tfs(self):
dir = os.path.dirname(self.get_gt_mesh_file(self.ob_ids[0]))
info_file = f'{dir}/models_info.json'
with open(info_file,'r') as ff:
info = json.load(ff)
self.symmetry_tfs = {}
self.symmetry_info_table = {}
for ob_id in self.ob_ids:
self.symmetry_info_table[ob_id] = info[str(ob_id)]
self.symmetry_tfs[ob_id] = symmetry_tfs_from_info(info[str(ob_id)], rot_angle_discrete=5)
self.geometry_symmetry_info_table = copy.deepcopy(self.symmetry_info_table)
def get_video_id(self):
return int(self.base_dir.split('/')[-1])
class LinemodOcclusionReader(BopBaseReader):
def __init__(self,base_dir='/mnt/9a72c439-d0a7-45e8-8d20-d7a235d02763/DATASET/LINEMOD-O/lmo_test_all/test/000002', zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'lmo'
self.K = list(self.K_table.values())[0]
self.ob_ids = [1,5,6,8,9,10,11,12]
self.ob_id_to_names = {
1: 'ape',
2: 'benchvise',
3: 'bowl',
4: 'camera',
5: 'water_pour',
6: 'cat',
7: 'cup',
8: 'driller',
9: 'duck',
10: 'eggbox',
11: 'glue',
12: 'holepuncher',
13: 'iron',
14: 'lamp',
15: 'phone',
}
self.load_symmetry_tfs()
def get_gt_mesh_file(self, ob_id):
mesh_dir = f'{BOP_DIR}/{self.dataset_name}/models/obj_{ob_id:06d}.ply'
return mesh_dir
class LinemodReader(LinemodOcclusionReader):
def __init__(self, base_dir='/mnt/9a72c439-d0a7-45e8-8d20-d7a235d02763/DATASET/LINEMOD/lm_test_all/test/000001', zfar=np.inf, split=None):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'lm'
if split is not None: # train/test
with open(f'/mnt/9a72c439-d0a7-45e8-8d20-d7a235d02763/DATASET/LINEMOD/Linemod_preprocessed/data/{self.get_video_id():02d}/{split}.txt','r') as ff:
lines = ff.read().splitlines()
self.color_files = []
for line in lines:
id = int(line)
self.color_files.append(f'{self.base_dir}/rgb/{id:06d}.png')
self.make_id_strs()
self.ob_ids = np.setdiff1d(np.arange(1,16), np.array([7,3])).tolist() # Exclude bowl and mug
self.load_symmetry_tfs()
def get_gt_mesh_file(self, ob_id):
root = self.base_dir
while 1:
if os.path.exists(f'{root}/lm_models'):
mesh_dir = f'{root}/lm_models/models/obj_{ob_id:06d}.ply'
break
else:
root = os.path.abspath(f'{root}/../')
return mesh_dir
def get_reconstructed_mesh(self, ob_id, ref_view_dir):
mesh = trimesh.load(os.path.abspath(f'{ref_view_dir}/ob_{ob_id:07d}/model/model.obj'))
return mesh
class YcbVideoReader(BopBaseReader):
def __init__(self, base_dir, zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'ycbv'
self.K = list(self.K_table.values())[0]
self.make_id_strs()
self.ob_ids = np.arange(1,22).astype(int).tolist()
YCB_VIDEO_DIR = os.getenv('YCB_VIDEO_DIR')
names = sorted(os.listdir(f'{YCB_VIDEO_DIR}/models/'))
self.ob_id_to_names = {}
self.name_to_ob_id = {}
for i,ob_id in enumerate(self.ob_ids):
self.ob_id_to_names[ob_id] = names[i]
self.name_to_ob_id[names[i]] = ob_id
if 'BOP' not in self.base_dir:
with open(f'{self.base_dir}/../../keyframe.txt','r') as ff:
self.keyframe_lines = ff.read().splitlines()
self.load_symmetry_tfs()
for ob_id in self.ob_ids:
if ob_id in [1,4,6,18]: # Cylinder
self.geometry_symmetry_info_table[ob_id] = {
'symmetries_continuous': [
{'axis':[0,0,1], 'offset':[0,0,0]},
],
'symmetries_discrete': euler_matrix(0, np.pi, 0).reshape(1,4,4).tolist(),
}
elif ob_id in [13]:
self.geometry_symmetry_info_table[ob_id] = {
'symmetries_continuous': [
{'axis':[0,0,1], 'offset':[0,0,0]},
],
}
elif ob_id in [2,3,9,21]: # Rectangle box
tfs = []
for rz in [0, np.pi]:
for rx in [0,np.pi]:
for ry in [0,np.pi]:
tfs.append(euler_matrix(rx, ry, rz))
self.geometry_symmetry_info_table[ob_id] = {
'symmetries_discrete': np.asarray(tfs).reshape(-1,4,4).tolist(),
}
else:
pass
def get_gt_mesh_file(self, ob_id):
if 'BOP' in self.base_dir:
mesh_file = os.path.abspath(f'{self.base_dir}/../../ycbv_models/models/obj_{ob_id:06d}.ply')
else:
mesh_file = f'{self.base_dir}/../../ycbv_models/models/obj_{ob_id:06d}.ply'
return mesh_file
def get_gt_mesh(self, ob_id:int, get_posecnn_version=False):
if get_posecnn_version:
YCB_VIDEO_DIR = os.getenv('YCB_VIDEO_DIR')
mesh = trimesh.load(f'{YCB_VIDEO_DIR}/models/{self.ob_id_to_names[ob_id]}/textured_simple.obj')
return mesh
mesh_file = self.get_gt_mesh_file(ob_id)
mesh = trimesh.load(mesh_file, process=False)
mesh.vertices *= 1e-3
tex_file = mesh_file.replace('.ply','.png')
if os.path.exists(tex_file):
from PIL import Image
im = Image.open(tex_file)
uv = mesh.visual.uv
material = trimesh.visual.texture.SimpleMaterial(image=im)
color_visuals = trimesh.visual.TextureVisuals(uv=uv, image=im, material=material)
mesh.visual = color_visuals
return mesh
def get_reconstructed_mesh(self, ob_id, ref_view_dir):
mesh = trimesh.load(os.path.abspath(f'{ref_view_dir}/ob_{ob_id:07d}/model/model.obj'))
return mesh
def get_transform_reconstructed_to_gt_model(self, ob_id):
out = np.eye(4)
return out
def get_visible_cloud(self, ob_id):
file = os.path.abspath(f'{self.base_dir}/../../models/{self.ob_id_to_names[ob_id]}/visible_cloud.ply')
pcd = o3d.io.read_point_cloud(file)
return pcd
def is_keyframe(self, i):
color_file = self.color_files[i]
video_id = self.get_video_id()
frame_id = int(os.path.basename(color_file).split('.')[0])
key = f'{video_id:04d}/{frame_id:06d}'
return (key in self.keyframe_lines)
class TlessReader(BopBaseReader):
def __init__(self, base_dir, zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'tless'
self.ob_ids = np.arange(1,31).astype(int).tolist()
self.load_symmetry_tfs()
def get_gt_mesh_file(self, ob_id):
mesh_file = f'{self.base_dir}/../../../models_cad/obj_{ob_id:06d}.ply'
return mesh_file
def get_gt_mesh(self, ob_id):
mesh = trimesh.load(self.get_gt_mesh_file(ob_id))
mesh.vertices *= 1e-3
mesh = trimesh_add_pure_colored_texture(mesh, color=np.ones((3))*200)
return mesh
class HomebrewedReader(BopBaseReader):
def __init__(self, base_dir, zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'hb'
self.ob_ids = np.arange(1,34).astype(int).tolist()
self.load_symmetry_tfs()
self.make_scene_ob_ids_dict()
def get_gt_mesh_file(self, ob_id):
mesh_file = f'{self.base_dir}/../../../hb_models/models/obj_{ob_id:06d}.ply'
return mesh_file
def get_gt_pose(self, i_frame:int, ob_id, use_my_correction=False):
logging.info("WARN HomeBrewed doesn't have GT pose")
return np.eye(4)
class ItoddReader(BopBaseReader):
def __init__(self, base_dir, zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'itodd'
self.make_id_strs()
self.ob_ids = np.arange(1,29).astype(int).tolist()
self.load_symmetry_tfs()
self.make_scene_ob_ids_dict()
def get_gt_mesh_file(self, ob_id):
mesh_file = f'{self.base_dir}/../../../itodd_models/models/obj_{ob_id:06d}.ply'
return mesh_file
class IcbinReader(BopBaseReader):
def __init__(self, base_dir, zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'icbin'
self.ob_ids = np.arange(1,3).astype(int).tolist()
self.load_symmetry_tfs()
def get_gt_mesh_file(self, ob_id):
mesh_file = f'{self.base_dir}/../../../icbin_models/models/obj_{ob_id:06d}.ply'
return mesh_file
class TudlReader(BopBaseReader):
def __init__(self, base_dir, zfar=np.inf):
super().__init__(base_dir, zfar=zfar)
self.dataset_name = 'tudl'
self.ob_ids = np.arange(1,4).astype(int).tolist()
self.load_symmetry_tfs()
def get_gt_mesh_file(self, ob_id):
mesh_file = f'{self.base_dir}/../../../tudl_models/models/obj_{ob_id:06d}.ply'
return mesh_file
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。