代码拉取完成,页面将自动刷新
import os
from tqdm import tqdm
from time import time
from fastprogress import progress_bar
import gc
import numpy as np
import h5py
from IPython.display import clear_output
from collections import defaultdict
from copy import deepcopy
# CV/ML
import cv2
import torch
import torch.nn.functional as F
import kornia as K
import kornia.feature as KF
from PIL import Image
import timm
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform
# 3D reconstruction
import pycolmap
from utils import *
# Function to create a submission file.
def create_submission(out_results, data_dict):
with open(f'submission.csv', 'w') as f:
f.write('image_path,dataset,scene,rotation_matrix,translation_vector\n')
for dataset in data_dict:
if dataset in out_results:
res = out_results[dataset]
else:
res = {}
for scene in data_dict[dataset]:
if scene in res:
scene_res = res[scene]
else:
scene_res = {"R":{}, "t":{}}
for image in data_dict[dataset][scene]:
if image in scene_res:
print (image)
R = scene_res[image]['R'].reshape(-1)
T = scene_res[image]['t'].reshape(-1)
else:
R = np.eye(3).reshape(-1)
T = np.zeros((3))
f.write(f'{image},{dataset},{scene},{arr_to_str(R)},{arr_to_str(T)}\n')
dry=False
printinfo()
LOCAL_FEATURE = 'KeyNetAffNetHardNet'# TODO local future
device=torch.device('cuda')
# Can be LoFTR, KeyNetAffNetHardNet, or DISK future
src = './input'
data_dict = {}
with open(f'{src}/test/test_labels.csv', 'r') as f: # append image and scence # todo
for i, l in enumerate(f):
# Skip header.
if l and i > 0:
image, dataset, scene, _, _ = l.strip().split(',')
if dataset not in data_dict:
data_dict[dataset] = {}
if scene not in data_dict[dataset]:
data_dict[dataset][scene] = []
data_dict[dataset][scene].append(image)
for dataset in data_dict:
for scene in data_dict[dataset]:
print(f'{dataset} / {scene} -> {len(data_dict[dataset][scene])} images')
out_results = {}
timings = {"shortlisting":[],
"feature_detection": [],
"feature_matching":[],
"RANSAC": [],
"Reconstruction": []}
gc.collect()
datasets = []
for dataset in data_dict:
datasets.append(dataset)
for dataset in datasets:
print(dataset)
if dataset not in out_results:
out_results[dataset] = {}
for scene in data_dict[dataset]:
print(scene)
# Fail gently if the notebook has not been submitted and the test data is not populated.
# You may want to run this on the training data in that case? train
img_dir = f'{src}/test/{dataset}/{scene}/images' # TODO
#img_dir_train = f'{src}/train/{dataset}/{scene}/images' # TODO
if not os.path.exists(img_dir):
continue
# Wrap the meaty part in a try-except block.
try:
out_results[dataset][scene] = {}
img_fnames = [f'{src}/test/{x}' for x in data_dict[dataset][scene]]
img_fnames_train = [f'{src}/train/{x}' for x in data_dict[dataset][scene]]# TODO
print (f"Got {len(img_fnames)} images")
feature_dir = f'featureout/{dataset}_{scene}'
if not os.path.isdir(feature_dir):
os.makedirs(feature_dir, exist_ok=True)
t=time()
index_pairs = get_image_pairs_shortlist(img_fnames,
sim_th = 0.5, # should be strict
min_pairs = 20, # we select at least min_pairs PER IMAGE with biggest similarity local
exhaustive_if_less = 20,
device=device)
t=time() -t
timings['shortlisting'].append(t)
print (f'{len(index_pairs)}, pairs to match, {t:.4f} sec')
gc.collect()
t=time()
if LOCAL_FEATURE != 'LoFTR':
detect_features(img_fnames,
2048,
feature_dir=feature_dir,
upright=True,
device=device,
resize_small_edge_to=600
)
gc.collect()
t=time() -t
timings['feature_detection'].append(t)
print(f'Features detected in {t:.4f} sec')
t=time()
match_features(img_fnames, index_pairs, feature_dir=feature_dir,device=device)
else:
match_loftr(img_fnames, index_pairs, feature_dir=feature_dir, device=device, resize_to_=(600, 800))
t=time() -t
timings['feature_matching'].append(t)
print(f'Features matched in {t:.4f} sec')
database_path = f'{feature_dir}/colmap.db'
if os.path.isfile(database_path):
os.remove(database_path)
gc.collect()
import_into_colmap(img_dir, feature_dir=feature_dir,database_path=database_path)
output_path = f'{feature_dir}/colmap_rec_{LOCAL_FEATURE}'
t=time()
pycolmap.match_exhaustive(database_path)
t=time() - t
timings['RANSAC'].append(t)
print(f'RANSAC in {t:.4f} sec')
t=time()
# By default colmap does not generate a reconstruction if less than 10 images are registered. Lower it to 3.
mapper_options = pycolmap.IncrementalMapperOptions()
mapper_options.min_model_size = 3
os.makedirs(output_path, exist_ok=True)
maps = pycolmap.incremental_mapping(database_path=database_path, image_path=img_dir, output_path=output_path, options=mapper_options)
print(maps)
#clear_output(wait=False)
t=time() - t
timings['Reconstruction'].append(t)
print(f'Reconstruction done in {t:.4f} sec')
imgs_registered = 0
best_idx = None
print ("Looking for the best reconstruction")
if isinstance(maps, dict):
for idx1, rec in maps.items():
print (idx1, rec.summary())
if len(rec.images) > imgs_registered:
imgs_registered = len(rec.images)
best_idx = idx1
if best_idx is not None:
print (maps[best_idx].summary())
for k, im in maps[best_idx].images.items():
key1 = f'{dataset}/{scene}/images/{im.name}'
out_results[dataset][scene][key1] = {}
out_results[dataset][scene][key1]["R"] = im.rotmat()
out_results[dataset][scene][key1]["t"] = im.tvec
print(f'Registered: {dataset} / {scene} -> {len(out_results[dataset][scene])} images')
print(f'Total: {dataset} / {scene} -> {len(data_dict[dataset][scene])} images')
create_submission(out_results, data_dict)
gc.collect()
except:
pass
if dry:
create_submission(out_results, data_dict)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。