diff --git a/.gitignore b/.gitignore index 3f7f92b..3ffcb78 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ downloads/ eggs/ .eggs/ lib/ +!3dhp_test/lib/ +!3dhp_test/lib/*.py lib64/ parts/ sdist/ @@ -45,9 +47,12 @@ htmlcov/ *.pkl *.h5 *.ckpt +animals/checkpoint/ +animals/dataset/animal3d/ +animals/dataset/control_animal3dlatest/ # Excluded directories pre_trained_models/ demo/predictions/ demo/images/ -**/predictions/ \ No newline at end of file +**/predictions/ diff --git a/3dhp_test/.gitignore b/3dhp_test/.gitignore new file mode 100644 index 0000000..679ba8d --- /dev/null +++ b/3dhp_test/.gitignore @@ -0,0 +1,10 @@ +__pycache__/ +*.py[cod] + +results/ + +*.pth +*.pt +*.ckpt + +dataset/*.npz diff --git a/3dhp_test/README.md b/3dhp_test/README.md new file mode 100644 index 0000000..5a44e90 --- /dev/null +++ b/3dhp_test/README.md @@ -0,0 +1,105 @@ +# MPI-INF-3DHP Test Evaluation + +This folder contains utilities for evaluating monocular 3D pose lifting models on the MPI-INF-3DHP test set. + +## Model and Weights + +By default, the 3DHP test uses the packaged human FMPose3D lifting model (`model_type=fmpose3d_humans`) and leaves `model_weights_path` empty so weights are downloaded automatically from Hugging Face Hub. + +`test_3dhp.sh` exposes the model and weights near the top of the script: + +```bash +model_type="fmpose3d_humans" +model_weights_path="" +model_path="" +``` + +To use local weights, set `model_weights_path` to your own checkpoint or to the human pretrained weights we provide on [Google Drive](https://drive.google.com/drive/folders/1aRZ6t_6IxSfM1nCTFOUXcYVaOk-5koGA?usp=sharing): + +```bash +model_weights_path="${SCRIPT_DIR}/pretrained/fmpose3d_h36m/FMpose3D_pretrained_weights.pth" +``` + +To use a local model definition instead of the packaged registry model, set `model_path` to a Python file that defines `Model`: + +```bash +model_path="${SCRIPT_DIR}/pretrained/fmpose3d_h36m/model_GAMLP.py" +``` + +When both `model_path` and `model_weights_path` are set, make sure the local model architecture matches the checkpoint. + +## Dataset Preparation + +`infer_3dhp.py` expects a processed MPI-INF-3DHP test file at: + +```bash +3dhp_test/dataset/data_test_3dhp.npz +``` + +The `.npz` file is not committed to this repository. Generate it from the official MPI-INF-3DHP test set annotations with `dataset/prepare_3dhp_test_npz.py`. + +### Get the official dataset + +Get MPI-INF-3DHP from the [official dataset website](https://vcai.mpi-inf.mpg.de/3dhp-dataset/) and follow its license and access instructions. The official package includes download scripts under `source/`; read the included `README.txt`, edit `source/conf.ig` as instructed, then run the test-set downloader: + +```bash +cd /path/to/mpi_inf_3dhp/source +bash get_testset.sh +``` + +After the script downloads and extracts the test set, set `${MPI_INF_3DHP_TEST_ROOT}` to the extracted test-set root. It should contain `TS1` through `TS6`: + +```text +/path/to/mpi_inf_3dhp/ + mpi_inf_3dhp_test_set/ + TS1/ + annot_data.mat + ... + TS2/ + annot_data.mat + ... + ... + TS6/ + annot_data.mat + ... +``` + +Verify that `TS1` through `TS6` each contain `annot_data.mat`: + +```bash +for subject in TS1 TS2 TS3 TS4 TS5 TS6; do + test -f "${MPI_INF_3DHP_TEST_ROOT}/${subject}/annot_data.mat" \ + && echo "${subject}: ok" \ + || echo "${subject}: missing annot_data.mat" +done +``` + +### Generate `data_test_3dhp.npz` + +Run from the repository root: + +```bash +python 3dhp_test/dataset/prepare_3dhp_test_npz.py \ + --test-root "${MPI_INF_3DHP_TEST_ROOT}" \ + --output 3dhp_test/dataset/data_test_3dhp.npz +``` + +It reads each `TS*/annot_data.mat` and writes a compressed npz with this schema: + +```text +data = { + "TS1": { + "data_2d": annot2, + "data_3d": univ_annot3, + "valid": valid_frame, + }, + ... + "TS6": ... +} +``` + +`ThreeDHPTestDataset` then applies the valid-frame mask, maps the 28-joint 3DHP layout to the 17-joint FMPose3D layout, converts 3D from millimeters to meters, root-centers joints 1-16 around joint 0, and normalizes the 2D coordinates. + +## Acknowledgement + +The MPI-INF-3DHP npz conversion is adapted from the preprocessing workflow in [P-STMO](https://github.com/paTRICK-swk/P-STMO). diff --git a/3dhp_test/dataset/prepare_3dhp_test_npz.py b/3dhp_test/dataset/prepare_3dhp_test_npz.py new file mode 100644 index 0000000..7dd6428 --- /dev/null +++ b/3dhp_test/dataset/prepare_3dhp_test_npz.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + +import argparse +from pathlib import Path + +import h5py +import numpy as np + + +DEFAULT_OUTPUT = Path(__file__).resolve().parent / "data_test_3dhp.npz" + + +def convert_test(test_root, output_path): + test_root = Path(test_root) + output_path = Path(output_path) + data_by_subject = {} + + for annot_path in sorted(test_root.glob("TS*/annot_data.mat")): + subject = annot_path.parent.name + print(f"loading {subject}...") + + with h5py.File(annot_path, "r") as data: + valid_frame = np.squeeze(data["valid_frame"][()]) + data_2d = np.squeeze(data["annot2"][()]) + data_3d = np.squeeze(data["univ_annot3"][()]) + + data_by_subject[subject] = { + "data_2d": data_2d, + "data_3d": data_3d, + "valid": valid_frame, + } + + if not data_by_subject: + raise FileNotFoundError(f"No test annot_data.mat files found under {test_root}") + + output_path.parent.mkdir(parents=True, exist_ok=True) + np.savez_compressed(output_path, data=data_by_subject) + print(f"saved {output_path}") + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Convert MPI-INF-3DHP test annotations to the FMPose3D 3DHP test npz." + ) + parser.add_argument( + "--test-root", + type=Path, + required=True, + help="Path containing TS1..TS6 folders from the official MPI-INF-3DHP test set.", + ) + parser.add_argument( + "--output", + type=Path, + default=DEFAULT_OUTPUT, + help="Output npz path consumed by 3dhp_test/infer_3dhp.py.", + ) + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + convert_test(args.test_root, args.output) diff --git a/3dhp_test/infer_3dhp.py b/3dhp_test/infer_3dhp.py new file mode 100644 index 0000000..ac3a575 --- /dev/null +++ b/3dhp_test/infer_3dhp.py @@ -0,0 +1,460 @@ +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + +import argparse +import importlib.util +import json +import logging +import os +import random +import sys +import time +from pathlib import Path + +import numpy as np +import torch +import torch.utils.data +from tqdm import tqdm + +ROOT = Path(__file__).resolve().parent +REPO_ROOT = ROOT.parent +for path in (ROOT, REPO_ROOT): + if str(path) not in sys.path: + sys.path.insert(0, str(path)) + +from fmpose3d.aggregation_methods import aggregation_RPEA_joint_level +from fmpose3d.models import get_model +from fmpose3d.utils.weights import resolve_weights_path + +from lib.camera import camera_params_for_subject +from lib.dataset_3dhp import ThreeDHPTestDataset +from lib.utils import AccumLoss, define_actions_3dhp, define_error_list +import lib.eval_cal as eval_cal + + +def str2bool(value): + if isinstance(value, bool): + return value + value = value.lower() + if value in {"true", "1", "yes", "y"}: + return True + if value in {"false", "0", "no", "n"}: + return False + raise argparse.ArgumentTypeError(f"Expected boolean value, got {value}") + + +def parse_args(argv=None): + parser = argparse.ArgumentParser(description="Clean 3DHP inference with processed test npz.") + parser.add_argument("--dataset-path", type=Path, default=ROOT / "dataset" / "data_test_3dhp.npz") + parser.add_argument( + "--model-path", + type=str, + default="", + help="Optional path to a Model definition. Defaults to the package FMPose3D human model.", + ) + parser.add_argument("--model-type", default="fmpose3d_humans", type=str) + parser.add_argument( + "--model-weights-path", + default="", + type=str, + help="Local checkpoint path. Empty downloads the weights for --model-type from Hugging Face.", + ) + parser.add_argument("--results-dir", type=Path, default=ROOT / "results") + parser.add_argument("--folder-name", type=str, default="") + parser.add_argument("--gpu", default="0", type=str) + parser.add_argument("--workers", default=8, type=int) + parser.add_argument("--batch-size", default=1024, type=int) + parser.add_argument("--frames", default=1, type=int, choices=[1], help="3DHP test evaluation uses single-frame samples; only 1 is supported.") + parser.add_argument("--layers", default=5, type=int) + parser.add_argument("--channel", default=512, type=int) + parser.add_argument("--d-hid", default=1024, type=int) + parser.add_argument("--token-dim", default=256, type=int) + parser.add_argument("--n-joints", default=17, type=int) + parser.add_argument("--dataset", default="3dhp_valid", type=str) + parser.add_argument("--actions", default="*", type=str, help="3DHP test annotations do not include action labels; only '*' is supported.") + parser.add_argument("--subjects-test", default="TS1,TS2,TS3,TS4,TS5,TS6", type=str) + parser.add_argument("--eval-sample-steps", default="2", type=str) + parser.add_argument("--num-hypothesis-list", default="1", type=str) + parser.add_argument("--topk", default=6, type=int) + parser.add_argument("--exp-temp", default=0.005, type=float) + parser.add_argument("--test-augmentation", default=True, type=str2bool) + parser.add_argument("--test-augmentation-flip-hypothesis", default=True, type=str2bool) + parser.add_argument("--max-batches", default=0, type=int, help="Smoke test limit. 0 means full evaluation.") + parser.add_argument("--manual-seed", default=1, type=int) + args = parser.parse_args(argv) + + args.pad = (args.frames - 1) // 2 + args.root_joint = 0 + args.train = 0 + args.test = True + args.keypoints = "gt_17_univ" + args.joints_left = [4, 5, 6, 11, 12, 13] + args.joints_right = [1, 2, 3, 14, 15, 16] + args.kps_left = args.joints_left + args.kps_right = args.joints_right + return args + + +def configure_reproducibility(seed): + random.seed(seed) + torch.manual_seed(seed) + np.random.seed(seed) + torch.cuda.manual_seed_all(seed) + torch.backends.cudnn.benchmark = False + torch.backends.cudnn.deterministic = True + + +def load_model_class(model_path, model_type): + if not model_path: + return get_model(model_type) + model_path = Path(model_path).resolve() + spec = importlib.util.spec_from_file_location(model_path.stem, model_path) + if spec is None or spec.loader is None: + raise ImportError(f"Could not load model definition from {model_path}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + if not hasattr(module, "Model"): + raise AttributeError(f"Model definition file {model_path} does not define a Model class") + return module.Model + + +def load_model_weights(model, model_weights_path, device): + pre_dict = torch.load(model_weights_path, map_location=device, weights_only=True) + try: + model.load_state_dict(pre_dict, strict=True) + except RuntimeError as exc: + raise RuntimeError(f"Checkpoint {model_weights_path} is incompatible with {model.__class__.__name__}") from exc + + +def get_device(gpu): + if gpu in {"", "-1", "cpu", "none", "None"}: + return torch.device("cpu") + if torch.cuda.is_available(): + return torch.device("cuda") + print(f"CUDA is not available; running on CPU instead of GPU {gpu}.") + return torch.device("cpu") + + +def camera_tensor_for_subjects(subjects, device, dtype=torch.float32): + cam_params = [camera_params_for_subject(subject) for subject in subjects] + return torch.tensor(cam_params, dtype=dtype, device=device) + + +def print_error(data_type, action_error_sum, is_train): + if data_type == "h36m" or data_type.startswith("3dhp"): + return print_error_action(action_error_sum, is_train, data_type) + return 0, 0, 0, 0 + + +def print_error_action(action_error_sum, is_train, data_type): + mean_error_each = {"p1": 0.0, "p2": 0.0, "pck": 0.0, "auc": 0.0} + mean_error_all = {"p1": AccumLoss(), "p2": AccumLoss(), "pck": AccumLoss(), "auc": AccumLoss()} + + if not is_train: + if data_type.startswith("3dhp"): + print("{0:=^12} {1:=^10} {2:=^8} {3:=^8} {4:=^8}".format("Action", "p#1 mm", "p#2 mm", "PCK", "AUC")) + logging.info("{0:=^12} {1:=^10} {2:=^8} {3:=^8} {4:=^8}".format("Action", "p#1 mm", "p#2 mm", "PCK", "AUC")) + else: + print("{0:=^12} {1:=^10} {2:=^8}".format("Action", "p#1 mm", "p#2 mm")) + + for action in action_error_sum.keys(): + if not is_train: + print("{0:<12} ".format(action), end="") + + mean_error_each["p1"] = action_error_sum[action]["p1"].avg * 1000.0 + mean_error_all["p1"].update(mean_error_each["p1"], 1) + mean_error_each["p2"] = action_error_sum[action]["p2"].avg * 1000.0 + mean_error_all["p2"].update(mean_error_each["p2"], 1) + mean_error_each["pck"] = action_error_sum[action]["pck"].avg * 100.0 + mean_error_all["pck"].update(mean_error_each["pck"], 1) + mean_error_each["auc"] = action_error_sum[action]["auc"].avg * 100.0 + mean_error_all["auc"].update(mean_error_each["auc"], 1) + + if is_train == 0: + if data_type.startswith("3dhp"): + print( + "{0:>6.2f} {1:>10.2f} {2:>10.2f} {3:>10.2f}".format( + mean_error_each["p1"], + mean_error_each["p2"], + mean_error_each["pck"], + mean_error_each["auc"], + ) + ) + logging.info( + "{0:<12} {1:>6.2f} {2:>10.2f} {3:>10.2f} {4:>10.2f}".format( + action, + mean_error_each["p1"], + mean_error_each["p2"], + mean_error_each["pck"], + mean_error_each["auc"], + ) + ) + else: + print("{0:>6.2f} {1:>10.2f}".format(mean_error_each["p1"], mean_error_each["p2"])) + + if is_train == 0: + if data_type.startswith("3dhp"): + print( + "{0:<12} {1:>6.2f} {2:>10.2f} {3:>10.2f} {4:>10.2f}".format( + "Average", + mean_error_all["p1"].avg, + mean_error_all["p2"].avg, + mean_error_all["pck"].avg, + mean_error_all["auc"].avg, + ) + ) + logging.info( + "{0:<12} {1:>6.2f} {2:>10.2f} {3:>10.2f} {4:>10.2f}".format( + "Average", + mean_error_all["p1"].avg, + mean_error_all["p2"].avg, + mean_error_all["pck"].avg, + mean_error_all["auc"].avg, + ) + ) + else: + print("{0:<12} {1:>6.2f} {2:>10.2f}".format("Average", mean_error_all["p1"].avg, mean_error_all["p2"].avg)) + + if data_type.startswith("3dhp"): + return mean_error_all["p1"].avg, mean_error_all["p2"].avg, mean_error_all["pck"].avg, mean_error_all["auc"].avg + return mean_error_all["p1"].avg, mean_error_all["p2"].avg, 0, 0 + + +def test(actions, dataloader, model, args, hypothesis_num=1): + model.eval() + eval_steps = sorted({int(s) for s in str(args.eval_sample_steps).split(",") if str(s).strip()}) + action_error_sum_multi = {s: define_error_list(actions) for s in eval_steps} + + print(f"\n{'=' * 80}") + print(f"Testing with {hypothesis_num} hypothesis(es), eval_steps: {eval_steps}") + print(f"{'=' * 80}\n") + + for i, data in enumerate(tqdm(dataloader)): + _, gt_3d, input_2d, action, subject, _ = data + input_2d = input_2d.contiguous().to(args.device, dtype=torch.float32) + gt_3d = gt_3d.contiguous().to(args.device, dtype=torch.float32) + + input_2d_nonflip = input_2d[:, 0] + input_2d_flip = input_2d[:, 1] if input_2d.size(1) > 1 else input_2d[:, 0] + + out_target = gt_3d.clone() + out_target[:, :, args.root_joint] = 0 + + def euler_sample(x2d, y_local, steps, model_3d): + dt = 1.0 / steps + for s in range(steps): + t_s = torch.full((gt_3d.size(0), 1, 1, 1), s * dt, device=gt_3d.device, dtype=gt_3d.dtype) + v_s = model_3d(x2d, y_local, t_s) + y_local = y_local + dt * v_s + return y_local + + if i == 0: + print(f"eval_steps: {eval_steps}, hypothesis_num: {hypothesis_num}") + + for s_keep in eval_steps: + list_hypothesis = [] + for _ in range(hypothesis_num): + y = torch.randn_like(gt_3d) + y_s = euler_sample(input_2d_nonflip, y, s_keep, model) + + if args.test_augmentation_flip_hypothesis: + y_flip = torch.randn_like(gt_3d) + y_flip[:, :, :, 0] *= -1 + y_flip[:, :, args.joints_left + args.joints_right, :] = y_flip[ + :, :, args.joints_right + args.joints_left, : + ] + y_flip_s = euler_sample(input_2d_flip, y_flip, s_keep, model) + y_flip_s[:, :, :, 0] *= -1 + y_flip_s[:, :, args.joints_left + args.joints_right, :] = y_flip_s[ + :, :, args.joints_right + args.joints_left, : + ] + y_flip_s_frame = y_flip_s[:, args.pad].unsqueeze(1) + y_flip_s_frame[:, :, 0, :] = 0 + list_hypothesis.append(y_flip_s_frame) + + y_s_frame = y_s[:, args.pad].unsqueeze(1) + y_s_frame[:, :, 0, :] = 0 + list_hypothesis.append(y_s_frame) + + cam_tensor = camera_tensor_for_subjects(subject, gt_3d.device, dtype=gt_3d.dtype) + output_3d_s = aggregation_RPEA_joint_level( + args, list_hypothesis, cam_tensor, input_2d_nonflip, gt_3d + ) + action_error_sum_multi[s_keep] = eval_cal.test_calculation( + output_3d_s, out_target, action, action_error_sum_multi[s_keep], args.dataset, subject + ) + + if args.max_batches and i + 1 >= args.max_batches: + break + + per_step_p1 = {} + per_step_p2 = {} + per_step_pck = {} + per_step_auc = {} + for s_keep in sorted(action_error_sum_multi.keys()): + p1_s, p2_s, pck_s, auc_s = print_error(args.dataset, action_error_sum_multi[s_keep], args.train) + per_step_p1[s_keep] = float(p1_s) + per_step_p2[s_keep] = float(p2_s) + per_step_pck[s_keep] = float(pck_s) + per_step_auc[s_keep] = float(auc_s) + + return per_step_p1, per_step_p2, per_step_pck, per_step_auc + + +def setup_logging(args): + if args.folder_name: + folder_name = args.folder_name + else: + folder_name = ( + f"s_{args.eval_sample_steps}_Top{args.topk}_exp_temp{args.exp_temp}_" + f"S{args.subjects_test}_h{args.num_hypothesis_list}_{time.strftime('%Y%m%d_%H%M%S')}" + ) + result_dir = args.results_dir / folder_name + result_dir.mkdir(parents=True, exist_ok=True) + log_path = result_dir / "train.log" + logging.basicConfig(filename=log_path, level=logging.INFO, format="%(message)s") + return result_dir, log_path + + +def main(): + args = parse_args() + if args.gpu not in {"", "-1", "cpu", "none", "None"}: + os.environ["CUDA_VISIBLE_DEVICES"] = args.gpu + args.device = get_device(args.gpu) + configure_reproducibility(args.manual_seed) + + result_dir, log_path = setup_logging(args) + print(f"Results: {result_dir}") + print(f"Log: {log_path}") + + subjects = [s for s in args.subjects_test.split(",") if s] + dataset = ThreeDHPTestDataset( + args.dataset_path, + subjects=subjects, + test_augmentation=args.test_augmentation, + kps_left=args.kps_left, + kps_right=args.kps_right, + joints_left=args.joints_left, + joints_right=args.joints_right, + ) + print(f"Dataset: {args.dataset_path}") + print(f"Dataset summary: {dataset.summary()}") + logging.info(f"Dataset: {args.dataset_path}") + logging.info(f"Dataset summary: {dataset.summary()}") + + dataloader = torch.utils.data.DataLoader( + dataset, + batch_size=args.batch_size, + shuffle=False, + num_workers=int(args.workers), + pin_memory=args.device.type == "cuda", + ) + + model_cls = load_model_class(args.model_path, args.model_type) + model = model_cls(args).to(args.device) + + model_weights_path = resolve_weights_path(args.model_weights_path, args.model_type) + print(model_weights_path) + load_model_weights(model, model_weights_path, args.device) + print("model loaded successfully!") + + actions = define_actions_3dhp(args.actions, train=False) + hypothesis_list = [int(x) for x in str(args.num_hypothesis_list).split(",") if str(x).strip()] + eval_steps_list = [int(s) for s in str(args.eval_sample_steps).split(",") if str(s).strip()] + + best_global_p1 = None + best_global_p2 = None + best_global_pck = None + best_global_auc = None + best_global_pair = None + all_metrics = {} + + for s_eval in eval_steps_list: + p1_by_hyp = {} + p2_by_hyp = {} + pck_by_hyp = {} + auc_by_hyp = {} + + for hypothesis_num in hypothesis_list: + print(f"\n{'=' * 80}") + print(f"Evaluating step {s_eval} with {hypothesis_num} hypotheses") + print(f"{'=' * 80}\n") + logging.info(f"Evaluating step {s_eval} with {hypothesis_num} hypotheses") + + with torch.no_grad(): + args_backup = args.eval_sample_steps + args.eval_sample_steps = str(s_eval) + p1_per_step, p2_per_step, pck_per_step, auc_per_step = test( + actions, dataloader, model, args, hypothesis_num=hypothesis_num + ) + args.eval_sample_steps = args_backup + + p1 = p1_per_step[int(s_eval)] + p2 = p2_per_step[int(s_eval)] + pck_s = pck_per_step[int(s_eval)] + auc_s = auc_per_step[int(s_eval)] + + p1_by_hyp[int(hypothesis_num)] = float(p1) + p2_by_hyp[int(hypothesis_num)] = float(p2) + pck_by_hyp[int(hypothesis_num)] = float(pck_s) + auc_by_hyp[int(hypothesis_num)] = float(auc_s) + + all_metrics[f"step_{s_eval}_hyp_{hypothesis_num}"] = { + "p1": float(p1), + "p2": float(p2), + "pck": float(pck_s), + "auc": float(auc_s), + } + + if best_global_p1 is None or float(p1) < best_global_p1: + best_global_p1 = float(p1) + best_global_p2 = float(p2) + best_global_pck = float(pck_s) + best_global_auc = float(auc_s) + best_global_pair = (int(s_eval), int(hypothesis_num)) + + hyp_sorted = sorted(p1_by_hyp.keys()) + hyp_strs = [ + f"h{h}_p1: {p1_by_hyp[h]:.4f}, h{h}_p2: {p2_by_hyp[h]:.4f}, " + f"h{h}_pck: {pck_by_hyp[h]:.4f}, h{h}_auc: {auc_by_hyp[h]:.4f}" + for h in hyp_sorted + ] + print("\n" + "=" * 80) + print(f"Step: {s_eval} | " + " | ".join(hyp_strs)) + print("=" * 80 + "\n") + logging.info(f"step: {s_eval} | " + " | ".join(hyp_strs)) + + if best_global_p1 is not None: + print("\n" + "=" * 80) + print( + f"BEST RESULT: step {best_global_pair[0]}, hyp {best_global_pair[1]}: " + f"p1: {best_global_p1:.4f}, p2: {best_global_p2:.4f}, " + f"pck: {best_global_pck:.4f}, auc: {best_global_auc:.4f}" + ) + print("=" * 80 + "\n") + logging.info( + f"BEST: step {best_global_pair[0]}, hyp {best_global_pair[1]}: " + f"p1: {best_global_p1:.4f}, p2: {best_global_p2:.4f}, " + f"pck: {best_global_pck:.4f}, auc: {best_global_auc:.4f}" + ) + all_metrics["best"] = { + "step": best_global_pair[0], + "hypothesis": best_global_pair[1], + "p1": best_global_p1, + "p2": best_global_p2, + "pck": best_global_pck, + "auc": best_global_auc, + } + + with open(result_dir / "metrics.json", "w", encoding="utf-8") as f: + json.dump(all_metrics, f, indent=2, sort_keys=True) + + +if __name__ == "__main__": + main() diff --git a/3dhp_test/lib/__init__.py b/3dhp_test/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/3dhp_test/lib/camera.py b/3dhp_test/lib/camera.py new file mode 100644 index 0000000..3367b1f --- /dev/null +++ b/3dhp_test/lib/camera.py @@ -0,0 +1,82 @@ +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + +import numpy as np +import torch + + +def normalize_screen_coordinates(x, w, h): + assert x.shape[-1] == 2 + return x / w * 2 - [1, h / w] + + +def project_to_2d(x, camera_params): + """ + Project 3D points to normalized 2D using the same camera model as the + original FMPose 3DHP inference script. + """ + assert x.shape[-1] == 3 + assert len(camera_params.shape) == 2 + assert camera_params.shape[-1] == 9 + assert x.shape[0] == camera_params.shape[0] + + while len(camera_params.shape) < len(x.shape): + camera_params = camera_params.unsqueeze(1) + + f = camera_params[..., :2] + c = camera_params[..., 2:4] + k = camera_params[..., 4:7] + p = camera_params[..., 7:] + + xx = torch.clamp(x[..., :2] / x[..., 2:], min=-1, max=1) + r2 = torch.sum(xx[..., :2] ** 2, dim=len(xx.shape) - 1, keepdim=True) + + radial = 1 + torch.sum( + k * torch.cat((r2, r2 ** 2, r2 ** 3), dim=len(r2.shape) - 1), + dim=len(r2.shape) - 1, + keepdim=True, + ) + tan = torch.sum(p * xx, dim=len(xx.shape) - 1, keepdim=True) + xxx = xx * (radial + tan) + p * r2 + + return f * xxx + c + + +def resolution_for_subject(subject): + if subject in {"TS5", "TS6"}: + return 1920, 1080 + return 2048, 2048 + + +def camera_params_for_subject(subject): + # Official MPI-INF-3DHP test intrinsics, normalized with + # normalize_screen_coordinates(): x / width * 2 - [1, height / width]. + if subject in {"TS5", "TS6"}: + return [ + 1.7541495005289713, + 1.7541495005289713, + -0.021502558390299464, + 0.020459111531575536, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + ] + return [ + 1.4650119543075562, + 1.4650119543075562, + -0.006945967674255371, + 0.018097639083862305, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + ] diff --git a/3dhp_test/lib/dataset_3dhp.py b/3dhp_test/lib/dataset_3dhp.py new file mode 100644 index 0000000..04e2c70 --- /dev/null +++ b/3dhp_test/lib/dataset_3dhp.py @@ -0,0 +1,110 @@ +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + +import numpy as np +from torch.utils.data import Dataset + +from .camera import normalize_screen_coordinates, resolution_for_subject + + +TEST_NPZ_TO_FMPOSE_17 = np.array( + [14, 8, 9, 10, 11, 12, 13, 15, 1, 16, 0, 5, 6, 7, 2, 3, 4], + dtype=np.int64, +) + + +class ThreeDHPTestDataset(Dataset): + def __init__( + self, + dataset_path, + subjects, + test_augmentation=True, + kps_left=None, + kps_right=None, + joints_left=None, + joints_right=None, + ): + self.dataset_path = dataset_path + self.subjects = subjects + self.test_augmentation = bool(test_augmentation) + self.kps_left = kps_left or [4, 5, 6, 11, 12, 13] + self.kps_right = kps_right or [1, 2, 3, 14, 15, 16] + self.joints_left = joints_left or [4, 5, 6, 11, 12, 13] + self.joints_right = joints_right or [1, 2, 3, 14, 15, 16] + self._data = self._load() + self.pairs = self._build_pairs() + + def _load(self): + raw = np.load(self.dataset_path, allow_pickle=True)["data"].item() + out = {} + + for subject in self.subjects: + if subject not in raw: + raise KeyError(f"Subject {subject} is missing from {self.dataset_path}") + + anim = raw[subject] + valid = anim["valid"].astype(bool) + data_2d = anim["data_2d"][valid][:, TEST_NPZ_TO_FMPOSE_17, :] + data_3d = anim["data_3d"][valid][:, TEST_NPZ_TO_FMPOSE_17, :] / 1000.0 + + data_3d[:, 1:] -= data_3d[:, :1] + + width, height = resolution_for_subject(subject) + data_2d = normalize_screen_coordinates(data_2d, w=width, h=height) + + out[subject] = { + "positions_2d": data_2d, + "positions_3d": data_3d, + "valid_count": int(valid.sum()), + "original_count": int(valid.shape[0]), + } + + return out + + def _build_pairs(self): + pairs = [] + for subject in self.subjects: + n_frames = self._data[subject]["positions_2d"].shape[0] + for frame_idx in range(n_frames): + pairs.append((subject, frame_idx)) + return pairs + + def summary(self): + return { + subject: { + "valid_count": self._data[subject]["valid_count"], + "original_count": self._data[subject]["original_count"], + } + for subject in self.subjects + } + + def __len__(self): + return len(self.pairs) + + def __getitem__(self, index): + subject, frame_idx = self.pairs[index] + pose_2d = self._data[subject]["positions_2d"][frame_idx : frame_idx + 1].copy() + pose_3d = self._data[subject]["positions_3d"][frame_idx : frame_idx + 1].copy() + + input_2d = pose_2d[None, ...] + + if self.test_augmentation: + flip_2d = pose_2d.copy() + flip_2d[:, :, 0] *= -1 + flip_2d[:, self.kps_left + self.kps_right] = flip_2d[:, self.kps_right + self.kps_left] + input_2d = np.concatenate((input_2d, flip_2d[None, ...]), axis=0) + + return ( + np.zeros(9, dtype=np.float32), + pose_3d, + input_2d.astype(np.float32), + "Seq1", + subject, + 0, + ) diff --git a/3dhp_test/lib/eval_cal.py b/3dhp_test/lib/eval_cal.py new file mode 100644 index 0000000..e48f131 --- /dev/null +++ b/3dhp_test/lib/eval_cal.py @@ -0,0 +1,139 @@ +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + +import numpy as np +import torch + + +def mpjpe(predicted, target): + assert predicted.shape == target.shape + return torch.mean(torch.norm(predicted - target, dim=-1)) + + +def pck(predicted, target): + assert predicted.shape == target.shape + dis = torch.norm(predicted - target, dim=len(target.shape) - 1) + threshold = torch.tensor(0.150, dtype=dis.dtype, device=dis.device) + return (dis < threshold).float().mean() + + +def auc(predicted, target): + assert predicted.shape == target.shape + dis = torch.norm(predicted - target, dim=len(target.shape) - 1) + thresholds = torch.arange(0, 151, 5, dtype=dis.dtype, device=dis.device) / 1000.0 + threshold_shape = (-1,) + (1,) * dis.ndim + return (dis.unsqueeze(0) < thresholds.view(threshold_shape)).float().mean() + + +def test_calculation(predicted, target, action, error_sum, data_type, subject): + if data_type == "h36m" or data_type.startswith("3dhp"): + error_sum = mpjpe_by_action_p1(predicted, target, action, error_sum) + error_sum = mpjpe_by_action_p2(predicted, target, action, error_sum) + if data_type.startswith("3dhp"): + error_sum = mpjpe_by_action_pck(predicted, target, action, error_sum) + error_sum = mpjpe_by_action_auc(predicted, target, action, error_sum) + return error_sum + + +def _action_name(action): + end_index = action.find(" ") + if end_index != -1: + return action[:end_index] + return action + + +def mpjpe_by_action_p1(predicted, target, action, action_error_sum): + assert predicted.shape == target.shape + num = predicted.size(0) + dist = torch.mean(torch.norm(predicted - target, dim=len(target.shape) - 1), dim=len(target.shape) - 2) + + if len(set(list(action))) == 1: + action_name = _action_name(action[0]) + action_error_sum[action_name]["p1"].update(torch.mean(dist).item() * num, num) + else: + for i in range(num): + action_name = _action_name(action[i]) + action_error_sum[action_name]["p1"].update(dist[i].item(), 1) + return action_error_sum + + +def mpjpe_by_action_p2(predicted, target, action, action_error_sum): + assert predicted.shape == target.shape + num = predicted.size(0) + pred = predicted.detach().cpu().numpy().reshape(-1, predicted.shape[-2], predicted.shape[-1]) + gt = target.detach().cpu().numpy().reshape(-1, target.shape[-2], target.shape[-1]) + dist = p_mpjpe(pred, gt) + + if len(set(list(action))) == 1: + action_name = _action_name(action[0]) + action_error_sum[action_name]["p2"].update(np.mean(dist) * num, num) + else: + for i in range(num): + action_name = _action_name(action[i]) + action_error_sum[action_name]["p2"].update(float(dist[i]), 1) + return action_error_sum + + +def mpjpe_by_action_pck(predicted, target, action, action_error_sum): + assert predicted.shape == target.shape + num = predicted.size(0) + + if len(set(list(action))) == 1: + action_name = _action_name(action[0]) + action_error_sum[action_name]["pck"].update(pck(predicted, target).item() * num, num) + else: + for i in range(num): + action_name = _action_name(action[i]) + action_error_sum[action_name]["pck"].update(pck(predicted[i : i + 1], target[i : i + 1]).item(), 1) + return action_error_sum + + +def mpjpe_by_action_auc(predicted, target, action, action_error_sum): + assert predicted.shape == target.shape + num = predicted.size(0) + + if len(set(list(action))) == 1: + action_name = _action_name(action[0]) + action_error_sum[action_name]["auc"].update(auc(predicted, target).item() * num, num) + else: + for i in range(num): + action_name = _action_name(action[i]) + action_error_sum[action_name]["auc"].update(auc(predicted[i : i + 1], target[i : i + 1]).item(), 1) + return action_error_sum + + +def p_mpjpe(predicted, target): + assert predicted.shape == target.shape + + mu_x = np.mean(target, axis=1, keepdims=True) + mu_y = np.mean(predicted, axis=1, keepdims=True) + x0 = target - mu_x + y0 = predicted - mu_y + + norm_x = np.sqrt(np.sum(x0**2, axis=(1, 2), keepdims=True)) + norm_y = np.sqrt(np.sum(y0**2, axis=(1, 2), keepdims=True)) + x0 /= norm_x + y0 /= norm_y + + h = np.matmul(x0.transpose(0, 2, 1), y0) + u, s, vt = np.linalg.svd(h) + v = vt.transpose(0, 2, 1) + r = np.matmul(v, u.transpose(0, 2, 1)) + + sign_det_r = np.sign(np.expand_dims(np.linalg.det(r), axis=1)) + v[:, :, -1] *= sign_det_r + s[:, -1] *= sign_det_r.flatten() + r = np.matmul(v, u.transpose(0, 2, 1)) + + tr = np.expand_dims(np.sum(s, axis=1, keepdims=True), axis=2) + a = tr * norm_x / norm_y + t = mu_x - a * np.matmul(mu_y, r) + predicted_aligned = a * np.matmul(predicted, r) + t + + return np.mean(np.linalg.norm(predicted_aligned - target, axis=len(target.shape) - 1), axis=len(target.shape) - 2) diff --git a/3dhp_test/lib/utils.py b/3dhp_test/lib/utils.py new file mode 100644 index 0000000..cd5aa78 --- /dev/null +++ b/3dhp_test/lib/utils.py @@ -0,0 +1,40 @@ +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + + +class AccumLoss(object): + def __init__(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val + self.count += n + self.avg = self.sum / self.count + + +def define_error_list(actions): + return { + action: { + "p1": AccumLoss(), + "p2": AccumLoss(), + "pck": AccumLoss(), + "auc": AccumLoss(), + } + for action in actions + } + + +def define_actions_3dhp(action="*", train=False): + if action != "*": + raise ValueError("MPI-INF-3DHP test annotations do not include action labels; use --actions '*'.") + return ["Seq1", "Seq2"] if train else ["Seq1"] diff --git a/3dhp_test/test_3dhp.sh b/3dhp_test/test_3dhp.sh new file mode 100755 index 0000000..08a08d7 --- /dev/null +++ b/3dhp_test/test_3dhp.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +layers=5 +gpu_id=0 +eval_sample_steps=3 +batch_size=1024 + +model_type="fmpose3d_humans" + +# By default, weights are automatically downloaded from Hugging Face Hub. +# To use local weights instead, uncomment the line below: +# model_weights_path="${SCRIPT_DIR}/pretrained/fmpose3d_h36m/FMpose3D_pretrained_weights.pth" +model_weights_path="" + +# By default, use the packaged FMPose3D human model definition. To use a local +# model definition instead, uncomment the line below: +# model_path="${SCRIPT_DIR}/pretrained/fmpose3d_h36m/model_GAMLP.py" +model_path="" + +num_hypothesis_list=10 +subjects_test=TS1,TS2,TS3,TS4,TS5,TS6 + +folder_name=s_${eval_sample_steps}_S${subjects_test}_h${num_hypothesis_list}_$(date +%Y%m%d_%H%M%S) + +python3 "${SCRIPT_DIR}/infer_3dhp.py" \ + --gpu "${gpu_id}" \ + --batch-size "${batch_size}" \ + --frames 1 \ + --layers "${layers}" \ + --channel 512 \ + --d-hid 1024 \ + --token-dim 256 \ + --eval-sample-steps "${eval_sample_steps}" \ + --dataset-path "${SCRIPT_DIR}/dataset/data_test_3dhp.npz" \ + --model-type "${model_type}" \ + --model-weights-path "${model_weights_path}" \ + --model-path "${model_path}" \ + --num-hypothesis-list "${num_hypothesis_list}" \ + --folder-name "${folder_name}" \ + --test-augmentation True \ + --test-augmentation-flip-hypothesis True \ + --subjects-test "${subjects_test}" \ + --results-dir "${SCRIPT_DIR}/results" \ + "$@" diff --git a/pyproject.toml b/pyproject.toml index e7df467..a3021b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ dependencies = [ "numpy>=1.18.5,<2.0", "tqdm>=4.60.0", "scipy>=1.7.0", + "h5py>=3.0.0", "yacs>=0.1.8", "opencv-python>=4.5.0", "numba>=0.56.0", diff --git a/tests/test_3dhp_infer_model_loading.py b/tests/test_3dhp_infer_model_loading.py new file mode 100644 index 0000000..7047836 --- /dev/null +++ b/tests/test_3dhp_infer_model_loading.py @@ -0,0 +1,127 @@ +""" +FMPose3D: monocular 3D Pose Estimation via Flow Matching + +Official implementation of the paper: +"FMPose3D: monocular 3D Pose Estimation via Flow Matching" +by Ti Wang, Xiaohang Yu, and Mackenzie Weygandt Mathis +Licensed under Apache 2.0 +""" + +import importlib.util +from pathlib import Path + +import torch +from torch import nn + +from fmpose3d.models import get_model +from fmpose3d.utils.weights import resolve_weights_path + + +def load_infer_3dhp_module(): + module_path = Path(__file__).resolve().parents[1] / "3dhp_test" / "infer_3dhp.py" + spec = importlib.util.spec_from_file_location("infer_3dhp", module_path) + if spec is None or spec.loader is None: + raise ImportError(f"Could not load {module_path}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_default_3dhp_model_loads_from_registry(): + infer_3dhp = load_infer_3dhp_module() + + model_cls = infer_3dhp.load_model_class(None, "fmpose3d_humans") + + assert model_cls is get_model("fmpose3d_humans") + + +def test_camera_tensor_uses_each_sample_subject(): + infer_3dhp = load_infer_3dhp_module() + subjects = ["TS4", "TS5"] + + cam_tensor = infer_3dhp.camera_tensor_for_subjects(subjects, torch.device("cpu")) + + assert cam_tensor.shape == (2, 9) + assert not torch.equal(cam_tensor[0], cam_tensor[1]) + + +def test_3dhp_imports_common_weight_resolver(): + infer_3dhp = load_infer_3dhp_module() + + assert infer_3dhp.resolve_weights_path is resolve_weights_path + + +def test_3dhp_weight_loading_rejects_partial_checkpoints(tmp_path): + infer_3dhp = load_infer_3dhp_module() + model = nn.Sequential(nn.Linear(2, 2), nn.Linear(2, 1)) + checkpoint_path = tmp_path / "partial.pth" + torch.save({"0.weight": model.state_dict()["0.weight"]}, checkpoint_path) + + try: + infer_3dhp.load_model_weights(model, checkpoint_path, torch.device("cpu")) + except RuntimeError as exc: + assert "incompatible" in str(exc) + else: + raise AssertionError("partial checkpoint should fail strict model loading") + + +def test_3dhp_actions_argument_allows_only_default_bucket(): + infer_3dhp = load_infer_3dhp_module() + + assert infer_3dhp.define_actions_3dhp("*", train=False) == ["Seq1"] + + +def test_3dhp_actions_argument_rejects_action_filtering(): + infer_3dhp = load_infer_3dhp_module() + + try: + infer_3dhp.define_actions_3dhp("Seq1", train=False) + except ValueError as exc: + assert "do not include action labels" in str(exc) + else: + raise AssertionError("3DHP action filtering should fail") + + +def test_p_mpjpe_mixed_actions_uses_per_sample_errors(): + infer_3dhp = load_infer_3dhp_module() + eval_cal = infer_3dhp.eval_cal + error_sum = infer_3dhp.define_error_list(["Walk", "Run"]) + + target = torch.tensor( + [ + [ + [ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + [0.0, 1.0, 0.0], + ] + ], + [ + [ + [0.0, 0.0, 0.0], + [1.0, 0.0, 0.0], + [0.0, 1.0, 0.0], + ] + ], + ], + dtype=torch.float32, + ) + predicted = target.clone() + predicted[1, 0, 1] = torch.tensor([1.4, 0.2, 0.3]) + predicted[1, 0, 2] = torch.tensor([-0.1, 0.7, 0.5]) + + expected = eval_cal.p_mpjpe( + predicted.numpy().reshape(-1, 3, 3), + target.numpy().reshape(-1, 3, 3), + ) + + eval_cal.mpjpe_by_action_p2( + predicted, + target, + ["Walk", "Run"], + error_sum, + ) + + assert error_sum["Walk"]["p2"].avg == float(expected[0]) + assert error_sum["Run"]["p2"].avg == float(expected[1]) + assert error_sum["Walk"]["p2"].avg != error_sum["Run"]["p2"].avg