CelloType: Transformer-Based Deep Learning for Automated Cell Segmentation and Classification in Tissue Imaging

CelloType: Transformer-Based Deep Learning for Automated Cell Segmentation and Classification in Tissue Imaging

Introduction

Analyzing tissue images at the cellular level has become fundamental to understanding disease mechanisms, identifying therapeutic targets, and advancing personalized medicine. However, one of the most time-consuming bottlenecks in spatial omics research is the manual annotation of cells following segmentation—a laborious two-stage process that requires separating cells from tissue backgrounds, then classifying each identified cell by type.

Recent advances in spatial imaging technologies have generated unprecedented volumes of multiplexed tissue data, yet our computational tools remain fragmented and inefficient. Researchers typically rely on separate algorithms: one for segmentation (such as Mesmer or Cellpose2) and another for classification (like CellSighter). This conventional pipeline suffers from information loss, inefficient training, and accumulated errors.

CelloType, published in Nature Methods, addresses these limitations through a unified, transformer-based deep learning framework that simultaneously performs cell segmentation and classification, outperforming state-of-the-art methods across diverse tissue imaging platforms. This article explores how CelloType works, why its approach represents a paradigm shift in spatial omics analysis, and what this means for the future of biomedical research.


How Traditional Cell Analysis Falls Short

The Two-Stage Pipeline Problem

Conventional cell analysis workflows have relied on a sequential approach for decades:

  1. Segmentation stage: CNN-based models predict cell boundaries from nuclear and membrane stains
  2. Classification stage: Separate models predict cell types based on the segmentation masks and immunofluorescence markers

This two-stage approach presents several critical limitations. First, cell classification models cannot leverage the full spectrum of semantic information present in tissue images, as they operate only on segmentation results. Second, the approach is computationally inefficient, requiring separate training for each task. Third, segmentation performance varies significantly across different tissue types, suggesting substantial room for improvement.

Key problems with traditional methods:

  • Information bottleneck: Classification models lose rich contextual information by relying only on segmentation masks
  • Error accumulation: Segmentation errors directly propagate into classification results
  • Resource inefficiency: Training separate models doubles computational overhead
  • Lack of confidence assessment: Most existing methods provide no reliability metrics for segmentation predictions

Existing Solutions and Their Limitations

Representative state-of-the-art segmentation methods include Mesmer, which uses CNNs with feature pyramid networks; Cellpose and Cellpose2, which employ U-Net architectures with gradient-tracking algorithms; and CellSighter and CELESTA for classification tasks. While these methods have achieved satisfactory performance on certain tissues, they remain fundamentally limited by their sequential architecture.


CelloType’s Unified Architecture: A Multitask Learning Breakthrough

Core Innovation: Integrated Multitask Learning

CelloType adopts a multitask learning strategy that integrates segmentation and classification tasks, simultaneously enhancing the performance of both. The model leverages transformer-based deep learning techniques—specifically the Swin Transformer for feature extraction, DINO for object detection, and MaskDINO for segmentation.

This architecture enables bidirectional information flow:

  • Segmentation informs detection: Pixel-level segmentation masks provide detailed contextual information that improves bounding box initialization
  • Detection refines segmentation: Object detection focuses the model on relevant image regions, reducing noise and enhancing precision

Three-Module Architecture Explained

1. Feature Extraction Module (Swin Transformer)

The Swin Transformer generates multiscale image features through hierarchical processing that captures both local and global patterns, outperforming conventional convolutional networks in modeling complex image data with improved computational efficiency. This foundation enables the model to process high-dimensional multiplexed tissue images effectively.

2. DINO Object Detection Module

The DINO (DETR with Improved Denoising Anchor Boxes) module performs two functions:

  • Query initialization: Uses mixed positional and content queries to generate adaptive anchor boxes
  • Anchor refinement: Applies deformable attention mechanisms and contrastive denoising training for robust detection

The mathematical framework for mixed query selection operates as:

\[ Q_{pos} = f_{encoder}(X), \quad Q_{content} = \text{learnable} \]

where Qpos​ represents positional queries (n-by-4 matrix) and Qcontent​ represents learnable content features (n-by-embedding_dimension matrix).

3. MaskDINO Segmentation Module

The final segmentation masks are computed through:

\[ m = q_c \otimes M(T(C_b) + F(C_e)) \]

where:

  • qc ​ = content query embedding from DINO
  • M = segmentation head (convolutional layers)
  • T = channel mapping convolution
  • Cb​ = feature map from feature extractor
  • Ce = latent features from DINO encoder
  • F = interpolation-based upsampling

The unified model is trained with a composite loss function:

\[ \text{Loss} = \lambda_{cls}L_{cls} + \lambda_{box}L_{box} + \lambda_{mask}L_{mask} \]


Performance Benchmarks: CelloType vs. Existing Methods

TissueNet Multiplexed Imaging Results

When evaluated on the TissueNet dataset containing images from six imaging platforms (CODEX, CycIF, IMC, MIBI, MxIF, and Vectra) across six tissue types, CelloType consistently outperformed both Mesmer and Cellpose2 across all IoU thresholds. For cell segmentation, CelloType_C (with confidence scores) achieved a mean AP of 0.56, significantly surpassing Cellpose2 (0.35) and Mesmer (0.31). For nuclear segmentation, CelloType_C achieved 0.66 AP, compared with Cellpose2 (0.52) and Mesmer (0.24).

$$\begin{array}{l|cc} \text{Method} & \text{Cell Segmentation AP} & \text{Nuclear Segmentation AP} \\ \hline \text{Mesmer} & 0.31 \pm 0.01 & 0.24 \pm 0.01 \\ \text{Cellpose2} & 0.35 \pm 0.02 & 0.52 \pm 0.02 \\ \text{CelloType} & 0.45 \pm 0.02 & 0.57 \pm 0.02 \\ \textbf{CelloType_C} & 0.56 \pm 0.02 & 0.66 \pm 0.02 \end{array}$$

Classification Performance on Colorectal Cancer CODEX

For joint segmentation and classification on colorectal cancer tissue, CelloType achieved a mean AP of 0.55 across all cell types, markedly exceeding the Cellpose2 + CellSighter model’s mean AP of 0.13 and the Mask R-CNN model’s mean AP of 0.43. Critically, CelloType’s confidence scores demonstrated a strong, nearly linear correlation with prediction accuracy (coefficient = 0.56), whereas CellSighter and Mask R-CNN showed flat correlations (0.21 and 0.19, respectively).

This superior calibration is crucial for clinical applications, as researchers can now trust high-confidence predictions and investigate low-confidence predictions with additional scrutiny.

Spatial Transcriptomics: Xenium and MERFISH

On Xenium spatial transcriptomics datasets, CelloType achieved a mean AP of 0.47 using both DAPI and transcript signals, vastly outperforming SCS and Baysor (both 0.01 AP). The results demonstrate that transcript signal alone is insufficient for accurate segmentation, even for advanced deep learning models, but when combined with DAPI, CelloType effectively integrates multiple modalities.


Multiscale Segmentation: Beyond Individual Cells

Handling Tissue Architecture Complexity

One of CelloType’s most innovative capabilities is multiscale segmentation—the ability to simultaneously identify and classify cellular and noncellular tissue elements of vastly different sizes.

When applied to human bone marrow tissue containing hematopoietic cells, large adipocytes, and irregularly shaped trabecular bone fragments, CelloType achieved mean AP values of 0.39 for adipocytes, 0.31 for trabecular bone, and 0.42 for other cell types. The model correctly identified both tiny lymphocytes (10-15 μm diameter) and massive adipocytes (100+ μm diameter), a capability unique among existing segmentation methods.

This multiscale capability is particularly valuable for:

  • Tissue organization studies: Understanding how cellular neighborhoods form around structural elements
  • Pathology workflows: Distinguishing malignant cells from stromal tissue components
  • Developmental biology: Mapping cell-matrix interactions during tissue remodeling

Practical Applications and Research Impact

Spatial Omics Data Analysis

CelloType’s integration with spatial transcriptomics platforms represents a major advance. As spatial omics datasets grow exponentially—encompassing hundreds of thousands of cells across multiple tissue sections—automated, accurate segmentation and classification become essential.

Key application areas:

  • Tumor microenvironment mapping: Identifying immune infiltration patterns and their relationship to treatment response
  • Brain tissue organization: Dissecting neuronal, glial, and vascular cell distributions in healthy and diseased brain
  • Immunological surveillance: Tracking how immune cells organize in lymphoid tissues
  • Developmental studies: Tracing cell type emergence and spatial organization during embryogenesis

Confidence Scores Enable Quality Filtering

Unlike previous tools that provide binary predictions, CelloType outputs confidence scores for every segmentation and classification result. This enables researchers to:

  • Prioritize manual validation of low-confidence predictions
  • Filter datasets to achieve desired precision thresholds
  • Assess model reliability in understudied tissue types
  • Generate publication-quality results with known error characteristics

Training and Computational Requirements

CelloType has speed and memory usage comparable to those of Mesmer and Cellpose2, with training and inference times that scale efficiently with dataset size. The model can achieve meaningful performance with just 20% of available training data, though performance progressively improves with larger datasets.

Typical training specifications:

  • Learning rate: 10⁻⁶ (Adam optimizer)
  • Batch size: 8
  • Hardware: Standard GPU workstations (A100-class GPUs)
  • Training time: 1-2 hours for tissue datasets
  • Inference speed: Processes 512×512 pixel images in ~2-3 seconds

Limitations and Future Directions

Current Constraints

CelloType requires training for both segmentation and classification tasks. While segmentation models are readily transferable to new images containing nuclear/membrane channels, pretrained classification models cannot be readily applied to new images unless substantial cell type overlap exists between training and testing datasets. To address this, methodologies such as few-shot learning, self-supervised learning, and contrastive learning can be incorporated into the CelloType framework.

Emerging Opportunities

  • Few-shot learning integration: Enabling classification of rare cell types with minimal manual annotations
  • Transfer learning optimization: Developing tissue-agnostic pretraining strategies
  • Real-time processing: Streamlining inference for live imaging applications
  • Integration with other modalities: Combining spatial imaging with scRNA-seq, ATAC-seq, and protein data

Conclusion: The Future of Computational Pathology

CelloType represents a fundamental advancement in how we analyze tissue images computationally. By abandoning the traditional two-stage pipeline in favor of integrated multitask learning, the method achieves superior accuracy while providing clinically meaningful confidence assessments. Its demonstrated performance across diverse tissue types and imaging modalities positions it as a broadly applicable tool for spatial omics research.

As spatial omics datasets continue to grow exponentially, the need for scalable, accurate, and reliable segmentation and classification tools becomes increasingly urgent. CelloType addresses this need head-on, democratizing access to publication-quality tissue analysis while reducing the computational burden and manual annotation required.

For researchers working with multiplexed imaging, spatial transcriptomics, or any multiplexed tissue data, exploring CelloType represents a valuable opportunity to streamline workflows, improve accuracy, and generate more robust biological insights.

Ready to accelerate your tissue image analysis? Access CelloType’s open-source implementation on GitHub and explore how transformer-based deep learning can transform your spatial omics research. Join the growing community of researchers leveraging AI for computational pathology—and share your results with the scientific community.

Here is the comprehensive implementation of the CelloType model based on the research paper. This includes data loading, model architecture, training, and inference.

cellotype_implementation/
├── requirements.txt
├── config.py
├── data/
│   ├── __init__.py
│   └── dataset.py
├── models/
│   ├── __init__.py
│   ├── feature_extractor.py
│   ├── dino_detector.py
│   ├── mask_dino.py
│   └── cellotype.py
├── training/
│   ├── __init__.py
│   ├── trainer.py
│   └── losses.py
├── utils/
│   ├── __init__.py
│   ├── metrics.py
│   └── visualization.py
├── inference.py
└── main.py
# requirements.txt
torch==2.1.0
torchvision==0.16.0
timm==0.9.12
numpy==1.24.3
opencv-python==4.8.0
scikit-image==0.21.0
albumentations==1.3.0
tensorboard==2.14.0
matplotlib==3.7.2
scipy==1.11.2
pycocotools==2.0.6
detectron2==0.6
# config.py
from dataclasses import dataclass
from typing import Tuple, List

@dataclass
class DataConfig:
    """Data configuration"""
    image_size: Tuple[int, int] = (512, 512)
    train_split: float = 0.8
    val_split: float = 0.1
    num_workers: int = 4
    batch_size: int = 8
    num_channels: int = 2  # Nuclear and membrane
    
@dataclass
class ModelConfig:
    """Model architecture configuration"""
    # Swin Transformer settings
    swin_embed_dim: int = 96
    swin_depths: Tuple[int, ...] = (2, 2, 6, 2)
    swin_num_heads: Tuple[int, ...] = (3, 6, 12, 24)
    swin_window_size: int = 7
    
    # DINO settings
    dino_hidden_dim: int = 256
    dino_num_queries: int = 1000
    dino_nheads: int = 8
    dino_num_decoder_layers: int = 6
    dino_num_encoder_layers: int = 6
    dino_dim_feedforward: int = 2048
    
    # MaskDINO settings
    mask_hidden_dim: int = 256
    num_mask_tokens: int = 100
    
    # Number of classes
    num_classes: int = 12  # Cell types
    
    # Loss weights
    weight_cls: float = 2.0
    weight_box: float = 5.0
    weight_mask: float = 1.0
    
    # Denoising parameters
    denoising_lambda1: float = 0.4
    denoising_lambda2: float = 1.0

@dataclass
class TrainingConfig:
    """Training configuration"""
    num_epochs: int = 100
    learning_rate: float = 1e-6
    weight_decay: float = 1e-4
    warmup_epochs: int = 5
    batch_size: int = 8
    num_workers: int = 4
    device: str = "cuda"
    save_dir: str = "./checkpoints"
    log_dir: str = "./logs"
    eval_interval: int = 5
    patience: int = 15
    
@dataclass
class InferenceConfig:
    """Inference configuration"""
    confidence_threshold: float = 0.3
    nms_threshold: float = 0.5
    max_instances: int = 1000
# data/dataset.py
import os
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional

import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
from pycocotools import mask as mask_util
from skimage import measure

class TissueImageDataset(Dataset):
    """
    Dataset for tissue images with segmentation masks and cell type annotations.
    
    Expected directory structure:
    data/
    ├── images/
    │   ├── img_001.tif (or .npy)
    │   └── ...
    ├── masks/
    │   ├── img_001_mask.png
    │   └── ...
    └── annotations/
        ├── img_001.json
        └── ...
    """
    
    def __init__(
        self,
        root_dir: str,
        image_size: Tuple[int, int] = (512, 512),
        augment: bool = True,
        num_classes: int = 12,
        split: str = "train"
    ):
        """
        Args:
            root_dir: Root directory containing images, masks, and annotations
            image_size: Target image size
            augment: Whether to apply augmentations
            num_classes: Number of cell type classes
            split: "train", "val", or "test"
        """
        self.root_dir = Path(root_dir)
        self.image_size = image_size
        self.num_classes = num_classes
        self.split = split
        
        self.images_dir = self.root_dir / "images"
        self.masks_dir = self.root_dir / "masks"
        self.annotations_dir = self.root_dir / "annotations"
        
        # Get image list
        self.image_files = sorted([
            f.stem for f in self.images_dir.glob("*")
            if f.suffix in ['.tif', '.npy', '.tiff']
        ])
        
        if not self.image_files:
            raise RuntimeError(f"No images found in {self.images_dir}")
        
        # Data augmentation pipeline
        if augment:
            self.transforms = A.Compose([
                A.HorizontalFlip(p=0.5),
                A.VerticalFlip(p=0.5),
                A.Rotate(limit=45, p=0.5),
                A.GaussNoise(p=0.3),
                A.GaussianBlur(blur_limit=3, p=0.2),
                A.Normalize(
                    mean=[0.5] * image_size[0],
                    std=[0.5] * image_size[0],
                    max_pixel_value=255.0
                ),
                ToTensorV2()
            ], bbox_params=A.BboxParams(format='pascal_voc'))
        else:
            self.transforms = A.Compose([
                A.Normalize(
                    mean=[0.5] * image_size[0],
                    std=[0.5] * image_size[0],
                    max_pixel_value=255.0
                ),
                ToTensorV2()
            ], bbox_params=A.BboxParams(format='pascal_voc'))
    
    def __len__(self) -> int:
        return len(self.image_files)
    
    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """
        Returns:
            Dictionary containing:
                - image: Tensor of shape (C, H, W)
                - masks: Tensor of shape (N, H, W) - instance masks
                - boxes: Tensor of shape (N, 4) - bounding boxes [x1, y1, x2, y2]
                - labels: Tensor of shape (N,) - class labels
                - areas: Tensor of shape (N,) - instance areas
                - iscrowd: Tensor of shape (N,) - crowd flags
        """
        image_name = self.image_files[idx]
        
        # Load image
        image_path = self.images_dir / f"{image_name}.npy"
        if not image_path.exists():
            image_path = self.images_dir / f"{image_name}.tif"
        
        if image_path.suffix == '.npy':
            image = np.load(image_path)
        else:
            image = cv2.imread(str(image_path), cv2.IMREAD_ANYDEPTH)
        
        # Ensure image is float32 and resized
        if image.ndim == 2:
            image = np.stack([image, image], axis=0)
        
        image = image.astype(np.float32)
        image = cv2.resize(image.transpose(1, 2, 0), self.image_size)
        image = image.transpose(2, 0, 1)
        
        # Load mask
        mask_path = self.masks_dir / f"{image_name}_mask.png"
        instance_mask = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
        instance_mask = cv2.resize(instance_mask, self.image_size, interpolation=cv2.INTER_NEAREST)
        
        # Load annotations
        annotation_path = self.annotations_dir / f"{image_name}.json"
        with open(annotation_path, 'r') as f:
            annotations = json.load(f)
        
        # Extract boxes, labels, and masks
        boxes = []
        labels = []
        masks = []
        areas = []
        
        for ann in annotations:
            # Bounding box [x1, y1, x2, y2]
            bbox = ann['bbox']
            x1, y1, w, h = bbox
            x2 = x1 + w
            y2 = y1 + h
            boxes.append([x1, y1, x2, y2])
            
            # Class label
            labels.append(ann['category_id'])
            
            # Instance mask
            cell_mask = instance_mask == ann['id']
            masks.append(cell_mask)
            
            # Area
            areas.append(cell_mask.sum())
        
        if len(boxes) == 0:
            # Handle empty image
            boxes = np.zeros((0, 4), dtype=np.float32)
            labels = np.zeros((0,), dtype=np.int64)
            masks = np.zeros((0, *self.image_size), dtype=np.uint8)
            areas = np.zeros((0,), dtype=np.float32)
        else:
            boxes = np.array(boxes, dtype=np.float32)
            labels = np.array(labels, dtype=np.int64)
            masks = np.array(masks, dtype=np.uint8)
            areas = np.array(areas, dtype=np.float32)
        
        # Apply augmentations
        if self.transforms:
            transformed = self.transforms(
                image=image.transpose(1, 2, 0),
                bboxes=boxes.tolist() if len(boxes) > 0 else [],
                labels=labels.tolist() if len(labels) > 0 else []
            )
            image = transformed['image']
            boxes = np.array(transformed['bboxes'], dtype=np.float32)
            labels = np.array(transformed['labels'], dtype=np.int64)
        
        # Convert masks to tensor
        masks = torch.from_numpy(masks).float()
        
        return {
            'image': image,
            'boxes': torch.from_numpy(boxes),
            'labels': torch.from_numpy(labels),
            'masks': masks,
            'areas': torch.from_numpy(areas),
            'iscrowd': torch.zeros(len(labels), dtype=torch.uint8),
            'image_id': torch.tensor(idx, dtype=torch.int64)
        }


def create_dataloaders(
    data_dir: str,
    batch_size: int = 8,
    image_size: Tuple[int, int] = (512, 512),
    num_workers: int = 4,
    train_split: float = 0.8,
    val_split: float = 0.1
) -> Tuple[DataLoader, DataLoader, DataLoader]:
    """Create train, validation, and test dataloaders"""
    
    # Create datasets
    train_dataset = TissueImageDataset(
        data_dir,
        image_size=image_size,
        augment=True,
        split="train"
    )
    
    val_dataset = TissueImageDataset(
        data_dir,
        image_size=image_size,
        augment=False,
        split="val"
    )
    
    test_dataset = TissueImageDataset(
        data_dir,
        image_size=image_size,
        augment=False,
        split="test"
    )
    
    # Create dataloaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        collate_fn=collate_fn
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        collate_fn=collate_fn
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        collate_fn=collate_fn
    )
    
    return train_loader, val_loader, test_loader


def collate_fn(batch: List[Dict]) -> Dict[str, List]:
    """Custom collate function for variable-sized tensors"""
    return {
        'image': torch.stack([item['image'] for item in batch]),
        'boxes': [item['boxes'] for item in batch],
        'labels': [item['labels'] for item in batch],
        'masks': [item['masks'] for item in batch],
        'areas': [item['areas'] for item in batch],
        'iscrowd': [item['iscrowd'] for item in batch],
        'image_id': torch.stack([item['image_id'] for item in batch])
    }

# models/feature_extractor.py
import torch
import torch.nn as nn
from timm.models.swin_transformer import SwinTransformer
from typing import List, Dict

class FeatureExtractor(nn.Module):
    """
    Swin Transformer-based feature extractor for multiscale feature extraction.
    
    This module generates multiscale image features from input tissue images,
    preserving both local and global spatial information.
    """
    
    def __init__(
        self,
        in_channels: int = 2,
        embed_dim: int = 96,
        depths: tuple = (2, 2, 6, 2),
        num_heads: tuple = (3, 6, 12, 24),
        window_size: int = 7,
        pretrained: bool = False
    ):
        """
        Args:
            in_channels: Number of input channels
            embed_dim: Embedding dimension
            depths: Depths of each Swin Transformer stage
            num_heads: Number of attention heads in each stage
            window_size: Window size for shifted window attention
            pretrained: Whether to use pretrained weights
        """
        super().__init__()
        
        self.in_channels = in_channels
        self.embed_dim = embed_dim
        self.depths = depths
        self.num_heads = num_heads
        self.window_size = window_size
        
        # Channel adaptation if input channels != 3
        if in_channels != 3:
            self.channel_adapter = nn.Conv2d(in_channels, 3, kernel_size=1)
        else:
            self.channel_adapter = None
        
        # Swin Transformer
        self.swin = SwinTransformer(
            img_size=512,
            patch_size=4,
            in_chans=3,
            num_classes=1000,
            embed_dim=embed_dim,
            depths=depths,
            num_heads=num_heads,
            window_size=window_size,
            mlp_ratio=4.0,
            drop_rate=0.0,
            attn_drop_rate=0.0,
            drop_path_rate=0.1,
        )
        
        # Feature dimension for each stage
        self.out_channels = [embed_dim * (2 ** i) for i in range(len(depths))]
        
    def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
        """
        Args:
            x: Input tensor of shape (B, C, H, W)
            
        Returns:
            Dictionary containing multiscale features
        """
        batch_size = x.shape[0]
        
        # Adapt channels if needed
        if self.channel_adapter is not None:
            x = self.channel_adapter(x)
        
        # Extract features from Swin Transformer
        features = self._extract_features(x)
        
        return features
    
    def _extract_features(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
        """Extract multiscale features from Swin Transformer"""
        features = {}
        
        # Patch embedding
        x = self.swin.patch_embed(x)
        x = self.swin.pos_drop(x)
        
        # Extract features from each stage
        for i, layer in enumerate(self.swin.layers):
            x = layer(x)
            
            # Reshape to spatial dimensions
            B, L, C = x.shape
            H = W = int(L ** 0.5)
            feat = x.reshape(B, H, W, C).permute(0, 3, 1, 2).contiguous()
            features[f'stage_{i}'] = feat
        
        # Flatten features for later use
        features['flattened'] = x  # (B, L, C)
        
        return features
# models/dino_detector.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Tuple, List
import math

class PositionalEmbedding(nn.Module):
    """Positional embeddings for spatial information preservation"""
    
    def __init__(self, d_model: int, max_len: int = 5000):
        super().__init__()
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * 
            (-math.log(10000.0) / d_model)
        )
        
        pe[:, 0::2] = torch.sin(position * div_term)
        if d_model % 2 == 1:
            pe[:, 1::2] = torch.cos(position * div_term[:-1])
        else:
            pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x + self.pe[:, :x.size(1)].to(x.device)


class MultiHeadAttention(nn.Module):
    """Multi-head attention with deformable offsets"""
    
    def __init__(self, d_model: int, num_heads: int):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)
        
        # Deformable offset generation
        self.offset_generator = nn.Linear(d_model, 2 * num_heads)
    
    def forward(
        self,
        query: torch.Tensor,
        key: torch.Tensor,
        value: torch.Tensor,
        mask: torch.Tensor = None
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Args:
            query: (B, N_q, d_model)
            key: (B, N_k, d_model)
            value: (B, N_k, d_model)
            mask: Optional attention mask
            
        Returns:
            output: (B, N_q, d_model)
            attention_weights: (B, num_heads, N_q, N_k)
        """
        B = query.shape[0]
        
        # Linear projections
        Q = self.query(query).reshape(B, -1, self.num_heads, self.d_k)
        K = self.key(key).reshape(B, -1, self.num_heads, self.d_k)
        V = self.value(value).reshape(B, -1, self.num_heads, self.d_k)
        
        # Transpose for attention computation
        Q = Q.transpose(1, 2)  # (B, num_heads, N_q, d_k)
        K = K.transpose(1, 2)
        V = V.transpose(1, 2)
        
        # Compute attention scores
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        
        attention_weights = F.softmax(scores, dim=-1)
        
        # Apply attention to values
        context = torch.matmul(attention_weights, V)
        
        # Concatenate heads
        context = context.transpose(1, 2).contiguous()
        context = context.reshape(B, -1, self.d_model)
        
        # Final linear projection
        output = self.out(context)
        
        return output, attention_weights


class TransformerEncoderLayer(nn.Module):
    """Transformer encoder layer"""
    
    def __init__(self, d_model: int, num_heads: int, dim_feedforward: int):
        super().__init__()
        
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.GELU(),
            nn.Linear(dim_feedforward, d_model)
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Self-attention
        attn_out, _ = self.attention(x, x, x)
        x = x + attn_out
        x = self.norm1(x)
        
        # Feed-forward
        ffn_out = self.ffn(x)
        x = x + ffn_out
        x = self.norm2(x)
        
        return x


class TransformerDecoderLayer(nn.Module):
    """Transformer decoder layer with cross-attention"""
    
    def __init__(self, d_model: int, num_heads: int, dim_feedforward: int):
        super().__init__()
        
        self.self_attention = MultiHeadAttention(d_model, num_heads)
        self.cross_attention = MultiHeadAttention(d_model, num_heads)
        
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dim_feedforward),
            nn.GELU(),
            nn.Linear(dim_feedforward, d_model)
        )
    
    def forward(
        self,
        x: torch.Tensor,
        encoder_output: torch.Tensor
    ) -> torch.Tensor:
        # Self-attention
        self_attn_out, _ = self.self_attention(x, x, x)
        x = x + self_attn_out
        x = self.norm1(x)
        
        # Cross-attention
        cross_attn_out, _ = self.cross_attention(x, encoder_output, encoder_output)
        x = x + cross_attn_out
        x = self.norm2(x)
        
        # Feed-forward
        ffn_out = self.ffn(x)
        x = x + ffn_out
        x = self.norm3(x)
        
        return x


class DINODetector(nn.Module):
    """
    DINO (DETR with Improved Denoising Anchor boxes) object detection module.
    
    This module performs object detection with improved anchor box refinement
    through deformable attention and contrastive denoising training.
    """
    
    def __init__(
        self,
        hidden_dim: int = 256,
        num_queries: int = 1000,
        num_heads: int = 8,
        num_encoder_layers: int = 6,
        num_decoder_layers: int = 6,
        dim_feedforward: int = 2048,
        num_classes: int = 12,
        denoising_lambda1: float = 0.4,
        denoising_lambda2: float = 1.0
    ):
        """
        Args:
            hidden_dim: Hidden dimension
            num_queries: Number of object queries
            num_heads: Number of attention heads
            num_encoder_layers: Number of encoder layers
            num_decoder_layers: Number of decoder layers
            dim_feedforward: Dimension of FFN
            num_classes: Number of object classes
            denoising_lambda1: Denoising lambda for small noise
            denoising_lambda2: Denoising lambda for large noise
        """
        super().__init__()
        
        self.hidden_dim = hidden_dim
        self.num_queries = num_queries
        self.num_classes = num_classes
        self.denoising_lambda1 = denoising_lambda1
        self.denoising_lambda2 = denoising_lambda2
        
        # Positional embeddings
        self.pos_embedding = PositionalEmbedding(hidden_dim)
        
        # Query embeddings (learnable)
        self.query_embed = nn.Embedding(num_queries, hidden_dim)
        
        # Encoder
        self.encoder = nn.ModuleList([
            TransformerEncoderLayer(hidden_dim, num_heads, dim_feedforward)
            for _ in range(num_encoder_layers)
        ])
        
        # Decoder
        self.decoder = nn.ModuleList([
            TransformerDecoderLayer(hidden_dim, num_heads, dim_feedforward)
            for _ in range(num_decoder_layers)
        ])
        
        # Prediction heads
        self.class_head = nn.Linear(hidden_dim, num_classes + 1)
        self.box_head = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 4)
        )
        
        # Initialize weights
        self._init_weights()
    
    def _init_weights(self):
        """Initialize layer weights"""
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.LayerNorm):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
    
    def forward(
        self,
        image_features: torch.Tensor,
        targets: Dict = None
    ) -> Dict[str, torch.Tensor]:
        """
        Args:
            image_features: Flattened image features from feature extractor (B, L, C)
            targets: Optional target annotations for denoising training
            
        Returns:
            Dictionary containing:
                - pred_boxes: Predicted bounding boxes (B, num_queries, 4)
                - pred_classes: Predicted class logits (B, num_queries, num_classes+1)
                - pred_logits: Logits for classification
        """
        batch_size = image_features.shape[0]
        
        # Add positional embeddings
        pos_features = self.pos_embedding(image_features)
        
        # Encoder
        encoder_output = pos_features
        for encoder_layer in self.encoder:
            encoder_output = encoder_layer(encoder_output)
        
        # Initialize query embeddings
        query_embed = self.query_embed.weight.unsqueeze(0).expand(batch_size, -1, -1)
        
        # Add denoising during training
        if self.training and targets is not None:
            query_embed = self._add_denoising_queries(query_embed, targets)
        
        # Decoder with iterative refinement
        decoder_output = query_embed
        for decoder_layer in self.decoder:
            decoder_output = decoder_layer(decoder_output, encoder_output)
        
        # Prediction heads
        pred_boxes = self.box_head(decoder_output)
        pred_classes = self.class_head(decoder_output)
        
        # Normalize boxes to [0, 1] using sigmoid + scale
        pred_boxes = torch.sigmoid(pred_boxes)
        
        return {
            'pred_boxes': pred_boxes,
            'pred_classes': pred_classes,
            'encoder_output': encoder_output,
            'decoder_output': decoder_output
        }
    
    def _add_denoising_queries(
        self,
        query_embed: torch.Tensor,
        targets: Dict
    ) -> torch.Tensor:
        """
        Add contrastive denoising queries for improved robustness.
        
        According to the paper, denoising involves adding controlled noise
        to ground-truth labels and boxes.
        """
        # Implementation of denoising as per equation:
        # |Δx| < λ₁w/2, |Δy| < λ₁h/2, |Δw| < λ₁w, |Δh| < λ₁h
        
        denoised_queries = query_embed.clone()
        
        # Add Gaussian noise for positive samples
        noise1 = torch.randn_like(denoised_queries) * self.denoising_lambda1
        denoised_positive = denoised_queries + noise1
        
        # Add larger noise for negative samples
        noise2 = torch.randn_like(denoised_queries) * self.denoising_lambda2
        denoised_negative = denoised_queries + noise2
        
        # Concatenate positive and negative samples
        denoised_queries = torch.cat([denoised_positive, denoised_negative], dim=1)
        
        return denoised_queries
# models/mask_dino.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Tuple

class SegmentationHead(nn.Module):
    """Segmentation head for mask prediction"""
    
    def __init__(self, hidden_dim: int, num_mask_tokens: int):
        super().__init__()
        
        self.conv1 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(hidden_dim, num_mask_tokens, kernel_size=1)
        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = F.relu(self.conv1(x))
        x = self.conv2(x)
        x = self.upsample(x)
        return x


class MaskDINO(nn.Module):
    """
    MaskDINO: Unified transformer-based framework for object detection
    and instance segmentation.
    
    This module integrates DINO outputs with segmentation branch to
    produce refined instance masks.
    """
    
    def __init__(
        self,
        hidden_dim: int = 256,
        num_mask_tokens: int = 100,
        mask_head_hidden_dim: int = 256
    ):
        """
        Args:
            hidden_dim: Hidden dimension
            num_mask_tokens: Number of mask prediction tokens
            mask_head_hidden_dim: Hidden dimension for mask head
        """
        super().__init__()
        
        self.hidden_dim = hidden_dim
        self.num_mask_tokens = num_mask_tokens
        
        # Channel adaptation for concatenation
        self.feature_projection = nn.Linear(hidden_dim, hidden_dim)
        self.latent_projection = nn.Linear(hidden_dim, hidden_dim)
        
        # Upsampling layers
        self.upsample_4x = nn.Sequential(
            nn.ConvTranspose2d(hidden_dim, hidden_dim // 2, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(hidden_dim // 2, hidden_dim // 4, kernel_size=4, stride=2, padding=1),
            nn.ReLU()
        )
        
        # Segmentation head
        self.seg_head = nn.Sequential(
            nn.Conv2d(hidden_dim // 4, hidden_dim // 8, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_dim // 8, 1, kernel_size=1)
        )
        
        # Dice loss computation
        self.register_buffer('eps', torch.tensor(1e-6))
    
    def forward(
        self,
        image_features: torch.Tensor,
        dino_output: Dict[str, torch.Tensor],
        masks_targets: torch.Tensor = None
    ) -> Dict[str, torch.Tensor]:
        """
        Args:
            image_features: Image features from feature extractor (B, H, W, C)
            dino_output: Output from DINO module containing:
                - pred_boxes: (B, num_queries, 4)
                - pred_classes: (B, num_queries, num_classes+1)
                - decoder_output: (B, num_queries, hidden_dim)
            masks_targets: Optional ground truth masks (B, num_queries, H, W)
            
        Returns:
            Dictionary containing:
                - pred_masks: Predicted binary masks (B, num_queries, H, W)
                - mask_loss: Segmentation loss
        """
        batch_size = image_features.shape[0]
        height, width = image_features.shape[1:3]
        
        # Reshape image features for processing
        image_features_reshape = image_features.permute(0, 3, 1, 2)  # (B, C, H, W)
        
        # Get decoder output from DINO
        decoder_output = dino_output['decoder_output']  # (B, num_queries, hidden_dim)
        
        # Compute segmentation masks via dot product
        # m = q_c ⊗ M(T(C_b) + F(C_e))
        
        # Project features and decoder output
        image_feat_proj = self.feature_projection(
            image_features.reshape(-1, image_features.shape[-1])
        ).reshape(batch_size, height, width, -1)
        image_feat_proj = image_feat_proj.permute(0, 3, 1, 2)
        
        latent_proj = self.latent_projection(decoder_output)  # (B, num_queries, hidden_dim)
        
        # Upsample features
        upsampled_feat = self.upsample_4x(image_feat_proj)  # (B, C', H', W')
        
        # Generate segmentation maps through attention
        query_feat = latent_proj.unsqueeze(-1).unsqueeze(-1)  # (B, num_queries, hidden_dim, 1, 1)
        
        # Expand for broadcasting
        batch_size, num_queries, feat_dim = latent_proj.shape
        _, _, up_height, up_width = upsampled_feat.shape
        
        # Reshape for dot product
        upsampled_flat = upsampled_feat.view(batch_size, feat_dim, -1)  # (B, C', H'*W')
        latent_flat = latent_proj.unsqueeze(-1)  # (B, num_queries, hidden_dim, 1)
        
        # Compute dot product: (B, num_queries, hidden_dim) @ (B, hidden_dim, H'*W')
        mask_logits = torch.bmm(
            latent_proj,
            upsampled_flat
        )  # (B, num_queries, H'*W')
        
        # Reshape to spatial dimensions
        pred_masks = mask_logits.view(
            batch_size, num_queries, up_height, up_width
        )  # (B, num_queries, H', W')
        
        # Apply sigmoid
        pred_masks = torch.sigmoid(pred_masks)
        
        output = {
            'pred_masks': pred_masks,
            'mask_logits': mask_logits
        }
        
        return output
    
    def compute_mask_loss(
        self,
        pred_masks: torch.Tensor,
        target_masks: torch.Tensor
    ) -> torch.Tensor:
        """
        Compute segmentation loss combining BCE and Dice losses.
        
        Args:
            pred_masks: Predicted masks (B, num_queries, H, W)
            target_masks: Target masks (B, num_queries, H, W)
            
        Returns:
            Segmentation loss
        """
        # BCE loss
        bce_loss = F.binary_cross_entropy(pred_masks, target_masks, reduction='mean')
        
        # Dice loss
        intersection = (pred_masks * target_masks).sum()
        union = pred_masks.sum() + target_masks.sum()
        dice_loss = 1 - (2 * intersection + self.eps) / (union + self.eps)
        
        # Combined loss
        total_loss = 0.5 * bce_loss + 0.5 * dice_loss
        
        return total_loss
# models/cellotype.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Optional, Tuple
from .feature_extractor import FeatureExtractor
from .dino_detector import DINODetector
from .mask_dino import MaskDINO

class CelloType(nn.Module):
    """
    CelloType: Unified end-to-end model for cell segmentation and classification.
    
    This model integrates three components:
    1. Feature extraction (Swin Transformer)
    2. Object detection (DINO)
    3. Instance segmentation (MaskDINO)
    
    The model is trained with a unified loss function that considers
    segmentation masks, bounding boxes, and class predictions.
    """
    
    def __init__(
        self,
        in_channels: int = 2,
        hidden_dim: int = 256,
        num_queries: int = 1000,
        num_classes: int = 12,
        swin_embed_dim: int = 96,
        swin_depths: Tuple[int, ...] = (2, 2, 6, 2),
        swin_num_heads: Tuple[int, ...] = (3, 6, 12, 24),
        dino_num_heads: int = 8,
        dino_num_encoder_layers: int = 6,
        dino_num_decoder_layers: int = 6,
        dim_feedforward: int = 2048,
        weight_cls: float = 2.0,
        weight_box: float = 5.0,
        weight_mask: float = 1.0,
        denoising_lambda1: float = 0.4,
        denoising_lambda2: float = 1.0
    ):
        """
        Args:
            in_channels: Number of input channels
            hidden_dim: Hidden dimension for DINO/MaskDINO
            num_queries: Number of object queries
            num_classes: Number of cell type classes
            swin_embed_dim: Swin Transformer embedding dimension
            swin_depths: Depths of Swin Transformer stages
            swin_num_heads: Number of heads in each Swin stage
            dino_num_heads: Number of heads in DINO
            dino_num_encoder_layers: Number of DINO encoder layers
            dino_num_decoder_layers: Number of DINO decoder layers
            dim_feedforward: FFN dimension
            weight_cls: Loss weight for classification
            weight_box: Loss weight for box regression
            weight_mask: Loss weight for segmentation
            denoising_lambda1: Small denoising lambda
            denoising_lambda2: Large denoising lambda
        """
        super().__init__()
        
        self.in_channels = in_channels
        self.hidden_dim = hidden_dim
        self.num_queries = num_queries
        self.num_classes = num_classes
        
        self.weight_cls = weight_cls
        self.weight_box = weight_box
        self.weight_mask = weight_mask
        
        # Feature extraction
        self.feature_extractor = FeatureExtractor(
            in_channels=in_channels,
            embed_dim=swin_embed_dim,
            depths=swin_depths,
            num_heads=swin_num_heads,
            window_size=7
        )
        
        # Get feature dimension from extractor
        feature_dim = self.feature_extractor.out_channels[-1]
        
        # Feature projection to hidden_dim
        self.feature_projection = nn.Sequential(
            nn.Linear(feature_dim, hidden_dim),
            nn.LayerNorm(hidden_dim)
        )
        
        # Object detection (DINO)
        self.dino_detector = DINODetector(
            hidden_dim=hidden_dim,
            num_queries=num_queries,
            num_heads=dino_num_heads,
            num_encoder_layers=dino_num_encoder_layers,
            num_decoder_layers=dino_num_decoder_layers,
            dim_feedforward=dim_feedforward,
            num_classes=num_classes,
            denoising_lambda1=denoising_lambda1,
            denoising_lambda2=denoising_lambda2
        )
        
        # Instance segmentation (MaskDINO)
        self.mask_dino = MaskDINO(
            hidden_dim=hidden_dim,
            num_mask_tokens=num_queries
        )
        
        # Confidence score head
        self.confidence_head = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Linear(hidden_dim // 2, 1),
            nn.Sigmoid()
        )
    
    def forward(
        self,
        images: torch.Tensor,
        targets: Optional[Dict] = None
    ) -> Dict[str, torch.Tensor]:
        """
        Args:
            images: Input images (B, C, H, W)
            targets: Optional target annotations for training
            
        Returns:
            Dictionary containing:
                - pred_boxes: Predicted bounding boxes
                - pred_classes: Predicted class logits
                - pred_masks: Predicted instance masks
                - confidence_scores: Confidence scores for predictions
                - losses: (Optional during training) Individual loss components
        """
        batch_size = images.shape[0]
        device = images.device
        
        # Feature extraction
        features = self.feature_extractor(images)
        image_features = features['flattened']  # (B, L, C)
        
        # Project features
        projected_features = self.feature_projection(image_features)
        
        # Object detection with DINO
        dino_output = self.dino_detector(
            projected_features,
            targets=targets if self.training else None
        )
        
        # Extract DINO outputs
        pred_boxes = dino_output['pred_boxes']  # (B, num_queries, 4)
        pred_classes = dino_output['pred_classes']  # (B, num_queries, num_classes+1)
        decoder_output = dino_output['decoder_output']  # (B, num_queries, hidden_dim)
        
        # Reshape image features for segmentation
        # Assuming 512x512 input -> (B, 1024, 256) from flattened
        feat_h = feat_w = int(image_features.shape[1] ** 0.5)
        image_features_spatial = image_features.reshape(
            batch_size, feat_h, feat_w, -1
        )
        
        # Instance segmentation with MaskDINO
        mask_output = self.mask_dino(
            image_features_spatial,
            dino_output,
            masks_targets=targets['masks'] if targets is not None else None
        )
        
        pred_masks = mask_output['pred_masks']  # (B, num_queries, H', W')
        
        # Generate confidence scores
        confidence_scores = self.confidence_head(decoder_output)  # (B, num_queries, 1)
        confidence_scores = confidence_scores.squeeze(-1)
        
        output = {
            'pred_boxes': pred_boxes,
            'pred_classes': pred_classes,
            'pred_masks': pred_masks,
            'confidence_scores': confidence_scores
        }
        
        # Compute losses during training
        if self.training and targets is not None:
            losses = self.compute_losses(
                pred_boxes=pred_boxes,
                pred_classes=pred_classes,
                pred_masks=pred_masks,
                targets=targets
            )
            output['losses'] = losses
            output['total_loss'] = (
                self.weight_cls * losses['cls_loss'] +
                self.weight_box * losses['box_loss'] +
                self.weight_mask * losses['mask_loss']
            )
        
        return output
    
    def compute_losses(
        self,
        pred_boxes: torch.Tensor,
        pred_classes: torch.Tensor,
        pred_masks: torch.Tensor,
        targets: Dict
    ) -> Dict[str, torch.Tensor]:
        """
        Compute multitask losses: classification, box regression, and segmentation.
        
        Loss = λ_cls * L_cls + λ_box * L_box + λ_mask * L_mask
        """
        # Classification loss
        cls_loss = self._compute_classification_loss(pred_classes, targets)
        
        # Box regression loss
        box_loss = self._compute_box_loss(pred_boxes, targets)
        
        # Segmentation loss
        mask_loss = self._compute_mask_loss(pred_masks, targets)
        
        return {
            'cls_loss': cls_loss,
            'box_loss': box_loss,
            'mask_loss': mask_loss
        }
    
    def _compute_classification_loss(
        self,
        pred_classes: torch.Tensor,
        targets: Dict
    ) -> torch.Tensor:
        """
        Focal loss for classification to handle class imbalance.
        """
        batch_size = pred_classes.shape[0]
        losses = []
        
        for i in range(batch_size):
            if len(targets['labels'][i]) == 0:
                continue
            
            # Get predictions and targets for this image
            pred = pred_classes[i]  # (num_queries, num_classes+1)
            target_labels = targets['labels'][i]  # (N,)
            
            # Focal loss with reduction
            ce_loss = F.cross_entropy(
                pred.unsqueeze(0).expand(len(target_labels), -1, -1),
                target_labels.unsqueeze(0).expand(len(target_labels), -1),
                reduction='none'
            )
            
            # Focal loss weighting
            p_t = torch.exp(-ce_loss)
            focal_loss = (1 - p_t) ** 2 * ce_loss
            
            losses.append(focal_loss.mean())
        
        if losses:
            return torch.stack(losses).mean()
        else:
            return torch.tensor(0.0, device=pred_classes.device)
    
    def _compute_box_loss(
        self,
        pred_boxes: torch.Tensor,
        targets: Dict
    ) -> torch.Tensor:
        """
        Compute L1 loss for bounding box regression.
        """
        batch_size = pred_boxes.shape[0]
        losses = []
        
        for i in range(batch_size):
            if len(targets['boxes'][i]) == 0:
                continue
            
            pred = pred_boxes[i]  # (num_queries, 4)
            target_boxes = targets['boxes'][i]  # (N, 4)
            
            # Match predictions to targets (simplified - use first N)
            n_targets = len(target_boxes)
            if n_targets > 0:
                pred_matched = pred[:n_targets]
                loss = F.l1_loss(pred_matched, target_boxes)
                losses.append(loss)
        
        if losses:
            return torch.stack(losses).mean()
        else:
            return torch.tensor(0.0, device=pred_boxes.device)
    
    def _compute_mask_loss(
        self,
        pred_masks: torch.Tensor,
        targets: Dict
    ) -> torch.Tensor:
        """
        Compute segmentation loss (BCE + Dice).
        """
        batch_size = pred_masks.shape[0]
        losses = []
        
        for i in range(batch_size):
            if len(targets['masks'][i]) == 0:
                continue
            
            pred = pred_masks[i]  # (num_queries, H, W)
            target = targets['masks'][i]  # (N, H, W)
            
            # Resize target to match predictions if needed
            if target.shape != pred.shape:
                target = F.interpolate(
                    target.unsqueeze(1).float(),
                    size=pred.shape[1:],
                    mode='bilinear',
                    align_corners=False
                ).squeeze(1)
            
            n_targets = len(target)
            if n_targets > 0:
                pred_matched = pred[:n_targets]
                
                # BCE loss
                bce = F.binary_cross_entropy(pred_matched, target.float())
                
                # Dice loss
                intersection = (pred_matched * target.float()).sum()
                union = pred_matched.sum() + target.float().sum()
                dice = 1 - (2 * intersection + 1e-6) / (union + 1e-6)
                
                loss = 0.5 * bce + 0.5 * dice
                losses.append(loss)
        
        if losses:
            return torch.stack(losses).mean()
        else:
            return torch.tensor(0.0, device=pred_masks.device)

# training/trainer.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from pathlib import Path
from typing import Dict, Tuple
from tqdm import tqdm
import numpy as np

from config import TrainingConfig, ModelConfig, DataConfig
from utils.metrics import compute_ap, compute_classification_metrics

class CelloTypeTrainer:
    """Trainer class for CelloType model"""
    
    def __init__(
        self,
        model: nn.Module,
        train_loader,
        val_loader,
        config: TrainingConfig
    ):
        """
        Args:
            model: CelloType model
            train_loader: Training data loader
            val_loader: Validation data loader
            config: Training configuration
        """
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.config = config
        
        self.device = torch.device(config.device)
        self.model = self.model.to(self.device)
        
        # Optimizer
        self.optimizer = optim.Adam(
            self.model.parameters(),
            lr=config.learning_rate,
            weight_decay=config.weight_decay
        )
        
        # Learning rate scheduler
        self.scheduler = optim.lr_scheduler.CosineAnnealingLR(
            self.optimizer,
            T_max=config.num_epochs,
            eta_min=1e-8
        )
        
        # Create checkpoints directory
        self.save_dir = Path(config.save_dir)
        self.save_dir.mkdir(parents=True, exist_ok=True)
        
        # Tensorboard logger
        self.writer = SummaryWriter(config.log_dir)
        
        # Training state
        self.current_epoch = 0
        self.best_val_ap = 0
        self.patience_counter = 0
    
    def train_epoch(self) -> Dict[str, float]:
        """Train one epoch"""
        self.model.train()
        
        total_loss = 0
        cls_losses = []
        box_losses = []
        mask_losses = []
        
        pbar = tqdm(self.train_loader, desc=f"Epoch {self.current_epoch}")
        
        for batch_idx, batch in enumerate(pbar):
            # Move to device
            images = batch['image'].to(self.device)
            
            # Prepare targets
            targets = {
                'boxes': [b.to(self.device) for b in batch['boxes']],
                'labels': [l.to(self.device) for l in batch['labels']],
                'masks': [m.to(self.device) for m in batch['masks']],
                'areas': [a.to(self.device) for a in batch['areas']]
            }
            
            # Forward pass
            self.optimizer.zero_grad()
            outputs = self.model(images, targets=targets)
            
            # Compute loss
            loss = outputs['total_loss']
            
            # Backward pass
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()
            
            # Record losses
            total_loss += loss.item()
            cls_losses.append(outputs['losses']['cls_loss'].item())
            box_losses.append(outputs['losses']['box_loss'].item())
            mask_losses.append(outputs['losses']['mask_loss'].item())
            
            pbar.set_postfix({
                'loss': loss.item(),
                'cls': outputs['losses']['cls_loss'].item(),
                'box': outputs['losses']['box_loss'].item(),
                'mask': outputs['losses']['mask_loss'].item()
            })
        
        # Average losses
        avg_loss = total_loss / len(self.train_loader)
        avg_cls_loss = np.mean(cls_losses)
        avg_box_loss = np.mean(box_losses)
        avg_mask_loss = np.mean(mask_losses)
        
        return {
            'total_loss': avg_loss,
            'cls_loss': avg_cls_loss,
            'box_loss': avg_box_loss,
            'mask_loss': avg_mask_loss
        }
    
    @torch.no_grad()
    def validate(self) -> Dict[str, float]:
        """Validate model"""
        self.model.eval()
        
        all_pred_boxes = []
        all_pred_classes = []
        all_pred_scores = []
        all_gt_boxes = []
        all_gt_classes = []
        
        pbar = tqdm(self.val_loader, desc="Validation")
        
        for batch in pbar:
            images = batch['image'].to(self.device)
            
            # Forward pass
            outputs = self.model(images)
            
            # Post-process predictions
            for i in range(images.shape[0]):
                pred_boxes = outputs['pred_boxes'][i]
                pred_classes = outputs['pred_classes'][i]
                confidence = outputs['confidence_scores'][i]
                
                # Filter by confidence threshold
                mask = confidence > self.config.confidence_threshold
                pred_boxes = pred_boxes[mask]
                pred_classes = pred_classes[mask]
                confidence = confidence[mask]
                
                all_pred_boxes.append(pred_boxes)
                all_pred_classes.append(pred_classes.argmax(dim=-1))
                all_pred_scores.append(confidence)
                
                # Ground truth (if available)
                if 'boxes' in batch:
                    all_gt_boxes.append(batch['boxes'][i])
                    all_gt_classes.append(batch['labels'][i])
        
        # Compute metrics
        ap = compute_ap(all_pred_boxes, all_gt_boxes, all_pred_scores)
        
        return {
            'ap': ap,
            'mean_confidence': np.mean([s.mean().item() for s in all_pred_scores])
        }
    
    def train(self):
        """Full training loop"""
        for epoch in range(self.config.num_epochs):
            self.current_epoch = epoch
            
            # Train
            train_metrics = self.train_epoch()
            
            # Validate
            if (epoch + 1) % self.config.eval_interval == 0:
                val_metrics = self.validate()
                
                # Log metrics
                self.writer.add_scalar(
                    'val/ap', val_metrics['ap'], epoch
                )
                self.writer.add_scalar(
                    'val/mean_confidence', val_metrics['mean_confidence'], epoch
                )
                
                print(f"Epoch {epoch}: Val AP = {val_metrics['ap']:.4f}")
                
                # Early stopping
                if val_metrics['ap'] > self.best_val_ap:
                    self.best_val_ap = val_metrics['ap']
                    self.patience_counter = 0
                    self._save_checkpoint(is_best=True)
                else:
                    self.patience_counter += 1
                    if self.patience_counter >= self.config.patience:
                        print(f"Early stopping at epoch {epoch}")
                        break
            
            # Log training metrics
            for key, value in train_metrics.items():
                self.writer.add_scalar(f'train/{key}', value, epoch)
            
            # Update learning rate
            self.scheduler.step()
            
            # Save checkpoint periodically
            if (epoch + 1) % 10 == 0:
                self._save_checkpoint()
        
        self.writer.close()
    
    def _save_checkpoint(self, is_best: bool = False):
        """Save model checkpoint"""
        checkpoint = {
            'epoch': self.current_epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'best_val_ap': self.best_val_ap
        }
        
        if is_best:
            path = self.save_dir / 'best_model.pt'
        else:
            path = self.save_dir / f'checkpoint_epoch_{self.current_epoch}.pt'
        
        torch.save(checkpoint, path)
        print(f"Checkpoint saved: {path}")
    
    def load_checkpoint(self, path: str):
        """Load checkpoint"""
        checkpoint = torch.load(path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.current_epoch = checkpoint['epoch']
        self.best_val_ap = checkpoint['best_val_ap']
        print(f"Checkpoint loaded: {path}")
# training/losses.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict

class CelloTypeLoss(nn.Module):
    """
    Combined loss function for CelloType.
    
    Loss = λ_cls * L_cls + λ_box * L_box + λ_mask * L_mask
    """
    
    def __init__(
        self,
        weight_cls: float = 2.0,
        weight_box: float = 5.0,
        weight_mask: float = 1.0,
        num_classes: int = 12
    ):
        super().__init__()
        self.weight_cls = weight_cls
        self.weight_box = weight_box
        self.weight_mask = weight_mask
        self.num_classes = num_classes
    
    def forward(
        self,
        pred_boxes: torch.Tensor,
        pred_classes: torch.Tensor,
        pred_masks: torch.Tensor,
        targets: Dict[str, torch.Tensor]
    ) -> Dict[str, torch.Tensor]:
        """
        Compute all loss components.
        
        Args:
            pred_boxes: (B, N, 4)
            pred_classes: (B, N, num_classes+1)
            pred_masks: (B, N, H, W)
            targets: Dictionary with 'boxes', 'labels', 'masks'
            
        Returns:
            Dictionary with individual losses and total loss
        """
        # Classification loss (focal loss)
        cls_loss = self._focal_loss(pred_classes, targets['labels'])
        
        # Box regression loss (L1 loss)
        box_loss = self._box_loss(pred_boxes, targets['boxes'])
        
        # Segmentation loss (BCE + Dice)
        mask_loss = self._mask_loss(pred_masks, targets['masks'])
        
        # Total loss
        total_loss = (
            self.weight_cls * cls_loss +
            self.weight_box * box_loss +
            self.weight_mask * mask_loss
        )
        
        return {
            'cls_loss': cls_loss,
            'box_loss': box_loss,
            'mask_loss': mask_loss,
            'total_loss': total_loss
        }
    
    def _focal_loss(
        self,
        pred_classes: torch.Tensor,
        target_labels: list
    ) -> torch.Tensor:
        """Focal loss for handling class imbalance"""
        batch_size = pred_classes.shape[0]
        losses = []
        
        for i in range(batch_size):
            if len(target_labels[i]) == 0:
                continue
            
            pred = pred_classes[i]
            target = target_labels[i]
            
            # Cross entropy
            ce_loss = F.cross_entropy(
                pred.unsqueeze(0),
                target.view(1, -1),
                reduction='none'
            )
            
            # Focal term
            p_t = torch.exp(-ce_loss)
            focal_weight = (1 - p_t) ** 2
            
            focal_loss = focal_weight * ce_loss
            losses.append(focal_loss.mean())
        
        if losses:
            return torch.stack(losses).mean()
        else:
            return torch.tensor(0.0, device=pred_classes.device)
    
    def _box_loss(
        self,
        pred_boxes: torch.Tensor,
        target_boxes: list
    ) -> torch.Tensor:
        """L1 loss for box regression"""
        batch_size = pred_boxes.shape[0]
        losses = []
        
        for i in range(batch_size):
            if len(target_boxes[i]) == 0:
                continue
            
            pred = pred_boxes[i]
            target = target_boxes[i]
            
            # Match predictions to targets
            n_targets = len(target)
            if n_targets > 0 and n_targets <= pred.shape[0]:
                pred_matched = pred[:n_targets]
                loss = F.l1_loss(pred_matched, target)
                losses.append(loss)
        
        if losses:
            return torch.stack(losses).mean()
        else:
            return torch.tensor(0.0, device=pred_boxes.device)
    
    def _mask_loss(
        self,
        pred_masks: torch.Tensor,
        target_masks: list
    ) -> torch.Tensor:
        """BCE + Dice loss for segmentation"""
        batch_size = pred_masks.shape[0]
        losses = []
        
        for i in range(batch_size):
            if len(target_masks[i]) == 0:
                continue
            
            pred = pred_masks[i]
            target = target_masks[i].float()
            
            # Resize if needed
            if target.shape != pred.shape:
                target = F.interpolate(
                    target.unsqueeze(1),
                    size=pred.shape[1:],
                    mode='bilinear',
                    align_corners=False
                ).squeeze(1)
            
            n_targets = len(target)
            if n_targets > 0 and n_targets <= pred.shape[0]:
                pred_matched = pred[:n_targets]
                
                # BCE loss
                bce = F.binary_cross_entropy(pred_matched, target)
                
                # Dice loss
                intersection = (pred_matched * target).sum()
                union = pred_matched.sum() + target.sum()
                dice = 1 - (2 * intersection + 1e-6) / (union + 1e-6)
                
                loss = 0.5 * bce + 0.5 * dice
                losses.append(loss)
        
        if losses:
            return torch.stack(losses).mean()
        else:
            return torch.tensor(0.0, device=pred_masks.device)
# utils/metrics.py
import torch
import numpy as np
from typing import List
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

def compute_ap(
    pred_boxes_list: List[torch.Tensor],
    gt_boxes_list: List[torch.Tensor],
    confidence_scores_list: List[torch.Tensor],
    iou_threshold: float = 0.5
) -> float:
    """
    Compute Average Precision using IoU metric.
    
    Args:
        pred_boxes_list: List of predicted boxes (N, 4)
        gt_boxes_list: List of ground truth boxes (M, 4)
        confidence_scores_list: List of confidence scores (N,)
        iou_threshold: IoU threshold for TP/FP determination
        
    Returns:
        Average Precision score
    """
    # Flatten all predictions
    all_preds = []
    all_gts = []
    
    for i, (pred, gt, conf) in enumerate(zip(
        pred_boxes_list, gt_boxes_list, confidence_scores_list
    )):
        for j, p in enumerate(pred):
            all_preds.append({
                'box': p.cpu().numpy(),
                'confidence': conf[j].item(),
                'image_id': i
            })
        
        for g in gt:
            all_gts.append({
                'box': g.cpu().numpy(),
                'image_id': i
            })
    
    if len(all_preds) == 0:
        return 0.0
    
    # Sort by confidence
    all_preds = sorted(all_preds, key=lambda x: x['confidence'], reverse=True)
    
    # Compute TP and FP
    tp = np.zeros(len(all_preds))
    fp = np.zeros(len(all_preds))
    gt_matched = set()
    
    for i, pred in enumerate(all_preds):
        pred_box = pred['box']
        pred_img_id = pred['image_id']
        
        max_iou = 0
        max_gt_idx = -1
        
        for j, gt in enumerate(all_gts):
            if gt['image_id'] != pred_img_id:
                continue
            if (gt['image_id'], j) in gt_matched:
                continue
            
            gt_box = gt['box']
            iou = compute_iou(pred_box, gt_box)
            
            if iou > max_iou:
                max_iou = iou
                max_gt_idx = j
        
        if max_iou >= iou_threshold and max_gt_idx >= 0:
            tp[i] = 1
            gt_matched.add((pred_img_id, max_gt_idx))
        else:
            fp[i] = 1
    
    # Compute precision and recall
    tp_cumsum = np.cumsum(tp)
    fp_cumsum = np.cumsum(fp)
    
    recalls = tp_cumsum / len(all_gts)
    precisions = tp_cumsum / (tp_cumsum + fp_cumsum)
    
    # Compute AP
    ap = np.mean(precisions)
    
    return float(ap)


def compute_iou(box1: np.ndarray, box2: np.ndarray) -> float:
    """
    Compute Intersection over Union between two boxes.
    
    Args:
        box1: [x1, y1, x2, y2]
        box2: [x1, y1, x2, y2]
        
    Returns:
        IoU value
    """
    x1_min, y1_min, x1_max, y1_max = box1
    x2_min, y2_min, x2_max, y2_max = box2
    
    # Intersection
    inter_min_x = max(x1_min, x2_min)
    inter_min_y = max(y1_min, y2_min)
    inter_max_x = min(x1_max, x2_max)
    inter_max_y = min(y1_max, y2_max)
    
    if inter_max_x < inter_min_x or inter_max_y < inter_min_y:
        return 0.0
    
    inter_area = (inter_max_x - inter_min_x) * (inter_max_y - inter_min_y)
    
    # Union
    box1_area = (x1_max - x1_min) * (y1_max - y1_min)
    box2_area = (x2_max - x2_min) * (y2_max - y2_min)
    union_area = box1_area + box2_area - inter_area
    
    if union_area == 0:
        return 0.0
    
    iou = inter_area / union_area
    return float(iou)


def compute_classification_metrics(
    pred_classes: np.ndarray,
    gt_classes: np.ndarray
) -> dict:
    """
    Compute classification metrics: precision, recall, F1.
    """
    from sklearn.metrics import precision_score, recall_score, f1_score
    
    precision = precision_score(gt_classes, pred_classes, average='weighted', zero_division=0)
    recall = recall_score(gt_classes, pred_classes, average='weighted', zero_division=0)
    f1 = f1_score(gt_classes, pred_classes, average='weighted', zero_division=0)
    
    return {
        'precision': precision,
        'recall': recall,
        'f1': f1
    }
# inference.py
import torch
import torch.nn.functional as F
import numpy as np
from pathlib import Path
from typing import Dict, Tuple, List
import cv2

from models.cellotype import CelloType
from config import InferenceConfig

class CelloTypeInference:
    """Inference pipeline for CelloType"""
    
    def __init__(
        self,
        model_path: str,
        device: str = 'cuda',
        config: InferenceConfig = None
    ):
        """
        Args:
            model_path: Path to trained model checkpoint
            device: Device to run inference on
            config: Inference configuration
        """
        self.device = torch.device(device)
        self.config = config or InferenceConfig()
        
        # Load model
        self.model = CelloType()
        checkpoint = torch.load(model_path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model = self.model.to(self.device)
        self.model.eval()
    
    @torch.no_grad()
    def predict(
        self,
        image: np.ndarray,
        image_size: Tuple[int, int] = (512, 512)
    ) -> Dict:
        """
        Run inference on an image.
        
        Args:
            image: Input image (H, W, C) or (C, H, W)
            image_size: Target image size
            
        Returns:
            Dictionary containing:
                - boxes: Detected bounding boxes (N, 4)
                - classes: Predicted class labels (N,)
                - masks: Instance segmentation masks (N, H, W)
                - scores: Confidence scores (N,)
        """
        # Preprocess image
        if image.ndim == 3 and image.shape[0] not in [1, 2, 3]:
            image = image.transpose(2, 0, 1)
        
        # Resize
        if image.shape[0] == 1:
            image = np.repeat(image, 3, axis=0)
        
        image = cv2.resize(image.transpose(1, 2, 0), image_size)
        image = image.transpose(2, 0, 1)
        
        # Normalize
        image = image.astype(np.float32)
        image = (image - image.min()) / (image.max() - image.min() + 1e-6)
        
        # To tensor
        image = torch.from_numpy(image).unsqueeze(0).to(self.device)
        
        # Forward pass
        outputs = self.model(image)
        
        # Post-process
        results = self._post_process(outputs)
        
        return results
    
    def _post_process(self, outputs: Dict) -> Dict:
        """
        Post-process model outputs.
        
        Includes:
        - Filtering by confidence threshold
        - NMS for box suppression
        - Mask refinement
        """
        pred_boxes = outputs['pred_boxes'][0]  # (num_queries, 4)
        pred_classes = outputs['pred_classes'][0]  # (num_queries, num_classes+1)
        pred_masks = outputs['pred_masks'][0]  # (num_queries, H, W)
        scores = outputs['confidence_scores'][0]  # (num_queries,)
        
        # Filter by confidence threshold
        mask = scores > self.config.confidence_threshold
        boxes = pred_boxes[mask]
        classes = pred_classes[mask].argmax(dim=-1)
        masks = pred_masks[mask]
        scores = scores[mask]
        
        if len(boxes) == 0:
            return {
                'boxes': np.array([]),
                'classes': np.array([]),
                'masks': np.array([]),
                'scores': np.array([])
            }
        
        # NMS
        keep_indices = self._nms(boxes, scores)
        boxes = boxes[keep_indices].cpu().numpy()
        classes = classes[keep_indices].cpu().numpy()
        masks = masks[keep_indices].cpu().numpy()
        scores = scores[keep_indices].cpu().numpy()
        
        return {
            'boxes': boxes,
            'classes': classes,
            'masks': masks,
            'scores': scores
        }
    
    def _nms(
        self,
        boxes: torch.Tensor,
        scores: torch.Tensor
    ) -> np.ndarray:
        """
        Non-maximum suppression.
        
        Args:
            boxes: (N, 4) in [x1, y1, x2, y2] format
            scores: (N,) confidence scores
            
        Returns:
            Indices of kept boxes
        """
        if len(boxes) == 0:
            return np.array([])
        
        # Convert to numpy
        boxes_np = boxes.cpu().numpy()
        scores_np = scores.cpu().numpy()
        
        # Sort by score
        sorted_indices = np.argsort(-scores_np)
        
        keep = []
        while len(sorted_indices) > 0:
            current = sorted_indices[0]
            keep.append(current)
            
            if len(sorted_indices) == 1:
                break
            
            current_box = boxes_np[current]
            rest_boxes = boxes_np[sorted_indices[1:]]
            
            # Compute IoU with rest
            ious = self._compute_ious(current_box, rest_boxes)
            
            # Keep boxes with IoU < threshold
            mask = ious < self.config.nms_threshold
            sorted_indices = sorted_indices[1:][mask]
        
        return np.array(keep)
    
    def _compute_ious(
        self,
        box: np.ndarray,
        boxes: np.ndarray
    ) -> np.ndarray:
        """Compute IoU between one box and multiple boxes"""
        x1_min, y1_min, x1_max, y1_max = box
        x2_min, y2_min, x2_max, y2_max = boxes.T
        
        inter_min_x = np.maximum(x1_min, x2_min)
        inter_min_y = np.maximum(y1_min, y2_min)
        inter_max_x = np.minimum(x1_max, x2_max)
        inter_max_y = np.minimum(y1_max, y2_max)
        
        inter_w = np.maximum(0, inter_max_x - inter_min_x)
        inter_h = np.maximum(0, inter_max_y - inter_min_y)
        inter_area = inter_w * inter_h
        
        box_area = (x1_max - x1_min) * (y1_max - y1_min)
        boxes_area = (x2_max - x2_min) * (y2_max - y2_min)
        union_area = box_area + boxes_area - inter_area
        
        ious = inter_area / (union_area + 1e-6)
        
        return ious
# main.py
import torch
import argparse
from pathlib import Path

from config import TrainingConfig, ModelConfig, DataConfig, InferenceConfig
from models.cellotype import CelloType
from data.dataset import create_dataloaders
from training.trainer import CelloTypeTrainer
from inference import CelloTypeInference

def main():
    parser = argparse.ArgumentParser(description="CelloType Training and Inference")
    parser.add_argument('--mode', type=str, choices=['train', 'infer'], default='train')
    parser.add_argument('--data-dir', type=str, default='./data')
    parser.add_argument('--model-path', type=str, default=None)
    parser.add_argument('--image-path', type=str, default=None)
    parser.add_argument('--output-dir', type=str, default='./outputs')
    parser.add_argument('--device', type=str, default='cuda')
    
    args = parser.parse_args()
    
    # Create output directory
    Path(args.output_dir).mkdir(parents=True, exist_ok=True)
    
    if args.mode == 'train':
        train(args)
    elif args.mode == 'infer':
        infer(args)

def train(args):
    """Training mode"""
    print("=" * 80)
    print("CelloType Training")
    print("=" * 80)
    
    # Configurations
    data_config = DataConfig()
    model_config = ModelConfig()
    train_config = TrainingConfig(device=args.device)
    
    # Create model
    print("Creating model...")
    model = CelloType(
        in_channels=data_config.num_channels,
        hidden_dim=model_config.dino_hidden_dim,
        num_queries=model_config.dino_num_queries,
        num_classes=model_config.num_classes,
        swin_embed_dim=model_config.swin_embed_dim,
        swin_depths=model_config.swin_depths,
        swin_num_heads=model_config.swin_num_heads,
        weight_cls=model_config.weight_cls,
        weight_box=model_config.weight_box,
        weight_mask=model_config.weight_mask
    )
    
    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
    
    # Create dataloaders
    print(f"Loading data from {args.data_dir}...")
    train_loader, val_loader, test_loader = create_dataloaders(
        args.data_dir,
        batch_size=train_config.batch_size,
        image_size=data_config.image_size,
        num_workers=train_config.num_workers
    )
    
    print(f"Train samples: {len(train_loader.dataset)}")
    print(f"Val samples: {len(val_loader.dataset)}")
    
    # Create trainer
    trainer = CelloTypeTrainer(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        config=train_config
    )
    
    # Train
    print("Starting training...")
    trainer.train()
    
    print("Training completed!")

def infer(args):
    """Inference mode"""
    print("=" * 80)
    print("CelloType Inference")
    print("=" * 80)
    
    if args.model_path is None:
        raise ValueError("--model-path required for inference")
    
    if args.image_path is None:
        raise ValueError("--image-path required for inference")
    
    # Load image
    import cv2
    import numpy as np
    from utils.visualization import visualize_predictions
    
    image = cv2.imread(args.image_path, cv2.IMREAD_ANYDEPTH)
    if image is None:
        image = np.load(args.image_path)
    
    print(f"Image shape: {image.shape}")
    
    # Initialize inference
    infer_config = InferenceConfig()
    inferencer = CelloTypeInference(
        args.model_path,
        device=args.device,
        config=infer_config
    )
    
    # Run inference
    print("Running inference...")
    results = inferencer.predict(image)
    
    print(f"Detected {len(results['boxes'])} cells")
    print(f"Confidence scores: min={results['scores'].min():.4f}, max={results['scores'].max():.4f}, mean={results['scores'].mean():.4f}")
    
    # Visualize results
    vis_image = visualize_predictions(image, results)
    
    output_path = Path(args.output_dir) / "predictions.png"
    cv2.imwrite(str(output_path), vis_image)
    print(f"Visualization saved to {output_path}")
    
    # Save results
    import json
    results_json = {
        'num_cells': len(results['boxes']),
        'boxes': results['boxes'].tolist(),
        'classes': results['classes'].tolist(),
        'scores': results['scores'].tolist()
    }
    
    results_path = Path(args.output_dir) / "results.json"
    with open(results_path, 'w') as f:
        json.dump(results_json, f, indent=2)
    
    print(f"Results saved to {results_path}")

if __name__ == '__main__':
    main()


References

All claims in this article are supported by the CelloType Nature Methods publication (Pang et al., February 2025).

Related posts, You May like to read

  1. 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
  2. 7 Revolutionary Breakthroughs in Medical Image Translation (And 1 Fatal Flaw That Could Derail Your AI Model)
  3. TimeDistill: Revolutionizing Time Series Forecasting with Cross-Architecture Knowledge Distillation
  4. HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
  5. GeoSAM2 3D Part Segmentation — Prompt-Controllable, Geometry-Aware Masks for Precision 3D Editing
  6. DGRM: How Advanced AI is Learning to Detect Machine-Generated Text Across Different Domains
  7. A Knowledge Distillation-Based Approach to Enhance Transparency of Classifier Models
  8. Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
  9. Discrete Migratory Bird Optimizer with Deep Transfer Learning for Multi-Retinal Disease Detection

Leave a Comment

Your email address will not be published. Required fields are marked *

Follow by Email
Tiktok