Introduction
In the rapidly evolving landscape of computer vision and artificial intelligence, one persistent challenge has plagued researchers and practitioners: how do we create machine learning models that can reliably identify and track objects in real-world video scenarios? Traditional video object segmentation (VOS) benchmarks like DAVIS and YouTube-VOS have produced impressive results, with state-of-the-art methods achieving 90+ percent accuracy. Yet when these same models are deployed in practical applications—autonomous vehicles navigating crowded streets, augmented reality systems processing livestreams, or video editing software tracking moving subjects—performance dramatically degrades.
This critical gap between laboratory benchmarks and real-world deployment has finally prompted a paradigm shift in how the computer vision community evaluates segmentation algorithms. The introduction of MOSEv2, a significantly more challenging video object segmentation dataset, represents a watershed moment for the field. With 5,024 high-resolution videos containing 701,976 carefully annotated masks across 200 object categories, MOSEv2 doesn’t just extend its predecessor MOSEv1—it fundamentally reimagines what it means to test video understanding in authentically complex environments.
What Is MOSEv2 and Why It Represents a Breakthrough
MOSEv2 (coMplex video Object SEgmentation version 2) is a comprehensive dataset specifically engineered to evaluate video object segmentation methods under conditions that mirror real-world complexity. Unlike earlier datasets that primarily featured salient, isolated objects with clear backgrounds, MOSEv2 embraces the messy reality of uncontrolled environments where objects disappear, reappear, blend into crowded scenes, and exist under challenging lighting or weather conditions.
Key Dataset Statistics
$$ \small \renewcommand{\arraystretch}{1.2} \begin{array}{l|c|c|c|c} \hline \textbf{Metric} & \textbf{MOSEv2} & \textbf{MOSEv1} & \textbf{DAVIS 2017} & \textbf{YouTube-VOS} \\ \hline \text{Total Videos} & 5{,}024 & 2{,}149 & 901 & 4{,}453 \\ \text{Annotated Masks} & 701{,}976 & 431{,}725 & 3{,}543 & 197{,}272 \\ \text{Object Categories} & 200 & 36 & \text{–} & 94 \\ \text{Disappearance Rate (%)} & 61.8 & 41.5 & 16.1 & 13.0 \\ \text{Reappearance Rate (%)} & 50.3 & 23.9 & 10.7 & 8.0 \\ \text{Average Distractors} & 13.6 & 6.5 & 3.7 & 3.0 \\ \hline \end{array} $$The numbers tell a compelling story: MOSEv2’s disappearance rate of 61.8 percent is substantially higher than any existing benchmark, reflecting the frequent occlusions and out-of-frame scenarios that characterize authentic video content. Similarly, the reappearance rate of 50.3 percent—more than double that of MOSEv1—means models must not only handle object loss but successfully re-identify targets amid visual ambiguity.
The Real-World Challenges MOSEv2 Addresses
Object Disappearance and Reappearance
In real-world video scenarios, tracked objects frequently vanish behind obstacles, move out of frame, or become obscured by other elements. MOSEv2 emphasizes this challenge through intentionally designed sequences where objects undergo repeated cycles of disappearance and reappearance. This tests a critical capability: can models maintain object identity across temporal gaps using robust memory mechanisms and temporal reasoning?
Traditional datasets largely avoided these scenarios, creating an unrealistic evaluation environment where models could achieve high accuracy through simple frame-to-frame propagation without sophisticated re-identification mechanisms.
Severe Occlusions and Crowded Scenes
MOSEv2 contains videos with exceptionally dense crowds and complex occlusion patterns. The dataset’s average “distractors” metric—measuring visually similar objects per target—reaches 13.6, more than twice that of MOSEv1 and exceeding even specialized tracking datasets. Figure 1 illustrates a crowded urban scene where a small person moves through a dense gathering, frequently obscured by surrounding individuals.

Environmental Complexity and Adverse Conditions
One of MOSEv2’s most distinctive features is its systematic inclusion of videos shot under diverse environmental conditions:
- Adverse Weather: 159 rainy videos, 142 heavy rain, 73 snowy, 60 foggy scenarios, and 50 disaster footage (earthquakes, floods)
- Low-Light Environments: 280 underwater scenes and 255 nighttime videos
- Multi-Shot Sequences: Videos with dramatic scene transitions requiring models to handle appearance discontinuities
- Camouflaged Objects: Targets that naturally blend into backgrounds
- Non-Physical Targets: Shadows and reflections with ambiguous boundaries
These conditions create compounding challenges. An object tracked underwater at night doesn’t just suffer from low visibility—it also exhibits unpredictable appearance changes due to light refraction and shadow dynamics that violate the assumptions built into many segmentation models.
Small Objects and Fine-Grained Tracking
MOSEv2 contains a substantially higher proportion of small masks (normalized by video resolution). Approximately 50.2 percent of annotated masks occupy less than 0.01 of the image area, compared to only 25.3 percent in DAVIS. This prevalence of small targets poses severe challenges for methods operating at standard video resolutions, where fine details become nearly imperceptible.
Evaluation Methodology: Moving Beyond Traditional Metrics
MOSEv2 introduces refined evaluation metrics that provide deeper insights into model performance across diverse scenarios.
The Adaptive Boundary Threshold (F̊)
Traditional evaluation uses a fixed boundary threshold for contour accuracy, which biases assessment toward larger objects. MOSEv2 proposes an adaptive threshold:
\[ \dot{w} = \min(0.008 \times D, \alpha \times \sqrt{A}) \]where D is the image diagonal, A is the object’s area in pixels, and α=0.1(determined from boundary statistics across DAVIS and MOSE datasets). This formulation ensures fair evaluation across object scales, addressing the limitation where a chopstick occupying only 0.039 percent of the image area would receive inflated scores under traditional metrics.
Specialized Disappearance and Reappearance Metrics
Beyond overall accuracy, MOSEv2 introduces dedicated evaluation scores:
- J&F̊_d: Measures performance exclusively on frames where targets are absent (disappearance clips)
- J&F̊_r: Evaluates performance on frames where targets return after disappearance (reappearance clips)
These metrics expose critical weaknesses in existing methods. Models that aggressively predict masks perform poorly on disappearance evaluation but achieve high reappearance scores, while conservative models show the opposite pattern. Only truly robust approaches achieve high performance on both metrics.
Benchmark Results: The Performance Gap Reality
State-of-the-Art Degradation
The benchmark results starkly illustrate the gap between controlled evaluation environments and real-world performance:
- SAM2: Drops from 90.7 percent J&F on DAVIS 2017 to 50.9 percent on MOSEv2—a 39.8 percentage point decline
- Cutie-B: Falls from 87.9 percent on DAVIS 2017 to 43.9 percent on MOSEv2
- SAM2Long-B+: Achieves the best performance at 48.6 percent J&F̊, yet remains substantially below performance on existing benchmarks
This consistent degradation across all 20 benchmarked VOS methods conclusively demonstrates that MOSEv2 represents a genuinely more difficult evaluation landscape, not merely a larger dataset with similar characteristics.
The Reappearance Problem
Perhaps most revealing is the reappearance metric analysis. Even the best-performing methods achieve J&F̊_r scores in the range of 27–36 percent—a stark contrast to their J&F̊_d performance (60–72 percent). This 40+ percentage point gap highlights a fundamental weakness in current architectures: they excel at tracking continuously visible objects but struggle to re-identify targets after disappearance.
Practical Improvements: The RCMS Solution
Based on extensive analysis of benchmark results, the MOSEv2 paper proposes Reliable Conditioned Memory Selection (RCMS), a practical enhancement to foundation models like SAM2 that substantially improves performance without sacrificing inference speed.
The Core Innovation
RCMS recognizes that SAM2 maintains two memory types: conditioned (typically from initialization) and unconditioned (from recent frames). The default approach of relying solely on initial-frame memory limits diversity and fails when initialization provides only partial visibility. RCMS strategically augments conditioned memory with high-quality frames from pre-disappearance sequences, leveraging SAM2’s demonstrated strength in tracking continuously visible objects.
Key improvements in SAM2 using RCMS and complementary tricks:
$$ \small \renewcommand{\arraystretch}{1.25} \begin{array}{l|c|l} \hline \textbf{Enhancement} & \textbf{J&F Gain} & \textbf{Method / Description} \\ \hline \text{RCMS} & +3.3\% & \text{Reliable Conditioned Memory Selection} \\ \text{MQF} & +0.9\% & Q = s_{\text{iou}} \times s_{\text{occ}} \times m_{\text{ness}} \\ \text{MSS} & +0.4\% & \text{Mask Scaling Strategy (factor: 7.5,\ offset: -4.0)} \\ \text{LVT} & +0.9\% & \text{Long-Video Finetuning (16-frame context)} \\ \text{Combined Effect} & +5.5\% & \text{All improvements integrated} \\ \hline \end{array} $$These improvements raise SAM2-B+ from 46.0 percent to 51.5 percent J&F̊—a substantial gain while maintaining 22.6 frames-per-second inference speed.
Real-World Applications and Future Impact
MOSEv2’s comprehensive evaluation framework has immediate implications for practical deployment:
Autonomous Vehicle Development: Navigation systems require robust tracking of pedestrians, cyclists, and other vehicles under diverse weather conditions and occlusion scenarios—precisely MOSEv2’s focus areas.
Augmented Reality: AR applications demand reliable segmentation as objects move through complex indoor/outdoor environments with varying lighting and occlusion patterns.
Video Editing and Content Creation: Professional tools must handle long-duration content with multi-shot sequences, complex backgrounds, and small moving subjects.
The dataset’s emphasis on knowledge-dependent scenarios (requiring optical character recognition or spatial reasoning) also points toward future directions integrating large language models with vision systems.
Key Takeaways for Computer Vision Practitioners
✓ Benchmark Saturation Is Misleading: High performance on existing datasets masks significant limitations in real-world scenarios.
✓ Disappearance-Reappearance Is Fundamental: This challenge deserves primary focus in architecture design and memory mechanisms.
✓ Multi-Dimensional Complexity Matters: Effective models must handle occlusions, adverse weather, small objects, and crowded scenes simultaneously, not in isolation.
✓ Adaptive Evaluation Is Essential: Fixed-threshold metrics systematically bias assessment toward larger objects.
Conclusion and Call to Action
MOSEv2 represents a pivotal moment in computer vision research—the moment when the field collectively acknowledged that laboratory benchmarks no longer sufficiently reflect real-world challenges. By comprehensively addressing this gap, MOSEv2 provides both a rigorous evaluation standard and a concrete roadmap for advancing the state-of-the-art in video understanding.
The challenge is clear, and the opportunity is immense. If you’re developing video understanding systems, conducting computer vision research, or deploying segmentation algorithms in production environments, MOSEv2 should become your benchmark of choice. Explore the complete dataset at https://MOSE.video, study the comprehensive benchmark results, and implement the practical improvements outlined in this research.
The gap between laboratory performance and real-world effectiveness won’t close without datasets that authentically represent real-world complexity. MOSEv2 is that dataset. The question now is: will your models be ready for this higher standard?
Share your experiences benchmarking against MOSEv2 in the comments below—what challenges have you encountered, and what innovations are you exploring?
This article synthesizes insights from the MOSEv2 research paper, providing practitioners with actionable understanding of why this dataset matters for advancing robust video understanding systems. For complete technical details, citations, and supplementary materials, readers are encouraged to review the full research publication.
Based on the MOSEv2 paper’s proposed improvements (RCMS, MQF, MSS, LVT), here’s a comprehensive implementation for video object segmentation with enhanced SAM2.
mosev2_sam2_enhanced/
├── config.yaml
├── model/
│ ├── __init__.py
│ ├── sam2_enhanced.py
│ ├── rcms_memory.py
│ ├── quality_filtering.py
│ └── mask_processing.py
├── data/
│ ├── __init__.py
│ ├── mosev2_dataset.py
│ ├── video_loader.py
│ └── transforms.py
├── training/
│ ├── __init__.py
│ ├── trainer.py
│ ├── losses.py
│ └── optimizer.py
├── evaluation/
│ ├── __init__.py
│ ├── metrics.py
│ └── evaluator.py
├── inference/
│ ├── __init__.py
│ └── inference_pipeline.py
└── main.py
# Model Configuration
model:
backbone: "sam2_large" # sam2_large or sam2_base
image_size: 1024
hidden_dim: 256
# RCMS Configuration
rcms:
max_conditioned_memories: 4 # N parameter
quality_threshold: 0.6 # theta parameter
use_memory_quality_filtering: true
iou_weight: 0.5
occlusion_weight: 0.5
maskness_weight: 0.0
# Mask Scaling Strategy
mss:
enabled: true
scale_factor: 7.5
offset: -4.0
# Long-Video Finetuning
lvt:
enabled: true
finetuning_frames: 16
freeze_image_encoder: true
# Training Configuration
training:
batch_size: 4
num_epochs: 50
learning_rate: 0.0001
weight_decay: 0.01
warmup_epochs: 5
num_workers: 4
# Dataset Configuration
dataset:
data_root: "/path/to/MOSEv2"
split_dir: "meta"
min_video_length: 5
max_video_length: 500
num_categories: 200
# Evaluation Configuration
evaluation:
compute_adaptive_boundary: true
compute_disappearance_reappearance: true
boundary_alpha: 0.1
# Device and Paths
device: "cuda"
checkpoint_dir: "./checkpoints"
output_dir: "./outputs"
log_dir: "./logs"
# 2. Core Model Implementation (model/sam2_enhanced.py)
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Optional, Tuple
import numpy as np
from dataclasses import dataclass
@dataclass
class SegmentationOutput:
"""Output structure for segmentation results"""
masks: torch.Tensor # (B, H, W)
confidences: torch.Tensor # (B,)
iou_scores: torch.Tensor # (B,)
occlusion_scores: torch.Tensor # (B,)
memory_state: Dict
quality_score: float
class SAM2Enhanced(nn.Module):
"""
Enhanced SAM2 with RCMS, MQF, MSS, and LVT improvements
"""
def __init__(self, config):
super().__init__()
self.config = config
# Load base SAM2 model
self.sam2_model = self._load_sam2_backbone()
# Enhanced components
self.rcms = RCMSMemoryManager(config)
self.quality_filter = MemoryQualityFilter(config)
self.mask_processor = MaskProcessor(config)
# Freezing settings for LVT
if config.lvt.freeze_image_encoder:
self._freeze_image_encoder()
def _load_sam2_backbone(self):
"""Load SAM2 from official implementation"""
try:
from sam2.build_sam import build_sam2
model_type = self.config.model.backbone
checkpoint = f"./checkpoints/sam2_{model_type.split('_')[1]}.pt"
sam2 = build_sam2(model_type, checkpoint)
return sam2
except ImportError:
raise ImportError(
"SAM2 not installed. Install with: "
"pip install git+https://github.com/facebookresearch/segment-anything-2.git"
)
def _freeze_image_encoder(self):
"""Freeze image encoder for long-video finetuning"""
for name, param in self.sam2_model.image_encoder.named_parameters():
param.requires_grad = False
def initialize_video(self, frame: torch.Tensor, mask: torch.Tensor) -> Dict:
"""
Initialize video processing with first frame and mask
Args:
frame: (1, 3, H, W) RGB frame
mask: (1, 1, H, W) binary mask
Returns:
Initial memory state dictionary
"""
with torch.no_grad():
# Get initial SAM2 state
self.sam2_model.reset_state()
image_embedding = self.sam2_model.encode_image(frame)
# Create initial memory
memory_state = {
"frame_embedding": image_embedding,
"object_mask": mask,
"conditioned_memories": [
{
"embedding": image_embedding,
"mask": mask,
"quality_score": 1.0,
"frame_idx": 0
}
],
"unconditioned_memories": [],
"disappeared": False,
"disappearance_start_idx": None
}
return memory_state
def process_frame(
self,
frame: torch.Tensor,
memory_state: Dict,
frame_idx: int,
previous_mask: Optional[torch.Tensor] = None
) -> Tuple[SegmentationOutput, Dict]:
"""
Process a single video frame
Args:
frame: (1, 3, H, W) RGB frame
memory_state: Current memory state
frame_idx: Index of current frame
previous_mask: Previous frame's mask for continuity
Returns:
Tuple of (SegmentationOutput, updated_memory_state)
"""
# Encode current frame
with torch.no_grad():
image_embedding = self.sam2_model.encode_image(frame)
# Select appropriate memories (conditioned + recent unconditioned)
conditioned_mems = memory_state["conditioned_memories"]
unconditioned_mems = self._select_nearest_memories(
memory_state["unconditioned_memories"],
k=6
)
# Prepare prompt embeddings
prompt_embeddings = self._prepare_prompt_embeddings(
conditioned_mems, unconditioned_mems, image_embedding
)
# Get segmentation prediction
with torch.no_grad():
mask_prediction = self.sam2_model.segment_image(
image_embedding,
prompt_embeddings
)
# Compute quality metrics
iou_score = mask_prediction.get("iou_pred", torch.tensor(0.5))
occlusion_score = mask_prediction.get("occlusion_pred", torch.tensor(0.0))
maskness = self._compute_maskness(mask_prediction["masks"])
# Apply Mask Scaling Strategy (MSS)
if self.config.mss.enabled:
mask_prediction["masks"] = self.mask_processor.apply_scaling(
mask_prediction["masks"],
scale_factor=self.config.mss.scale_factor,
offset=self.config.mss.offset
)
# Compute quality score
quality_score = self.quality_filter.compute_quality_score(
iou_score, occlusion_score, maskness
)
# Check for disappearance
is_disappeared = self._check_disappearance(
mask_prediction["masks"],
previous_mask if previous_mask is not None else memory_state["object_mask"]
)
# Update memory with RCMS
updated_memory = self.rcms.update_memory(
memory_state,
frame_idx,
image_embedding,
mask_prediction["masks"],
quality_score,
is_disappeared
)
# Create output
output = SegmentationOutput(
masks=mask_prediction["masks"],
confidences=torch.tensor([quality_score]),
iou_scores=iou_score.unsqueeze(0) if iou_score.dim() == 0 else iou_score,
occlusion_scores=occlusion_score.unsqueeze(0) if occlusion_score.dim() == 0 else occlusion_score,
memory_state=updated_memory,
quality_score=quality_score.item() if isinstance(quality_score, torch.Tensor) else quality_score
)
return output, updated_memory
def _select_nearest_memories(
self,
memories: List[Dict],
k: int = 6
) -> List[Dict]:
"""Select k nearest memories by temporal proximity"""
if len(memories) <= k:
return memories
# Sort by frame index (assuming memories have "frame_idx")
sorted_mems = sorted(
memories,
key=lambda x: x.get("frame_idx", 0),
reverse=True
)
return sorted_mems[:k]
def _prepare_prompt_embeddings(
self,
conditioned_mems: List[Dict],
unconditioned_mems: List[Dict],
current_embedding: torch.Tensor
) -> torch.Tensor:
"""Prepare combined prompt embeddings from memory"""
embeddings = []
# Add conditioned memory embeddings
for mem in conditioned_mems:
embeddings.append(mem["embedding"])
# Add unconditioned memory embeddings
for mem in unconditioned_mems:
embeddings.append(mem["embedding"])
# Concatenate and normalize
if embeddings:
combined = torch.cat(embeddings, dim=0)
combined = F.normalize(combined, p=2, dim=-1)
return combined
else:
return current_embedding
def _compute_maskness(self, masks: torch.Tensor) -> torch.Tensor:
"""
Compute maskness metric (proportion of non-zero predictions)
Higher value indicates stronger mask predictions
"""
if masks.shape[-2:] == (0, 0):
return torch.tensor(0.0)
mask_area = masks.sum(dim=(-2, -1))
total_area = masks.numel() / masks.shape[0]
maskness = (mask_area / total_area).mean()
return torch.clamp(maskness, 0, 1)
def _check_disappearance(
self,
current_mask: torch.Tensor,
previous_mask: torch.Tensor,
threshold: float = 0.05
) -> bool:
"""Check if target has disappeared"""
if current_mask is None or previous_mask is None:
return False
current_area = (current_mask > 0.5).float().sum()
# Target disappeared if current mask area is near zero
return current_area < (current_mask.numel() * threshold)
def forward(
self,
video_frames: torch.Tensor,
first_frame_mask: torch.Tensor,
return_all_frames: bool = True
) -> Dict:
"""
Full forward pass through video
Args:
video_frames: (T, 3, H, W) video tensor
first_frame_mask: (1, 1, H, W) first frame annotation
return_all_frames: Whether to return masks for all frames
Returns:
Dictionary with segmentation results for all frames
"""
device = next(self.parameters()).device
video_frames = video_frames.to(device)
first_frame_mask = first_frame_mask.to(device)
# Initialize
memory_state = self.initialize_video(
video_frames[0:1],
first_frame_mask
)
all_outputs = []
previous_mask = first_frame_mask
# Process each frame
for frame_idx in range(len(video_frames)):
frame = video_frames[frame_idx:frame_idx+1]
output, memory_state = self.process_frame(
frame,
memory_state,
frame_idx,
previous_mask
)
all_outputs.append({
"masks": output.masks,
"confidence": output.confidences,
"iou_score": output.iou_scores,
"quality_score": output.quality_score
})
previous_mask = output.masks
# Compile results
result = {
"masks": torch.cat([o["masks"] for o in all_outputs], dim=0),
"confidences": torch.cat([o["confidence"] for o in all_outputs], dim=0),
"iou_scores": torch.cat([o["iou_score"] for o in all_outputs], dim=0),
"quality_scores": [o["quality_score"] for o in all_outputs],
"all_outputs": all_outputs if return_all_frames else None
}
return result
class RCMSMemoryManager(nn.Module):
"""Reliable Conditioned Memory Selection"""
def __init__(self, config):
super().__init__()
self.config = config
self.max_conditioned = config.rcms.max_conditioned_memories
self.quality_threshold = config.rcms.quality_threshold
def update_memory(
self,
memory_state: Dict,
frame_idx: int,
frame_embedding: torch.Tensor,
mask: torch.Tensor,
quality_score: float,
is_disappeared: bool
) -> Dict:
"""
Update memory state using RCMS algorithm
Algorithm 1 from MOSEv2 paper
"""
updated_state = memory_state.copy()
# Add to unconditioned memories
new_memory = {
"embedding": frame_embedding,
"mask": mask,
"quality_score": quality_score,
"frame_idx": frame_idx
}
updated_state["unconditioned_memories"].append(new_memory)
# Track disappearance
if is_disappeared and not memory_state["disappeared"]:
updated_state["disappeared"] = True
updated_state["disappearance_start_idx"] = frame_idx
# RCMS: Select high-quality memories for conditioned bank
pre_disappearance_mems = [
m for m in updated_state["unconditioned_memories"]
if m["frame_idx"] < frame_idx
]
# Sort by quality and recency
candidates = sorted(
pre_disappearance_mems,
key=lambda x: (x["quality_score"], -x["frame_idx"]),
reverse=True
)
# Add top memories to conditioned bank if quality > threshold
for candidate in candidates:
if (candidate["quality_score"] > self.quality_threshold and
len(updated_state["conditioned_memories"]) < self.max_conditioned + 1):
updated_state["conditioned_memories"].append(candidate)
# Maintain unconditioned memory size
if len(updated_state["unconditioned_memories"]) > 12:
updated_state["unconditioned_memories"] = (
updated_state["unconditioned_memories"][-12:]
)
return updated_state
class MemoryQualityFilter(nn.Module):
"""Memory Quality Filtering (MQF)"""
def __init__(self, config):
super().__init__()
self.config = config
def compute_quality_score(
self,
iou_pred: torch.Tensor,
occlusion_pred: torch.Tensor,
maskness: torch.Tensor
) -> torch.Tensor:
"""
Compute quality score as product of components
Q = score_iou × score_occ × maskness
"""
# Normalize scores to [0, 1]
iou_score = torch.clamp(iou_pred, 0, 1) if isinstance(iou_pred, torch.Tensor) else torch.tensor(iou_pred)
occ_score = torch.clamp(1.0 - occlusion_pred, 0, 1) if isinstance(occlusion_pred, torch.Tensor) else torch.tensor(occlusion_pred)
maskness_score = torch.clamp(maskness, 0, 1) if isinstance(maskness, torch.Tensor) else torch.tensor(maskness)
# Weighted product
weights = {
"iou": self.config.rcms.iou_weight,
"occ": self.config.rcms.occlusion_weight,
"maskness": self.config.rcms.maskness_weight
}
quality = (
(iou_score ** weights["iou"]) *
(occ_score ** weights["occ"]) *
(maskness_score ** weights["maskness"])
)
return quality.mean() if quality.dim() > 0 else quality
class MaskProcessor(nn.Module):
"""Mask Scaling Strategy (MSS)"""
def __init__(self, config):
super().__init__()
self.config = config
def apply_scaling(
self,
masks: torch.Tensor,
scale_factor: float = 7.5,
offset: float = -4.0
) -> torch.Tensor:
"""
Apply mask scaling adjustment
Helps improve robustness to small objects and occlusions
"""
if masks.dtype == torch.bool:
masks = masks.float()
# Apply scaling to logits (convert to logits first if probability)
scaled_masks = masks * scale_factor + offset
# Apply sigmoid to get back to [0, 1]
scaled_masks = torch.sigmoid(scaled_masks)
return scaled_masks
# 3. Memory Management (model/rcms_memory.py)
import torch
import torch.nn as nn
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import logging
logger = logging.getLogger(__name__)
@dataclass
class MemoryFrame:
"""Single frame in memory bank"""
frame_idx: int
embedding: torch.Tensor
mask: torch.Tensor
quality_score: float
iou_score: float
occlusion_score: float
maskness: float
def to_dict(self) -> Dict:
return {
"frame_idx": self.frame_idx,
"embedding": self.embedding,
"mask": self.mask,
"quality_score": self.quality_score,
"iou_score": self.iou_score,
"occlusion_score": self.occlusion_score,
"maskness": self.maskness
}
class ConditionedMemoryBank(nn.Module):
"""Conditioned Memory Bank with RCMS"""
def __init__(self, max_size: int = 5, quality_threshold: float = 0.6):
super().__init__()
self.max_size = max_size
self.quality_threshold = quality_threshold
self.memories: List[MemoryFrame] = []
def add_memory(self, memory_frame: MemoryFrame) -> None:
"""Add memory if quality is sufficient"""
if memory_frame.quality_score >= self.quality_threshold:
self.memories.append(memory_frame)
# Keep only best memories if exceeding max size
if len(self.memories) > self.max_size:
self.memories = sorted(
self.memories,
key=lambda x: x.quality_score,
reverse=True
)[:self.max_size]
def get_embeddings(self) -> Optional[torch.Tensor]:
"""Get combined embedding from all memories"""
if not self.memories:
return None
embeddings = torch.cat(
[m.embedding for m in self.memories],
dim=0
)
return embeddings / embeddings.norm(dim=-1, keepdim=True)
def clear(self) -> None:
"""Clear all memories"""
self.memories.clear()
def __len__(self) -> int:
return len(self.memories)
class UnconditionedMemoryBank(nn.Module):
"""Unconditioned Memory Bank (recent frames)"""
def __init__(self, max_size: int = 6):
super().__init__()
self.max_size = max_size
self.memories: List[MemoryFrame] = []
def add_memory(self, memory_frame: MemoryFrame) -> None:
"""Add memory (always, maintains temporal window)"""
self.memories.append(memory_frame)
# Keep only recent memories
if len(self.memories) > self.max_size:
self.memories = self.memories[-self.max_size:]
def get_nearest_k(self, k: int) -> List[MemoryFrame]:
"""Get k nearest (most recent) memories"""
return self.memories[-k:] if len(self.memories) > 0 else []
def clear(self) -> None:
"""Clear all memories"""
self.memories.clear()
def __len__(self) -> int:
return len(self.memories)
class RCMSController:
"""Main controller for RCMS algorithm"""
def __init__(
self,
max_conditioned: int = 4,
max_unconditioned: int = 6,
quality_threshold: float = 0.6
):
self.conditioned_bank = ConditionedMemoryBank(max_conditioned, quality_threshold)
self.unconditioned_bank = UnconditionedMemoryBank(max_unconditioned)
self.max_conditioned = max_conditioned
self.quality_threshold = quality_threshold
self.disappeared = False
self.disappearance_start_idx = None
def update(
self,
frame_idx: int,
embedding: torch.Tensor,
mask: torch.Tensor,
iou_score: float,
occlusion_score: float,
maskness: float,
quality_score: float,
is_disappeared: bool
) -> None:
"""Update memory banks based on current frame"""
# Create memory frame
memory_frame = MemoryFrame(
frame_idx=frame_idx,
embedding=embedding,
mask=mask,
quality_score=quality_score,
iou_score=iou_score,
occlusion_score=occlusion_score,
maskness=maskness
)
# Always add to unconditioned bank
self.unconditioned_bank.add_memory(memory_frame)
# Detect disappearance transition
if is_disappeared and not self.disappeared:
self.disappeared = True
self.disappearance_start_idx = frame_idx
logger.info(f"Target disappeared at frame {frame_idx}")
# RCMS: Augment conditioned memory with pre-disappearance frames
self._augment_conditioned_memory(frame_idx)
# Recovery from disappearance
if not is_disappeared and self.disappeared:
self.disappeared = False
logger.info(f"Target reappeared at frame {frame_idx}")
def _augment_conditioned_memory(self, current_frame_idx: int) -> None:
"""
Algorithm 1 from MOSEv2 paper:
Select high-quality pre-disappearance memories for conditioned bank
"""
# Get all unconditioned memories before disappearance
pre_disappearance = [
m for m in self.unconditioned_bank.memories
if m.frame_idx < current_frame_idx
]
if not pre_disappearance:
return
# Sort by quality (descending) then recency (descending)
candidates = sorted(
pre_disappearance,
key=lambda x: (x.quality_score, x.frame_idx),
reverse=True
)
# Add top-quality memories to conditioned bank
for candidate in candidates:
if (candidate.quality_score > self.quality_threshold and
len(self.conditioned_bank) < self.max_conditioned):
self.conditioned_bank.add_memory(candidate)
logger.debug(
f"Added memory from frame {candidate.frame_idx} "
f"(quality: {candidate.quality_score:.3f})"
)
def get_combined_embeddings(self) -> torch.Tensor:
"""Get combined embeddings from both memory banks"""
embeddings = []
# Add conditioned memories
cond_emb = self.conditioned_bank.get_embeddings()
if cond_emb is not None:
embeddings.append(cond_emb)
# Add recent unconditioned memories
uncond_memories = self.unconditioned_bank.get_nearest_k(6)
if uncond_memories:
uncond_emb = torch.cat(
[m.embedding for m in uncond_memories],
dim=0
)
embeddings.append(uncond_emb)
if embeddings:
combined = torch.cat(embeddings, dim=0)
return combined / combined.norm(dim=-1, keepdim=True)
else:
return None
def reset(self) -> None:
"""Reset for new video"""
self.conditioned_bank.clear()
self.unconditioned_bank.clear()
self.disappeared = False
self.disappearance_start_idx = None
def get_stats(self) -> Dict:
"""Get current memory statistics"""
return {
"conditioned_size": len(self.conditioned_bank),
"unconditioned_size": len(self.unconditioned_bank),
"disappeared": self.disappeared,
"avg_conditioned_quality": (
sum(m.quality_score for m in self.conditioned_bank.memories) /
len(self.conditioned_bank.memories)
if len(self.conditioned_bank.memories) > 0 else 0.0
)
}
# 4. Dataset Implementation (data/mosev2_dataset.py)
import torch
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from pathlib import Path
import json
import cv2
import numpy as np
from typing import Dict, Tuple, List
import logging
logger = logging.getLogger(__name__)
class MOSEv2Dataset(Dataset):
"""MOSEv2 Dataset for Video Object Segmentation"""
def __init__(
self,
data_root: str,
split: str = "train",
config: Dict = None,
transforms_fn=None
):
"""
Args:
data_root: Root directory of MOSEv2 dataset
split: 'train', 'val', or 'test'
config: Configuration dictionary
transforms_fn: Transformation function
"""
self.data_root = Path(data_root)
self.split = split
self.config = config or {}
self.transforms = transforms_fn
# Load video list
self.video_list = self._load_video_list()
# Default transforms
if self.transforms is None:
self.transforms = self._get_default_transforms()
logger.info(f"Loaded {len(self.video_list)} videos for {split} split")
def _load_video_list(self) -> List[Dict]:
"""Load list of videos from dataset"""
split_file = self.data_root / "meta" / f"{self.split}.json"
if not split_file.exists():
raise FileNotFoundError(f"Split file not found: {split_file}")
with open(split_file, 'r') as f:
video_list = json.load(f)
return video_list
def _get_default_transforms(self):
"""Default augmentation transforms"""
return transforms.Compose([
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
def __len__(self) -> int:
return len(self.video_list)
def __getitem__(self, idx: int) -> Dict:
"""
Get video sample
Returns:
Dictionary with:
- video_frames: (T, 3, H, W)
- masks: (T, 1, H, W)
- video_id: str
- object_id: str
"""
video_info = self.video_list[idx]
video_id = video_info["video_id"]
object_id = video_info["object_id"]
# Load frames and masks
frames = self._load_frames(video_id)
masks = self._load_masks(video_id, object_id)
# Ensure same length
min_len = min(len(frames), len(masks))
frames = frames[:min_len]
masks = masks[:min_len]
# Convert to tensors
frames_tensor = torch.FloatTensor(frames).permute(0, 3, 1, 2) / 255.0
masks_tensor = torch.FloatTensor(masks).unsqueeze(1)
# Apply transforms
if self.transforms:
frames_tensor = self.transforms(frames_tensor)
return {
"video_frames": frames_tensor, # (T, 3, H, W)
"masks": masks_tensor, # (T, 1, H, W)
"video_id": video_id,
"object_id": object_id,
"first_frame_mask": masks_tensor[0:1] # (1, 1, H, W)
}
def _load_frames(self, video_id: str) -> List[np.ndarray]:
"""Load video frames"""
frame_dir = self.data_root / "JPEGImages" / video_id
if not frame_dir.exists():
raise FileNotFoundError(f"Frame directory not found: {frame_dir}")
frames = []
frame_files = sorted(list(frame_dir.glob("*.jpg")))
for frame_file in frame_files:
frame = cv2.imread(str(frame_file))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(frame)
return frames
def _load_masks(self, video_id: str, object_id: str) -> List[np.ndarray]:
"""Load object masks"""
mask_dir = self.data_root / "Annotations" / video_id / object_id
if not mask_dir.exists():
raise FileNotFoundError(f"Mask directory not found: {mask_dir}")
masks = []
mask_files = sorted(list(mask_dir.glob("*.png")))
for mask_file in mask_files:
mask = cv2.imread(str(mask_file), cv2.IMREAD_GRAYSCALE)
mask = mask.astype(np.float32) / 255.0
masks.append(mask)
return masks
class MOSEv2DataLoader:
"""Utility for creating MOSEv2 data loaders"""
@staticmethod
def create_loaders(
data_root: str,
batch_size: int = 4,
num_workers: int = 4,
config: Dict = None
) -> Dict:
"""Create train/val data loaders"""
train_dataset = MOSEv2Dataset(
data_root,
split="train",
config=config
)
val_dataset = MOSEv2Dataset(
data_root,
split="val",
config=config
)
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True
)
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=1,
shuffle=False,
num_workers=num_workers
)
return {
"train": train_loader,
"val": val_loader,
"train_dataset": train_dataset,
"val_dataset": val_dataset
}
# 5. Evaluation Metrics (evaluation/metrics.py)
import torch
import torch.nn.functional as F
import numpy as np
from typing import Dict, Tuple
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SegmentationMetrics:
"""Container for segmentation metrics"""
j_mean: float # Region similarity
f_mean: float # Contour accuracy
j_and_f: float # Combined J&F
j_dot: float # Adaptive boundary J
j_and_f_dot: float # Adaptive J&F
j_and_f_d: float # Disappearance metric
j_and_f_r: float # Reappearance metric
def __repr__(self) -> str:
return (
f"J: {self.j_mean:.1%} | F: {self.f_mean:.1%} | "
f"J&F: {self.j_and_f:.1%} | "
f"J&F_dot: {self.j_and_f_dot:.1%} | "
f"J&F_d: {self.j_and_f_d:.1%} | J&F_r: {self.j_and_f_r:.1%}"
)
class MOSEv2Metrics:
"""MOSEv2 Evaluation Metrics"""
def __init__(self, boundary_alpha: float = 0.1):
self.boundary_alpha = boundary_alpha
def compute_metrics(
self,
predictions: torch.Tensor, # (T, H, W)
ground_truth: torch.Tensor, # (T, H, W)
compute_adaptive: bool = True,
compute_disappearance: bool = True
) -> SegmentationMetrics:
"""
Compute comprehensive MOSEv2 metrics
Args:
predictions: Predicted masks
ground_truth: Ground truth masks
compute_adaptive: Compute adaptive boundary metrics
compute_disappearance: Compute disappearance/reappearance metrics
"""
# Convert to binary masks
pred_binary = (predictions > 0.5).float()
gt_binary = (ground_truth > 0.5).float()
# Compute J (Jaccard/IoU)
j_scores = self._compute_j(pred_binary, gt_binary)
j_mean = j_scores.mean().item()
# Compute F (Contour accuracy)
f_scores = self._compute_f(pred_binary, gt_binary)
f_mean = f_scores.mean().item()
# Combined J&F
j_and_f = (j_mean + f_mean) / 2.0
# Adaptive boundary J
j_dot_scores = self._compute_adaptive_j(
pred_binary, gt_binary
) if compute_adaptive else j_scores
j_dot = j_dot_scores.mean().item()
j_and_f_dot = (j_dot + f_mean) / 2.0
# Disappearance/Reappearance metrics
j_and_f_d = 0.0
j_and_f_r = 0.0
if compute_disappearance:
j_and_f_d = self._compute_disappearance_metric(pred_binary, gt_binary)
j_and_f_r = self._compute_reappearance_metric(pred_binary, gt_binary)
return SegmentationMetrics(
j_mean=j_mean,
f_mean=f_mean,
j_and_f=j_and_f,
j_dot=j_dot,
j_and_f_dot=j_and_f_dot,
j_and_f_d=j_and_f_d,
j_and_f_r=j_and_f_r
)
def _compute_j(
self,
predictions: torch.Tensor,
ground_truth: torch.Tensor
) -> torch.Tensor:
"""
Compute Jaccard Index (J) / IoU
J = |Pred ∩ GT| / |Pred ∪ GT|
"""
intersection = (predictions * ground_truth).sum(dim=(-2, -1))
union = (
(predictions + ground_truth).clamp(0, 1).sum(dim=(-2, -1))
)
j_scores = intersection / (union + 1e-8)
return j_scores
def _compute_f(
self,
predictions: torch.Tensor,
ground_truth: torch.Tensor
) -> torch.Tensor:
"""
Compute Contour Accuracy (F)
Uses bipartite matching between predicted and ground truth contours
"""
batch_size = predictions.shape[0]
f_scores = []
for i in range(batch_size):
pred_contour = self._get_contour(predictions[i])
gt_contour = self._get_contour(ground_truth[i])
if pred_contour.sum() == 0 and gt_contour.sum() == 0:
f_scores.append(torch.tensor(1.0))
continue
if pred_contour.sum() == 0 or gt_contour.sum() == 0:
f_scores.append(torch.tensor(0.0))
continue
# Simplified contour matching
pred_points = torch.nonzero(pred_contour, as_tuple=False).float()
gt_points = torch.nonzero(gt_contour, as_tuple=False).float()
if len(pred_points) == 0 or len(gt_points) == 0:
f_scores.append(torch.tensor(0.0))
continue
# Compute Hausdorff-like distance
distances = torch.cdist(pred_points, gt_points)
min_distances = distances.min(dim=1)[0]
f_score = 1.0 / (1.0 + min_distances.mean().item())
f_scores.append(torch.tensor(f_score))
return torch.stack(f_scores)
def _get_contour(self, mask: torch.Tensor) -> torch.Tensor:
"""Extract contour from mask"""
# Morphological dilation - erosion
kernel = torch.ones(3, 3, device=mask.device)
mask_expanded = mask.unsqueeze(0).unsqueeze(0).float()
# Simple edge detection via difference
dilated = F.max_pool2d(mask_expanded, 3, padding=1)
eroded = -F.max_pool2d(-mask_expanded, 3, padding=1)
contour = (dilated - eroded).squeeze()
return (contour > 0.5).float()
def _compute_adaptive_j(
self,
predictions: torch.Tensor,
ground_truth: torch.Tensor
) -> torch.Tensor:
"""
Compute adaptive boundary J (Ḟ)
From MOSEv2 paper Eq. (1):
ẇ = min(0.008 × D, α × √A)
"""
batch_size = predictions.shape[0]
h, w = predictions.shape[-2:]
diagonal = torch.tensor(np.sqrt(h**2 + w**2))
adaptive_j_scores = []
for i in range(batch_size):
gt_mask = ground_truth[i]
area = (gt_mask > 0.5).float().sum().item()
# Adaptive threshold
w_fixed = 0.008 * diagonal.item()
w_adaptive = self.boundary_alpha * np.sqrt(area)
w = min(w_fixed, w_adaptive)
# Compute boundary-based J with adaptive threshold
pred_contour = self._get_contour_with_width(predictions[i], w)
gt_contour = self._get_contour_with_width(gt_mask, w)
intersection = (pred_contour * gt_contour).sum()
union = (
(pred_contour + gt_contour).clamp(0, 1).sum()
)
j_adaptive = intersection / (union + 1e-8)
adaptive_j_scores.append(j_adaptive)
return torch.stack(adaptive_j_scores)
def _get_contour_with_width(
self,
mask: torch.Tensor,
width: float
) -> torch.Tensor:
"""Get contour with specified width"""
mask_expanded = mask.unsqueeze(0).unsqueeze(0).float()
# Morphological operations with variable width
n_iterations = max(1, int(width))
contour = mask_expanded.clone()
for _ in range(n_iterations):
dilated = F.max_pool2d(contour, 3, padding=1)
eroded = -F.max_pool2d(-contour, 3, padding=1)
contour = (dilated - eroded)
return (contour > 0.5).squeeze().float()
def _compute_disappearance_metric(
self,
predictions: torch.Tensor,
ground_truth: torch.Tensor
) -> float:
"""
Compute J&F for frames where target is absent
"""
# Find frames where GT is empty (target disappeared)
gt_areas = (ground_truth > 0.5).float().sum(dim=(-2, -1))
disappeared_frames = (gt_areas < 1.0).nonzero(as_tuple=True)[0]
if len(disappeared_frames) == 0:
return 0.0
# Compute J&F only on disappeared frames
pred_sub = predictions[disappeared_frames]
gt_sub = ground_truth[disappeared_frames]
j_scores = self._compute_j(pred_sub, gt_sub)
f_scores = self._compute_f(pred_sub, gt_sub)
j_and_f_d = ((j_scores + f_scores) / 2.0).mean().item()
return j_and_f_d
def _compute_reappearance_metric(
self,
predictions: torch.Tensor,
ground_truth: torch.Tensor
) -> float:
"""
Compute J&F for frames where target reappears after disappearance
"""
# Find disappearance and reappearance clips
gt_areas = (ground_truth > 0.5).float().sum(dim=(-2, -1))
disappeared = (gt_areas < 1.0)
# Find reappearance indices (transition from False to True)
reappear_indices = []
in_disappearance = False
for i in range(len(disappeared)):
if disappeared[i] and not in_disappearance:
in_disappearance = True
elif not disappeared[i] and in_disappearance:
in_disappearance = False
reappear_indices.append(i)
if len(reappear_indices) == 0:
return 0.0
# Compute J&F on reappearance frames
reappear_indices = torch.tensor(reappear_indices)
pred_sub = predictions[reappear_indices]
gt_sub = ground_truth[reappear_indices]
j_scores = self._compute_j(pred_sub, gt_sub)
f_scores = self._compute_f(pred_sub, gt_sub)
j_and_f_r = ((j_scores + f_scores) / 2.0).mean().item()
return j_and_f_r
# 6. Training Loop (training/trainer.py)
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR
from typing import Dict, Tuple
import logging
from pathlib import Path
import json
from tqdm import tqdm
logger = logging.getLogger(__name__)
class MOSEv2Trainer:
"""Trainer for MOSEv2 enhanced SAM2"""
def __init__(self, model, config, device="cuda"):
self.model = model.to(device)
self.config = config
self.device = device
# Optimizer
self.optimizer = Adam(
filter(lambda p: p.requires_grad, self.model.parameters()),
lr=config.training.learning_rate,
weight_decay=config.training.weight_decay
)
# Scheduler
self.scheduler = CosineAnnealingLR(
self.optimizer,
T_max=config.training.num_epochs
)
# Loss
self.criterion = nn.BCEWithLogitsLoss()
# Paths
self.checkpoint_dir = Path(config.checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
self.best_metrics = None
self.training_history = []
def train_epoch(self, train_loader) -> Dict:
"""Train for one epoch"""
self.model.train()
epoch_losses = []
pbar = tqdm(train_loader, desc="Training")
for batch in pbar:
video_frames = batch["video_frames"].to(self.device)
masks = batch["masks"].to(self.device)
first_frame_mask = batch["first_frame_mask"].to(self.device)
# Forward pass
outputs = self.model(video_frames, first_frame_mask)
# Compute loss
pred_masks = outputs["masks"]
target_masks = masks.squeeze(1)
loss = self.criterion(pred_masks, target_masks)
# Backward pass
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(
self.model.parameters(),
max_norm=1.0
)
self.optimizer.step()
epoch_losses.append(loss.item())
pbar.set_postfix({"loss": f"{loss.item():.4f}"})
return {
"loss": sum(epoch_losses) / len(epoch_losses)
}
@torch.no_grad()
def validate(self, val_loader, metrics_fn) -> Dict:
"""Validate on validation set"""
self.model.eval()
all_metrics = []
pbar = tqdm(val_loader, desc="Validation")
for batch in pbar:
video_frames = batch["video_frames"].to(self.device)
masks = batch["masks"].to(self.device)
first_frame_mask = batch["first_frame_mask"].to(self.device)
# Forward pass
outputs = self.model(video_frames, first_frame_mask)
pred_masks = outputs["masks"]
target_masks = masks.squeeze(1)
# Compute metrics
metrics = metrics_fn.compute_metrics(
pred_masks,
target_masks,
compute_adaptive=True,
compute_disappearance=True
)
all_metrics.append({
"j_and_f": metrics.j_and_f,
"j_and_f_dot": metrics.j_and_f_dot,
"j_and_f_d": metrics.j_and_f_d,
"j_and_f_r": metrics.j_and_f_r
})
pbar.set_postfix({"J&F": f"{metrics.j_and_f:.1%}"})
# Average metrics
avg_metrics = {
key: sum(m[key] for m in all_metrics) / len(all_metrics)
for key in all_metrics[0].keys()
}
return avg_metrics
def train(self, train_loader, val_loader, metrics_fn):
"""Full training loop"""
num_epochs = self.config.training.num_epochs
for epoch in range(num_epochs):
logger.info(f"\nEpoch {epoch+1}/{num_epochs}")
# Train
train_metrics = self.train_epoch(train_loader)
# Validate
val_metrics = self.validate(val_loader, metrics_fn)
# Update scheduler
self.scheduler.step()
# Log metrics
self.training_history.append({
"epoch": epoch,
"train_loss": train_metrics["loss"],
"val_metrics": val_metrics
})
logger.info(
f"Train Loss: {train_metrics['loss']:.4f} | "
f"Val J&F: {val_metrics['j_and_f']:.1%} | "
f"Val J&F_dot: {val_metrics['j_and_f_dot']:.1%}"
)
# Save checkpoint
if (self.best_metrics is None or
val_metrics['j_and_f_dot'] > self.best_metrics['j_and_f_dot']):
self.best_metrics = val_metrics
self._save_checkpoint(epoch, is_best=True)
logger.info(f"Saved best model at epoch {epoch}")
# Regular checkpoint
if (epoch + 1) % 5 == 0:
self._save_checkpoint(epoch)
logger.info("Training completed!")
return self.training_history
def _save_checkpoint(self, epoch: int, is_best: bool = False):
"""Save model checkpoint"""
checkpoint = {
"epoch": epoch,
"model_state": self.model.state_dict(),
"optimizer_state": self.optimizer.state_dict(),
"scheduler_state": self.scheduler.state_dict(),
"best_metrics": self.best_metrics,
"config": self.config.__dict__ if hasattr(self.config, '__dict__') else self.config
}
filename = "best.pth" if is_best else f"checkpoint_epoch_{epoch}.pth"
path = self.checkpoint_dir / filename
torch.save(checkpoint, path)
logger.debug(f"Saved checkpoint: {path}")
def load_checkpoint(self, path: str):
"""Load model checkpoint"""
checkpoint = torch.load(path, map_location=self.device)
self.model.load_state_dict(checkpoint["model_state"])
self.optimizer.load_state_dict(checkpoint["optimizer_state"])
self.scheduler.load_state_dict(checkpoint["scheduler_state"])
self.best_metrics = checkpoint.get("best_metrics")
logger.info(f"Loaded checkpoint: {path}")
# 7. Inference Pipeline (inference/inference_pipeline.py)
import torch
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class MOSEv2InferencePipeline:
"""End-to-end inference pipeline for MOSEv2"""
def __init__(self, model, config, device="cuda"):
self.model = model.to(device)
self.config = config
self.device = device
self.model.eval()
def segment_video(
self,
video_path: str,
first_frame_annotation: np.ndarray,
output_dir: Optional[str] = None
) -> Dict:
"""
Segment video with first frame annotation
Args:
video_path: Path to input video
first_frame_annotation: Binary mask for first frame
output_dir: Directory to save results
Returns:
Dictionary with segmentation results
"""
# Load video
frames = self._load_video(video_path)
logger.info(f"Loaded video with {len(frames)} frames")
# Prepare first frame mask
first_mask = torch.FloatTensor(first_frame_annotation).unsqueeze(0).unsqueeze(0)
# Convert frames to tensor
frames_tensor = torch.FloatTensor(frames).permute(0, 3, 1, 2) / 255.0
# Normalize
mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1)
frames_tensor = (frames_tensor - mean) / std
# Move to device
frames_tensor = frames_tensor.to(self.device)
first_mask = first_mask.to(self.device)
# Run segmentation
with torch.no_grad():
output = self.model(frames_tensor, first_mask)
# Post-process
segmentation_masks = output["masks"].cpu().numpy()
segmentation_masks = (segmentation_masks > 0.5).astype(np.uint8)
# Save results
results = {
"masks": segmentation_masks,
"quality_scores": output["quality_scores"],
"iou_scores": output["iou_scores"].cpu().numpy(),
"frames": frames
}
if output_dir:
self._save_results(results, output_dir, video_path)
return results
def _load_video(self, video_path: str) -> List[np.ndarray]:
"""Load video frames"""
cap = cv2.VideoCapture(video_path)
frames = []
while True:
ret, frame = cap.read()
if not ret:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frames.append(frame)
cap.release()
return frames
def _save_results(
self,
results: Dict,
output_dir: str,
video_name: str
):
"""Save segmentation results"""
output_path = Path(output_dir) / Path(video_name).stem
output_path.mkdir(parents=True, exist_ok=True)
masks = results["masks"]
frames = results["frames"]
# Save masks
mask_dir = output_path / "masks"
mask_dir.mkdir(exist_ok=True)
for i, mask in enumerate(masks):
mask_path = mask_dir / f"frame_{i:06d}.png"
cv2.imwrite(str(mask_path), mask * 255)
# Save overlay
overlay_dir = output_path / "overlays"
overlay_dir.mkdir(exist_ok=True)
for i, (frame, mask) in enumerate(zip(frames, masks)):
overlay = self._create_overlay(frame, mask)
overlay_path = overlay_dir / f"frame_{i:06d}.png"
overlay_path_bgr = cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR)
cv2.imwrite(str(overlay_path), overlay_path_bgr)
logger.info(f"Saved results to {output_path}")
def _create_overlay(
self,
frame: np.ndarray,
mask: np.ndarray,
alpha: float = 0.5
) -> np.ndarray:
"""Create visualization overlay"""
overlay = frame.copy().astype(np.float32)
mask_color = np.array([255, 0, 0]) # Red
mask_pixels = mask > 0
overlay[mask_pixels] = (
alpha * mask_color +
(1 - alpha) * overlay[mask_pixels]
)
return overlay.astype(np.uint8)
# 8. Main Script (main.py)
import torch
import yaml
import argparse
import logging
from pathlib import Path
from omegaconf import OmegaConf
from model.sam2_enhanced import SAM2Enhanced
from data.mosev2_dataset import MOSEv2DataLoader
from evaluation.metrics import MOSEv2Metrics
from training.trainer import MOSEv2Trainer
from inference.inference_pipeline import MOSEv2InferencePipeline
def setup_logging(config):
"""Setup logging configuration"""
log_dir = Path(config.log_dir)
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / 'training.log'),
logging.StreamHandler()
]
)
def train(config):
"""Training pipeline"""
logger = logging.getLogger(__name__)
logger.info("Starting training...")
# Setup device
device = torch.device(config.device)
logger.info(f"Using device: {device}")
# Load config
if isinstance(config, str):
with open(config, 'r') as f:
config = yaml.safe_load(f)
config = OmegaConf.create(config)
# Create model
model = SAM2Enhanced(config.model)
logger.info(f"Created SAM2Enhanced model")
# Create dataloaders
loaders = MOSEv2DataLoader.create_loaders(
config.dataset.data_root,
batch_size=config.training.batch_size,
num_workers=config.training.num_workers,
config=config
)
# Create metrics
metrics = MOSEv2Metrics(boundary_alpha=config.evaluation.boundary_alpha)
# Create trainer
trainer = MOSEv2Trainer(model, config, device)
# Train
history = trainer.train(
loaders["train"],
loaders["val"],
metrics
)
logger.info("Training completed!")
return trainer, history
def inference(config, video_path: str, mask_path: str, output_dir: str):
"""Inference pipeline"""
logger = logging.getLogger(__name__)
logger.info(f"Starting inference on {video_path}...")
device = torch.device(config.device)
# Load config
if isinstance(config, str):
with open(config, 'r') as f:
config = yaml.safe_load(f)
config = OmegaConf.create(config)
# Create model
model = SAM2Enhanced(config.model)
# Load checkpoint
checkpoint_path = Path(config.checkpoint_dir) / "best.pth"
checkpoint = torch.load(checkpoint_path, map_location=device)
model.load_state_dict(checkpoint["model_state"])
logger.info(f"Loaded checkpoint: {checkpoint_path}")
# Create pipeline
pipeline = MOSEv2InferencePipeline(model, config, device)
# Load first frame annotation
import cv2
mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
mask = mask.astype(np.float32) / 255.0
# Run inference
results = pipeline.segment_video(video_path, mask, output_dir)
logger.info(f"Inference completed! Results saved to {output_dir}")
return results
def main():
parser = argparse.ArgumentParser(description="MOSEv2 SAM2 Enhanced")
parser.add_argument("--config", type=str, default="config.yaml")
parser.add_argument("--mode", type=str, choices=["train", "inference"], default="train")
parser.add_argument("--video", type=str, help="Video path for inference")
parser.add_argument("--mask", type=str, help="First frame mask path for inference")
parser.add_argument("--output", type=str, default="./outputs")
args = parser.parse_args()
# Setup logging
setup_logging(OmegaConf.load(args.config) if args.config.endswith('.yaml') else {})
if args.mode == "train":
trainer, history = train(args.config)
else:
results = inference(args.config, args.video, args.mask, args.output)
if __name__ == "__main__":
import numpy as np
main()
# Usage
# Training
python main.py --config config.yaml --mode train
# Inference
python main.py --config config.yaml --mode inference \
--video /path/to/video.mp4 \
--mask /path/to/first_frame_mask.png \
--output ./results
Related posts, You May like to read
- 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
- 7 Revolutionary Breakthroughs in Medical Image Translation (And 1 Fatal Flaw That Could Derail Your AI Model)
- MedDINOv3: Revolutionizing Medical Image Segmentation with Adaptable Vision Foundation Models
- HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
- GeoSAM2 3D Part Segmentation — Prompt-Controllable, Geometry-Aware Masks for Precision 3D Editing
- SegTrans: The Breakthrough Framework That Makes AI Segmentation Models Vulnerable to Transfer Attacks
- Universal Text-Driven Medical Image Segmentation: How MedCLIP-SAMv2 Revolutionizes Diagnostic AI
- Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
- Cellpose3: The Revolutionary One-Click Solution for Restoring Noisy, Blurry, and Undersampled Microscopy Images

