Introduction
Analyzing tissue images at the cellular level has become fundamental to understanding disease mechanisms, identifying therapeutic targets, and advancing personalized medicine. However, one of the most time-consuming bottlenecks in spatial omics research is the manual annotation of cells following segmentation—a laborious two-stage process that requires separating cells from tissue backgrounds, then classifying each identified cell by type.
Recent advances in spatial imaging technologies have generated unprecedented volumes of multiplexed tissue data, yet our computational tools remain fragmented and inefficient. Researchers typically rely on separate algorithms: one for segmentation (such as Mesmer or Cellpose2) and another for classification (like CellSighter). This conventional pipeline suffers from information loss, inefficient training, and accumulated errors.
CelloType, published in Nature Methods, addresses these limitations through a unified, transformer-based deep learning framework that simultaneously performs cell segmentation and classification, outperforming state-of-the-art methods across diverse tissue imaging platforms. This article explores how CelloType works, why its approach represents a paradigm shift in spatial omics analysis, and what this means for the future of biomedical research.
How Traditional Cell Analysis Falls Short
The Two-Stage Pipeline Problem
Conventional cell analysis workflows have relied on a sequential approach for decades:
- Segmentation stage: CNN-based models predict cell boundaries from nuclear and membrane stains
- Classification stage: Separate models predict cell types based on the segmentation masks and immunofluorescence markers
This two-stage approach presents several critical limitations. First, cell classification models cannot leverage the full spectrum of semantic information present in tissue images, as they operate only on segmentation results. Second, the approach is computationally inefficient, requiring separate training for each task. Third, segmentation performance varies significantly across different tissue types, suggesting substantial room for improvement.
Key problems with traditional methods:
- Information bottleneck: Classification models lose rich contextual information by relying only on segmentation masks
- Error accumulation: Segmentation errors directly propagate into classification results
- Resource inefficiency: Training separate models doubles computational overhead
- Lack of confidence assessment: Most existing methods provide no reliability metrics for segmentation predictions
Existing Solutions and Their Limitations
Representative state-of-the-art segmentation methods include Mesmer, which uses CNNs with feature pyramid networks; Cellpose and Cellpose2, which employ U-Net architectures with gradient-tracking algorithms; and CellSighter and CELESTA for classification tasks. While these methods have achieved satisfactory performance on certain tissues, they remain fundamentally limited by their sequential architecture.
CelloType’s Unified Architecture: A Multitask Learning Breakthrough
Core Innovation: Integrated Multitask Learning
CelloType adopts a multitask learning strategy that integrates segmentation and classification tasks, simultaneously enhancing the performance of both. The model leverages transformer-based deep learning techniques—specifically the Swin Transformer for feature extraction, DINO for object detection, and MaskDINO for segmentation.
This architecture enables bidirectional information flow:
- Segmentation informs detection: Pixel-level segmentation masks provide detailed contextual information that improves bounding box initialization
- Detection refines segmentation: Object detection focuses the model on relevant image regions, reducing noise and enhancing precision
Three-Module Architecture Explained
1. Feature Extraction Module (Swin Transformer)
The Swin Transformer generates multiscale image features through hierarchical processing that captures both local and global patterns, outperforming conventional convolutional networks in modeling complex image data with improved computational efficiency. This foundation enables the model to process high-dimensional multiplexed tissue images effectively.
2. DINO Object Detection Module
The DINO (DETR with Improved Denoising Anchor Boxes) module performs two functions:
- Query initialization: Uses mixed positional and content queries to generate adaptive anchor boxes
- Anchor refinement: Applies deformable attention mechanisms and contrastive denoising training for robust detection
The mathematical framework for mixed query selection operates as:
\[ Q_{pos} = f_{encoder}(X), \quad Q_{content} = \text{learnable} \]where Qpos represents positional queries (n-by-4 matrix) and Qcontent represents learnable content features (n-by-embedding_dimension matrix).
3. MaskDINO Segmentation Module
The final segmentation masks are computed through:
\[ m = q_c \otimes M(T(C_b) + F(C_e)) \]where:
- qc = content query embedding from DINO
- M = segmentation head (convolutional layers)
- T = channel mapping convolution
- Cb = feature map from feature extractor
- Ce = latent features from DINO encoder
- F = interpolation-based upsampling
The unified model is trained with a composite loss function:
\[ \text{Loss} = \lambda_{cls}L_{cls} + \lambda_{box}L_{box} + \lambda_{mask}L_{mask} \]Performance Benchmarks: CelloType vs. Existing Methods
TissueNet Multiplexed Imaging Results
When evaluated on the TissueNet dataset containing images from six imaging platforms (CODEX, CycIF, IMC, MIBI, MxIF, and Vectra) across six tissue types, CelloType consistently outperformed both Mesmer and Cellpose2 across all IoU thresholds. For cell segmentation, CelloType_C (with confidence scores) achieved a mean AP of 0.56, significantly surpassing Cellpose2 (0.35) and Mesmer (0.31). For nuclear segmentation, CelloType_C achieved 0.66 AP, compared with Cellpose2 (0.52) and Mesmer (0.24).
$$\begin{array}{l|cc} \text{Method} & \text{Cell Segmentation AP} & \text{Nuclear Segmentation AP} \\ \hline \text{Mesmer} & 0.31 \pm 0.01 & 0.24 \pm 0.01 \\ \text{Cellpose2} & 0.35 \pm 0.02 & 0.52 \pm 0.02 \\ \text{CelloType} & 0.45 \pm 0.02 & 0.57 \pm 0.02 \\ \textbf{CelloType_C} & 0.56 \pm 0.02 & 0.66 \pm 0.02 \end{array}$$Classification Performance on Colorectal Cancer CODEX
For joint segmentation and classification on colorectal cancer tissue, CelloType achieved a mean AP of 0.55 across all cell types, markedly exceeding the Cellpose2 + CellSighter model’s mean AP of 0.13 and the Mask R-CNN model’s mean AP of 0.43. Critically, CelloType’s confidence scores demonstrated a strong, nearly linear correlation with prediction accuracy (coefficient = 0.56), whereas CellSighter and Mask R-CNN showed flat correlations (0.21 and 0.19, respectively).
This superior calibration is crucial for clinical applications, as researchers can now trust high-confidence predictions and investigate low-confidence predictions with additional scrutiny.
Spatial Transcriptomics: Xenium and MERFISH
On Xenium spatial transcriptomics datasets, CelloType achieved a mean AP of 0.47 using both DAPI and transcript signals, vastly outperforming SCS and Baysor (both 0.01 AP). The results demonstrate that transcript signal alone is insufficient for accurate segmentation, even for advanced deep learning models, but when combined with DAPI, CelloType effectively integrates multiple modalities.
Multiscale Segmentation: Beyond Individual Cells
Handling Tissue Architecture Complexity
One of CelloType’s most innovative capabilities is multiscale segmentation—the ability to simultaneously identify and classify cellular and noncellular tissue elements of vastly different sizes.
When applied to human bone marrow tissue containing hematopoietic cells, large adipocytes, and irregularly shaped trabecular bone fragments, CelloType achieved mean AP values of 0.39 for adipocytes, 0.31 for trabecular bone, and 0.42 for other cell types. The model correctly identified both tiny lymphocytes (10-15 μm diameter) and massive adipocytes (100+ μm diameter), a capability unique among existing segmentation methods.
This multiscale capability is particularly valuable for:
- Tissue organization studies: Understanding how cellular neighborhoods form around structural elements
- Pathology workflows: Distinguishing malignant cells from stromal tissue components
- Developmental biology: Mapping cell-matrix interactions during tissue remodeling
Practical Applications and Research Impact
Spatial Omics Data Analysis
CelloType’s integration with spatial transcriptomics platforms represents a major advance. As spatial omics datasets grow exponentially—encompassing hundreds of thousands of cells across multiple tissue sections—automated, accurate segmentation and classification become essential.
Key application areas:
- Tumor microenvironment mapping: Identifying immune infiltration patterns and their relationship to treatment response
- Brain tissue organization: Dissecting neuronal, glial, and vascular cell distributions in healthy and diseased brain
- Immunological surveillance: Tracking how immune cells organize in lymphoid tissues
- Developmental studies: Tracing cell type emergence and spatial organization during embryogenesis
Confidence Scores Enable Quality Filtering
Unlike previous tools that provide binary predictions, CelloType outputs confidence scores for every segmentation and classification result. This enables researchers to:
- Prioritize manual validation of low-confidence predictions
- Filter datasets to achieve desired precision thresholds
- Assess model reliability in understudied tissue types
- Generate publication-quality results with known error characteristics
Training and Computational Requirements
CelloType has speed and memory usage comparable to those of Mesmer and Cellpose2, with training and inference times that scale efficiently with dataset size. The model can achieve meaningful performance with just 20% of available training data, though performance progressively improves with larger datasets.
Typical training specifications:
- Learning rate: 10⁻⁶ (Adam optimizer)
- Batch size: 8
- Hardware: Standard GPU workstations (A100-class GPUs)
- Training time: 1-2 hours for tissue datasets
- Inference speed: Processes 512×512 pixel images in ~2-3 seconds
Limitations and Future Directions
Current Constraints
CelloType requires training for both segmentation and classification tasks. While segmentation models are readily transferable to new images containing nuclear/membrane channels, pretrained classification models cannot be readily applied to new images unless substantial cell type overlap exists between training and testing datasets. To address this, methodologies such as few-shot learning, self-supervised learning, and contrastive learning can be incorporated into the CelloType framework.
Emerging Opportunities
- Few-shot learning integration: Enabling classification of rare cell types with minimal manual annotations
- Transfer learning optimization: Developing tissue-agnostic pretraining strategies
- Real-time processing: Streamlining inference for live imaging applications
- Integration with other modalities: Combining spatial imaging with scRNA-seq, ATAC-seq, and protein data
Conclusion: The Future of Computational Pathology
CelloType represents a fundamental advancement in how we analyze tissue images computationally. By abandoning the traditional two-stage pipeline in favor of integrated multitask learning, the method achieves superior accuracy while providing clinically meaningful confidence assessments. Its demonstrated performance across diverse tissue types and imaging modalities positions it as a broadly applicable tool for spatial omics research.
As spatial omics datasets continue to grow exponentially, the need for scalable, accurate, and reliable segmentation and classification tools becomes increasingly urgent. CelloType addresses this need head-on, democratizing access to publication-quality tissue analysis while reducing the computational burden and manual annotation required.
For researchers working with multiplexed imaging, spatial transcriptomics, or any multiplexed tissue data, exploring CelloType represents a valuable opportunity to streamline workflows, improve accuracy, and generate more robust biological insights.
Ready to accelerate your tissue image analysis? Access CelloType’s open-source implementation on GitHub and explore how transformer-based deep learning can transform your spatial omics research. Join the growing community of researchers leveraging AI for computational pathology—and share your results with the scientific community.
Here is the comprehensive implementation of the CelloType model based on the research paper. This includes data loading, model architecture, training, and inference.
cellotype_implementation/
├── requirements.txt
├── config.py
├── data/
│ ├── __init__.py
│ └── dataset.py
├── models/
│ ├── __init__.py
│ ├── feature_extractor.py
│ ├── dino_detector.py
│ ├── mask_dino.py
│ └── cellotype.py
├── training/
│ ├── __init__.py
│ ├── trainer.py
│ └── losses.py
├── utils/
│ ├── __init__.py
│ ├── metrics.py
│ └── visualization.py
├── inference.py
└── main.py
# requirements.txt
torch==2.1.0
torchvision==0.16.0
timm==0.9.12
numpy==1.24.3
opencv-python==4.8.0
scikit-image==0.21.0
albumentations==1.3.0
tensorboard==2.14.0
matplotlib==3.7.2
scipy==1.11.2
pycocotools==2.0.6
detectron2==0.6
# config.py
from dataclasses import dataclass
from typing import Tuple, List
@dataclass
class DataConfig:
"""Data configuration"""
image_size: Tuple[int, int] = (512, 512)
train_split: float = 0.8
val_split: float = 0.1
num_workers: int = 4
batch_size: int = 8
num_channels: int = 2 # Nuclear and membrane
@dataclass
class ModelConfig:
"""Model architecture configuration"""
# Swin Transformer settings
swin_embed_dim: int = 96
swin_depths: Tuple[int, ...] = (2, 2, 6, 2)
swin_num_heads: Tuple[int, ...] = (3, 6, 12, 24)
swin_window_size: int = 7
# DINO settings
dino_hidden_dim: int = 256
dino_num_queries: int = 1000
dino_nheads: int = 8
dino_num_decoder_layers: int = 6
dino_num_encoder_layers: int = 6
dino_dim_feedforward: int = 2048
# MaskDINO settings
mask_hidden_dim: int = 256
num_mask_tokens: int = 100
# Number of classes
num_classes: int = 12 # Cell types
# Loss weights
weight_cls: float = 2.0
weight_box: float = 5.0
weight_mask: float = 1.0
# Denoising parameters
denoising_lambda1: float = 0.4
denoising_lambda2: float = 1.0
@dataclass
class TrainingConfig:
"""Training configuration"""
num_epochs: int = 100
learning_rate: float = 1e-6
weight_decay: float = 1e-4
warmup_epochs: int = 5
batch_size: int = 8
num_workers: int = 4
device: str = "cuda"
save_dir: str = "./checkpoints"
log_dir: str = "./logs"
eval_interval: int = 5
patience: int = 15
@dataclass
class InferenceConfig:
"""Inference configuration"""
confidence_threshold: float = 0.3
nms_threshold: float = 0.5
max_instances: int = 1000
# data/dataset.py
import os
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
from pycocotools import mask as mask_util
from skimage import measure
class TissueImageDataset(Dataset):
"""
Dataset for tissue images with segmentation masks and cell type annotations.
Expected directory structure:
data/
├── images/
│ ├── img_001.tif (or .npy)
│ └── ...
├── masks/
│ ├── img_001_mask.png
│ └── ...
└── annotations/
├── img_001.json
└── ...
"""
def __init__(
self,
root_dir: str,
image_size: Tuple[int, int] = (512, 512),
augment: bool = True,
num_classes: int = 12,
split: str = "train"
):
"""
Args:
root_dir: Root directory containing images, masks, and annotations
image_size: Target image size
augment: Whether to apply augmentations
num_classes: Number of cell type classes
split: "train", "val", or "test"
"""
self.root_dir = Path(root_dir)
self.image_size = image_size
self.num_classes = num_classes
self.split = split
self.images_dir = self.root_dir / "images"
self.masks_dir = self.root_dir / "masks"
self.annotations_dir = self.root_dir / "annotations"
# Get image list
self.image_files = sorted([
f.stem for f in self.images_dir.glob("*")
if f.suffix in ['.tif', '.npy', '.tiff']
])
if not self.image_files:
raise RuntimeError(f"No images found in {self.images_dir}")
# Data augmentation pipeline
if augment:
self.transforms = A.Compose([
A.HorizontalFlip(p=0.5),
A.VerticalFlip(p=0.5),
A.Rotate(limit=45, p=0.5),
A.GaussNoise(p=0.3),
A.GaussianBlur(blur_limit=3, p=0.2),
A.Normalize(
mean=[0.5] * image_size[0],
std=[0.5] * image_size[0],
max_pixel_value=255.0
),
ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc'))
else:
self.transforms = A.Compose([
A.Normalize(
mean=[0.5] * image_size[0],
std=[0.5] * image_size[0],
max_pixel_value=255.0
),
ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc'))
def __len__(self) -> int:
return len(self.image_files)
def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
"""
Returns:
Dictionary containing:
- image: Tensor of shape (C, H, W)
- masks: Tensor of shape (N, H, W) - instance masks
- boxes: Tensor of shape (N, 4) - bounding boxes [x1, y1, x2, y2]
- labels: Tensor of shape (N,) - class labels
- areas: Tensor of shape (N,) - instance areas
- iscrowd: Tensor of shape (N,) - crowd flags
"""
image_name = self.image_files[idx]
# Load image
image_path = self.images_dir / f"{image_name}.npy"
if not image_path.exists():
image_path = self.images_dir / f"{image_name}.tif"
if image_path.suffix == '.npy':
image = np.load(image_path)
else:
image = cv2.imread(str(image_path), cv2.IMREAD_ANYDEPTH)
# Ensure image is float32 and resized
if image.ndim == 2:
image = np.stack([image, image], axis=0)
image = image.astype(np.float32)
image = cv2.resize(image.transpose(1, 2, 0), self.image_size)
image = image.transpose(2, 0, 1)
# Load mask
mask_path = self.masks_dir / f"{image_name}_mask.png"
instance_mask = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
instance_mask = cv2.resize(instance_mask, self.image_size, interpolation=cv2.INTER_NEAREST)
# Load annotations
annotation_path = self.annotations_dir / f"{image_name}.json"
with open(annotation_path, 'r') as f:
annotations = json.load(f)
# Extract boxes, labels, and masks
boxes = []
labels = []
masks = []
areas = []
for ann in annotations:
# Bounding box [x1, y1, x2, y2]
bbox = ann['bbox']
x1, y1, w, h = bbox
x2 = x1 + w
y2 = y1 + h
boxes.append([x1, y1, x2, y2])
# Class label
labels.append(ann['category_id'])
# Instance mask
cell_mask = instance_mask == ann['id']
masks.append(cell_mask)
# Area
areas.append(cell_mask.sum())
if len(boxes) == 0:
# Handle empty image
boxes = np.zeros((0, 4), dtype=np.float32)
labels = np.zeros((0,), dtype=np.int64)
masks = np.zeros((0, *self.image_size), dtype=np.uint8)
areas = np.zeros((0,), dtype=np.float32)
else:
boxes = np.array(boxes, dtype=np.float32)
labels = np.array(labels, dtype=np.int64)
masks = np.array(masks, dtype=np.uint8)
areas = np.array(areas, dtype=np.float32)
# Apply augmentations
if self.transforms:
transformed = self.transforms(
image=image.transpose(1, 2, 0),
bboxes=boxes.tolist() if len(boxes) > 0 else [],
labels=labels.tolist() if len(labels) > 0 else []
)
image = transformed['image']
boxes = np.array(transformed['bboxes'], dtype=np.float32)
labels = np.array(transformed['labels'], dtype=np.int64)
# Convert masks to tensor
masks = torch.from_numpy(masks).float()
return {
'image': image,
'boxes': torch.from_numpy(boxes),
'labels': torch.from_numpy(labels),
'masks': masks,
'areas': torch.from_numpy(areas),
'iscrowd': torch.zeros(len(labels), dtype=torch.uint8),
'image_id': torch.tensor(idx, dtype=torch.int64)
}
def create_dataloaders(
data_dir: str,
batch_size: int = 8,
image_size: Tuple[int, int] = (512, 512),
num_workers: int = 4,
train_split: float = 0.8,
val_split: float = 0.1
) -> Tuple[DataLoader, DataLoader, DataLoader]:
"""Create train, validation, and test dataloaders"""
# Create datasets
train_dataset = TissueImageDataset(
data_dir,
image_size=image_size,
augment=True,
split="train"
)
val_dataset = TissueImageDataset(
data_dir,
image_size=image_size,
augment=False,
split="val"
)
test_dataset = TissueImageDataset(
data_dir,
image_size=image_size,
augment=False,
split="test"
)
# Create dataloaders
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
collate_fn=collate_fn
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
collate_fn=collate_fn
)
test_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
collate_fn=collate_fn
)
return train_loader, val_loader, test_loader
def collate_fn(batch: List[Dict]) -> Dict[str, List]:
"""Custom collate function for variable-sized tensors"""
return {
'image': torch.stack([item['image'] for item in batch]),
'boxes': [item['boxes'] for item in batch],
'labels': [item['labels'] for item in batch],
'masks': [item['masks'] for item in batch],
'areas': [item['areas'] for item in batch],
'iscrowd': [item['iscrowd'] for item in batch],
'image_id': torch.stack([item['image_id'] for item in batch])
}
# models/feature_extractor.py
import torch
import torch.nn as nn
from timm.models.swin_transformer import SwinTransformer
from typing import List, Dict
class FeatureExtractor(nn.Module):
"""
Swin Transformer-based feature extractor for multiscale feature extraction.
This module generates multiscale image features from input tissue images,
preserving both local and global spatial information.
"""
def __init__(
self,
in_channels: int = 2,
embed_dim: int = 96,
depths: tuple = (2, 2, 6, 2),
num_heads: tuple = (3, 6, 12, 24),
window_size: int = 7,
pretrained: bool = False
):
"""
Args:
in_channels: Number of input channels
embed_dim: Embedding dimension
depths: Depths of each Swin Transformer stage
num_heads: Number of attention heads in each stage
window_size: Window size for shifted window attention
pretrained: Whether to use pretrained weights
"""
super().__init__()
self.in_channels = in_channels
self.embed_dim = embed_dim
self.depths = depths
self.num_heads = num_heads
self.window_size = window_size
# Channel adaptation if input channels != 3
if in_channels != 3:
self.channel_adapter = nn.Conv2d(in_channels, 3, kernel_size=1)
else:
self.channel_adapter = None
# Swin Transformer
self.swin = SwinTransformer(
img_size=512,
patch_size=4,
in_chans=3,
num_classes=1000,
embed_dim=embed_dim,
depths=depths,
num_heads=num_heads,
window_size=window_size,
mlp_ratio=4.0,
drop_rate=0.0,
attn_drop_rate=0.0,
drop_path_rate=0.1,
)
# Feature dimension for each stage
self.out_channels = [embed_dim * (2 ** i) for i in range(len(depths))]
def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
"""
Args:
x: Input tensor of shape (B, C, H, W)
Returns:
Dictionary containing multiscale features
"""
batch_size = x.shape[0]
# Adapt channels if needed
if self.channel_adapter is not None:
x = self.channel_adapter(x)
# Extract features from Swin Transformer
features = self._extract_features(x)
return features
def _extract_features(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
"""Extract multiscale features from Swin Transformer"""
features = {}
# Patch embedding
x = self.swin.patch_embed(x)
x = self.swin.pos_drop(x)
# Extract features from each stage
for i, layer in enumerate(self.swin.layers):
x = layer(x)
# Reshape to spatial dimensions
B, L, C = x.shape
H = W = int(L ** 0.5)
feat = x.reshape(B, H, W, C).permute(0, 3, 1, 2).contiguous()
features[f'stage_{i}'] = feat
# Flatten features for later use
features['flattened'] = x # (B, L, C)
return features
# models/dino_detector.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Tuple, List
import math
class PositionalEmbedding(nn.Module):
"""Positional embeddings for spatial information preservation"""
def __init__(self, d_model: int, max_len: int = 5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2).float() *
(-math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
if d_model % 2 == 1:
pe[:, 1::2] = torch.cos(position * div_term[:-1])
else:
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x + self.pe[:, :x.size(1)].to(x.device)
class MultiHeadAttention(nn.Module):
"""Multi-head attention with deformable offsets"""
def __init__(self, d_model: int, num_heads: int):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.query = nn.Linear(d_model, d_model)
self.key = nn.Linear(d_model, d_model)
self.value = nn.Linear(d_model, d_model)
self.out = nn.Linear(d_model, d_model)
# Deformable offset generation
self.offset_generator = nn.Linear(d_model, 2 * num_heads)
def forward(
self,
query: torch.Tensor,
key: torch.Tensor,
value: torch.Tensor,
mask: torch.Tensor = None
) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Args:
query: (B, N_q, d_model)
key: (B, N_k, d_model)
value: (B, N_k, d_model)
mask: Optional attention mask
Returns:
output: (B, N_q, d_model)
attention_weights: (B, num_heads, N_q, N_k)
"""
B = query.shape[0]
# Linear projections
Q = self.query(query).reshape(B, -1, self.num_heads, self.d_k)
K = self.key(key).reshape(B, -1, self.num_heads, self.d_k)
V = self.value(value).reshape(B, -1, self.num_heads, self.d_k)
# Transpose for attention computation
Q = Q.transpose(1, 2) # (B, num_heads, N_q, d_k)
K = K.transpose(1, 2)
V = V.transpose(1, 2)
# Compute attention scores
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
attention_weights = F.softmax(scores, dim=-1)
# Apply attention to values
context = torch.matmul(attention_weights, V)
# Concatenate heads
context = context.transpose(1, 2).contiguous()
context = context.reshape(B, -1, self.d_model)
# Final linear projection
output = self.out(context)
return output, attention_weights
class TransformerEncoderLayer(nn.Module):
"""Transformer encoder layer"""
def __init__(self, d_model: int, num_heads: int, dim_feedforward: int):
super().__init__()
self.attention = MultiHeadAttention(d_model, num_heads)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.ffn = nn.Sequential(
nn.Linear(d_model, dim_feedforward),
nn.GELU(),
nn.Linear(dim_feedforward, d_model)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
# Self-attention
attn_out, _ = self.attention(x, x, x)
x = x + attn_out
x = self.norm1(x)
# Feed-forward
ffn_out = self.ffn(x)
x = x + ffn_out
x = self.norm2(x)
return x
class TransformerDecoderLayer(nn.Module):
"""Transformer decoder layer with cross-attention"""
def __init__(self, d_model: int, num_heads: int, dim_feedforward: int):
super().__init__()
self.self_attention = MultiHeadAttention(d_model, num_heads)
self.cross_attention = MultiHeadAttention(d_model, num_heads)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.ffn = nn.Sequential(
nn.Linear(d_model, dim_feedforward),
nn.GELU(),
nn.Linear(dim_feedforward, d_model)
)
def forward(
self,
x: torch.Tensor,
encoder_output: torch.Tensor
) -> torch.Tensor:
# Self-attention
self_attn_out, _ = self.self_attention(x, x, x)
x = x + self_attn_out
x = self.norm1(x)
# Cross-attention
cross_attn_out, _ = self.cross_attention(x, encoder_output, encoder_output)
x = x + cross_attn_out
x = self.norm2(x)
# Feed-forward
ffn_out = self.ffn(x)
x = x + ffn_out
x = self.norm3(x)
return x
class DINODetector(nn.Module):
"""
DINO (DETR with Improved Denoising Anchor boxes) object detection module.
This module performs object detection with improved anchor box refinement
through deformable attention and contrastive denoising training.
"""
def __init__(
self,
hidden_dim: int = 256,
num_queries: int = 1000,
num_heads: int = 8,
num_encoder_layers: int = 6,
num_decoder_layers: int = 6,
dim_feedforward: int = 2048,
num_classes: int = 12,
denoising_lambda1: float = 0.4,
denoising_lambda2: float = 1.0
):
"""
Args:
hidden_dim: Hidden dimension
num_queries: Number of object queries
num_heads: Number of attention heads
num_encoder_layers: Number of encoder layers
num_decoder_layers: Number of decoder layers
dim_feedforward: Dimension of FFN
num_classes: Number of object classes
denoising_lambda1: Denoising lambda for small noise
denoising_lambda2: Denoising lambda for large noise
"""
super().__init__()
self.hidden_dim = hidden_dim
self.num_queries = num_queries
self.num_classes = num_classes
self.denoising_lambda1 = denoising_lambda1
self.denoising_lambda2 = denoising_lambda2
# Positional embeddings
self.pos_embedding = PositionalEmbedding(hidden_dim)
# Query embeddings (learnable)
self.query_embed = nn.Embedding(num_queries, hidden_dim)
# Encoder
self.encoder = nn.ModuleList([
TransformerEncoderLayer(hidden_dim, num_heads, dim_feedforward)
for _ in range(num_encoder_layers)
])
# Decoder
self.decoder = nn.ModuleList([
TransformerDecoderLayer(hidden_dim, num_heads, dim_feedforward)
for _ in range(num_decoder_layers)
])
# Prediction heads
self.class_head = nn.Linear(hidden_dim, num_classes + 1)
self.box_head = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, 4)
)
# Initialize weights
self._init_weights()
def _init_weights(self):
"""Initialize layer weights"""
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.constant_(m.bias, 0)
elif isinstance(m, nn.LayerNorm):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def forward(
self,
image_features: torch.Tensor,
targets: Dict = None
) -> Dict[str, torch.Tensor]:
"""
Args:
image_features: Flattened image features from feature extractor (B, L, C)
targets: Optional target annotations for denoising training
Returns:
Dictionary containing:
- pred_boxes: Predicted bounding boxes (B, num_queries, 4)
- pred_classes: Predicted class logits (B, num_queries, num_classes+1)
- pred_logits: Logits for classification
"""
batch_size = image_features.shape[0]
# Add positional embeddings
pos_features = self.pos_embedding(image_features)
# Encoder
encoder_output = pos_features
for encoder_layer in self.encoder:
encoder_output = encoder_layer(encoder_output)
# Initialize query embeddings
query_embed = self.query_embed.weight.unsqueeze(0).expand(batch_size, -1, -1)
# Add denoising during training
if self.training and targets is not None:
query_embed = self._add_denoising_queries(query_embed, targets)
# Decoder with iterative refinement
decoder_output = query_embed
for decoder_layer in self.decoder:
decoder_output = decoder_layer(decoder_output, encoder_output)
# Prediction heads
pred_boxes = self.box_head(decoder_output)
pred_classes = self.class_head(decoder_output)
# Normalize boxes to [0, 1] using sigmoid + scale
pred_boxes = torch.sigmoid(pred_boxes)
return {
'pred_boxes': pred_boxes,
'pred_classes': pred_classes,
'encoder_output': encoder_output,
'decoder_output': decoder_output
}
def _add_denoising_queries(
self,
query_embed: torch.Tensor,
targets: Dict
) -> torch.Tensor:
"""
Add contrastive denoising queries for improved robustness.
According to the paper, denoising involves adding controlled noise
to ground-truth labels and boxes.
"""
# Implementation of denoising as per equation:
# |Δx| < λ₁w/2, |Δy| < λ₁h/2, |Δw| < λ₁w, |Δh| < λ₁h
denoised_queries = query_embed.clone()
# Add Gaussian noise for positive samples
noise1 = torch.randn_like(denoised_queries) * self.denoising_lambda1
denoised_positive = denoised_queries + noise1
# Add larger noise for negative samples
noise2 = torch.randn_like(denoised_queries) * self.denoising_lambda2
denoised_negative = denoised_queries + noise2
# Concatenate positive and negative samples
denoised_queries = torch.cat([denoised_positive, denoised_negative], dim=1)
return denoised_queries
# models/mask_dino.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Tuple
class SegmentationHead(nn.Module):
"""Segmentation head for mask prediction"""
def __init__(self, hidden_dim: int, num_mask_tokens: int):
super().__init__()
self.conv1 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(hidden_dim, num_mask_tokens, kernel_size=1)
self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = F.relu(self.conv1(x))
x = self.conv2(x)
x = self.upsample(x)
return x
class MaskDINO(nn.Module):
"""
MaskDINO: Unified transformer-based framework for object detection
and instance segmentation.
This module integrates DINO outputs with segmentation branch to
produce refined instance masks.
"""
def __init__(
self,
hidden_dim: int = 256,
num_mask_tokens: int = 100,
mask_head_hidden_dim: int = 256
):
"""
Args:
hidden_dim: Hidden dimension
num_mask_tokens: Number of mask prediction tokens
mask_head_hidden_dim: Hidden dimension for mask head
"""
super().__init__()
self.hidden_dim = hidden_dim
self.num_mask_tokens = num_mask_tokens
# Channel adaptation for concatenation
self.feature_projection = nn.Linear(hidden_dim, hidden_dim)
self.latent_projection = nn.Linear(hidden_dim, hidden_dim)
# Upsampling layers
self.upsample_4x = nn.Sequential(
nn.ConvTranspose2d(hidden_dim, hidden_dim // 2, kernel_size=4, stride=2, padding=1),
nn.ReLU(),
nn.ConvTranspose2d(hidden_dim // 2, hidden_dim // 4, kernel_size=4, stride=2, padding=1),
nn.ReLU()
)
# Segmentation head
self.seg_head = nn.Sequential(
nn.Conv2d(hidden_dim // 4, hidden_dim // 8, kernel_size=3, padding=1),
nn.ReLU(),
nn.Conv2d(hidden_dim // 8, 1, kernel_size=1)
)
# Dice loss computation
self.register_buffer('eps', torch.tensor(1e-6))
def forward(
self,
image_features: torch.Tensor,
dino_output: Dict[str, torch.Tensor],
masks_targets: torch.Tensor = None
) -> Dict[str, torch.Tensor]:
"""
Args:
image_features: Image features from feature extractor (B, H, W, C)
dino_output: Output from DINO module containing:
- pred_boxes: (B, num_queries, 4)
- pred_classes: (B, num_queries, num_classes+1)
- decoder_output: (B, num_queries, hidden_dim)
masks_targets: Optional ground truth masks (B, num_queries, H, W)
Returns:
Dictionary containing:
- pred_masks: Predicted binary masks (B, num_queries, H, W)
- mask_loss: Segmentation loss
"""
batch_size = image_features.shape[0]
height, width = image_features.shape[1:3]
# Reshape image features for processing
image_features_reshape = image_features.permute(0, 3, 1, 2) # (B, C, H, W)
# Get decoder output from DINO
decoder_output = dino_output['decoder_output'] # (B, num_queries, hidden_dim)
# Compute segmentation masks via dot product
# m = q_c ⊗ M(T(C_b) + F(C_e))
# Project features and decoder output
image_feat_proj = self.feature_projection(
image_features.reshape(-1, image_features.shape[-1])
).reshape(batch_size, height, width, -1)
image_feat_proj = image_feat_proj.permute(0, 3, 1, 2)
latent_proj = self.latent_projection(decoder_output) # (B, num_queries, hidden_dim)
# Upsample features
upsampled_feat = self.upsample_4x(image_feat_proj) # (B, C', H', W')
# Generate segmentation maps through attention
query_feat = latent_proj.unsqueeze(-1).unsqueeze(-1) # (B, num_queries, hidden_dim, 1, 1)
# Expand for broadcasting
batch_size, num_queries, feat_dim = latent_proj.shape
_, _, up_height, up_width = upsampled_feat.shape
# Reshape for dot product
upsampled_flat = upsampled_feat.view(batch_size, feat_dim, -1) # (B, C', H'*W')
latent_flat = latent_proj.unsqueeze(-1) # (B, num_queries, hidden_dim, 1)
# Compute dot product: (B, num_queries, hidden_dim) @ (B, hidden_dim, H'*W')
mask_logits = torch.bmm(
latent_proj,
upsampled_flat
) # (B, num_queries, H'*W')
# Reshape to spatial dimensions
pred_masks = mask_logits.view(
batch_size, num_queries, up_height, up_width
) # (B, num_queries, H', W')
# Apply sigmoid
pred_masks = torch.sigmoid(pred_masks)
output = {
'pred_masks': pred_masks,
'mask_logits': mask_logits
}
return output
def compute_mask_loss(
self,
pred_masks: torch.Tensor,
target_masks: torch.Tensor
) -> torch.Tensor:
"""
Compute segmentation loss combining BCE and Dice losses.
Args:
pred_masks: Predicted masks (B, num_queries, H, W)
target_masks: Target masks (B, num_queries, H, W)
Returns:
Segmentation loss
"""
# BCE loss
bce_loss = F.binary_cross_entropy(pred_masks, target_masks, reduction='mean')
# Dice loss
intersection = (pred_masks * target_masks).sum()
union = pred_masks.sum() + target_masks.sum()
dice_loss = 1 - (2 * intersection + self.eps) / (union + self.eps)
# Combined loss
total_loss = 0.5 * bce_loss + 0.5 * dice_loss
return total_loss
# models/cellotype.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, Optional, Tuple
from .feature_extractor import FeatureExtractor
from .dino_detector import DINODetector
from .mask_dino import MaskDINO
class CelloType(nn.Module):
"""
CelloType: Unified end-to-end model for cell segmentation and classification.
This model integrates three components:
1. Feature extraction (Swin Transformer)
2. Object detection (DINO)
3. Instance segmentation (MaskDINO)
The model is trained with a unified loss function that considers
segmentation masks, bounding boxes, and class predictions.
"""
def __init__(
self,
in_channels: int = 2,
hidden_dim: int = 256,
num_queries: int = 1000,
num_classes: int = 12,
swin_embed_dim: int = 96,
swin_depths: Tuple[int, ...] = (2, 2, 6, 2),
swin_num_heads: Tuple[int, ...] = (3, 6, 12, 24),
dino_num_heads: int = 8,
dino_num_encoder_layers: int = 6,
dino_num_decoder_layers: int = 6,
dim_feedforward: int = 2048,
weight_cls: float = 2.0,
weight_box: float = 5.0,
weight_mask: float = 1.0,
denoising_lambda1: float = 0.4,
denoising_lambda2: float = 1.0
):
"""
Args:
in_channels: Number of input channels
hidden_dim: Hidden dimension for DINO/MaskDINO
num_queries: Number of object queries
num_classes: Number of cell type classes
swin_embed_dim: Swin Transformer embedding dimension
swin_depths: Depths of Swin Transformer stages
swin_num_heads: Number of heads in each Swin stage
dino_num_heads: Number of heads in DINO
dino_num_encoder_layers: Number of DINO encoder layers
dino_num_decoder_layers: Number of DINO decoder layers
dim_feedforward: FFN dimension
weight_cls: Loss weight for classification
weight_box: Loss weight for box regression
weight_mask: Loss weight for segmentation
denoising_lambda1: Small denoising lambda
denoising_lambda2: Large denoising lambda
"""
super().__init__()
self.in_channels = in_channels
self.hidden_dim = hidden_dim
self.num_queries = num_queries
self.num_classes = num_classes
self.weight_cls = weight_cls
self.weight_box = weight_box
self.weight_mask = weight_mask
# Feature extraction
self.feature_extractor = FeatureExtractor(
in_channels=in_channels,
embed_dim=swin_embed_dim,
depths=swin_depths,
num_heads=swin_num_heads,
window_size=7
)
# Get feature dimension from extractor
feature_dim = self.feature_extractor.out_channels[-1]
# Feature projection to hidden_dim
self.feature_projection = nn.Sequential(
nn.Linear(feature_dim, hidden_dim),
nn.LayerNorm(hidden_dim)
)
# Object detection (DINO)
self.dino_detector = DINODetector(
hidden_dim=hidden_dim,
num_queries=num_queries,
num_heads=dino_num_heads,
num_encoder_layers=dino_num_encoder_layers,
num_decoder_layers=dino_num_decoder_layers,
dim_feedforward=dim_feedforward,
num_classes=num_classes,
denoising_lambda1=denoising_lambda1,
denoising_lambda2=denoising_lambda2
)
# Instance segmentation (MaskDINO)
self.mask_dino = MaskDINO(
hidden_dim=hidden_dim,
num_mask_tokens=num_queries
)
# Confidence score head
self.confidence_head = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, 1),
nn.Sigmoid()
)
def forward(
self,
images: torch.Tensor,
targets: Optional[Dict] = None
) -> Dict[str, torch.Tensor]:
"""
Args:
images: Input images (B, C, H, W)
targets: Optional target annotations for training
Returns:
Dictionary containing:
- pred_boxes: Predicted bounding boxes
- pred_classes: Predicted class logits
- pred_masks: Predicted instance masks
- confidence_scores: Confidence scores for predictions
- losses: (Optional during training) Individual loss components
"""
batch_size = images.shape[0]
device = images.device
# Feature extraction
features = self.feature_extractor(images)
image_features = features['flattened'] # (B, L, C)
# Project features
projected_features = self.feature_projection(image_features)
# Object detection with DINO
dino_output = self.dino_detector(
projected_features,
targets=targets if self.training else None
)
# Extract DINO outputs
pred_boxes = dino_output['pred_boxes'] # (B, num_queries, 4)
pred_classes = dino_output['pred_classes'] # (B, num_queries, num_classes+1)
decoder_output = dino_output['decoder_output'] # (B, num_queries, hidden_dim)
# Reshape image features for segmentation
# Assuming 512x512 input -> (B, 1024, 256) from flattened
feat_h = feat_w = int(image_features.shape[1] ** 0.5)
image_features_spatial = image_features.reshape(
batch_size, feat_h, feat_w, -1
)
# Instance segmentation with MaskDINO
mask_output = self.mask_dino(
image_features_spatial,
dino_output,
masks_targets=targets['masks'] if targets is not None else None
)
pred_masks = mask_output['pred_masks'] # (B, num_queries, H', W')
# Generate confidence scores
confidence_scores = self.confidence_head(decoder_output) # (B, num_queries, 1)
confidence_scores = confidence_scores.squeeze(-1)
output = {
'pred_boxes': pred_boxes,
'pred_classes': pred_classes,
'pred_masks': pred_masks,
'confidence_scores': confidence_scores
}
# Compute losses during training
if self.training and targets is not None:
losses = self.compute_losses(
pred_boxes=pred_boxes,
pred_classes=pred_classes,
pred_masks=pred_masks,
targets=targets
)
output['losses'] = losses
output['total_loss'] = (
self.weight_cls * losses['cls_loss'] +
self.weight_box * losses['box_loss'] +
self.weight_mask * losses['mask_loss']
)
return output
def compute_losses(
self,
pred_boxes: torch.Tensor,
pred_classes: torch.Tensor,
pred_masks: torch.Tensor,
targets: Dict
) -> Dict[str, torch.Tensor]:
"""
Compute multitask losses: classification, box regression, and segmentation.
Loss = λ_cls * L_cls + λ_box * L_box + λ_mask * L_mask
"""
# Classification loss
cls_loss = self._compute_classification_loss(pred_classes, targets)
# Box regression loss
box_loss = self._compute_box_loss(pred_boxes, targets)
# Segmentation loss
mask_loss = self._compute_mask_loss(pred_masks, targets)
return {
'cls_loss': cls_loss,
'box_loss': box_loss,
'mask_loss': mask_loss
}
def _compute_classification_loss(
self,
pred_classes: torch.Tensor,
targets: Dict
) -> torch.Tensor:
"""
Focal loss for classification to handle class imbalance.
"""
batch_size = pred_classes.shape[0]
losses = []
for i in range(batch_size):
if len(targets['labels'][i]) == 0:
continue
# Get predictions and targets for this image
pred = pred_classes[i] # (num_queries, num_classes+1)
target_labels = targets['labels'][i] # (N,)
# Focal loss with reduction
ce_loss = F.cross_entropy(
pred.unsqueeze(0).expand(len(target_labels), -1, -1),
target_labels.unsqueeze(0).expand(len(target_labels), -1),
reduction='none'
)
# Focal loss weighting
p_t = torch.exp(-ce_loss)
focal_loss = (1 - p_t) ** 2 * ce_loss
losses.append(focal_loss.mean())
if losses:
return torch.stack(losses).mean()
else:
return torch.tensor(0.0, device=pred_classes.device)
def _compute_box_loss(
self,
pred_boxes: torch.Tensor,
targets: Dict
) -> torch.Tensor:
"""
Compute L1 loss for bounding box regression.
"""
batch_size = pred_boxes.shape[0]
losses = []
for i in range(batch_size):
if len(targets['boxes'][i]) == 0:
continue
pred = pred_boxes[i] # (num_queries, 4)
target_boxes = targets['boxes'][i] # (N, 4)
# Match predictions to targets (simplified - use first N)
n_targets = len(target_boxes)
if n_targets > 0:
pred_matched = pred[:n_targets]
loss = F.l1_loss(pred_matched, target_boxes)
losses.append(loss)
if losses:
return torch.stack(losses).mean()
else:
return torch.tensor(0.0, device=pred_boxes.device)
def _compute_mask_loss(
self,
pred_masks: torch.Tensor,
targets: Dict
) -> torch.Tensor:
"""
Compute segmentation loss (BCE + Dice).
"""
batch_size = pred_masks.shape[0]
losses = []
for i in range(batch_size):
if len(targets['masks'][i]) == 0:
continue
pred = pred_masks[i] # (num_queries, H, W)
target = targets['masks'][i] # (N, H, W)
# Resize target to match predictions if needed
if target.shape != pred.shape:
target = F.interpolate(
target.unsqueeze(1).float(),
size=pred.shape[1:],
mode='bilinear',
align_corners=False
).squeeze(1)
n_targets = len(target)
if n_targets > 0:
pred_matched = pred[:n_targets]
# BCE loss
bce = F.binary_cross_entropy(pred_matched, target.float())
# Dice loss
intersection = (pred_matched * target.float()).sum()
union = pred_matched.sum() + target.float().sum()
dice = 1 - (2 * intersection + 1e-6) / (union + 1e-6)
loss = 0.5 * bce + 0.5 * dice
losses.append(loss)
if losses:
return torch.stack(losses).mean()
else:
return torch.tensor(0.0, device=pred_masks.device)
# training/trainer.py
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from pathlib import Path
from typing import Dict, Tuple
from tqdm import tqdm
import numpy as np
from config import TrainingConfig, ModelConfig, DataConfig
from utils.metrics import compute_ap, compute_classification_metrics
class CelloTypeTrainer:
"""Trainer class for CelloType model"""
def __init__(
self,
model: nn.Module,
train_loader,
val_loader,
config: TrainingConfig
):
"""
Args:
model: CelloType model
train_loader: Training data loader
val_loader: Validation data loader
config: Training configuration
"""
self.model = model
self.train_loader = train_loader
self.val_loader = val_loader
self.config = config
self.device = torch.device(config.device)
self.model = self.model.to(self.device)
# Optimizer
self.optimizer = optim.Adam(
self.model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay
)
# Learning rate scheduler
self.scheduler = optim.lr_scheduler.CosineAnnealingLR(
self.optimizer,
T_max=config.num_epochs,
eta_min=1e-8
)
# Create checkpoints directory
self.save_dir = Path(config.save_dir)
self.save_dir.mkdir(parents=True, exist_ok=True)
# Tensorboard logger
self.writer = SummaryWriter(config.log_dir)
# Training state
self.current_epoch = 0
self.best_val_ap = 0
self.patience_counter = 0
def train_epoch(self) -> Dict[str, float]:
"""Train one epoch"""
self.model.train()
total_loss = 0
cls_losses = []
box_losses = []
mask_losses = []
pbar = tqdm(self.train_loader, desc=f"Epoch {self.current_epoch}")
for batch_idx, batch in enumerate(pbar):
# Move to device
images = batch['image'].to(self.device)
# Prepare targets
targets = {
'boxes': [b.to(self.device) for b in batch['boxes']],
'labels': [l.to(self.device) for l in batch['labels']],
'masks': [m.to(self.device) for m in batch['masks']],
'areas': [a.to(self.device) for a in batch['areas']]
}
# Forward pass
self.optimizer.zero_grad()
outputs = self.model(images, targets=targets)
# Compute loss
loss = outputs['total_loss']
# Backward pass
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
# Record losses
total_loss += loss.item()
cls_losses.append(outputs['losses']['cls_loss'].item())
box_losses.append(outputs['losses']['box_loss'].item())
mask_losses.append(outputs['losses']['mask_loss'].item())
pbar.set_postfix({
'loss': loss.item(),
'cls': outputs['losses']['cls_loss'].item(),
'box': outputs['losses']['box_loss'].item(),
'mask': outputs['losses']['mask_loss'].item()
})
# Average losses
avg_loss = total_loss / len(self.train_loader)
avg_cls_loss = np.mean(cls_losses)
avg_box_loss = np.mean(box_losses)
avg_mask_loss = np.mean(mask_losses)
return {
'total_loss': avg_loss,
'cls_loss': avg_cls_loss,
'box_loss': avg_box_loss,
'mask_loss': avg_mask_loss
}
@torch.no_grad()
def validate(self) -> Dict[str, float]:
"""Validate model"""
self.model.eval()
all_pred_boxes = []
all_pred_classes = []
all_pred_scores = []
all_gt_boxes = []
all_gt_classes = []
pbar = tqdm(self.val_loader, desc="Validation")
for batch in pbar:
images = batch['image'].to(self.device)
# Forward pass
outputs = self.model(images)
# Post-process predictions
for i in range(images.shape[0]):
pred_boxes = outputs['pred_boxes'][i]
pred_classes = outputs['pred_classes'][i]
confidence = outputs['confidence_scores'][i]
# Filter by confidence threshold
mask = confidence > self.config.confidence_threshold
pred_boxes = pred_boxes[mask]
pred_classes = pred_classes[mask]
confidence = confidence[mask]
all_pred_boxes.append(pred_boxes)
all_pred_classes.append(pred_classes.argmax(dim=-1))
all_pred_scores.append(confidence)
# Ground truth (if available)
if 'boxes' in batch:
all_gt_boxes.append(batch['boxes'][i])
all_gt_classes.append(batch['labels'][i])
# Compute metrics
ap = compute_ap(all_pred_boxes, all_gt_boxes, all_pred_scores)
return {
'ap': ap,
'mean_confidence': np.mean([s.mean().item() for s in all_pred_scores])
}
def train(self):
"""Full training loop"""
for epoch in range(self.config.num_epochs):
self.current_epoch = epoch
# Train
train_metrics = self.train_epoch()
# Validate
if (epoch + 1) % self.config.eval_interval == 0:
val_metrics = self.validate()
# Log metrics
self.writer.add_scalar(
'val/ap', val_metrics['ap'], epoch
)
self.writer.add_scalar(
'val/mean_confidence', val_metrics['mean_confidence'], epoch
)
print(f"Epoch {epoch}: Val AP = {val_metrics['ap']:.4f}")
# Early stopping
if val_metrics['ap'] > self.best_val_ap:
self.best_val_ap = val_metrics['ap']
self.patience_counter = 0
self._save_checkpoint(is_best=True)
else:
self.patience_counter += 1
if self.patience_counter >= self.config.patience:
print(f"Early stopping at epoch {epoch}")
break
# Log training metrics
for key, value in train_metrics.items():
self.writer.add_scalar(f'train/{key}', value, epoch)
# Update learning rate
self.scheduler.step()
# Save checkpoint periodically
if (epoch + 1) % 10 == 0:
self._save_checkpoint()
self.writer.close()
def _save_checkpoint(self, is_best: bool = False):
"""Save model checkpoint"""
checkpoint = {
'epoch': self.current_epoch,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'best_val_ap': self.best_val_ap
}
if is_best:
path = self.save_dir / 'best_model.pt'
else:
path = self.save_dir / f'checkpoint_epoch_{self.current_epoch}.pt'
torch.save(checkpoint, path)
print(f"Checkpoint saved: {path}")
def load_checkpoint(self, path: str):
"""Load checkpoint"""
checkpoint = torch.load(path, map_location=self.device)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
self.current_epoch = checkpoint['epoch']
self.best_val_ap = checkpoint['best_val_ap']
print(f"Checkpoint loaded: {path}")
# training/losses.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict
class CelloTypeLoss(nn.Module):
"""
Combined loss function for CelloType.
Loss = λ_cls * L_cls + λ_box * L_box + λ_mask * L_mask
"""
def __init__(
self,
weight_cls: float = 2.0,
weight_box: float = 5.0,
weight_mask: float = 1.0,
num_classes: int = 12
):
super().__init__()
self.weight_cls = weight_cls
self.weight_box = weight_box
self.weight_mask = weight_mask
self.num_classes = num_classes
def forward(
self,
pred_boxes: torch.Tensor,
pred_classes: torch.Tensor,
pred_masks: torch.Tensor,
targets: Dict[str, torch.Tensor]
) -> Dict[str, torch.Tensor]:
"""
Compute all loss components.
Args:
pred_boxes: (B, N, 4)
pred_classes: (B, N, num_classes+1)
pred_masks: (B, N, H, W)
targets: Dictionary with 'boxes', 'labels', 'masks'
Returns:
Dictionary with individual losses and total loss
"""
# Classification loss (focal loss)
cls_loss = self._focal_loss(pred_classes, targets['labels'])
# Box regression loss (L1 loss)
box_loss = self._box_loss(pred_boxes, targets['boxes'])
# Segmentation loss (BCE + Dice)
mask_loss = self._mask_loss(pred_masks, targets['masks'])
# Total loss
total_loss = (
self.weight_cls * cls_loss +
self.weight_box * box_loss +
self.weight_mask * mask_loss
)
return {
'cls_loss': cls_loss,
'box_loss': box_loss,
'mask_loss': mask_loss,
'total_loss': total_loss
}
def _focal_loss(
self,
pred_classes: torch.Tensor,
target_labels: list
) -> torch.Tensor:
"""Focal loss for handling class imbalance"""
batch_size = pred_classes.shape[0]
losses = []
for i in range(batch_size):
if len(target_labels[i]) == 0:
continue
pred = pred_classes[i]
target = target_labels[i]
# Cross entropy
ce_loss = F.cross_entropy(
pred.unsqueeze(0),
target.view(1, -1),
reduction='none'
)
# Focal term
p_t = torch.exp(-ce_loss)
focal_weight = (1 - p_t) ** 2
focal_loss = focal_weight * ce_loss
losses.append(focal_loss.mean())
if losses:
return torch.stack(losses).mean()
else:
return torch.tensor(0.0, device=pred_classes.device)
def _box_loss(
self,
pred_boxes: torch.Tensor,
target_boxes: list
) -> torch.Tensor:
"""L1 loss for box regression"""
batch_size = pred_boxes.shape[0]
losses = []
for i in range(batch_size):
if len(target_boxes[i]) == 0:
continue
pred = pred_boxes[i]
target = target_boxes[i]
# Match predictions to targets
n_targets = len(target)
if n_targets > 0 and n_targets <= pred.shape[0]:
pred_matched = pred[:n_targets]
loss = F.l1_loss(pred_matched, target)
losses.append(loss)
if losses:
return torch.stack(losses).mean()
else:
return torch.tensor(0.0, device=pred_boxes.device)
def _mask_loss(
self,
pred_masks: torch.Tensor,
target_masks: list
) -> torch.Tensor:
"""BCE + Dice loss for segmentation"""
batch_size = pred_masks.shape[0]
losses = []
for i in range(batch_size):
if len(target_masks[i]) == 0:
continue
pred = pred_masks[i]
target = target_masks[i].float()
# Resize if needed
if target.shape != pred.shape:
target = F.interpolate(
target.unsqueeze(1),
size=pred.shape[1:],
mode='bilinear',
align_corners=False
).squeeze(1)
n_targets = len(target)
if n_targets > 0 and n_targets <= pred.shape[0]:
pred_matched = pred[:n_targets]
# BCE loss
bce = F.binary_cross_entropy(pred_matched, target)
# Dice loss
intersection = (pred_matched * target).sum()
union = pred_matched.sum() + target.sum()
dice = 1 - (2 * intersection + 1e-6) / (union + 1e-6)
loss = 0.5 * bce + 0.5 * dice
losses.append(loss)
if losses:
return torch.stack(losses).mean()
else:
return torch.tensor(0.0, device=pred_masks.device)
# utils/metrics.py
import torch
import numpy as np
from typing import List
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
def compute_ap(
pred_boxes_list: List[torch.Tensor],
gt_boxes_list: List[torch.Tensor],
confidence_scores_list: List[torch.Tensor],
iou_threshold: float = 0.5
) -> float:
"""
Compute Average Precision using IoU metric.
Args:
pred_boxes_list: List of predicted boxes (N, 4)
gt_boxes_list: List of ground truth boxes (M, 4)
confidence_scores_list: List of confidence scores (N,)
iou_threshold: IoU threshold for TP/FP determination
Returns:
Average Precision score
"""
# Flatten all predictions
all_preds = []
all_gts = []
for i, (pred, gt, conf) in enumerate(zip(
pred_boxes_list, gt_boxes_list, confidence_scores_list
)):
for j, p in enumerate(pred):
all_preds.append({
'box': p.cpu().numpy(),
'confidence': conf[j].item(),
'image_id': i
})
for g in gt:
all_gts.append({
'box': g.cpu().numpy(),
'image_id': i
})
if len(all_preds) == 0:
return 0.0
# Sort by confidence
all_preds = sorted(all_preds, key=lambda x: x['confidence'], reverse=True)
# Compute TP and FP
tp = np.zeros(len(all_preds))
fp = np.zeros(len(all_preds))
gt_matched = set()
for i, pred in enumerate(all_preds):
pred_box = pred['box']
pred_img_id = pred['image_id']
max_iou = 0
max_gt_idx = -1
for j, gt in enumerate(all_gts):
if gt['image_id'] != pred_img_id:
continue
if (gt['image_id'], j) in gt_matched:
continue
gt_box = gt['box']
iou = compute_iou(pred_box, gt_box)
if iou > max_iou:
max_iou = iou
max_gt_idx = j
if max_iou >= iou_threshold and max_gt_idx >= 0:
tp[i] = 1
gt_matched.add((pred_img_id, max_gt_idx))
else:
fp[i] = 1
# Compute precision and recall
tp_cumsum = np.cumsum(tp)
fp_cumsum = np.cumsum(fp)
recalls = tp_cumsum / len(all_gts)
precisions = tp_cumsum / (tp_cumsum + fp_cumsum)
# Compute AP
ap = np.mean(precisions)
return float(ap)
def compute_iou(box1: np.ndarray, box2: np.ndarray) -> float:
"""
Compute Intersection over Union between two boxes.
Args:
box1: [x1, y1, x2, y2]
box2: [x1, y1, x2, y2]
Returns:
IoU value
"""
x1_min, y1_min, x1_max, y1_max = box1
x2_min, y2_min, x2_max, y2_max = box2
# Intersection
inter_min_x = max(x1_min, x2_min)
inter_min_y = max(y1_min, y2_min)
inter_max_x = min(x1_max, x2_max)
inter_max_y = min(y1_max, y2_max)
if inter_max_x < inter_min_x or inter_max_y < inter_min_y:
return 0.0
inter_area = (inter_max_x - inter_min_x) * (inter_max_y - inter_min_y)
# Union
box1_area = (x1_max - x1_min) * (y1_max - y1_min)
box2_area = (x2_max - x2_min) * (y2_max - y2_min)
union_area = box1_area + box2_area - inter_area
if union_area == 0:
return 0.0
iou = inter_area / union_area
return float(iou)
def compute_classification_metrics(
pred_classes: np.ndarray,
gt_classes: np.ndarray
) -> dict:
"""
Compute classification metrics: precision, recall, F1.
"""
from sklearn.metrics import precision_score, recall_score, f1_score
precision = precision_score(gt_classes, pred_classes, average='weighted', zero_division=0)
recall = recall_score(gt_classes, pred_classes, average='weighted', zero_division=0)
f1 = f1_score(gt_classes, pred_classes, average='weighted', zero_division=0)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# inference.py
import torch
import torch.nn.functional as F
import numpy as np
from pathlib import Path
from typing import Dict, Tuple, List
import cv2
from models.cellotype import CelloType
from config import InferenceConfig
class CelloTypeInference:
"""Inference pipeline for CelloType"""
def __init__(
self,
model_path: str,
device: str = 'cuda',
config: InferenceConfig = None
):
"""
Args:
model_path: Path to trained model checkpoint
device: Device to run inference on
config: Inference configuration
"""
self.device = torch.device(device)
self.config = config or InferenceConfig()
# Load model
self.model = CelloType()
checkpoint = torch.load(model_path, map_location=self.device)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.model = self.model.to(self.device)
self.model.eval()
@torch.no_grad()
def predict(
self,
image: np.ndarray,
image_size: Tuple[int, int] = (512, 512)
) -> Dict:
"""
Run inference on an image.
Args:
image: Input image (H, W, C) or (C, H, W)
image_size: Target image size
Returns:
Dictionary containing:
- boxes: Detected bounding boxes (N, 4)
- classes: Predicted class labels (N,)
- masks: Instance segmentation masks (N, H, W)
- scores: Confidence scores (N,)
"""
# Preprocess image
if image.ndim == 3 and image.shape[0] not in [1, 2, 3]:
image = image.transpose(2, 0, 1)
# Resize
if image.shape[0] == 1:
image = np.repeat(image, 3, axis=0)
image = cv2.resize(image.transpose(1, 2, 0), image_size)
image = image.transpose(2, 0, 1)
# Normalize
image = image.astype(np.float32)
image = (image - image.min()) / (image.max() - image.min() + 1e-6)
# To tensor
image = torch.from_numpy(image).unsqueeze(0).to(self.device)
# Forward pass
outputs = self.model(image)
# Post-process
results = self._post_process(outputs)
return results
def _post_process(self, outputs: Dict) -> Dict:
"""
Post-process model outputs.
Includes:
- Filtering by confidence threshold
- NMS for box suppression
- Mask refinement
"""
pred_boxes = outputs['pred_boxes'][0] # (num_queries, 4)
pred_classes = outputs['pred_classes'][0] # (num_queries, num_classes+1)
pred_masks = outputs['pred_masks'][0] # (num_queries, H, W)
scores = outputs['confidence_scores'][0] # (num_queries,)
# Filter by confidence threshold
mask = scores > self.config.confidence_threshold
boxes = pred_boxes[mask]
classes = pred_classes[mask].argmax(dim=-1)
masks = pred_masks[mask]
scores = scores[mask]
if len(boxes) == 0:
return {
'boxes': np.array([]),
'classes': np.array([]),
'masks': np.array([]),
'scores': np.array([])
}
# NMS
keep_indices = self._nms(boxes, scores)
boxes = boxes[keep_indices].cpu().numpy()
classes = classes[keep_indices].cpu().numpy()
masks = masks[keep_indices].cpu().numpy()
scores = scores[keep_indices].cpu().numpy()
return {
'boxes': boxes,
'classes': classes,
'masks': masks,
'scores': scores
}
def _nms(
self,
boxes: torch.Tensor,
scores: torch.Tensor
) -> np.ndarray:
"""
Non-maximum suppression.
Args:
boxes: (N, 4) in [x1, y1, x2, y2] format
scores: (N,) confidence scores
Returns:
Indices of kept boxes
"""
if len(boxes) == 0:
return np.array([])
# Convert to numpy
boxes_np = boxes.cpu().numpy()
scores_np = scores.cpu().numpy()
# Sort by score
sorted_indices = np.argsort(-scores_np)
keep = []
while len(sorted_indices) > 0:
current = sorted_indices[0]
keep.append(current)
if len(sorted_indices) == 1:
break
current_box = boxes_np[current]
rest_boxes = boxes_np[sorted_indices[1:]]
# Compute IoU with rest
ious = self._compute_ious(current_box, rest_boxes)
# Keep boxes with IoU < threshold
mask = ious < self.config.nms_threshold
sorted_indices = sorted_indices[1:][mask]
return np.array(keep)
def _compute_ious(
self,
box: np.ndarray,
boxes: np.ndarray
) -> np.ndarray:
"""Compute IoU between one box and multiple boxes"""
x1_min, y1_min, x1_max, y1_max = box
x2_min, y2_min, x2_max, y2_max = boxes.T
inter_min_x = np.maximum(x1_min, x2_min)
inter_min_y = np.maximum(y1_min, y2_min)
inter_max_x = np.minimum(x1_max, x2_max)
inter_max_y = np.minimum(y1_max, y2_max)
inter_w = np.maximum(0, inter_max_x - inter_min_x)
inter_h = np.maximum(0, inter_max_y - inter_min_y)
inter_area = inter_w * inter_h
box_area = (x1_max - x1_min) * (y1_max - y1_min)
boxes_area = (x2_max - x2_min) * (y2_max - y2_min)
union_area = box_area + boxes_area - inter_area
ious = inter_area / (union_area + 1e-6)
return ious
# main.py
import torch
import argparse
from pathlib import Path
from config import TrainingConfig, ModelConfig, DataConfig, InferenceConfig
from models.cellotype import CelloType
from data.dataset import create_dataloaders
from training.trainer import CelloTypeTrainer
from inference import CelloTypeInference
def main():
parser = argparse.ArgumentParser(description="CelloType Training and Inference")
parser.add_argument('--mode', type=str, choices=['train', 'infer'], default='train')
parser.add_argument('--data-dir', type=str, default='./data')
parser.add_argument('--model-path', type=str, default=None)
parser.add_argument('--image-path', type=str, default=None)
parser.add_argument('--output-dir', type=str, default='./outputs')
parser.add_argument('--device', type=str, default='cuda')
args = parser.parse_args()
# Create output directory
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
if args.mode == 'train':
train(args)
elif args.mode == 'infer':
infer(args)
def train(args):
"""Training mode"""
print("=" * 80)
print("CelloType Training")
print("=" * 80)
# Configurations
data_config = DataConfig()
model_config = ModelConfig()
train_config = TrainingConfig(device=args.device)
# Create model
print("Creating model...")
model = CelloType(
in_channels=data_config.num_channels,
hidden_dim=model_config.dino_hidden_dim,
num_queries=model_config.dino_num_queries,
num_classes=model_config.num_classes,
swin_embed_dim=model_config.swin_embed_dim,
swin_depths=model_config.swin_depths,
swin_num_heads=model_config.swin_num_heads,
weight_cls=model_config.weight_cls,
weight_box=model_config.weight_box,
weight_mask=model_config.weight_mask
)
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
# Create dataloaders
print(f"Loading data from {args.data_dir}...")
train_loader, val_loader, test_loader = create_dataloaders(
args.data_dir,
batch_size=train_config.batch_size,
image_size=data_config.image_size,
num_workers=train_config.num_workers
)
print(f"Train samples: {len(train_loader.dataset)}")
print(f"Val samples: {len(val_loader.dataset)}")
# Create trainer
trainer = CelloTypeTrainer(
model=model,
train_loader=train_loader,
val_loader=val_loader,
config=train_config
)
# Train
print("Starting training...")
trainer.train()
print("Training completed!")
def infer(args):
"""Inference mode"""
print("=" * 80)
print("CelloType Inference")
print("=" * 80)
if args.model_path is None:
raise ValueError("--model-path required for inference")
if args.image_path is None:
raise ValueError("--image-path required for inference")
# Load image
import cv2
import numpy as np
from utils.visualization import visualize_predictions
image = cv2.imread(args.image_path, cv2.IMREAD_ANYDEPTH)
if image is None:
image = np.load(args.image_path)
print(f"Image shape: {image.shape}")
# Initialize inference
infer_config = InferenceConfig()
inferencer = CelloTypeInference(
args.model_path,
device=args.device,
config=infer_config
)
# Run inference
print("Running inference...")
results = inferencer.predict(image)
print(f"Detected {len(results['boxes'])} cells")
print(f"Confidence scores: min={results['scores'].min():.4f}, max={results['scores'].max():.4f}, mean={results['scores'].mean():.4f}")
# Visualize results
vis_image = visualize_predictions(image, results)
output_path = Path(args.output_dir) / "predictions.png"
cv2.imwrite(str(output_path), vis_image)
print(f"Visualization saved to {output_path}")
# Save results
import json
results_json = {
'num_cells': len(results['boxes']),
'boxes': results['boxes'].tolist(),
'classes': results['classes'].tolist(),
'scores': results['scores'].tolist()
}
results_path = Path(args.output_dir) / "results.json"
with open(results_path, 'w') as f:
json.dump(results_json, f, indent=2)
print(f"Results saved to {results_path}")
if __name__ == '__main__':
main()
References
Related posts, You May like to read
- 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
- 7 Revolutionary Breakthroughs in Medical Image Translation (And 1 Fatal Flaw That Could Derail Your AI Model)
- TimeDistill: Revolutionizing Time Series Forecasting with Cross-Architecture Knowledge Distillation
- HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
- GeoSAM2 3D Part Segmentation — Prompt-Controllable, Geometry-Aware Masks for Precision 3D Editing
- DGRM: How Advanced AI is Learning to Detect Machine-Generated Text Across Different Domains
- A Knowledge Distillation-Based Approach to Enhance Transparency of Classifier Models
- Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
- Discrete Migratory Bird Optimizer with Deep Transfer Learning for Multi-Retinal Disease Detection

