MOSEv2: The Game-Changing Video Object Segmentation Dataset for Real-World AI Applications

MOSEv2: The Game-Changing Video Object Segmentation Dataset for Real-World AI Applications

Introduction

In the rapidly evolving landscape of computer vision and artificial intelligence, one persistent challenge has plagued researchers and practitioners: how do we create machine learning models that can reliably identify and track objects in real-world video scenarios? Traditional video object segmentation (VOS) benchmarks like DAVIS and YouTube-VOS have produced impressive results, with state-of-the-art methods achieving 90+ percent accuracy. Yet when these same models are deployed in practical applications—autonomous vehicles navigating crowded streets, augmented reality systems processing livestreams, or video editing software tracking moving subjects—performance dramatically degrades.

This critical gap between laboratory benchmarks and real-world deployment has finally prompted a paradigm shift in how the computer vision community evaluates segmentation algorithms. The introduction of MOSEv2, a significantly more challenging video object segmentation dataset, represents a watershed moment for the field. With 5,024 high-resolution videos containing 701,976 carefully annotated masks across 200 object categories, MOSEv2 doesn’t just extend its predecessor MOSEv1—it fundamentally reimagines what it means to test video understanding in authentically complex environments.

What Is MOSEv2 and Why It Represents a Breakthrough

MOSEv2 (coMplex video Object SEgmentation version 2) is a comprehensive dataset specifically engineered to evaluate video object segmentation methods under conditions that mirror real-world complexity. Unlike earlier datasets that primarily featured salient, isolated objects with clear backgrounds, MOSEv2 embraces the messy reality of uncontrolled environments where objects disappear, reappear, blend into crowded scenes, and exist under challenging lighting or weather conditions.

Key Dataset Statistics

$$ \small \renewcommand{\arraystretch}{1.2} \begin{array}{l|c|c|c|c} \hline \textbf{Metric} & \textbf{MOSEv2} & \textbf{MOSEv1} & \textbf{DAVIS 2017} & \textbf{YouTube-VOS} \\ \hline \text{Total Videos} & 5{,}024 & 2{,}149 & 901 & 4{,}453 \\ \text{Annotated Masks} & 701{,}976 & 431{,}725 & 3{,}543 & 197{,}272 \\ \text{Object Categories} & 200 & 36 & \text{–} & 94 \\ \text{Disappearance Rate (%)} & 61.8 & 41.5 & 16.1 & 13.0 \\ \text{Reappearance Rate (%)} & 50.3 & 23.9 & 10.7 & 8.0 \\ \text{Average Distractors} & 13.6 & 6.5 & 3.7 & 3.0 \\ \hline \end{array} $$

The numbers tell a compelling story: MOSEv2’s disappearance rate of 61.8 percent is substantially higher than any existing benchmark, reflecting the frequent occlusions and out-of-frame scenarios that characterize authentic video content. Similarly, the reappearance rate of 50.3 percent—more than double that of MOSEv1—means models must not only handle object loss but successfully re-identify targets amid visual ambiguity.

The Real-World Challenges MOSEv2 Addresses

Object Disappearance and Reappearance

In real-world video scenarios, tracked objects frequently vanish behind obstacles, move out of frame, or become obscured by other elements. MOSEv2 emphasizes this challenge through intentionally designed sequences where objects undergo repeated cycles of disappearance and reappearance. This tests a critical capability: can models maintain object identity across temporal gaps using robust memory mechanisms and temporal reasoning?

Traditional datasets largely avoided these scenarios, creating an unrealistic evaluation environment where models could achieve high accuracy through simple frame-to-frame propagation without sophisticated re-identification mechanisms.

Severe Occlusions and Crowded Scenes

MOSEv2 contains videos with exceptionally dense crowds and complex occlusion patterns. The dataset’s average “distractors” metric—measuring visually similar objects per target—reaches 13.6, more than twice that of MOSEv1 and exceeding even specialized tracking datasets. Figure 1 illustrates a crowded urban scene where a small person moves through a dense gathering, frequently obscured by surrounding individuals.

A bustling city square filled with hundreds of pedestrians wearing colored clothing. A small person marked in orange can be seen navigating through the crowd, partially hidden by surrounding individuals. The complex inter-person occlusions and small target size exemplify MOSEv2’s core challenges.

Environmental Complexity and Adverse Conditions

One of MOSEv2’s most distinctive features is its systematic inclusion of videos shot under diverse environmental conditions:

  • Adverse Weather: 159 rainy videos, 142 heavy rain, 73 snowy, 60 foggy scenarios, and 50 disaster footage (earthquakes, floods)
  • Low-Light Environments: 280 underwater scenes and 255 nighttime videos
  • Multi-Shot Sequences: Videos with dramatic scene transitions requiring models to handle appearance discontinuities
  • Camouflaged Objects: Targets that naturally blend into backgrounds
  • Non-Physical Targets: Shadows and reflections with ambiguous boundaries

These conditions create compounding challenges. An object tracked underwater at night doesn’t just suffer from low visibility—it also exhibits unpredictable appearance changes due to light refraction and shadow dynamics that violate the assumptions built into many segmentation models.

Small Objects and Fine-Grained Tracking

MOSEv2 contains a substantially higher proportion of small masks (normalized by video resolution). Approximately 50.2 percent of annotated masks occupy less than 0.01 of the image area, compared to only 25.3 percent in DAVIS. This prevalence of small targets poses severe challenges for methods operating at standard video resolutions, where fine details become nearly imperceptible.

Evaluation Methodology: Moving Beyond Traditional Metrics

MOSEv2 introduces refined evaluation metrics that provide deeper insights into model performance across diverse scenarios.

The Adaptive Boundary Threshold (F̊)

Traditional evaluation uses a fixed boundary threshold for contour accuracy, which biases assessment toward larger objects. MOSEv2 proposes an adaptive threshold:

\[ \dot{w} = \min(0.008 \times D, \alpha \times \sqrt{A}) \]

where D is the image diagonal, A is the object’s area in pixels, and α=0.1(determined from boundary statistics across DAVIS and MOSE datasets). This formulation ensures fair evaluation across object scales, addressing the limitation where a chopstick occupying only 0.039 percent of the image area would receive inflated scores under traditional metrics.

Specialized Disappearance and Reappearance Metrics

Beyond overall accuracy, MOSEv2 introduces dedicated evaluation scores:

  • J&F̊_d: Measures performance exclusively on frames where targets are absent (disappearance clips)
  • J&F̊_r: Evaluates performance on frames where targets return after disappearance (reappearance clips)

These metrics expose critical weaknesses in existing methods. Models that aggressively predict masks perform poorly on disappearance evaluation but achieve high reappearance scores, while conservative models show the opposite pattern. Only truly robust approaches achieve high performance on both metrics.

Benchmark Results: The Performance Gap Reality

State-of-the-Art Degradation

The benchmark results starkly illustrate the gap between controlled evaluation environments and real-world performance:

  • SAM2: Drops from 90.7 percent J&F on DAVIS 2017 to 50.9 percent on MOSEv2—a 39.8 percentage point decline
  • Cutie-B: Falls from 87.9 percent on DAVIS 2017 to 43.9 percent on MOSEv2
  • SAM2Long-B+: Achieves the best performance at 48.6 percent J&F̊, yet remains substantially below performance on existing benchmarks

This consistent degradation across all 20 benchmarked VOS methods conclusively demonstrates that MOSEv2 represents a genuinely more difficult evaluation landscape, not merely a larger dataset with similar characteristics.

The Reappearance Problem

Perhaps most revealing is the reappearance metric analysis. Even the best-performing methods achieve J&F̊_r scores in the range of 27–36 percent—a stark contrast to their J&F̊_d performance (60–72 percent). This 40+ percentage point gap highlights a fundamental weakness in current architectures: they excel at tracking continuously visible objects but struggle to re-identify targets after disappearance.

Practical Improvements: The RCMS Solution

Based on extensive analysis of benchmark results, the MOSEv2 paper proposes Reliable Conditioned Memory Selection (RCMS), a practical enhancement to foundation models like SAM2 that substantially improves performance without sacrificing inference speed.

The Core Innovation

RCMS recognizes that SAM2 maintains two memory types: conditioned (typically from initialization) and unconditioned (from recent frames). The default approach of relying solely on initial-frame memory limits diversity and fails when initialization provides only partial visibility. RCMS strategically augments conditioned memory with high-quality frames from pre-disappearance sequences, leveraging SAM2’s demonstrated strength in tracking continuously visible objects.

Key improvements in SAM2 using RCMS and complementary tricks:

$$ \small \renewcommand{\arraystretch}{1.25} \begin{array}{l|c|l} \hline \textbf{Enhancement} & \textbf{J&F Gain} & \textbf{Method / Description} \\ \hline \text{RCMS} & +3.3\% & \text{Reliable Conditioned Memory Selection} \\ \text{MQF} & +0.9\% & Q = s_{\text{iou}} \times s_{\text{occ}} \times m_{\text{ness}} \\ \text{MSS} & +0.4\% & \text{Mask Scaling Strategy (factor: 7.5,\ offset: -4.0)} \\ \text{LVT} & +0.9\% & \text{Long-Video Finetuning (16-frame context)} \\ \text{Combined Effect} & +5.5\% & \text{All improvements integrated} \\ \hline \end{array} $$

These improvements raise SAM2-B+ from 46.0 percent to 51.5 percent J&F̊—a substantial gain while maintaining 22.6 frames-per-second inference speed.

Real-World Applications and Future Impact

MOSEv2’s comprehensive evaluation framework has immediate implications for practical deployment:

Autonomous Vehicle Development: Navigation systems require robust tracking of pedestrians, cyclists, and other vehicles under diverse weather conditions and occlusion scenarios—precisely MOSEv2’s focus areas.

Augmented Reality: AR applications demand reliable segmentation as objects move through complex indoor/outdoor environments with varying lighting and occlusion patterns.

Video Editing and Content Creation: Professional tools must handle long-duration content with multi-shot sequences, complex backgrounds, and small moving subjects.

The dataset’s emphasis on knowledge-dependent scenarios (requiring optical character recognition or spatial reasoning) also points toward future directions integrating large language models with vision systems.

Key Takeaways for Computer Vision Practitioners

✓ Benchmark Saturation Is Misleading: High performance on existing datasets masks significant limitations in real-world scenarios.

✓ Disappearance-Reappearance Is Fundamental: This challenge deserves primary focus in architecture design and memory mechanisms.

✓ Multi-Dimensional Complexity Matters: Effective models must handle occlusions, adverse weather, small objects, and crowded scenes simultaneously, not in isolation.

✓ Adaptive Evaluation Is Essential: Fixed-threshold metrics systematically bias assessment toward larger objects.

Conclusion and Call to Action

MOSEv2 represents a pivotal moment in computer vision research—the moment when the field collectively acknowledged that laboratory benchmarks no longer sufficiently reflect real-world challenges. By comprehensively addressing this gap, MOSEv2 provides both a rigorous evaluation standard and a concrete roadmap for advancing the state-of-the-art in video understanding.

The challenge is clear, and the opportunity is immense. If you’re developing video understanding systems, conducting computer vision research, or deploying segmentation algorithms in production environments, MOSEv2 should become your benchmark of choice. Explore the complete dataset at https://MOSE.video, study the comprehensive benchmark results, and implement the practical improvements outlined in this research.

The gap between laboratory performance and real-world effectiveness won’t close without datasets that authentically represent real-world complexity. MOSEv2 is that dataset. The question now is: will your models be ready for this higher standard?

Share your experiences benchmarking against MOSEv2 in the comments below—what challenges have you encountered, and what innovations are you exploring?


This article synthesizes insights from the MOSEv2 research paper, providing practitioners with actionable understanding of why this dataset matters for advancing robust video understanding systems. For complete technical details, citations, and supplementary materials, readers are encouraged to review the full research publication.

Based on the MOSEv2 paper’s proposed improvements (RCMS, MQF, MSS, LVT), here’s a comprehensive implementation for video object segmentation with enhanced SAM2.

mosev2_sam2_enhanced/
├── config.yaml
├── model/
│   ├── __init__.py
│   ├── sam2_enhanced.py
│   ├── rcms_memory.py
│   ├── quality_filtering.py
│   └── mask_processing.py
├── data/
│   ├── __init__.py
│   ├── mosev2_dataset.py
│   ├── video_loader.py
│   └── transforms.py
├── training/
│   ├── __init__.py
│   ├── trainer.py
│   ├── losses.py
│   └── optimizer.py
├── evaluation/
│   ├── __init__.py
│   ├── metrics.py
│   └── evaluator.py
├── inference/
│   ├── __init__.py
│   └── inference_pipeline.py
└── main.py
# Model Configuration
model:
  backbone: "sam2_large"  # sam2_large or sam2_base
  image_size: 1024
  hidden_dim: 256

# RCMS Configuration
rcms:
  max_conditioned_memories: 4  # N parameter
  quality_threshold: 0.6  # theta parameter
  use_memory_quality_filtering: true
  iou_weight: 0.5
  occlusion_weight: 0.5
  maskness_weight: 0.0

# Mask Scaling Strategy
mss:
  enabled: true
  scale_factor: 7.5
  offset: -4.0

# Long-Video Finetuning
lvt:
  enabled: true
  finetuning_frames: 16
  freeze_image_encoder: true

# Training Configuration
training:
  batch_size: 4
  num_epochs: 50
  learning_rate: 0.0001
  weight_decay: 0.01
  warmup_epochs: 5
  num_workers: 4

# Dataset Configuration
dataset:
  data_root: "/path/to/MOSEv2"
  split_dir: "meta"
  min_video_length: 5
  max_video_length: 500
  num_categories: 200

# Evaluation Configuration
evaluation:
  compute_adaptive_boundary: true
  compute_disappearance_reappearance: true
  boundary_alpha: 0.1
  
# Device and Paths
device: "cuda"
checkpoint_dir: "./checkpoints"
output_dir: "./outputs"
log_dir: "./logs"
# 2. Core Model Implementation (model/sam2_enhanced.py)
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Optional, Tuple
import numpy as np
from dataclasses import dataclass


@dataclass
class SegmentationOutput:
    """Output structure for segmentation results"""
    masks: torch.Tensor  # (B, H, W)
    confidences: torch.Tensor  # (B,)
    iou_scores: torch.Tensor  # (B,)
    occlusion_scores: torch.Tensor  # (B,)
    memory_state: Dict
    quality_score: float


class SAM2Enhanced(nn.Module):
    """
    Enhanced SAM2 with RCMS, MQF, MSS, and LVT improvements
    """
    
    def __init__(self, config):
        super().__init__()
        self.config = config
        
        # Load base SAM2 model
        self.sam2_model = self._load_sam2_backbone()
        
        # Enhanced components
        self.rcms = RCMSMemoryManager(config)
        self.quality_filter = MemoryQualityFilter(config)
        self.mask_processor = MaskProcessor(config)
        
        # Freezing settings for LVT
        if config.lvt.freeze_image_encoder:
            self._freeze_image_encoder()
    
    def _load_sam2_backbone(self):
        """Load SAM2 from official implementation"""
        try:
            from sam2.build_sam import build_sam2
            model_type = self.config.model.backbone
            checkpoint = f"./checkpoints/sam2_{model_type.split('_')[1]}.pt"
            sam2 = build_sam2(model_type, checkpoint)
            return sam2
        except ImportError:
            raise ImportError(
                "SAM2 not installed. Install with: "
                "pip install git+https://github.com/facebookresearch/segment-anything-2.git"
            )
    
    def _freeze_image_encoder(self):
        """Freeze image encoder for long-video finetuning"""
        for name, param in self.sam2_model.image_encoder.named_parameters():
            param.requires_grad = False
    
    def initialize_video(self, frame: torch.Tensor, mask: torch.Tensor) -> Dict:
        """
        Initialize video processing with first frame and mask
        
        Args:
            frame: (1, 3, H, W) RGB frame
            mask: (1, 1, H, W) binary mask
            
        Returns:
            Initial memory state dictionary
        """
        with torch.no_grad():
            # Get initial SAM2 state
            self.sam2_model.reset_state()
            image_embedding = self.sam2_model.encode_image(frame)
            
            # Create initial memory
            memory_state = {
                "frame_embedding": image_embedding,
                "object_mask": mask,
                "conditioned_memories": [
                    {
                        "embedding": image_embedding,
                        "mask": mask,
                        "quality_score": 1.0,
                        "frame_idx": 0
                    }
                ],
                "unconditioned_memories": [],
                "disappeared": False,
                "disappearance_start_idx": None
            }
        
        return memory_state
    
    def process_frame(
        self,
        frame: torch.Tensor,
        memory_state: Dict,
        frame_idx: int,
        previous_mask: Optional[torch.Tensor] = None
    ) -> Tuple[SegmentationOutput, Dict]:
        """
        Process a single video frame
        
        Args:
            frame: (1, 3, H, W) RGB frame
            memory_state: Current memory state
            frame_idx: Index of current frame
            previous_mask: Previous frame's mask for continuity
            
        Returns:
            Tuple of (SegmentationOutput, updated_memory_state)
        """
        # Encode current frame
        with torch.no_grad():
            image_embedding = self.sam2_model.encode_image(frame)
        
        # Select appropriate memories (conditioned + recent unconditioned)
        conditioned_mems = memory_state["conditioned_memories"]
        unconditioned_mems = self._select_nearest_memories(
            memory_state["unconditioned_memories"], 
            k=6
        )
        
        # Prepare prompt embeddings
        prompt_embeddings = self._prepare_prompt_embeddings(
            conditioned_mems, unconditioned_mems, image_embedding
        )
        
        # Get segmentation prediction
        with torch.no_grad():
            mask_prediction = self.sam2_model.segment_image(
                image_embedding,
                prompt_embeddings
            )
        
        # Compute quality metrics
        iou_score = mask_prediction.get("iou_pred", torch.tensor(0.5))
        occlusion_score = mask_prediction.get("occlusion_pred", torch.tensor(0.0))
        maskness = self._compute_maskness(mask_prediction["masks"])
        
        # Apply Mask Scaling Strategy (MSS)
        if self.config.mss.enabled:
            mask_prediction["masks"] = self.mask_processor.apply_scaling(
                mask_prediction["masks"],
                scale_factor=self.config.mss.scale_factor,
                offset=self.config.mss.offset
            )
        
        # Compute quality score
        quality_score = self.quality_filter.compute_quality_score(
            iou_score, occlusion_score, maskness
        )
        
        # Check for disappearance
        is_disappeared = self._check_disappearance(
            mask_prediction["masks"], 
            previous_mask if previous_mask is not None else memory_state["object_mask"]
        )
        
        # Update memory with RCMS
        updated_memory = self.rcms.update_memory(
            memory_state,
            frame_idx,
            image_embedding,
            mask_prediction["masks"],
            quality_score,
            is_disappeared
        )
        
        # Create output
        output = SegmentationOutput(
            masks=mask_prediction["masks"],
            confidences=torch.tensor([quality_score]),
            iou_scores=iou_score.unsqueeze(0) if iou_score.dim() == 0 else iou_score,
            occlusion_scores=occlusion_score.unsqueeze(0) if occlusion_score.dim() == 0 else occlusion_score,
            memory_state=updated_memory,
            quality_score=quality_score.item() if isinstance(quality_score, torch.Tensor) else quality_score
        )
        
        return output, updated_memory
    
    def _select_nearest_memories(
        self, 
        memories: List[Dict], 
        k: int = 6
    ) -> List[Dict]:
        """Select k nearest memories by temporal proximity"""
        if len(memories) <= k:
            return memories
        
        # Sort by frame index (assuming memories have "frame_idx")
        sorted_mems = sorted(
            memories, 
            key=lambda x: x.get("frame_idx", 0),
            reverse=True
        )
        return sorted_mems[:k]
    
    def _prepare_prompt_embeddings(
        self,
        conditioned_mems: List[Dict],
        unconditioned_mems: List[Dict],
        current_embedding: torch.Tensor
    ) -> torch.Tensor:
        """Prepare combined prompt embeddings from memory"""
        embeddings = []
        
        # Add conditioned memory embeddings
        for mem in conditioned_mems:
            embeddings.append(mem["embedding"])
        
        # Add unconditioned memory embeddings
        for mem in unconditioned_mems:
            embeddings.append(mem["embedding"])
        
        # Concatenate and normalize
        if embeddings:
            combined = torch.cat(embeddings, dim=0)
            combined = F.normalize(combined, p=2, dim=-1)
            return combined
        else:
            return current_embedding
    
    def _compute_maskness(self, masks: torch.Tensor) -> torch.Tensor:
        """
        Compute maskness metric (proportion of non-zero predictions)
        Higher value indicates stronger mask predictions
        """
        if masks.shape[-2:] == (0, 0):
            return torch.tensor(0.0)
        
        mask_area = masks.sum(dim=(-2, -1))
        total_area = masks.numel() / masks.shape[0]
        maskness = (mask_area / total_area).mean()
        
        return torch.clamp(maskness, 0, 1)
    
    def _check_disappearance(
        self, 
        current_mask: torch.Tensor,
        previous_mask: torch.Tensor,
        threshold: float = 0.05
    ) -> bool:
        """Check if target has disappeared"""
        if current_mask is None or previous_mask is None:
            return False
        
        current_area = (current_mask > 0.5).float().sum()
        
        # Target disappeared if current mask area is near zero
        return current_area < (current_mask.numel() * threshold)
    
    def forward(
        self,
        video_frames: torch.Tensor,
        first_frame_mask: torch.Tensor,
        return_all_frames: bool = True
    ) -> Dict:
        """
        Full forward pass through video
        
        Args:
            video_frames: (T, 3, H, W) video tensor
            first_frame_mask: (1, 1, H, W) first frame annotation
            return_all_frames: Whether to return masks for all frames
            
        Returns:
            Dictionary with segmentation results for all frames
        """
        device = next(self.parameters()).device
        video_frames = video_frames.to(device)
        first_frame_mask = first_frame_mask.to(device)
        
        # Initialize
        memory_state = self.initialize_video(
            video_frames[0:1],
            first_frame_mask
        )
        
        all_outputs = []
        previous_mask = first_frame_mask
        
        # Process each frame
        for frame_idx in range(len(video_frames)):
            frame = video_frames[frame_idx:frame_idx+1]
            
            output, memory_state = self.process_frame(
                frame,
                memory_state,
                frame_idx,
                previous_mask
            )
            
            all_outputs.append({
                "masks": output.masks,
                "confidence": output.confidences,
                "iou_score": output.iou_scores,
                "quality_score": output.quality_score
            })
            
            previous_mask = output.masks
        
        # Compile results
        result = {
            "masks": torch.cat([o["masks"] for o in all_outputs], dim=0),
            "confidences": torch.cat([o["confidence"] for o in all_outputs], dim=0),
            "iou_scores": torch.cat([o["iou_score"] for o in all_outputs], dim=0),
            "quality_scores": [o["quality_score"] for o in all_outputs],
            "all_outputs": all_outputs if return_all_frames else None
        }
        
        return result


class RCMSMemoryManager(nn.Module):
    """Reliable Conditioned Memory Selection"""
    
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.max_conditioned = config.rcms.max_conditioned_memories
        self.quality_threshold = config.rcms.quality_threshold
    
    def update_memory(
        self,
        memory_state: Dict,
        frame_idx: int,
        frame_embedding: torch.Tensor,
        mask: torch.Tensor,
        quality_score: float,
        is_disappeared: bool
    ) -> Dict:
        """
        Update memory state using RCMS algorithm
        
        Algorithm 1 from MOSEv2 paper
        """
        updated_state = memory_state.copy()
        
        # Add to unconditioned memories
        new_memory = {
            "embedding": frame_embedding,
            "mask": mask,
            "quality_score": quality_score,
            "frame_idx": frame_idx
        }
        updated_state["unconditioned_memories"].append(new_memory)
        
        # Track disappearance
        if is_disappeared and not memory_state["disappeared"]:
            updated_state["disappeared"] = True
            updated_state["disappearance_start_idx"] = frame_idx
            
            # RCMS: Select high-quality memories for conditioned bank
            pre_disappearance_mems = [
                m for m in updated_state["unconditioned_memories"]
                if m["frame_idx"] < frame_idx
            ]
            
            # Sort by quality and recency
            candidates = sorted(
                pre_disappearance_mems,
                key=lambda x: (x["quality_score"], -x["frame_idx"]),
                reverse=True
            )
            
            # Add top memories to conditioned bank if quality > threshold
            for candidate in candidates:
                if (candidate["quality_score"] > self.quality_threshold and
                    len(updated_state["conditioned_memories"]) < self.max_conditioned + 1):
                    updated_state["conditioned_memories"].append(candidate)
        
        # Maintain unconditioned memory size
        if len(updated_state["unconditioned_memories"]) > 12:
            updated_state["unconditioned_memories"] = (
                updated_state["unconditioned_memories"][-12:]
            )
        
        return updated_state


class MemoryQualityFilter(nn.Module):
    """Memory Quality Filtering (MQF)"""
    
    def __init__(self, config):
        super().__init__()
        self.config = config
    
    def compute_quality_score(
        self,
        iou_pred: torch.Tensor,
        occlusion_pred: torch.Tensor,
        maskness: torch.Tensor
    ) -> torch.Tensor:
        """
        Compute quality score as product of components
        
        Q = score_iou × score_occ × maskness
        """
        # Normalize scores to [0, 1]
        iou_score = torch.clamp(iou_pred, 0, 1) if isinstance(iou_pred, torch.Tensor) else torch.tensor(iou_pred)
        occ_score = torch.clamp(1.0 - occlusion_pred, 0, 1) if isinstance(occlusion_pred, torch.Tensor) else torch.tensor(occlusion_pred)
        maskness_score = torch.clamp(maskness, 0, 1) if isinstance(maskness, torch.Tensor) else torch.tensor(maskness)
        
        # Weighted product
        weights = {
            "iou": self.config.rcms.iou_weight,
            "occ": self.config.rcms.occlusion_weight,
            "maskness": self.config.rcms.maskness_weight
        }
        
        quality = (
            (iou_score ** weights["iou"]) *
            (occ_score ** weights["occ"]) *
            (maskness_score ** weights["maskness"])
        )
        
        return quality.mean() if quality.dim() > 0 else quality


class MaskProcessor(nn.Module):
    """Mask Scaling Strategy (MSS)"""
    
    def __init__(self, config):
        super().__init__()
        self.config = config
    
    def apply_scaling(
        self,
        masks: torch.Tensor,
        scale_factor: float = 7.5,
        offset: float = -4.0
    ) -> torch.Tensor:
        """
        Apply mask scaling adjustment
        
        Helps improve robustness to small objects and occlusions
        """
        if masks.dtype == torch.bool:
            masks = masks.float()
        
        # Apply scaling to logits (convert to logits first if probability)
        scaled_masks = masks * scale_factor + offset
        
        # Apply sigmoid to get back to [0, 1]
        scaled_masks = torch.sigmoid(scaled_masks)
        
        return scaled_masks
# 3. Memory Management (model/rcms_memory.py)
import torch
import torch.nn as nn
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import logging

logger = logging.getLogger(__name__)


@dataclass
class MemoryFrame:
    """Single frame in memory bank"""
    frame_idx: int
    embedding: torch.Tensor
    mask: torch.Tensor
    quality_score: float
    iou_score: float
    occlusion_score: float
    maskness: float
    
    def to_dict(self) -> Dict:
        return {
            "frame_idx": self.frame_idx,
            "embedding": self.embedding,
            "mask": self.mask,
            "quality_score": self.quality_score,
            "iou_score": self.iou_score,
            "occlusion_score": self.occlusion_score,
            "maskness": self.maskness
        }


class ConditionedMemoryBank(nn.Module):
    """Conditioned Memory Bank with RCMS"""
    
    def __init__(self, max_size: int = 5, quality_threshold: float = 0.6):
        super().__init__()
        self.max_size = max_size
        self.quality_threshold = quality_threshold
        self.memories: List[MemoryFrame] = []
    
    def add_memory(self, memory_frame: MemoryFrame) -> None:
        """Add memory if quality is sufficient"""
        if memory_frame.quality_score >= self.quality_threshold:
            self.memories.append(memory_frame)
            
            # Keep only best memories if exceeding max size
            if len(self.memories) > self.max_size:
                self.memories = sorted(
                    self.memories,
                    key=lambda x: x.quality_score,
                    reverse=True
                )[:self.max_size]
    
    def get_embeddings(self) -> Optional[torch.Tensor]:
        """Get combined embedding from all memories"""
        if not self.memories:
            return None
        
        embeddings = torch.cat(
            [m.embedding for m in self.memories],
            dim=0
        )
        return embeddings / embeddings.norm(dim=-1, keepdim=True)
    
    def clear(self) -> None:
        """Clear all memories"""
        self.memories.clear()
    
    def __len__(self) -> int:
        return len(self.memories)


class UnconditionedMemoryBank(nn.Module):
    """Unconditioned Memory Bank (recent frames)"""
    
    def __init__(self, max_size: int = 6):
        super().__init__()
        self.max_size = max_size
        self.memories: List[MemoryFrame] = []
    
    def add_memory(self, memory_frame: MemoryFrame) -> None:
        """Add memory (always, maintains temporal window)"""
        self.memories.append(memory_frame)
        
        # Keep only recent memories
        if len(self.memories) > self.max_size:
            self.memories = self.memories[-self.max_size:]
    
    def get_nearest_k(self, k: int) -> List[MemoryFrame]:
        """Get k nearest (most recent) memories"""
        return self.memories[-k:] if len(self.memories) > 0 else []
    
    def clear(self) -> None:
        """Clear all memories"""
        self.memories.clear()
    
    def __len__(self) -> int:
        return len(self.memories)


class RCMSController:
    """Main controller for RCMS algorithm"""
    
    def __init__(
        self,
        max_conditioned: int = 4,
        max_unconditioned: int = 6,
        quality_threshold: float = 0.6
    ):
        self.conditioned_bank = ConditionedMemoryBank(max_conditioned, quality_threshold)
        self.unconditioned_bank = UnconditionedMemoryBank(max_unconditioned)
        
        self.max_conditioned = max_conditioned
        self.quality_threshold = quality_threshold
        self.disappeared = False
        self.disappearance_start_idx = None
    
    def update(
        self,
        frame_idx: int,
        embedding: torch.Tensor,
        mask: torch.Tensor,
        iou_score: float,
        occlusion_score: float,
        maskness: float,
        quality_score: float,
        is_disappeared: bool
    ) -> None:
        """Update memory banks based on current frame"""
        
        # Create memory frame
        memory_frame = MemoryFrame(
            frame_idx=frame_idx,
            embedding=embedding,
            mask=mask,
            quality_score=quality_score,
            iou_score=iou_score,
            occlusion_score=occlusion_score,
            maskness=maskness
        )
        
        # Always add to unconditioned bank
        self.unconditioned_bank.add_memory(memory_frame)
        
        # Detect disappearance transition
        if is_disappeared and not self.disappeared:
            self.disappeared = True
            self.disappearance_start_idx = frame_idx
            logger.info(f"Target disappeared at frame {frame_idx}")
            
            # RCMS: Augment conditioned memory with pre-disappearance frames
            self._augment_conditioned_memory(frame_idx)
        
        # Recovery from disappearance
        if not is_disappeared and self.disappeared:
            self.disappeared = False
            logger.info(f"Target reappeared at frame {frame_idx}")
    
    def _augment_conditioned_memory(self, current_frame_idx: int) -> None:
        """
        Algorithm 1 from MOSEv2 paper:
        Select high-quality pre-disappearance memories for conditioned bank
        """
        # Get all unconditioned memories before disappearance
        pre_disappearance = [
            m for m in self.unconditioned_bank.memories
            if m.frame_idx < current_frame_idx
        ]
        
        if not pre_disappearance:
            return
        
        # Sort by quality (descending) then recency (descending)
        candidates = sorted(
            pre_disappearance,
            key=lambda x: (x.quality_score, x.frame_idx),
            reverse=True
        )
        
        # Add top-quality memories to conditioned bank
        for candidate in candidates:
            if (candidate.quality_score > self.quality_threshold and
                len(self.conditioned_bank) < self.max_conditioned):
                self.conditioned_bank.add_memory(candidate)
                logger.debug(
                    f"Added memory from frame {candidate.frame_idx} "
                    f"(quality: {candidate.quality_score:.3f})"
                )
    
    def get_combined_embeddings(self) -> torch.Tensor:
        """Get combined embeddings from both memory banks"""
        embeddings = []
        
        # Add conditioned memories
        cond_emb = self.conditioned_bank.get_embeddings()
        if cond_emb is not None:
            embeddings.append(cond_emb)
        
        # Add recent unconditioned memories
        uncond_memories = self.unconditioned_bank.get_nearest_k(6)
        if uncond_memories:
            uncond_emb = torch.cat(
                [m.embedding for m in uncond_memories],
                dim=0
            )
            embeddings.append(uncond_emb)
        
        if embeddings:
            combined = torch.cat(embeddings, dim=0)
            return combined / combined.norm(dim=-1, keepdim=True)
        else:
            return None
    
    def reset(self) -> None:
        """Reset for new video"""
        self.conditioned_bank.clear()
        self.unconditioned_bank.clear()
        self.disappeared = False
        self.disappearance_start_idx = None
    
    def get_stats(self) -> Dict:
        """Get current memory statistics"""
        return {
            "conditioned_size": len(self.conditioned_bank),
            "unconditioned_size": len(self.unconditioned_bank),
            "disappeared": self.disappeared,
            "avg_conditioned_quality": (
                sum(m.quality_score for m in self.conditioned_bank.memories) /
                len(self.conditioned_bank.memories)
                if len(self.conditioned_bank.memories) > 0 else 0.0
            )
        }
# 4. Dataset Implementation (data/mosev2_dataset.py)
import torch
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from pathlib import Path
import json
import cv2
import numpy as np
from typing import Dict, Tuple, List
import logging

logger = logging.getLogger(__name__)


class MOSEv2Dataset(Dataset):
    """MOSEv2 Dataset for Video Object Segmentation"""
    
    def __init__(
        self,
        data_root: str,
        split: str = "train",
        config: Dict = None,
        transforms_fn=None
    ):
        """
        Args:
            data_root: Root directory of MOSEv2 dataset
            split: 'train', 'val', or 'test'
            config: Configuration dictionary
            transforms_fn: Transformation function
        """
        self.data_root = Path(data_root)
        self.split = split
        self.config = config or {}
        self.transforms = transforms_fn
        
        # Load video list
        self.video_list = self._load_video_list()
        
        # Default transforms
        if self.transforms is None:
            self.transforms = self._get_default_transforms()
        
        logger.info(f"Loaded {len(self.video_list)} videos for {split} split")
    
    def _load_video_list(self) -> List[Dict]:
        """Load list of videos from dataset"""
        split_file = self.data_root / "meta" / f"{self.split}.json"
        
        if not split_file.exists():
            raise FileNotFoundError(f"Split file not found: {split_file}")
        
        with open(split_file, 'r') as f:
            video_list = json.load(f)
        
        return video_list
    
    def _get_default_transforms(self):
        """Default augmentation transforms"""
        return transforms.Compose([
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
    
    def __len__(self) -> int:
        return len(self.video_list)
    
    def __getitem__(self, idx: int) -> Dict:
        """
        Get video sample
        
        Returns:
            Dictionary with:
                - video_frames: (T, 3, H, W)
                - masks: (T, 1, H, W)
                - video_id: str
                - object_id: str
        """
        video_info = self.video_list[idx]
        video_id = video_info["video_id"]
        object_id = video_info["object_id"]
        
        # Load frames and masks
        frames = self._load_frames(video_id)
        masks = self._load_masks(video_id, object_id)
        
        # Ensure same length
        min_len = min(len(frames), len(masks))
        frames = frames[:min_len]
        masks = masks[:min_len]
        
        # Convert to tensors
        frames_tensor = torch.FloatTensor(frames).permute(0, 3, 1, 2) / 255.0
        masks_tensor = torch.FloatTensor(masks).unsqueeze(1)
        
        # Apply transforms
        if self.transforms:
            frames_tensor = self.transforms(frames_tensor)
        
        return {
            "video_frames": frames_tensor,  # (T, 3, H, W)
            "masks": masks_tensor,  # (T, 1, H, W)
            "video_id": video_id,
            "object_id": object_id,
            "first_frame_mask": masks_tensor[0:1]  # (1, 1, H, W)
        }
    
    def _load_frames(self, video_id: str) -> List[np.ndarray]:
        """Load video frames"""
        frame_dir = self.data_root / "JPEGImages" / video_id
        
        if not frame_dir.exists():
            raise FileNotFoundError(f"Frame directory not found: {frame_dir}")
        
        frames = []
        frame_files = sorted(list(frame_dir.glob("*.jpg")))
        
        for frame_file in frame_files:
            frame = cv2.imread(str(frame_file))
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
        
        return frames
    
    def _load_masks(self, video_id: str, object_id: str) -> List[np.ndarray]:
        """Load object masks"""
        mask_dir = self.data_root / "Annotations" / video_id / object_id
        
        if not mask_dir.exists():
            raise FileNotFoundError(f"Mask directory not found: {mask_dir}")
        
        masks = []
        mask_files = sorted(list(mask_dir.glob("*.png")))
        
        for mask_file in mask_files:
            mask = cv2.imread(str(mask_file), cv2.IMREAD_GRAYSCALE)
            mask = mask.astype(np.float32) / 255.0
            masks.append(mask)
        
        return masks


class MOSEv2DataLoader:
    """Utility for creating MOSEv2 data loaders"""
    
    @staticmethod
    def create_loaders(
        data_root: str,
        batch_size: int = 4,
        num_workers: int = 4,
        config: Dict = None
    ) -> Dict:
        """Create train/val data loaders"""
        
        train_dataset = MOSEv2Dataset(
            data_root,
            split="train",
            config=config
        )
        
        val_dataset = MOSEv2Dataset(
            data_root,
            split="val",
            config=config
        )
        
        train_loader = torch.utils.data.DataLoader(
            train_dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=num_workers,
            drop_last=True
        )
        
        val_loader = torch.utils.data.DataLoader(
            val_dataset,
            batch_size=1,
            shuffle=False,
            num_workers=num_workers
        )
        
        return {
            "train": train_loader,
            "val": val_loader,
            "train_dataset": train_dataset,
            "val_dataset": val_dataset
        }
# 5. Evaluation Metrics (evaluation/metrics.py)

import torch
import torch.nn.functional as F
import numpy as np
from typing import Dict, Tuple
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class SegmentationMetrics:
    """Container for segmentation metrics"""
    j_mean: float  # Region similarity
    f_mean: float  # Contour accuracy
    j_and_f: float  # Combined J&F
    j_dot: float  # Adaptive boundary J
    j_and_f_dot: float  # Adaptive J&F
    j_and_f_d: float  # Disappearance metric
    j_and_f_r: float  # Reappearance metric
    
    def __repr__(self) -> str:
        return (
            f"J: {self.j_mean:.1%} | F: {self.f_mean:.1%} | "
            f"J&F: {self.j_and_f:.1%} | "
            f"J&F_dot: {self.j_and_f_dot:.1%} | "
            f"J&F_d: {self.j_and_f_d:.1%} | J&F_r: {self.j_and_f_r:.1%}"
        )


class MOSEv2Metrics:
    """MOSEv2 Evaluation Metrics"""
    
    def __init__(self, boundary_alpha: float = 0.1):
        self.boundary_alpha = boundary_alpha
    
    def compute_metrics(
        self,
        predictions: torch.Tensor,  # (T, H, W)
        ground_truth: torch.Tensor,  # (T, H, W)
        compute_adaptive: bool = True,
        compute_disappearance: bool = True
    ) -> SegmentationMetrics:
        """
        Compute comprehensive MOSEv2 metrics
        
        Args:
            predictions: Predicted masks
            ground_truth: Ground truth masks
            compute_adaptive: Compute adaptive boundary metrics
            compute_disappearance: Compute disappearance/reappearance metrics
        """
        # Convert to binary masks
        pred_binary = (predictions > 0.5).float()
        gt_binary = (ground_truth > 0.5).float()
        
        # Compute J (Jaccard/IoU)
        j_scores = self._compute_j(pred_binary, gt_binary)
        j_mean = j_scores.mean().item()
        
        # Compute F (Contour accuracy)
        f_scores = self._compute_f(pred_binary, gt_binary)
        f_mean = f_scores.mean().item()
        
        # Combined J&F
        j_and_f = (j_mean + f_mean) / 2.0
        
        # Adaptive boundary J
        j_dot_scores = self._compute_adaptive_j(
            pred_binary, gt_binary
        ) if compute_adaptive else j_scores
        j_dot = j_dot_scores.mean().item()
        j_and_f_dot = (j_dot + f_mean) / 2.0
        
        # Disappearance/Reappearance metrics
        j_and_f_d = 0.0
        j_and_f_r = 0.0
        if compute_disappearance:
            j_and_f_d = self._compute_disappearance_metric(pred_binary, gt_binary)
            j_and_f_r = self._compute_reappearance_metric(pred_binary, gt_binary)
        
        return SegmentationMetrics(
            j_mean=j_mean,
            f_mean=f_mean,
            j_and_f=j_and_f,
            j_dot=j_dot,
            j_and_f_dot=j_and_f_dot,
            j_and_f_d=j_and_f_d,
            j_and_f_r=j_and_f_r
        )
    
    def _compute_j(
        self,
        predictions: torch.Tensor,
        ground_truth: torch.Tensor
    ) -> torch.Tensor:
        """
        Compute Jaccard Index (J) / IoU
        
        J = |Pred ∩ GT| / |Pred ∪ GT|
        """
        intersection = (predictions * ground_truth).sum(dim=(-2, -1))
        union = (
            (predictions + ground_truth).clamp(0, 1).sum(dim=(-2, -1))
        )
        
        j_scores = intersection / (union + 1e-8)
        return j_scores
    
    def _compute_f(
        self,
        predictions: torch.Tensor,
        ground_truth: torch.Tensor
    ) -> torch.Tensor:
        """
        Compute Contour Accuracy (F)
        
        Uses bipartite matching between predicted and ground truth contours
        """
        batch_size = predictions.shape[0]
        f_scores = []
        
        for i in range(batch_size):
            pred_contour = self._get_contour(predictions[i])
            gt_contour = self._get_contour(ground_truth[i])
            
            if pred_contour.sum() == 0 and gt_contour.sum() == 0:
                f_scores.append(torch.tensor(1.0))
                continue
            
            if pred_contour.sum() == 0 or gt_contour.sum() == 0:
                f_scores.append(torch.tensor(0.0))
                continue
            
            # Simplified contour matching
            pred_points = torch.nonzero(pred_contour, as_tuple=False).float()
            gt_points = torch.nonzero(gt_contour, as_tuple=False).float()
            
            if len(pred_points) == 0 or len(gt_points) == 0:
                f_scores.append(torch.tensor(0.0))
                continue
            
            # Compute Hausdorff-like distance
            distances = torch.cdist(pred_points, gt_points)
            min_distances = distances.min(dim=1)[0]
            
            f_score = 1.0 / (1.0 + min_distances.mean().item())
            f_scores.append(torch.tensor(f_score))
        
        return torch.stack(f_scores)
    
    def _get_contour(self, mask: torch.Tensor) -> torch.Tensor:
        """Extract contour from mask"""
        # Morphological dilation - erosion
        kernel = torch.ones(3, 3, device=mask.device)
        mask_expanded = mask.unsqueeze(0).unsqueeze(0).float()
        
        # Simple edge detection via difference
        dilated = F.max_pool2d(mask_expanded, 3, padding=1)
        eroded = -F.max_pool2d(-mask_expanded, 3, padding=1)
        
        contour = (dilated - eroded).squeeze()
        return (contour > 0.5).float()
    
    def _compute_adaptive_j(
        self,
        predictions: torch.Tensor,
        ground_truth: torch.Tensor
    ) -> torch.Tensor:
        """
        Compute adaptive boundary J (Ḟ)
        
        From MOSEv2 paper Eq. (1):
        ẇ = min(0.008 × D, α × √A)
        """
        batch_size = predictions.shape[0]
        h, w = predictions.shape[-2:]
        
        diagonal = torch.tensor(np.sqrt(h**2 + w**2))
        adaptive_j_scores = []
        
        for i in range(batch_size):
            gt_mask = ground_truth[i]
            area = (gt_mask > 0.5).float().sum().item()
            
            # Adaptive threshold
            w_fixed = 0.008 * diagonal.item()
            w_adaptive = self.boundary_alpha * np.sqrt(area)
            w = min(w_fixed, w_adaptive)
            
            # Compute boundary-based J with adaptive threshold
            pred_contour = self._get_contour_with_width(predictions[i], w)
            gt_contour = self._get_contour_with_width(gt_mask, w)
            
            intersection = (pred_contour * gt_contour).sum()
            union = (
                (pred_contour + gt_contour).clamp(0, 1).sum()
            )
            
            j_adaptive = intersection / (union + 1e-8)
            adaptive_j_scores.append(j_adaptive)
        
        return torch.stack(adaptive_j_scores)
    
    def _get_contour_with_width(
        self,
        mask: torch.Tensor,
        width: float
    ) -> torch.Tensor:
        """Get contour with specified width"""
        mask_expanded = mask.unsqueeze(0).unsqueeze(0).float()
        
        # Morphological operations with variable width
        n_iterations = max(1, int(width))
        contour = mask_expanded.clone()
        
        for _ in range(n_iterations):
            dilated = F.max_pool2d(contour, 3, padding=1)
            eroded = -F.max_pool2d(-contour, 3, padding=1)
            contour = (dilated - eroded)
        
        return (contour > 0.5).squeeze().float()
    
    def _compute_disappearance_metric(
        self,
        predictions: torch.Tensor,
        ground_truth: torch.Tensor
    ) -> float:
        """
        Compute J&F for frames where target is absent
        """
        # Find frames where GT is empty (target disappeared)
        gt_areas = (ground_truth > 0.5).float().sum(dim=(-2, -1))
        disappeared_frames = (gt_areas < 1.0).nonzero(as_tuple=True)[0]
        
        if len(disappeared_frames) == 0:
            return 0.0
        
        # Compute J&F only on disappeared frames
        pred_sub = predictions[disappeared_frames]
        gt_sub = ground_truth[disappeared_frames]
        
        j_scores = self._compute_j(pred_sub, gt_sub)
        f_scores = self._compute_f(pred_sub, gt_sub)
        
        j_and_f_d = ((j_scores + f_scores) / 2.0).mean().item()
        return j_and_f_d
    
    def _compute_reappearance_metric(
        self,
        predictions: torch.Tensor,
        ground_truth: torch.Tensor
    ) -> float:
        """
        Compute J&F for frames where target reappears after disappearance
        """
        # Find disappearance and reappearance clips
        gt_areas = (ground_truth > 0.5).float().sum(dim=(-2, -1))
        disappeared = (gt_areas < 1.0)
        
        # Find reappearance indices (transition from False to True)
        reappear_indices = []
        in_disappearance = False
        
        for i in range(len(disappeared)):
            if disappeared[i] and not in_disappearance:
                in_disappearance = True
            elif not disappeared[i] and in_disappearance:
                in_disappearance = False
                reappear_indices.append(i)
        
        if len(reappear_indices) == 0:
            return 0.0
        
        # Compute J&F on reappearance frames
        reappear_indices = torch.tensor(reappear_indices)
        pred_sub = predictions[reappear_indices]
        gt_sub = ground_truth[reappear_indices]
        
        j_scores = self._compute_j(pred_sub, gt_sub)
        f_scores = self._compute_f(pred_sub, gt_sub)
        
        j_and_f_r = ((j_scores + f_scores) / 2.0).mean().item()
        return j_and_f_r

# 6. Training Loop (training/trainer.py)
import torch
import torch.nn as nn
from torch.optim import Adam
from torch.optim.lr_scheduler import CosineAnnealingLR
from typing import Dict, Tuple
import logging
from pathlib import Path
import json
from tqdm import tqdm

logger = logging.getLogger(__name__)


class MOSEv2Trainer:
    """Trainer for MOSEv2 enhanced SAM2"""
    
    def __init__(self, model, config, device="cuda"):
        self.model = model.to(device)
        self.config = config
        self.device = device
        
        # Optimizer
        self.optimizer = Adam(
            filter(lambda p: p.requires_grad, self.model.parameters()),
            lr=config.training.learning_rate,
            weight_decay=config.training.weight_decay
        )
        
        # Scheduler
        self.scheduler = CosineAnnealingLR(
            self.optimizer,
            T_max=config.training.num_epochs
        )
        
        # Loss
        self.criterion = nn.BCEWithLogitsLoss()
        
        # Paths
        self.checkpoint_dir = Path(config.checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        
        self.best_metrics = None
        self.training_history = []
    
    def train_epoch(self, train_loader) -> Dict:
        """Train for one epoch"""
        self.model.train()
        epoch_losses = []
        
        pbar = tqdm(train_loader, desc="Training")
        for batch in pbar:
            video_frames = batch["video_frames"].to(self.device)
            masks = batch["masks"].to(self.device)
            first_frame_mask = batch["first_frame_mask"].to(self.device)
            
            # Forward pass
            outputs = self.model(video_frames, first_frame_mask)
            
            # Compute loss
            pred_masks = outputs["masks"]
            target_masks = masks.squeeze(1)
            
            loss = self.criterion(pred_masks, target_masks)
            
            # Backward pass
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(
                self.model.parameters(),
                max_norm=1.0
            )
            self.optimizer.step()
            
            epoch_losses.append(loss.item())
            pbar.set_postfix({"loss": f"{loss.item():.4f}"})
        
        return {
            "loss": sum(epoch_losses) / len(epoch_losses)
        }
    
    @torch.no_grad()
    def validate(self, val_loader, metrics_fn) -> Dict:
        """Validate on validation set"""
        self.model.eval()
        all_metrics = []
        
        pbar = tqdm(val_loader, desc="Validation")
        for batch in pbar:
            video_frames = batch["video_frames"].to(self.device)
            masks = batch["masks"].to(self.device)
            first_frame_mask = batch["first_frame_mask"].to(self.device)
            
            # Forward pass
            outputs = self.model(video_frames, first_frame_mask)
            pred_masks = outputs["masks"]
            target_masks = masks.squeeze(1)
            
            # Compute metrics
            metrics = metrics_fn.compute_metrics(
                pred_masks,
                target_masks,
                compute_adaptive=True,
                compute_disappearance=True
            )
            
            all_metrics.append({
                "j_and_f": metrics.j_and_f,
                "j_and_f_dot": metrics.j_and_f_dot,
                "j_and_f_d": metrics.j_and_f_d,
                "j_and_f_r": metrics.j_and_f_r
            })
            
            pbar.set_postfix({"J&F": f"{metrics.j_and_f:.1%}"})
        
        # Average metrics
        avg_metrics = {
            key: sum(m[key] for m in all_metrics) / len(all_metrics)
            for key in all_metrics[0].keys()
        }
        
        return avg_metrics
    
    def train(self, train_loader, val_loader, metrics_fn):
        """Full training loop"""
        num_epochs = self.config.training.num_epochs
        
        for epoch in range(num_epochs):
            logger.info(f"\nEpoch {epoch+1}/{num_epochs}")
            
            # Train
            train_metrics = self.train_epoch(train_loader)
            
            # Validate
            val_metrics = self.validate(val_loader, metrics_fn)
            
            # Update scheduler
            self.scheduler.step()
            
            # Log metrics
            self.training_history.append({
                "epoch": epoch,
                "train_loss": train_metrics["loss"],
                "val_metrics": val_metrics
            })
            
            logger.info(
                f"Train Loss: {train_metrics['loss']:.4f} | "
                f"Val J&F: {val_metrics['j_and_f']:.1%} | "
                f"Val J&F_dot: {val_metrics['j_and_f_dot']:.1%}"
            )
            
            # Save checkpoint
            if (self.best_metrics is None or
                val_metrics['j_and_f_dot'] > self.best_metrics['j_and_f_dot']):
                
                self.best_metrics = val_metrics
                self._save_checkpoint(epoch, is_best=True)
                logger.info(f"Saved best model at epoch {epoch}")
            
            # Regular checkpoint
            if (epoch + 1) % 5 == 0:
                self._save_checkpoint(epoch)
        
        logger.info("Training completed!")
        return self.training_history
    
    def _save_checkpoint(self, epoch: int, is_best: bool = False):
        """Save model checkpoint"""
        checkpoint = {
            "epoch": epoch,
            "model_state": self.model.state_dict(),
            "optimizer_state": self.optimizer.state_dict(),
            "scheduler_state": self.scheduler.state_dict(),
            "best_metrics": self.best_metrics,
            "config": self.config.__dict__ if hasattr(self.config, '__dict__') else self.config
        }
        
        filename = "best.pth" if is_best else f"checkpoint_epoch_{epoch}.pth"
        path = self.checkpoint_dir / filename
        
        torch.save(checkpoint, path)
        logger.debug(f"Saved checkpoint: {path}")
    
    def load_checkpoint(self, path: str):
        """Load model checkpoint"""
        checkpoint = torch.load(path, map_location=self.device)
        
        self.model.load_state_dict(checkpoint["model_state"])
        self.optimizer.load_state_dict(checkpoint["optimizer_state"])
        self.scheduler.load_state_dict(checkpoint["scheduler_state"])
        self.best_metrics = checkpoint.get("best_metrics")
        
        logger.info(f"Loaded checkpoint: {path}")

# 7. Inference Pipeline (inference/inference_pipeline.py)
import torch
import cv2
import numpy as np
from pathlib import Path
from typing import List, Dict, Optional
import logging

logger = logging.getLogger(__name__)


class MOSEv2InferencePipeline:
    """End-to-end inference pipeline for MOSEv2"""
    
    def __init__(self, model, config, device="cuda"):
        self.model = model.to(device)
        self.config = config
        self.device = device
        self.model.eval()
    
    def segment_video(
        self,
        video_path: str,
        first_frame_annotation: np.ndarray,
        output_dir: Optional[str] = None
    ) -> Dict:
        """
        Segment video with first frame annotation
        
        Args:
            video_path: Path to input video
            first_frame_annotation: Binary mask for first frame
            output_dir: Directory to save results
            
        Returns:
            Dictionary with segmentation results
        """
        # Load video
        frames = self._load_video(video_path)
        logger.info(f"Loaded video with {len(frames)} frames")
        
        # Prepare first frame mask
        first_mask = torch.FloatTensor(first_frame_annotation).unsqueeze(0).unsqueeze(0)
        
        # Convert frames to tensor
        frames_tensor = torch.FloatTensor(frames).permute(0, 3, 1, 2) / 255.0
        
        # Normalize
        mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1)
        std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1)
        frames_tensor = (frames_tensor - mean) / std
        
        # Move to device
        frames_tensor = frames_tensor.to(self.device)
        first_mask = first_mask.to(self.device)
        
        # Run segmentation
        with torch.no_grad():
            output = self.model(frames_tensor, first_mask)
        
        # Post-process
        segmentation_masks = output["masks"].cpu().numpy()
        segmentation_masks = (segmentation_masks > 0.5).astype(np.uint8)
        
        # Save results
        results = {
            "masks": segmentation_masks,
            "quality_scores": output["quality_scores"],
            "iou_scores": output["iou_scores"].cpu().numpy(),
            "frames": frames
        }
        
        if output_dir:
            self._save_results(results, output_dir, video_path)
        
        return results
    
    def _load_video(self, video_path: str) -> List[np.ndarray]:
        """Load video frames"""
        cap = cv2.VideoCapture(video_path)
        frames = []
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
        
        cap.release()
        return frames
    
    def _save_results(
        self,
        results: Dict,
        output_dir: str,
        video_name: str
    ):
        """Save segmentation results"""
        output_path = Path(output_dir) / Path(video_name).stem
        output_path.mkdir(parents=True, exist_ok=True)
        
        masks = results["masks"]
        frames = results["frames"]
        
        # Save masks
        mask_dir = output_path / "masks"
        mask_dir.mkdir(exist_ok=True)
        
        for i, mask in enumerate(masks):
            mask_path = mask_dir / f"frame_{i:06d}.png"
            cv2.imwrite(str(mask_path), mask * 255)
        
        # Save overlay
        overlay_dir = output_path / "overlays"
        overlay_dir.mkdir(exist_ok=True)
        
        for i, (frame, mask) in enumerate(zip(frames, masks)):
            overlay = self._create_overlay(frame, mask)
            overlay_path = overlay_dir / f"frame_{i:06d}.png"
            overlay_path_bgr = cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR)
            cv2.imwrite(str(overlay_path), overlay_path_bgr)
        
        logger.info(f"Saved results to {output_path}")
    
    def _create_overlay(
        self,
        frame: np.ndarray,
        mask: np.ndarray,
        alpha: float = 0.5
    ) -> np.ndarray:
        """Create visualization overlay"""
        overlay = frame.copy().astype(np.float32)
        mask_color = np.array([255, 0, 0])  # Red
        
        mask_pixels = mask > 0
        overlay[mask_pixels] = (
            alpha * mask_color +
            (1 - alpha) * overlay[mask_pixels]
        )
        
        return overlay.astype(np.uint8)
# 8. Main Script (main.py)
import torch
import yaml
import argparse
import logging
from pathlib import Path
from omegaconf import OmegaConf

from model.sam2_enhanced import SAM2Enhanced
from data.mosev2_dataset import MOSEv2DataLoader
from evaluation.metrics import MOSEv2Metrics
from training.trainer import MOSEv2Trainer
from inference.inference_pipeline import MOSEv2InferencePipeline


def setup_logging(config):
    """Setup logging configuration"""
    log_dir = Path(config.log_dir)
    log_dir.mkdir(parents=True, exist_ok=True)
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_dir / 'training.log'),
            logging.StreamHandler()
        ]
    )


def train(config):
    """Training pipeline"""
    logger = logging.getLogger(__name__)
    logger.info("Starting training...")
    
    # Setup device
    device = torch.device(config.device)
    logger.info(f"Using device: {device}")
    
    # Load config
    if isinstance(config, str):
        with open(config, 'r') as f:
            config = yaml.safe_load(f)
    config = OmegaConf.create(config)
    
    # Create model
    model = SAM2Enhanced(config.model)
    logger.info(f"Created SAM2Enhanced model")
    
    # Create dataloaders
    loaders = MOSEv2DataLoader.create_loaders(
        config.dataset.data_root,
        batch_size=config.training.batch_size,
        num_workers=config.training.num_workers,
        config=config
    )
    
    # Create metrics
    metrics = MOSEv2Metrics(boundary_alpha=config.evaluation.boundary_alpha)
    
    # Create trainer
    trainer = MOSEv2Trainer(model, config, device)
    
    # Train
    history = trainer.train(
        loaders["train"],
        loaders["val"],
        metrics
    )
    
    logger.info("Training completed!")
    return trainer, history


def inference(config, video_path: str, mask_path: str, output_dir: str):
    """Inference pipeline"""
    logger = logging.getLogger(__name__)
    logger.info(f"Starting inference on {video_path}...")
    
    device = torch.device(config.device)
    
    # Load config
    if isinstance(config, str):
        with open(config, 'r') as f:
            config = yaml.safe_load(f)
    config = OmegaConf.create(config)
    
    # Create model
    model = SAM2Enhanced(config.model)
    
    # Load checkpoint
    checkpoint_path = Path(config.checkpoint_dir) / "best.pth"
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint["model_state"])
    logger.info(f"Loaded checkpoint: {checkpoint_path}")
    
    # Create pipeline
    pipeline = MOSEv2InferencePipeline(model, config, device)
    
    # Load first frame annotation
    import cv2
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    mask = mask.astype(np.float32) / 255.0
    
    # Run inference
    results = pipeline.segment_video(video_path, mask, output_dir)
    logger.info(f"Inference completed! Results saved to {output_dir}")
    
    return results


def main():
    parser = argparse.ArgumentParser(description="MOSEv2 SAM2 Enhanced")
    parser.add_argument("--config", type=str, default="config.yaml")
    parser.add_argument("--mode", type=str, choices=["train", "inference"], default="train")
    parser.add_argument("--video", type=str, help="Video path for inference")
    parser.add_argument("--mask", type=str, help="First frame mask path for inference")
    parser.add_argument("--output", type=str, default="./outputs")
    
    args = parser.parse_args()
    
    # Setup logging
    setup_logging(OmegaConf.load(args.config) if args.config.endswith('.yaml') else {})
    
    if args.mode == "train":
        trainer, history = train(args.config)
    else:
        results = inference(args.config, args.video, args.mask, args.output)


if __name__ == "__main__":
    import numpy as np
    main()
# Usage
# Training
python main.py --config config.yaml --mode train

# Inference
python main.py --config config.yaml --mode inference \
  --video /path/to/video.mp4 \
  --mask /path/to/first_frame_mask.png \
  --output ./results

Related posts, You May like to read

  1. 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
  2. 7 Revolutionary Breakthroughs in Medical Image Translation (And 1 Fatal Flaw That Could Derail Your AI Model)
  3. MedDINOv3: Revolutionizing Medical Image Segmentation with Adaptable Vision Foundation Models
  4. HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
  5. GeoSAM2 3D Part Segmentation — Prompt-Controllable, Geometry-Aware Masks for Precision 3D Editing
  6. SegTrans: The Breakthrough Framework That Makes AI Segmentation Models Vulnerable to Transfer Attacks
  7. Universal Text-Driven Medical Image Segmentation: How MedCLIP-SAMv2 Revolutionizes Diagnostic AI
  8. Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
  9. Cellpose3: The Revolutionary One-Click Solution for Restoring Noisy, Blurry, and Undersampled Microscopy Images

Leave a Comment

Your email address will not be published. Required fields are marked *

Follow by Email
Tiktok