Segment Anything with Text: Revolutionary AI Foundation Model Transforms 3D Medical Image Segmentation

Segment Anything with Text: Revolutionary AI Foundation Model Transforms 3D Medical Image Segmentation

Introduction: The Future of Automated Medical Diagnosis

The traditional workflow in medical imaging has remained largely unchanged for decades. Radiologists manually examine thousands of scans, carefully delineating regions of interest slice by slice—a process that is both time-consuming and prone to human error. But what if an AI model could segment any anatomical structure, lesion, or tissue type across virtually any medical imaging modality using nothing more than a simple text description?

This is precisely what Segment Anything with Text (SAT) achieves, introducing the first knowledge-enhanced foundation model for 3D medical volume segmentation that accepts medical terminology as text prompts and seamlessly tackles various medical image segmentation tasks across modalities, anatomies, and body regions. This breakthrough represents far more than incremental progress in medical AI—it signals a fundamental shift in how the healthcare industry approaches medical image analysis at scale.

This article explores the transformative capabilities of SAT, its architectural innovations, the unprecedented dataset that powers it, and its implications for the future of clinical practice and medical artificial intelligence.


The Evolution of Medical Image Segmentation: From Specialists to Generalists

Understanding the Current Landscape

Medical image segmentation is the process of automatically identifying and delineating regions of interest—organs, lesions, and tissues—within medical images. This plays a crucial role in numerous clinical applications, such as disease diagnosis, treatment planning, and disease progression tracking, as well as in medical research.

Historically, the medical imaging AI community has operated under a specialist paradigm. Individual research groups develop and train specialized neural networks for specific tasks: one model for liver segmentation, another for lung cancer detection, yet another for brain tumor delineation. While these specialist models demonstrate impressive segmentation capabilities, their major drawback lies in their narrow specialization—they are designed and optimized for distinct regions of interest and imaging modalities, require distinct preprocessing methods for each dataset, and often fall short in diverse and dynamic clinical environments where adaptability to new conditions and imaging techniques is essential.

This fragmented approach creates significant practical challenges in real-world clinical deployment. Hospitals cannot maintain dozens of specialized models, each requiring unique infrastructure, monitoring, and updates. The specialist paradigm has become a bottleneck preventing AI’s broader adoption in clinical settings.

Limitations of Interactive Segmentation Approaches

Recent efforts to address this limitation have explored adapting foundation models from computer vision. Models typically perform 2D slice segmentation, which is later fused into 3D volumes through post-processing, overlooking crucial contextual information in 3D radiological imaging. Additionally, models often require point or box inputs as prompts, making them interactive segmentation models that require considerable manual effort for use in practice, and they suffer from significant domain gaps from image statistics to domain-specific medical knowledge.

While interactive approaches represent progress, they still demand substantial human intervention—physicians must click or draw on images to guide segmentation, reducing efficiency and scalability.


SAT: A Paradigm Shift in Medical Image Analysis

Core Innovation: Text-Prompted Segmentation

The first knowledge-enhanced foundation model for 3D medical volume segmentation with medical terminology as text prompt distinguishes itself from previous medical segmentation paradigms by being seamlessly applied to clinical practice or integrated with any large language model.

Key Advantages of the Text-Prompt Paradigm:

  • Automatic segmentation without point-and-click interaction
  • 3D volumetric processing preserving crucial spatial context
  • Knowledge-enhanced representations leveraging anatomical relationships
  • Large language model integration enabling agent-based clinical workflows
  • Scalability across diverse imaging modalities and anatomical structures

The shift from interactive spatial prompts (points, boxes) to text prompts represents a fundamental recognition that medical knowledge is fundamentally linguistic. Radiologists think in anatomical terminology, clinical reports use standardized medical language, and integrating large language models with segmentation requires text-based communication.

Model Variants: SAT-Nano and SAT-Pro

To balance computational cost and performance, SAT includes two variants: SAT-Nano with 110M parameters and SAT-Pro with 447M parameters. This dual-model strategy addresses the practical reality that different clinical settings have varying computational constraints. Hospital systems with substantial GPU resources can deploy SAT-Pro for maximum accuracy, while resource-constrained clinics can utilize SAT-Nano with acceptable performance degradation.

This scalability approach mirrors successful strategies in foundation model development, where organizations release multiple model sizes to maximize adoption across diverse infrastructure capabilities.


Building the Foundation: SAT-DS, the Largest Medical Segmentation Dataset

Dataset Scope and Composition

The technical achievements of SAT rest upon an unprecedented foundation of curated medical imaging data. The researchers collected and integrated 72 diverse publicly available medical segmentation datasets, totaling 22,186 scans including CT, MRI, and PET, with 302,033 segmentation annotations spanning 8 different regions of the human body: brain, head and neck, upper limb, thorax, abdomen, pelvis, and lower limb.

To contextualize this achievement: The data collection is more than fourth times the size of the largest dataset (BraTS2023-GLI) in terms of volume number, and nearly triple the most comprehensive dataset (DAP Atlas) in terms of the class number.

The Challenge of Dataset Unification

Creating SAT-DS required far more than simply concatenating existing datasets. Medical datasets represent decades of collection efforts under varying clinical protocols, annotation standards, and quality criteria. The research team addressed critical challenges:

1. Unified Annotation System

  • Manually verified anatomical targets across all datasets
  • Standardized naming conventions (e.g., “lumbar vertebrae 1 (L1)”)
  • Resolved contradictions between overlapping classes
  • Merged fine-grained classes to maintain consistency

2. Image Preprocessing Standardization

  • Reoriented images to specific axcodes
  • Respaced to uniform voxel size of 1 × 1 × 3 mm³
  • Applied modality-specific intensity normalization:
    • CT images: Truncated to [-500, 1000] HU with z-score normalization
    • MRI/PET images: Clipped at 0.5% and 99.5% percentiles with z-score normalization
  • Cropped to non-zero regions

These preprocessing standards ensure the model learns consistent representations across diverse source datasets, preventing dataset-specific biases from dominating the learned features.


Architectural Innovation: Knowledge Injection Through Contrastive Learning

The Multimodal Knowledge Graph

The researchers constructed a multimodal knowledge tree integrating anatomical concepts and definitions throughout the human body, creating 6502 anatomical terminologies with concept-definition pairs and extensive relationships. This knowledge graph draws from two primary sources:

1. Unified Medical Language System (UMLS)

  • 229,435 biomedical terminologies and definitions
  • 1,048,575 relationship triplets from 60+ vocabularies
  • Comprehensive but functionally-focused definitions

2. Search Engine-Augmented Knowledge

  • 6,502 anatomy-specific concept definitions emphasizing morphological features
  • Location, shape, appearance, structure, and spatial relationships
  • 38,344 relationships extracted through GPT-4 analysis
  • Authority-sourced definitions from medical institutions

Knowledge-Enhanced Representation Learning

The power of SAT derives from injecting this rich medical knowledge into neural representations through contrastive learning. The framework operates on the principle that similar anatomical concepts—whether represented as text definitions, visual atlases, or concept relationships—should produce similar embeddings in latent space.

The Contrastive Learning Objective:

For a batch of positive knowledge pairs linked by medical terminology:

\[ L_{\text{knowledge}} = -\frac{1}{N}\sum_{i=1}^{N}\left(\log\frac{\exp(z_i \cdot z’_i/\tau)}{\sum_{k=1}^{N}\mathbb{1}_{i\neq k}\exp(z_i \cdot z’_k/\tau)} + \log\frac{\exp(z_i \cdot z’_i/\tau)}{\sum_{k=1}^{N}\mathbb{1}_{i\neq k}\exp(z_k \cdot z’_i/\tau)}\right) \]

Where τ = 0.07 is the temperature parameter, and ziz_i zi​ and zi′z’_i zi′​ represent paired embeddings from potentially different modalities (text, visual) but encoding the same anatomical concept.

Key Insight: This approach differs fundamentally from standard contrastive learning by enforcing alignment between textual definitions, visual atlases, and concept relationships—creating a unified embedding space where medical knowledge manifests as geometric structure.

Ablation Study Results: The Value of Knowledge Injection

Testing with different text encoders (BERT-Base, CLIP, MedCPT, and their proprietary knowledge-enhanced encoder) demonstrated that their proposed knowledge-enhanced text encoder achieved 99.18% Recall@1 on anatomy concept-definition retrieval, compared to only 0.08% for BERT-Base, 4.13% for CLIP, and 11.19% for MedCPT.

This dramatic difference illustrates how domain-specific knowledge injection creates embeddings that genuinely capture medical concepts rather than relying on surface-level language similarity.


Performance Benchmarks: SAT Versus Established Baselines

Comparison with Specialist Models

Comparing SAT-Pro against 72 specialist nnU-Net models trained individually on each dataset, SAT-Pro shows comparable performance to the ensemble of 72 nnU-Nets over 497 categories, while using only 20% of their combined model parameters (447M vs. 2.2B+).

Benchmark Results (Internal Evaluation):

$$ \begin{array}{|l|c|c|c|c|l|} \hline \textbf{Model} & \textbf{Parameters} & \textbf{Datasets} & \textbf{Categories} & \textbf{Avg DSC} & \textbf{Notes} \\ \hline \text{SAT-Pro} & 447M & \text{All 72} & 497 & \text{Comparable} & \text{Generalist} \\ \hline \text{72 × nnU-Net} & 2.2B+ & \text{Each specialized} & \text{12 avg} & \text{Reference} & \text{Specialist ensemble} \\ \hline \text{SAT-Nano} & 110M & \text{All 72} & 497 & \text{Lower} & \text{Resource-constrained} \\ \hline \text{U-Mamba} & \approx 400M & \text{Each dataset} & \text{Variable} & \text{Comparable} & \text{Specialist} \\ \hline \text{SwinUNETR} & \approx 400M & \text{Each dataset} & \text{Variable} & \text{Lower} & \text{Specialist} \\ \hline \end{array} $$

This represents a remarkable achievement: a single generalist model achieves performance parity with 72 specialized systems while consuming 1/5 the parameters and offering 40× greater semantic coverage (497 vs. 12 average classes per specialist model).

Superior Generalization on External Data

On external validation datasets (AbdomenAtlas and LiQA, which were excluded from SAT-DS training), SAT-Pro achieves higher performance than all baselines (+3.7% average DSC), demonstrating superior generalization ability. This external evaluation is particularly significant—it simulates real-world deployment scenarios where models encounter multi-center data with different acquisition protocols and patient populations.

Comparison with Interactive Segmentation: MedSAM

Compared with the interactive approach MedSAM, SAT-Pro consistently outperforms across all 7 human body regions with +7.1% average Dice Similarity Coefficient (DSC) improvement, while showing enhanced scalability and robustness.

Particularly compelling is the practical efficiency difference: MedSAM requires 10+ to 60+ prompts to segment a single target in a 3D image scan across different body regions, while SAT requires only a single text prompt to segment the entire 3D scan. For a radiologist segmenting a full abdominal CT scan with 10+ structures of interest, this translates from 100+ manual interactions to simply typing anatomical names.


Clinical Applications and LLM Integration

SAT as an Interface Between Language and Medicine

Perhaps the most transformative aspect of SAT is its potential integration with large language models. SAT can be seamlessly applied as an interface between natural language and segmentation, acting as a high-performance and efficient agent for language models. GPT-4 can analyze and extract anatomical targets of interest from real clinical reports and prompt SAT to segment them on clinical images, providing visual cues for the clinical report and enhancing its interpretability.

Three Emerging Clinical Workflows

1. Report-Guided Segmentation A radiologist writes a clinical report noting “suspicious lesion in the right lung.” GPT-4 extracts “right lung” and “lesion,” prompting SAT to automatically segment both structures. The radiologist receives immediate visual confirmation, accelerating report writing and reducing cognitive load.

2. Conversational Clinical Assistance Physicians can query SAT through natural language without memorizing anatomical terminology. “Show me the liver and any lesions within it” automatically segments appropriate structures, democratizing access to AI-assisted analysis for non-specialist clinicians.

3. Comprehensive EHR-Integrated Analysis Combining radiology images, patient complaints, and historical context, LLM-SAT systems provide automated diagnostic pipelines. A patient presenting with “abdominal pain and weight loss” receives automatic segmentation of pancreas, liver, and surrounding structures relevant to differential diagnosis.


The Long-Tail Distribution Problem: Why Knowledge Matters

Medical segmentation datasets exhibit severe long-tail distributions. In SAT-DS-Nano, the 10 “head” classes account for 12.75% of annotations, while 150 classes with minimum annotations account for only 3.25% despite comprising 34.97% of classes.

Models trained purely on image-label pairs struggle catastrophically on rare structures. Knowledge injection specifically addresses this pathology: Using their proposed knowledge-enhanced text encoder, performance improvements are most pronounced on “tail” classes, demonstrating that knowledge enhancement particularly improves rare structure segmentation where visual examples are limited.

This capability is clinically crucial—many segmentation targets are pathological lesions or anatomical variants encountered infrequently in training data but critical for clinical decision-making.


Limitations and Future Directions

No breakthrough technology emerges without acknowledged limitations. The research team identifies key areas for continued development:

  1. Regional Performance Gaps: SAT-Pro still lags behind nnU-Nets in some anatomical regions, suggesting scaling to larger models remains beneficial
  2. Unseen Categories: Generalization to previously unseen lesion types remains limited, motivating exploration of open-vocabulary segmentation
  3. Demographic Representation: SAT-DS remains skewed toward adult populations with limited pediatric and fetal data, potentially affecting generalization across age groups
  4. Clinical Deployment: While inference speed is suitable for clinical use, further optimization for standard hardware remains important

Conclusion: A New Era in Medical AI

The significance of SAT is demonstrated through multiple dimensions: it represents an important step toward a universal segmentation model in medical scenarios that can flexibly handle diverse images and targets within a single generalist model, effectively replacing the need for dozens of specialist models while achieving superior cross-center transferability.

SAT exemplifies how foundation models, knowledge graphs, and large language models converge to create genuinely transformative medical AI. Rather than forcing clinicians to adapt to technology, SAT adapts to how clinicians think and communicate—through medical language itself.

The path forward is clear: As medical AI matures, expect the specialist paradigm to fade. Future systems will combine the knowledge of foundation models with the linguistic fluency of large language models, delivering AI that is simultaneously more capable, more accessible, and more clinically integrated than anything available today.


Take Action: Engage with the Future of Medical AI

Are you a researcher, clinician, or healthcare technologist interested in how foundation models are revolutionizing medical imaging? The implications of text-prompted segmentation extend far beyond technical achievement—they reshape how radiologists work, how hospitals deploy AI infrastructure, and how medical knowledge translates into clinical action.

We invite you to:

  • Share your perspective: How might text-prompted segmentation transform your clinical workflow or research program?
  • Explore the code: The SAT implementation and SAT-DS are publicly available for reproducibility and benchmarking
  • Consider the implications: What new clinical applications become possible when any anatomical structure can be automatically segmented through natural language?

The democratization of medical AI through accessible, knowledge-enhanced foundation models isn’t inevitable—it requires ongoing investment in research, thoughtful clinical integration, and dialogue between technologists and medical professionals. Your insights and engagement help shape this transformation.

Subscribe to stay informed about breakthrough developments in medical AI, foundation models for healthcare, and clinical AI deployment strategies.


References and Further Reading

The original research introducing SAT was published in npj Digital Medicine by Ziheng Zhao, Yao Zhang, and colleagues, presenting comprehensive evaluation across 72 datasets with 497 anatomical categories and demonstrating superior generalization compared to specialist models, interactive segmentation approaches, and concurrent text-prompted systems. The dataset, code, and trained models are publicly available to accelerate research and clinical adoption.

Below is the Complete End-to-End Implementation of SAT Model.

SAT/
├── config/
│   ├── config.yaml
│   └── dataset_config.json
├── data/
│   ├── __init__.py
│   ├── dataset.py
│   ├── preprocessing.py
│   └── knowledge_graph.py
├── models/
│   ├── __init__.py
│   ├── encoders.py
│   ├── decoders.py
│   └── sat_model.py
├── training/
│   ├── __init__.py
│   ├── trainer.py
│   ├── losses.py
│   └── optimizers.py
├── evaluation/
│   ├── __init__.py
│   ├── metrics.py
│   └── validator.py
├── utils/
│   ├── __init__.py
│   ├── logger.py
│   ├── visualization.py
│   └── checkpoint.py
├── inference/
│   ├── __init__.py
│   └── predictor.py
├── train.py
├── evaluate.py
├── requirements.txt
└── README.md
# 1. Configuration Files

# Model Configuration
model:
  name: "SAT"
  variant: "pro"  # "nano" or "pro"
  
  # Text Encoder
  text_encoder:
    type: "biobert"
    pretrained_model: "dmis-lab/biobert-base-cased"
    hidden_dim: 768
    num_layers: 12
    dropout: 0.1
    freeze_layers: 0  # Number of layers to freeze
    
  # Visual Encoder
  visual_encoder:
    type: "unet3d"
    in_channels: 1
    num_features: 64
    depth: 5
    
  # Query Decoder
  query_decoder:
    type: "transformer"
    num_layers: 6
    num_heads: 8
    hidden_dim: 512
    ffn_dim: 2048
    dropout: 0.1
    
  # Visual Decoder
  visual_decoder:
    type: "unet3d"
    num_features: 64
    depth: 5
    out_channels: 256
    
  # Knowledge Injection
  knowledge_injection:
    enabled: true
    contrastive_temp: 0.07
    embedding_dim: 768

# Data Configuration
data:
  dataset_path: "/path/to/SAT-DS"
  batch_size: 4
  num_workers: 8
  image_size: [288, 288, 96]
  
  # Preprocessing
  preprocessing:
    normalize_ct: true
    ct_hu_range: [-500, 1000]
    normalize_mri: true
    mri_percentile: [0.5, 99.5]
    normalize_pet: true
    pet_percentile: [0.5, 99.5]
    
  # Augmentation
  augmentation:
    enable_random_flip: true
    enable_random_rotation: true
    enable_random_zoom: true
    enable_intensity_shift: true

# Training Configuration
training:
  num_epochs: 100
  learning_rate: 1e-4
  weight_decay: 1e-5
  warmup_epochs: 5
  
  # Loss weights
  loss_weights:
    segmentation: 1.0
    knowledge: 0.5
    
  # Optimizer
  optimizer: "adamw"
  
  # Scheduler
  scheduler: "cosine"
  
  # Precision
  use_amp: true
  
  # Checkpointing
  checkpoint_interval: 5
  checkpoint_dir: "./checkpoints"

# Evaluation Configuration
evaluation:
  val_interval: 1
  metrics:
    - "dice"
    - "nsd"
    - "hausdorff"
  
# Device Configuration
device:
  cuda: true
  distributed: false
  num_gpus: 1

# Logging
logging:
  log_dir: "./logs"
  log_interval: 100
  save_tensorboard: true
# config/dataset_config.json
{
  "datasets": {
    "brain": {
      "modalities": ["CT", "MRI"],
      "anatomies": [
        "brain stem",
        "superior frontal gyrus",
        "lateral ventricle",
        "postcentral gyrus",
        "superior parietal gyrus",
        "cingulate gyrus"
      ],
      "num_samples": 3500,
      "num_annotations": 45000
    },
    "thorax": {
      "modalities": ["CT", "MRI"],
      "anatomies": [
        "lung",
        "heart atrium",
        "aorta",
        "rib",
        "autochthon"
      ],
      "num_samples": 4200,
      "num_annotations": 52000
    },
    "abdomen": {
      "modalities": ["CT", "MRI"],
      "anatomies": [
        "liver",
        "stomach",
        "colon",
        "kidney",
        "spleen",
        "pancreas"
      ],
      "num_samples": 6800,
      "num_annotations": 85000
    },
    "pelvis": {
      "modalities": ["CT", "MRI"],
      "anatomies": [
        "transition zone of prostate",
        "peripheral zone of prostate",
        "bladder"
      ],
      "num_samples": 1200,
      "num_annotations": 15000
    },
    "spine": {
      "modalities": ["CT", "MRI"],
      "anatomies": [
        "lumbar vertebrae",
        "sacrum",
        "intervertebral disc"
      ],
      "num_samples": 2800,
      "num_annotations": 35000
    }
  },
  "modality_stats": {
    "CT": {
      "hu_range": [-500, 1000],
      "typical_spacing": [1.0, 1.0, 3.0]
    },
    "MRI": {
      "intensity_range": [0, 4096],
      "typical_spacing": [1.0, 1.0, 3.0]
    },
    "PET": {
      "intensity_range": [0, 100],
      "typical_spacing": [2.0, 2.0, 2.0]
    }
  },
  "total_scans": 22186,
  "total_annotations": 302033,
  "num_classes": 497
}
# 2. Data Loading and Preprocessing
import numpy as np
import torch
from torch.nn import functional as F
from scipy import ndimage
from typing import Tuple, Dict, Optional
import logging

logger = logging.getLogger(__name__)

class MedicalImagePreprocessor:
    """Preprocessing pipeline for medical images."""
    
    def __init__(self, config: Dict):
        self.config = config
        self.target_spacing = config.get('target_spacing', [1.0, 1.0, 3.0])
        self.target_size = config.get('image_size', [288, 288, 96])
        
    def preprocess_ct(self, image: np.ndarray) -> np.ndarray:
        """Preprocess CT image."""
        # Clamp HU values
        hu_min, hu_max = self.config['preprocessing']['ct_hu_range']
        image = np.clip(image, hu_min, hu_max)
        
        # Normalize
        image = (image - hu_min) / (hu_max - hu_min)
        return image.astype(np.float32)
    
    def preprocess_mri(self, image: np.ndarray) -> np.ndarray:
        """Preprocess MRI image."""
        # Percentile clipping
        p_low, p_high = self.config['preprocessing']['mri_percentile']
        v_low = np.percentile(image, p_low)
        v_high = np.percentile(image, p_high)
        
        image = np.clip(image, v_low, v_high)
        image = (image - v_low) / (v_high - v_low + 1e-5)
        return image.astype(np.float32)
    
    def preprocess_pet(self, image: np.ndarray) -> np.ndarray:
        """Preprocess PET image."""
        p_low, p_high = self.config['preprocessing']['pet_percentile']
        v_low = np.percentile(image, p_low)
        v_high = np.percentile(image, p_high)
        
        image = np.clip(image, v_low, v_high)
        image = (image - v_low) / (v_high - v_low + 1e-5)
        return image.astype(np.float32)
    
    def resample(self, image: np.ndarray, 
                 original_spacing: Tuple, 
                 target_spacing: Tuple) -> np.ndarray:
        """Resample image to target spacing."""
        zoom_factors = tuple(o / t for o, t in zip(original_spacing, target_spacing))
        resampled = ndimage.zoom(image, zoom_factors, order=1)
        return resampled
    
    def crop_or_pad(self, image: np.ndarray, 
                   target_shape: Tuple) -> np.ndarray:
        """Crop or pad image to target shape."""
        current_shape = image.shape
        
        if all(c >= t for c, t in zip(current_shape, target_shape)):
            # Crop from center
            slices = tuple(
                slice((c - t) // 2, (c - t) // 2 + t)
                for c, t in zip(current_shape, target_shape)
            )
            return image[slices]
        else:
            # Pad
            padding = tuple(
                ((t - c) // 2, (t - c) - (t - c) // 2)
                for c, t in zip(current_shape, target_shape)
            )
            return np.pad(image, padding, mode='constant', constant_values=0)
    
    def __call__(self, image: np.ndarray, 
                modality: str, 
                original_spacing: Tuple) -> np.ndarray:
        """Apply full preprocessing pipeline."""
        # Preprocess based on modality
        if modality.upper() == 'CT':
            image = self.preprocess_ct(image)
        elif modality.upper() == 'MRI':
            image = self.preprocess_mri(image)
        elif modality.upper() == 'PET':
            image = self.preprocess_pet(image)
        
        # Resample
        image = self.resample(image, original_spacing, self.target_spacing)
        
        # Crop or pad
        image = self.crop_or_pad(image, self.target_size)
        
        return image


class DataAugmentation:
    """Data augmentation for medical images."""
    
    def __init__(self, config: Dict):
        self.config = config['augmentation']
        
    def random_flip(self, image: np.ndarray, 
                   label: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Random flip along axes."""
        for axis in range(3):
            if np.random.rand() < 0.5:
                image = np.flip(image, axis=axis)
                label = np.flip(label, axis=axis)
        return image, label
    
    def random_rotation(self, image: np.ndarray, 
                       label: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Random rotation."""
        if np.random.rand() < 0.5:
            angle = np.random.uniform(-30, 30)
            image = ndimage.rotate(image, angle, reshape=False, order=1)
            label = ndimage.rotate(label, angle, reshape=False, order=0)
        return image, label
    
    def random_zoom(self, image: np.ndarray, 
                   label: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Random zoom."""
        if np.random.rand() < 0.5:
            zoom_factor = np.random.uniform(0.9, 1.1)
            image = ndimage.zoom(image, zoom_factor, order=1)
            label = ndimage.zoom(label, zoom_factor, order=0)
        return image, label
    
    def intensity_shift(self, image: np.ndarray) -> np.ndarray:
        """Random intensity shift."""
        if np.random.rand() < 0.5:
            shift = np.random.uniform(-0.1, 0.1)
            image = np.clip(image + shift, 0, 1)
        return image
    
    def __call__(self, image: np.ndarray, 
                label: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Apply augmentations."""
        if self.config.get('enable_random_flip', True):
            image, label = self.random_flip(image, label)
        
        if self.config.get('enable_random_rotation', True):
            image, label = self.random_rotation(image, label)
        
        if self.config.get('enable_random_zoom', True):
            image, label = self.random_zoom(image, label)
        
        if self.config.get('enable_intensity_shift', True):
            image = self.intensity_shift(image)
        
        return image, label
# data/knowledge_graph.py
import json
from typing import Dict, List, Tuple, Set
import numpy as np
from collections import defaultdict
import logging

logger = logging.getLogger(__name__)

class AnatomyKnowledgeGraph:
    """Knowledge graph for anatomical concepts."""
    
    def __init__(self, umls_path: str, custom_definitions_path: str):
        """
        Initialize knowledge graph.
        
        Args:
            umls_path: Path to UMLS data
            custom_definitions_path: Path to custom anatomical definitions
        """
        self.concepts = {}  # concept_id -> concept_data
        self.definitions = {}  # concept_id -> definition text
        self.relationships = defaultdict(list)  # concept_id -> [(rel_type, related_id), ...]
        self.concept_to_id = {}  # concept_name -> concept_id
        
        self._load_umls(umls_path)
        self._load_custom_definitions(custom_definitions_path)
    
    def _load_umls(self, umls_path: str):
        """Load UMLS data."""
        try:
            with open(umls_path, 'r') as f:
                umls_data = json.load(f)
            
            for concept_id, concept_data in umls_data['concepts'].items():
                self.concepts[concept_id] = concept_data
                concept_name = concept_data.get('name', '')
                self.concept_to_id[concept_name.lower()] = concept_id
                self.definitions[concept_id] = concept_data.get('definition', '')
            
            for concept_id, relationships in umls_data.get('relationships', {}).items():
                self.relationships[concept_id] = relationships
            
            logger.info(f"Loaded {len(self.concepts)} concepts from UMLS")
        except FileNotFoundError:
            logger.warning(f"UMLS file not found at {umls_path}")
    
    def _load_custom_definitions(self, path: str):
        """Load custom anatomical definitions."""
        try:
            with open(path, 'r') as f:
                custom_defs = json.load(f)
            
            for concept_name, definition in custom_defs['definitions'].items():
                concept_id = self.concept_to_id.get(concept_name.lower())
                if concept_id:
                    self.definitions[concept_id] = definition
                else:
                    # Create new concept
                    concept_id = f"CUSTOM_{len(self.concepts)}"
                    self.concepts[concept_id] = {'name': concept_name}
                    self.definitions[concept_id] = definition
                    self.concept_to_id[concept_name.lower()] = concept_id
            
            logger.info(f"Loaded custom definitions")
        except FileNotFoundError:
            logger.warning(f"Custom definitions file not found at {path}")
    
    def get_concept_definition(self, concept_name: str) -> str:
        """Get definition for a concept."""
        concept_id = self.concept_to_id.get(concept_name.lower())
        if concept_id:
            return self.definitions.get(concept_id, "")
        return ""
    
    def get_related_concepts(self, concept_name: str, 
                            rel_type: Optional[str] = None) -> List[str]:
        """Get related concepts."""
        concept_id = self.concept_to_id.get(concept_name.lower())
        if not concept_id:
            return []
        
        related = []
        for rel_type_stored, related_id in self.relationships.get(concept_id, []):
            if rel_type is None or rel_type == rel_type_stored:
                related_concept = self.concepts.get(related_id, {})
                related.append(related_concept.get('name', ''))
        
        return related
    
    def get_concept_hierarchy(self, concept_name: str, 
                             max_depth: int = 3) -> Dict:
        """Get concept hierarchy (up and down)."""
        concept_id = self.concept_to_id.get(concept_name.lower())
        if not concept_id:
            return {}
        
        hierarchy = {
            'concept': concept_name,
            'definition': self.definitions.get(concept_id, ''),
            'parents': self._get_parents(concept_id, max_depth),
            'children': self._get_children(concept_id, max_depth),
        }
        return hierarchy
    
    def _get_parents(self, concept_id: str, depth: int = 2) -> List[Dict]:
        """Get parent concepts recursively."""
        if depth == 0:
            return []
        
        parents = []
        for rel_type, related_id in self.relationships.get(concept_id, []):
            if rel_type in ['is-a', 'parent_of']:
                related_concept = self.concepts.get(related_id, {})
                parents.append({
                    'concept': related_concept.get('name', ''),
                    'definition': self.definitions.get(related_id, ''),
                    'parents': self._get_parents(related_id, depth - 1)
                })
        return parents
    
    def _get_children(self, concept_id: str, depth: int = 2) -> List[Dict]:
        """Get child concepts recursively."""
        if depth == 0:
            return []
        
        children = []
        for other_id in self.concepts:
            for rel_type, related_id in self.relationships.get(other_id, []):
                if related_id == concept_id and rel_type in ['is-a', 'parent_of']:
                    related_concept = self.concepts.get(other_id, {})
                    children.append({
                        'concept': related_concept.get('name', ''),
                        'definition': self.definitions.get(other_id, ''),
                        'children': self._get_children(other_id, depth - 1)
                    })
        return children


class AtlasEmbedding:
    """Extract visual features from atlas segmentations."""
    
    def __init__(self):
        self.atlas_cache = {}
    
    def extract_roi_features(self, image: np.ndarray, 
                            mask: np.ndarray, 
                            num_scales: int = 5) -> np.ndarray:
        """
        Extract multi-scale ROI features.
        
        Args:
            image: Image array
            mask: Binary mask
            num_scales: Number of scales to extract
            
        Returns:
            Feature vector
        """
        features = []
        
        # Get bounding box
        positions = np.where(mask > 0)
        if len(positions[0]) == 0:
            return np.zeros(256)
        
        # Extract multi-scale features
        for scale in range(num_scales):
            scale_factor = 2 ** scale
            pooled = self._adaptive_pool(image, mask, scale_factor)
            features.append(pooled)
        
        # Concatenate features
        features = np.concatenate(features)
        return features.astype(np.float32)
    
    def _adaptive_pool(self, image: np.ndarray, 
                      mask: np.ndarray, 
                      pool_size: int) -> np.ndarray:
        """Adaptive pooling."""
        masked_image = image * mask
        pool = np.mean(masked_image[mask > 0])
        return np.array([pool])
# data/dataset.py
import torch
from torch.utils.data import Dataset
import numpy as np
import os
import json
from typing import Dict, Tuple, List, Optional
import logging

logger = logging.getLogger(__name__)

class SAT_Dataset(Dataset):
    """SAT Dataset for 3D medical image segmentation."""
    
    def __init__(self, 
                 dataset_dir: str,
                 split: str = 'train',
                 config: Dict = None,
                 preprocessor = None,
                 augmentor = None,
                 knowledge_graph = None):
        """
        Initialize dataset.
        
        Args:
            dataset_dir: Path to dataset directory
            split: 'train', 'val', or 'test'
            config: Configuration dictionary
            preprocessor: Preprocessing pipeline
            augmentor: Data augmentation pipeline
            knowledge_graph: Anatomical knowledge graph
        """
        self.dataset_dir = dataset_dir
        self.split = split
        self.config = config or {}
        self.preprocessor = preprocessor
        self.augmentor = augmentor
        self.knowledge_graph = knowledge_graph
        
        self.samples = []
        self._load_samples()
        
        logger.info(f"Loaded {len(self.samples)} samples for {split} split")
    
    def _load_samples(self):
        """Load sample metadata."""
        split_file = os.path.join(self.dataset_dir, f'{self.split}_split.json')
        
        try:
            with open(split_file, 'r') as f:
                split_data = json.load(f)
            
            for sample_id, sample_info in split_data.items():
                self.samples.append({
                    'id': sample_id,
                    'image_path': sample_info['image_path'],
                    'label_path': sample_info['label_path'],
                    'modality': sample_info.get('modality', 'CT'),
                    'anatomy_labels': sample_info.get('anatomy_labels', []),
                    'spacing': sample_info.get('spacing', [1.0, 1.0, 3.0])
                })
        except FileNotFoundError:
            logger.error(f"Split file not found: {split_file}")
            raise
    
    def __len__(self) -> int:
        return len(self.samples)
    
    def __getitem__(self, idx: int) -> Dict:
        """Get dataset item."""
        sample = self.samples[idx]
        
        # Load image and label
        image = np.load(sample['image_path']).astype(np.float32)
        label = np.load(sample['label_path']).astype(np.int32)
        
        # Preprocess
        if self.preprocessor:
            image = self.preprocessor(
                image,
                sample['modality'],
                sample['spacing']
            )
        
        # Augmentation (train only)
        if self.split == 'train' and self.augmentor:
            image, label = self.augmentor(image, label)
        
        # Convert to tensor
        image = torch.from_numpy(image).unsqueeze(0)  # Add channel dimension
        label = torch.from_numpy(label).long()
        
        # Get text prompts and definitions
        text_prompts = sample['anatomy_labels']
        definitions = []
        
        if self.knowledge_graph:
            definitions = [
                self.knowledge_graph.get_concept_definition(prompt)
                for prompt in text_prompts
            ]
        
        return {
            'image': image,
            'label': label,
            'text_prompts': text_prompts,
            'definitions': definitions,
            'modality': sample['modality'],
            'sample_id': sample['id']
        }


class DataCollator:
    """Custom collate function for batching."""
    
    def __call__(self, batch: List[Dict]) -> Dict:
        """Collate batch."""
        images = torch.stack([item['image'] for item in batch])
        labels = torch.stack([item['label'] for item in batch])
        
        text_prompts = [item['text_prompts'] for item in batch]
        definitions = [item['definitions'] for item in batch]
        modalities = [item['modality'] for item in batch]
        sample_ids = [item['sample_id'] for item in batch]
        
        return {
            'images': images,
            'labels': labels,
            'text_prompts': text_prompts,
            'definitions': definitions,
            'modalities': modalities,
            'sample_ids': sample_ids
        }
# 3. Model Architecture
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
import logging

logger = logging.getLogger(__name__)

class TextEncoder(nn.Module):
    """Knowledge-enhanced text encoder."""
    
    def __init__(self, config: Dict):
        super().__init__()
        self.config = config
        self.hidden_dim = config['hidden_dim']
        self.embedding_dim = config.get('embedding_dim', 768)
        
        # Load pretrained BioBERT
        model_name = config.get('pretrained_model', 'dmis-lab/biobert-base-cased')
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.encoder = AutoModel.from_pretrained(model_name)
        
        # Projection layer
        self.projection = nn.Linear(self.hidden_dim, self.embedding_dim)
        self.norm = nn.LayerNorm(self.embedding_dim)
        
        # Freeze layers if specified
        num_freeze = config.get('freeze_layers', 0)
        if num_freeze > 0:
            for i, param in enumerate(self.encoder.parameters()):
                if i < num_freeze * len(list(self.encoder.parameters())) // config['num_layers']:
                    param.requires_grad = False
    
    def forward(self, texts: List[str]) -> torch.Tensor:
        """
        Encode text prompts.
        
        Args:
            texts: List of text prompts
            
        Returns:
            Encoded text embeddings [batch_size, embedding_dim]
        """
        # Tokenize
        encoded = self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=512,
            return_tensors='pt'
        )
        
        # Encode with BERT
        outputs = self.encoder(
            input_ids=encoded['input_ids'],
            attention_mask=encoded['attention_mask'],
            return_dict=True
        )
        
        # Use [CLS] token representation
        cls_output = outputs.last_hidden_state[:, 0, :]  # [batch_size, hidden_dim]
        
        # Project to embedding dimension
        embeddings = self.projection(cls_output)
        embeddings = self.norm(embeddings)
        
        return embeddings


class VisualEncoder(nn.Module):
    """3D U-Net visual encoder."""
    
    def __init__(self, config: Dict):
        super().__init__()
        self.config = config
        self.in_channels = config.get('in_channels', 1)
        self.num_features = config.get('num_features', 64)
        self.depth = config.get('depth', 5)
        
        self.encoder_blocks = nn.ModuleList()
        self.pool = nn.MaxPool3d(2, 2)
        
        in_channels = self.in_channels
        for i in range(self.depth):
            out_channels = self.num_features * (2 ** i)
            self.encoder_blocks.append(
                ConvBlock3D(in_channels, out_channels)
            )
            in_channels = out_channels
    
    def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
        """
        Encode image.
        
        Args:
            x: Input image [batch_size, 1, D, H, W]
            
        Returns:
            List of multi-scale features
        """
        features = []
        
        for i, block in enumerate(self.encoder_blocks):
            x = block(x)
            features.append(x)
            if i < len(self.encoder_blocks) - 1:
                x = self.pool(x)
        
        return features


class ConvBlock3D(nn.Module):
    """3D Convolution block."""
    
    def __init__(self, in_channels: int, out_channels: int):
        super().__init__()
        self.conv1 = nn.Conv3d(in_channels, out_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm3d(out_channels)
        self.conv2 = nn.Conv3d(out_channels, out_channels, 3, padding=1)
        self.bn2 = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        return x
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Dict

class QueryDecoder(nn.Module):
    """Transformer-based query decoder."""
    
    def __init__(self, config: Dict):
        super().__init__()
        self.config = config
        self.hidden_dim = config.get('hidden_dim', 512)
        self.num_heads = config.get('num_heads', 8)
        self.num_layers = config.get('num_layers', 6)
        self.ffn_dim = config.get('ffn_dim', 2048)
        self.dropout = config.get('dropout', 0.1)
        
        self.transformer_decoder = nn.TransformerDecoder(
            nn.TransformerDecoderLayer(
                d_model=self.hidden_dim,
                nhead=self.num_heads,
                dim_feedforward=self.ffn_dim,
                dropout=self.dropout,
                batch_first=True
            ),
            num_layers=self.num_layers
        )
    
    def forward(self, 
                text_embedding: torch.Tensor,
                visual_features: List[torch.Tensor]) -> torch.Tensor:
        """
        Decode query with visual context.
        
        Args:
            text_embedding: Text embedding [batch_size, embedding_dim]
            visual_features: Multi-scale visual features
            
        Returns:
            Updated query embedding [batch_size, embedding_dim]
        """
        # Prepare visual key-value
        visual_kv = []
        for feature in visual_features:
            # Flatten spatial dimensions
            batch_size, channels, *spatial = feature.shape
            feature_flat = feature.reshape(batch_size, channels, -1).permute(0, 2, 1)
            visual_kv.append(feature_flat)
        
        # Concatenate visual features
        visual_kv = torch.cat(visual_kv, dim=1)  # [batch_size, total_tokens, channels]
        
        # Use transformer decoder
        # text_embedding as query, visual_features as key and value
        text_embedding = text_embedding.unsqueeze(1)  # [batch_size, 1, embedding_dim]
        
        query = self.transformer_decoder(
            text_embedding,
            visual_kv,
            memory_key_padding_mask=None
        )
        
        return query.squeeze(1)  # [batch_size, embedding_dim]


class VisualDecoder(nn.Module):
    """3D U-Net visual decoder."""
    
    def __init__(self, config: Dict):
        super().__init__()
        self.config = config
        self.num_features = config.get('num_features', 64)
        self.depth = config.get('depth', 5)
        self.out_channels = config.get('out_channels', 256)
        
        self.decoder_blocks = nn.ModuleList()
        self.upsample = nn.Upsample(scale_factor=2, mode='trilinear', align_corners=True)
        
        for i in range(self.depth - 1, 0, -1):
            in_channels = self.num_features * (2 ** i)
            out_channels = self.num_features * (2 ** (i - 1))
            self.decoder_blocks.append(
                ConvBlock3D(in_channels + out_channels, out_channels)
            )
        
        # Final dense feature layer
        self.final_conv = nn.Sequential(
            nn.Conv3d(self.num_features, self.out_channels, 1),
            nn.BatchNorm3d(self.out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, encoder_features: List[torch.Tensor]) -> torch.Tensor:
        """
        Decode features.
        
        Args:
            encoder_features: List of encoder features from coarse to fine
            
        Returns:
            Dense feature map [batch_size, out_channels, D, H, W]
        """
        x = encoder_features[-1]
        
        for i, decoder_block in enumerate(self.decoder_blocks):
            x = self.upsample(x)
            skip = encoder_features[-(i + 2)]
            x = torch.cat([x, skip], dim=1)
            x = decoder_block(x)
        
        x = self.final_conv(x)
        return x
# models/sat_model.py

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Tuple, Optional
import logging

from .encoders import TextEncoder, VisualEncoder
from .decoders import QueryDecoder, VisualDecoder

logger = logging.getLogger(__name__)

class SAT(nn.Module):
    """Segment Anything with Text - Main Model."""
    
    def __init__(self, config: Dict):
        super().__init__()
        self.config = config
        
        # Initialize encoders
        self.text_encoder = TextEncoder(config['model']['text_encoder'])
        self.visual_encoder = VisualEncoder(config['model']['visual_encoder'])
        
        # Initialize decoders
        self.query_decoder = QueryDecoder(config['model']['query_decoder'])
        self.visual_decoder = VisualDecoder(config['model']['visual_decoder'])
        
        # Mask generator
        self.mask_generator = nn.Sequential(
            nn.Linear(config['model']['visual_decoder']['out_channels'], 256),
            nn.ReLU(),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )
        
        # Knowledge injection parameters
        self.use_knowledge = config['model']['knowledge_injection']['enabled']
        self.contrastive_temp = config['model']['knowledge_injection']['contrastive_temp']
    
    def forward(self, 
                images: torch.Tensor,
                text_prompts: List[str],
                definitions: Optional[List[str]] = None,
                labels: Optional[torch.Tensor] = None) -> Dict:
        """
        Forward pass.
        
        Args:
            images: Image batch [batch_size, 1, D, H, W]
            text_prompts: Text prompts for segmentation
            definitions: Anatomical definitions (for knowledge injection)
            labels: Ground truth labels (optional)
            
        Returns:
            Dictionary with predictions and losses
        """
        batch_size = images.shape[0]
        device = images.device
        
        # Encode text
        text_embeddings = self.text_encoder(text_prompts)  # [batch_size, embedding_dim]
        
        # Encode image
        visual_features = self.visual_encoder(images)  # List of features
        
        # Decode query with visual context
        query_features = self.query_decoder(
            text_embeddings,
            visual_features
        )  # [batch_size, embedding_dim]
        
        # Decode visual features
        dense_features = self.visual_decoder(visual_features)  # [batch_size, out_channels, D, H, W]
        
        # Generate segmentation
        out_channels = dense_features.shape[1]
        dense_features_reshaped = dense_features.permute(0, 2, 3, 4, 1).reshape(-1, out_channels)
        query_features_repeated = query_features.unsqueeze(-1).unsqueeze(-1).unsqueeze(-1)
        query_features_repeated = query_features_repeated.repeat(
            1, 1, dense_features.shape[2], dense_features.shape[3], dense_features.shape[4]
        )
        query_features_reshaped = query_features_repeated.permute(0, 2, 3, 4, 1).reshape(-1, out_channels)
        
        # Element-wise multiplication
        dot_product = (dense_features_reshaped * query_features_reshaped).sum(dim=1)
        mask = torch.sigmoid(dot_product)
        
        # Reshape to spatial dimensions
        spatial_shape = dense_features.shape[2:]
        mask = mask.reshape(batch_size, *spatial_shape)
        
        output = {
            'mask': mask,
            'text_embedding': text_embeddings,
            'query_feature': query_features,
            'dense_features': dense_features
        }
        
        # Calculate losses if labels provided
        if labels is not None:
            losses = self._calculate_losses(
                mask, labels, text_embeddings, definitions
            )
            output['losses'] = losses
        
        return output
    
    def _calculate_losses(self,
                         mask: torch.Tensor,
                         labels: torch.Tensor,
                         text_embeddings: torch.Tensor,
                         definitions: Optional[List[str]]) -> Dict:
        """Calculate segmentation and knowledge losses."""
        losses = {}
        
        # Segmentation loss
        mask_flat = mask.flatten()
        labels_flat = labels.flatten().float()
        
        bce_loss = F.binary_cross_entropy(mask_flat, labels_flat)
        
        # Dice loss
        intersection = (mask_flat * labels_flat).sum()
        union = mask_flat.sum() + labels_flat.sum()
        dice_loss = 1 - (2 * intersection + 1e-6) / (union + 1e-6)
        
        segmentation_loss = bce_loss + dice_loss
        losses['segmentation'] = segmentation_loss
        
        # Knowledge loss (if using knowledge injection)
        if self.use_knowledge and definitions:
            knowledge_loss = self._calculate_knowledge_loss(
                text_embeddings, definitions
            )
            losses['knowledge'] = knowledge_loss
        
        return losses
    
    def _calculate_knowledge_loss(self,
                                 text_embeddings: torch.Tensor,
                                 definitions: List[str]) -> torch.Tensor:
        """Calculate contrastive knowledge loss."""
        # Encode definitions
        def_embeddings = self.text_encoder(definitions)
        
        # Normalize embeddings
        text_norm = F.normalize(text_embeddings, dim=-1)
        def_norm = F.normalize(def_embeddings, dim=-1)
        
        # Compute similarity matrix
        similarity = torch.matmul(text_norm, def_norm.T)  # [batch_size, batch_size]
        similarity = similarity / self.contrastive_temp
        
        # Contrastive loss
        batch_size = text_embeddings.shape[0]
        labels = torch.arange(batch_size, device=text_embeddings.device)
        
        loss_forward = F.cross_entropy(similarity, labels)
        loss_backward = F.cross_entropy(similarity.T, labels)
        
        loss = (loss_forward + loss_backward) / 2
        return loss


class SAT_Nano(SAT):
    """SAT-Nano variant with reduced parameters."""
    
    def __init__(self, config: Dict):
        # Reduce model sizes
        config['model']['text_encoder']['hidden_dim'] = 512
        config['model']['visual_encoder']['num_features'] = 32
        config['model']['query_decoder']['hidden_dim'] = 256
        config['model']['visual_decoder']['out_channels'] = 128
        
        super().__init__(config)


class SAT_Pro(SAT):
    """SAT-Pro variant with full capacity."""
    
    def __init__(self, config: Dict):
        # Use default full sizes
        super().__init__(config)

# 4. Training Pipeline
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict

class DiceLoss(nn.Module):
    """Dice Loss for segmentation."""
    
    def __init__(self, smooth: float = 1e-6):
        super().__init__()
        self.smooth = smooth
    
    def forward(self, pred: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
        pred_flat = pred.flatten()
        target_flat = target.flatten().float()
        
        intersection = (pred_flat * target_flat).sum()
        union = pred_flat.sum() + target_flat.sum()
        
        dice = (2 * intersection + self.smooth) / (union + self.smooth)
        return 1 - dice


class CombinedSegmentationLoss(nn.Module):
    """Combined BCE + Dice loss."""
    
    def __init__(self, bce_weight: float = 0.5, dice_weight: float = 0.5):
        super().__init__()
        self.bce_weight = bce_weight
        self.dice_weight = dice_weight
        self.bce_loss = nn.BCELoss()
        self.dice_loss = DiceLoss()
    
    def forward(self, pred: torch.Tensor, target: torch.Tensor) -> torch.Tensor:
        bce = self.bce_loss(pred, target.float())
        dice = self.dice_loss(pred, target)
        return self.bce_weight * bce + self.dice_weight * dice


class ContrastiveKnowledgeLoss(nn.Module):
    """Contrastive learning loss for knowledge injection."""
    
    def __init__(self, temperature: float = 0.07):
        super().__init__()
        self.temperature = temperature
    
    def forward(self, 
                text_embeddings: torch.Tensor,
                definition_embeddings: torch.Tensor) -> torch.Tensor:
        """
        Contrastive loss between text and definition embeddings.
        """
        # Normalize
        text_norm = F.normalize(text_embeddings, dim=-1)
        def_norm = F.normalize(definition_embeddings, dim=-1)
        
        # Similarity matrix
        similarity = torch.matmul(text_norm, def_norm.T) / self.temperature
        
        batch_size = text_embeddings.shape[0]
        labels = torch.arange(batch_size, device=text_embeddings.device)
        
        loss_forward = F.cross_entropy(similarity, labels)
        loss_backward = F.cross_entropy(similarity.T, labels)
        
        return (loss_forward + loss_backward) / 2


class CombinedLoss(nn.Module):
    """Combined segmentation and knowledge loss."""
    
    def __init__(self, config: Dict):
        super().__init__()
        self.segmentation_loss = CombinedSegmentationLoss()
        self.knowledge_loss = ContrastiveKnowledgeLoss(
            temperature=config['model']['knowledge_injection']['contrastive_temp']
        )
        self.seg_weight = config['training']['loss_weights']['segmentation']
        self.know_weight = config['training']['loss_weights']['knowledge']
        self.use_knowledge = config['model']['knowledge_injection']['enabled']
    
    def forward(self, 
                predictions: Dict,
                targets: torch.Tensor,
                definition_embeddings: torch.Tensor = None) -> torch.Tensor:
        """Compute combined loss."""
        seg_loss = self.segmentation_loss(predictions['mask'], targets)
        
        total_loss = self.seg_weight * seg_loss
        
        if self.use_knowledge and definition_embeddings is not None:
            know_loss = self.knowledge_loss(
                predictions['text_embedding'],
                definition_embeddings
            )
            total_loss += self.know_weight * know_loss
        
        return total_loss

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import logging
from pathlib import Path
from typing import Dict, Optional
import json

from ..models.sat_model import SAT, SAT_Pro, SAT_Nano
from .losses import CombinedLoss
from ..evaluation.metrics import DiceScore, NormalizedSurfaceDistance

logger = logging.getLogger(__name__)

class Trainer:
    """Training framework for SAT model."""
    
    def __init__(self, config: Dict, device: str = 'cuda'):
        self.config = config
        self.device = device
        
        # Initialize model
        model_variant = config['model'].get('variant', 'pro')
        if model_variant == 'nano':
            self.model = SAT_Nano(config).to(device)
        else:
            self.model = SAT_Pro(config).to(device)
        
        # Initialize loss
        self.criterion = CombinedLoss(config)
        
        # Initialize optimizer
        self._setup_optimizer()
        
        # Initialize scheduler
        self._setup_scheduler()
        
        # Metrics
        self.dice_metric = DiceScore()
        self.nsd_metric = NormalizedSurfaceDistance()
        
        # Checkpointing
        self.checkpoint_dir = Path(config['training']['checkpoint_dir'])
        self.checkpoint_dir.mkdir(exist_ok=True)
    
    def _setup_optimizer(self):
        """Setup optimizer."""
        optimizer_name = self.config['training'].get('optimizer', 'adamw')
        lr = self.config['training']['learning_rate']
        wd = self.config['training']['weight_decay']
        
        if optimizer_name == 'adamw':
            self.optimizer = optim.AdamW(
                self.model.parameters(),
                lr=lr,
                weight_decay=wd
            )
        elif optimizer_name == 'sgd':
            self.optimizer = optim.SGD(
                self.model.parameters(),
                lr=lr,
                weight_decay=wd,
                momentum=0.9
            )
        else:
            raise ValueError(f"Unknown optimizer: {optimizer_name}")
    
    def _setup_scheduler(self):
        """Setup learning rate scheduler."""
        scheduler_name = self.config['training'].get('scheduler', 'cosine')
        num_epochs = self.config['training']['num_epochs']
        warmup_epochs = self.config['training']['warmup_epochs']
        
        if scheduler_name == 'cosine':
            self.scheduler = optim.lr_scheduler.CosineAnnealingLR(
                self.optimizer,
                T_max=num_epochs - warmup_epochs
            )
        elif scheduler_name == 'linear':
            self.scheduler = optim.lr_scheduler.LinearLR(
                self.optimizer,
                total_iters=num_epochs
            )
        else:
            raise ValueError(f"Unknown scheduler: {scheduler_name}")
    
    def train_epoch(self, train_loader: DataLoader) -> Dict:
        """Train one epoch."""
        self.model.train()
        losses = []
        dice_scores = []
        
        for batch_idx, batch in enumerate(train_loader):
            images = batch['images'].to(self.device)
            labels = batch['labels'].to(self.device)
            text_prompts = batch['text_prompts']
            definitions = batch['definitions']
            
            # Forward pass
            outputs = self.model(images, text_prompts, definitions, labels)
            
            # Calculate loss
            loss = self.criterion(outputs, labels)
            
            # Backward pass
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
            self.optimizer.step()
            
            losses.append(loss.item())
            
            # Calculate metrics
            dice = self.dice_metric(outputs['mask'], labels)
            dice_scores.append(dice)
            
            if batch_idx % self.config['logging']['log_interval'] == 0:
                logger.info(
                    f"Batch {batch_idx}/{len(train_loader)}: "
                    f"Loss = {loss.item():.4f}, Dice = {dice:.4f}"
                )
        
        return {
            'loss': sum(losses) / len(losses),
            'dice': sum(dice_scores) / len(dice_scores)
        }
    
    def validate(self, val_loader: DataLoader) -> Dict:
        """Validate model."""
        self.model.eval()
        losses = []
        dice_scores = []
        nsd_scores = []
        
        with torch.no_grad():
            for batch in val_loader:
                images = batch['images'].to(self.device)
                labels = batch['labels'].to(self.device)
                text_prompts = batch['text_prompts']
                definitions = batch['definitions']
                
                outputs = self.model(images, text_prompts, definitions)
                
                # Evaluate
                dice = self.dice_metric(outputs['mask'], labels)
                nsd = self.nsd_metric(outputs['mask'], labels)
                
                dice_scores.append(dice)
                nsd_scores.append(nsd)
        
        return {
            'dice': sum(dice_scores) / len(dice_scores),
            'nsd': sum(nsd_scores) / len(nsd_scores)
        }
    
    def train(self, train_loader: DataLoader, val_loader: DataLoader):
        """Full training loop."""
        num_epochs = self.config['training']['num_epochs']
        best_dice = 0
        
        for epoch in range(num_epochs):
            logger.info(f"Epoch {epoch + 1}/{num_epochs}")
            
            # Train
            train_metrics = self.train_epoch(train_loader)
            logger.info(f"Train - Loss: {train_metrics['loss']:.4f}, "
                       f"Dice: {train_metrics['dice']:.4f}")
            
            # Validate
            if (epoch + 1) % self.config['evaluation']['val_interval'] == 0:
                val_metrics = self.validate(val_loader)
                logger.info(f"Val - Dice: {val_metrics['dice']:.4f}, "
                           f"NSD: {val_metrics['nsd']:.4f}")
                
                # Save checkpoint
                if val_metrics['dice'] > best_dice:
                    best_dice = val_metrics['dice']
                    self._save_checkpoint(epoch, val_metrics)
            
            # Update scheduler
            self.scheduler.step()
    
    def _save_checkpoint(self, epoch: int, metrics: Dict):
        """Save model checkpoint."""
        checkpoint = {
            'epoch': epoch,
            'model_state': self.model.state_dict(),
            'optimizer_state': self.optimizer.state_dict(),
            'metrics': metrics,
            'config': self.config
        }
        
        path = self.checkpoint_dir / f"checkpoint_epoch_{epoch}.pt"
        torch.save(checkpoint, path)
        logger.info(f"Saved checkpoint: {path}")
    
    def load_checkpoint(self, path: str):
        """Load model checkpoint."""
        checkpoint = torch.load(path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state'])
        logger.info(f"Loaded checkpoint from {path}")
        return checkpoint['epoch']
# 5. Evaluation and Inference
import torch
import torch.nn.functional as F
import numpy as np
from scipy.spatial.distance import directed_hausdorff
from typing import Tuple

class DiceScore:
    """Dice Similarity Coefficient."""
    
    def __init__(self, smooth: float = 1e-6):
        self.smooth = smooth
    
    def __call__(self, pred: torch.Tensor, target: torch.Tensor) -> float:
        """Calculate Dice score."""
        pred_binary = (pred > 0.5).float()
        target_binary = target.float()
        
        intersection = (pred_binary * target_binary).sum()
        union = pred_binary.sum() + target_binary.sum()
        
        dice = (2 * intersection + self.smooth) / (union + self.smooth)
        return dice.item()


class NormalizedSurfaceDistance:
    """Normalized Surface Distance metric."""
    
    def __init__(self, tau: int = 1):
        self.tau = tau
    
    def __call__(self, pred: torch.Tensor, target: torch.Tensor) -> float:
        """Calculate NSD metric."""
        pred_np = pred.cpu().numpy()
        target_np = target.cpu().numpy()
        
        # Get boundaries
        pred_boundary = self._get_boundary(pred_np > 0.5)
        target_boundary = self._get_boundary(target_np > 0)
        
        if len(pred_boundary) == 0 or len(target_boundary) == 0:
            return 0.0
        
        # Calculate directed Hausdorff distances
        dist1 = directed_hausdorff(pred_boundary, target_boundary)[0]
        dist2 = directed_hausdorff(target_boundary, pred_boundary)[0]
        
        # Normalized surface distance
        denom = max(dist1, dist2)
        if denom == 0:
            return 1.0
        
        nsd = 1 - min(dist1, dist2) / denom
        return nsd
    
    def _get_boundary(self, mask: np.ndarray) -> np.ndarray:
        """Get boundary voxels."""
        boundary = np.zeros_like(mask)
        for i in range(mask.shape[0]):
            for j in range(mask.shape[1]):
                for k in range(mask.shape[2]):
                    if mask[i, j, k]:
                        # Check if on boundary
                        neighbors = mask[
                            max(0, i-1):min(mask.shape[0], i+2),
                            max(0, j-1):min(mask.shape[1], j+2),
                            max(0, k-1):min(mask.shape[2], k+2)
                        ]
                        if not neighbors.all():
                            boundary[i, j, k] = True
        
        return np.argwhere(boundary)
import torch
import yaml
import logging
import json
from pathlib import Path
from torch.utils.data import DataLoader

from config import load_config
from data.dataset import SAT_Dataset, DataCollator
from data.preprocessing import MedicalImagePreprocessor, DataAugmentation
from data.knowledge_graph import AnatomyKnowledgeGraph
from training.trainer import Trainer
from utils.logger import setup_logging

def main():
    """Main training function."""
    # Setup logging
    setup_logging()
    logger = logging.getLogger(__name__)
    
    # Load configuration
    config = load_config('config/config.yaml')
    logger.info(f"Loaded configuration")
    
    # Setup device
    device = 'cuda' if torch.cuda.is_available() and config['device']['cuda'] else 'cpu'
    logger.info(f"Using device: {device}")
    
    # Initialize preprocessor and augmentor
    preprocessor = MedicalImagePreprocessor(config['data'])
    augmentor = DataAugmentation(config['data'])
    
    # Initialize knowledge graph
    kg = AnatomyKnowledgeGraph(
        umls_path='data/umls_data.json',
        custom_definitions_path='data/anatomical_definitions.json'
    )
    
    # Create datasets
    logger.info("Creating datasets...")
    train_dataset = SAT_Dataset(
        dataset_dir=config['data']['dataset_path'],
        split='train',
        config=config,
        preprocessor=preprocessor,
        augmentor=augmentor,
        knowledge_graph=kg
    )
    
    val_dataset = SAT_Dataset(
        dataset_dir=config['data']['dataset_path'],
        split='val',
        config=config,
        preprocessor=preprocessor,
        augmentor=None,
        knowledge_graph=kg
    )
    
    # Create dataloaders
    collate_fn = DataCollator()
    train_loader = DataLoader(
        train_dataset,
        batch_size=config['data']['batch_size'],
        shuffle=True,
        num_workers=config['data']['num_workers'],
        collate_fn=collate_fn
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=config['data']['batch_size'],
        shuffle=False,
        num_workers=config['data']['num_workers'],
        collate_fn=collate_fn
    )
    
    logger.info(f"Train loader: {len(train_loader)} batches")
    logger.info(f"Val loader: {len(val_loader)} batches")
    
    # Initialize trainer
    trainer = Trainer(config, device=device)
    
    # Train
    logger.info("Starting training...")
    trainer.train(train_loader, val_loader)
    
    logger.info("Training completed!")

if __name__ == '__main__':
    main()

Related posts, You May like to read

  1. 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
  2. 7 Revolutionary Breakthroughs in Medical Image Translation (And 1 Fatal Flaw That Could Derail Your AI Model)
  3. TimeDistill: Revolutionizing Time Series Forecasting with Cross-Architecture Knowledge Distillation
  4. HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
  5. GeoSAM2 3D Part Segmentation — Prompt-Controllable, Geometry-Aware Masks for Precision 3D Editing
  6. Probabilistic Smooth Attention for Deep Multiple Instance Learning in Medical Imaging
  7. A Knowledge Distillation-Based Approach to Enhance Transparency of Classifier Models
  8. Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
  9. Discrete Migratory Bird Optimizer with Deep Transfer Learning for Multi-Retinal Disease Detection

Leave a Comment

Your email address will not be published. Required fields are marked *

Follow by Email
Tiktok