KGMgT: Revolutionary AI-Powered Cardiac MRI Reconstruction Achieves 10× Faster Scanning with Diagnostic-Quality Imaging

KGMgT: Revolutionary AI-Powered Cardiac MRI Reconstruction Achieves 10× Faster Scanning with Diagnostic-Quality Imaging

Medical imaging stands at the threshold of a transformative era where artificial intelligence doesn’t merely assist radiologists—it fundamentally reimagines what’s possible in diagnostic speed and precision. Cardiac magnetic resonance imaging (CMR), long considered the gold standard for evaluating heart function, has been constrained by a persistent challenge: the trade-off between image quality and scan duration. Patients enduring lengthy examinations inside claustrophobia-inducing MRI tubes, the discomfort of prolonged breath-holds, and the ever-present risk of motion artifacts degrading critical diagnostic information.

Enter KGMgT (Knowledge-Guided Multi-Geometric Window Transformer), a groundbreaking deep learning architecture that’s shattering these limitations. Developed by researchers from Fudan University, Harvard Medical School, and Hong Kong Polytechnic University, this innovative framework achieves state-of-the-art cardiac cine MRI reconstruction at acceleration factors up to 10×—reducing scan times from minutes to mere seconds while preserving the anatomical fidelity clinicians demand for accurate diagnoses.

The Critical Challenge: Why Traditional Cardiac MRI Falls Short

Cardiac cine MRI captures the dynamic motion of the beating heart through sequences of high-resolution images, enabling physicians to assess ejection fraction, wall motion abnormalities, and valvular function with unparalleled precision. However, the fundamental physics of MRI creates an inescapable dilemma:

  • Extended scan times cause patient discomfort and increase likelihood of motion artifacts
  • Breath-hold requirements exclude patients with respiratory compromise or arrhythmias
  • Spatial-temporal resolution trade-offs force clinicians to choose between image clarity and temporal fidelity

Traditional compressed sensing approaches like k-t FOCUSS attempted to address these issues through mathematical optimization, but their iterative nature demanded substantial computational resources and reconstruction times incompatible with clinical workflows.

Deep learning initially offered promise through static image reconstruction methods—MoDL, KIGAN, and SwinIR among them—but these approaches treated each cardiac frame in isolation, ignoring the rich temporal correlations that define cardiac function. The heart’s rhythmic contraction and relaxation creates predictable motion patterns across adjacent frames, information that remained largely untapped by early deep learning solutions.

KGMgT’s Three-Pillar Architecture: Knowledge, Attention, and Geometry

The KGMgT framework represents a paradigm shift in how AI systems approach dynamic medical imaging. Rather than treating cardiac cine reconstruction as a series of independent image restoration tasks, the architecture implements three interconnected innovations that collectively model the heart’s spatiotemporal dynamics.

Pillar 1: Knowledge-Guided Mechanism (KgM)

At the heart of KGMgT lies a sophisticated mentorship system inspired by knowledge distillation principles. The architecture employs dual networks—a Mentor network and a Learner network—that engage in continuous knowledge transfer:

Key Innovation: The Mentor network processes the target frame alongside its preceding reference frame, extracting deep spatial features and encoding motion priors. These learned representations then guide the Learner network, which combines this knowledge with the succeeding reference frame to reconstruct the target image.

This self-distillation process creates a cross-stage feature constraint system that prevents information drift during training. The mathematical formulation captures this relationship:

\[ \mathrm{Input}_{\tau}^{kg} = K_m\!\left( \mathrm{Input}_{\tau},\, \mathrm{Ref}_{\tau-1} \right) + \mathrm{Input}_{\tau} \] \[ \mathrm{Rec}(M \mid L) = K_l\!\left( \mathrm{Input}_{\tau}^{kg},\, \mathrm{Ref}_{\tau+1},\, \mathrm{FM}^{kg} \right) \]

Where Km​ and Kl represent the knowledge guidance operations for Mentor and Learner networks respectively, and FMkg​ denotes the multi-scale prior features transferred between networks.

Pillar 2: Adaptive Spatiotemporal Attention (ASA)

The ASA module addresses a critical limitation in previous approaches: the failure to explicitly model motion trajectories between adjacent cardiac frames. Rather than assuming uniform temporal correlations across the entire image sequence, ASA implements dynamic image registration that adapts to local cardiac motion.

The mechanism operates through three sequential processes:

  1. Correlation Matching: Query and Key features from adjacent frames undergo local comparison to generate similarity maps
  2. Optical Flow Estimation: Displacement information captures temporal correlations through differentiable warping operations
  3. Deformable Feature Aggregation: A Deformable Convolutional Network (DCN) adaptively samples features based on computed motion fields

The optical flow computation follows:

\[ \mathrm{flow}_{\tau}(x,y) = \mathrm{pad}\!\left( \begin{bmatrix} \delta O_{\tau}(x,y) \bmod w \\ \delta O_{\tau}(x,y) \div h \end{bmatrix} – \begin{bmatrix} \mathrm{grid}_x(x,y) \\ \mathrm{grid}_y(x,y) \end{bmatrix} \right) \]

Where δOτ​ represents displacement indices, and the Shift operation generates n=9 offset versions (3×3 neighborhood) for robust feature sampling. This enables the model to track myocardial deformation through the cardiac cycle with sub-pixel precision.

Pillar 3: Multi-Geometric Window Transformer (MGwinT)

Cardiac tissues exhibit similarity patterns at multiple scales—local myocardial texture resembles neighboring regions, while global chamber geometries follow predictable anatomical templates. KGMgT captures these relationships through parallel attention mechanisms using differently shaped receptive fields:Table

Window TypeGeometryPurposeImplementation
SW-MSASquare ( β × β )Local texture modelingCyclic shifting by [2β​,2β​] pixels
RW-MSARectangular (vertical/horizontal)Global structure captureAlternating orientation across scales

The square window multi-head self-attention (SW-MSA) employs regular partitioning with cyclic shifts to enable cross-window information flow, while the rectangular window variant (RW-MSA) maintains global modeling capacity without quadratic complexity growth. At each Transformer-driven Dynamic Feature Aggregation (TdFA) block, features concatenate as:

\[ F_{\mu+1} = \operatorname{Conv}\!\left( \varsigma\!\left( \operatorname{Conv}\!\left( C\!\left( F_{\mu},\, \mathrm{FM}^{kg},\, D_{\mu} \right) \right) \right) + F_{\mu} \right) \]

Where C denotes concatenation, ς represents the MGwinT operation with n=4 stacked blocks, and residual connections stabilize training across deep architectures.

Data Consistency: Bridging the Frequency-Domain Gap

A critical innovation ensuring clinical reliability is KGMgT’s explicit data consistency (DC) constraint in k-space. Deep learning reconstructions risk hallucinating anatomically plausible but physically incorrect details. The DC module anchors the reconstruction to actual acquired measurements through Fourier-domain enforcement:

\[ K_{dc}[m,n] = \begin{cases} \dfrac{K_{\mathrm{Rec}}[m,n]}{1 + \eta K_{\mathrm{Rec}}[m,n]} + \eta K[m,n], & \text{if } (m,n) \notin M, \\[6pt] K[m,n], & \text{if } (m,n) \in M \end{cases} \]

Here, M represents the sampling mask, η≥0 controls noise regularization, and the hybrid approach ensures unsampled regions benefit from neural network estimation while measured k-space data maintains fidelity to raw acquisitions. This physics-informed regularization distinguishes KGMgT from pure learning-based approaches that may deviate from measurement constraints.

Comprehensive Training Strategy

KGMgT’s optimization employs a three-component loss function balancing pixel-accurate reconstruction with perceptual quality:Table

\[ \begin{array}{|l|l|l|} \hline \textbf{Loss Component} & \textbf{Mathematical Form} & \textbf{Purpose} \\ \hline \text{L1 Pixel Loss} & L_{\text{img}} = \left\lVert \mathrm{Rec}_{\tau} – \mathrm{GT}_{\tau} \right\rVert_{1} & \text{Ensures point-wise fidelity} \\ \hline \text{Perceptual Loss} & L_{\text{vgg}} = \frac{1}{N} \sum_{i=1}^{N} \left\lVert F(\mathrm{Rec}_{\tau})_{i} – F(\mathrm{GT}_{\tau})_{i} \right\rVert_{2} & \text{Matches high-level features (VGG19)} \\ \hline \text{Adversarial Loss} & L_{G,D} = \mathbb{E}_{\text{gt}}\!\left[D(\mathrm{GT}_{\tau})\right] – \mathbb{E}_{\text{rec}}\!\left[D(\mathrm{Rec}_{\tau})\right] & \text{Enhances natural image statistics} \\ \hline \end{array} \]

The combined objective Lall​ prioritizes reconstruction accuracy while leveraging adversarial training for texture realism.

\[ L_{\text{all}} = L_{\text{img}} + \lambda_{1} L_{\text{vgg}} + \lambda_{2} L_{G,D}, \quad \text{where } \lambda_{1} = 10^{-4} \text{ and } \lambda_{2} = 10^{-6}. \]

Clinical Validation: Quantitative Excellence and Radiologist Approval

Extensive evaluation across multiple datasets demonstrates KGMgT’s superiority:

Quantitative Performance (PSNR in dB, higher is better):

DatasetAccelerationKGMgTBest CompetitorImprovement
Cine-Sax (In-house)32.9731.91 (MoDL)+1.06 dB
Cine-Sax (In-house)10×30.2728.89 (MoDL)+1.38 dB
Cine-Sax* (CMRxRecon)31.4630.19 (MoDL)+1.27 dB
Cine-Lax* (CMRxRecon)32.5930.47 (MoDL)+2.12 dB

Critical Achievement: At 10× acceleration—where scan times reduce to one-tenth of conventional acquisitions—KGMgT maintains PSNR above 30 dB across all test sets, a threshold generally considered acceptable for clinical interpretation.

Blinded Radiologist Evaluation (5-point Likert scale):

Assessment CategoryKGMgT ScoreBest AlternativeStatistical Significance
General Quality4.73 ± 0.114.65 (VRT)p < 0.01
Image Sharpness4.78 ± 0.084.60 (VRT)p < 0.01
Artifact Suppression4.60 ± 0.084.51 (VRT)p < 0.01

Five board-certified cardiac imaging specialists with 8+ years experience rated KGMgT reconstructions significantly higher than all baseline methods, confirming diagnostic utility even at extreme acceleration factors.

Ablation Studies: Validating Architectural Choices

Systematic component removal demonstrates each element’s contribution:

Impact of Knowledge-Guided Mechanism:

  • Removal caused PSNR drops of 0.5–2.1 dB across datasets
  • The Mentor network’s prior features prove essential for temporal coherence

Impact of Attention Mechanisms:

  • Removing SW-MSA: -1.46 dB (Cine-Sax, 6×)
  • Removing RW-MSA: -0.89 dB (Cine-Sax, 6×)
  • Using global MSA instead: -0.95 dB (Cine-Sax, 6×)

The complementary nature of square and rectangular windows is evident—neither alone achieves optimal performance, while their combination through MGwinT maximizes feature aggregation.

Impact of Data Consistency:

  • DC removal caused consistent degradation (0.52–0.89 dB PSNR loss)
  • Frequency-domain constraints prove non-negotiable for measurement fidelity

Efficiency Analysis: Balancing Quality and Computational Cost

In clinical deployment, reconstruction speed rivals accuracy in importance. KGMgT achieves:

  • Inference latency: 156ms per slice (competitive with real-time requirements)
  • Parameter efficiency: Superior PSNR-to-parameters ratio versus SwinIR and GRL
  • GFLOPs optimization: Lower computational complexity than recurrent architectures (CRNN: 120ms but inferior quality; GRL: 958ms)
Bubble chart comparing MRI reconstruction methods by GFLOPs (x-axis) and PSNR in dB (y-axis), with bubble size indicating parameter count. KGMgT appears in the upper left with highest PSNR (~33 dB) and low GFLOPs (~1G), while competitors like GRL show lower PSNR (~30.9 dB) at higher computational cost (~9 GFLOPs).

Real-World Impact: Transforming Cardiac Care Pathways

The clinical implications of KGMgT extend far beyond technical benchmarks:

For Patients:

  • Reduced breath-hold durations accommodate those with heart failure or arrhythmias
  • Shorter scan times decrease claustrophobia-related examination failures
  • Lower sedation requirements for pediatric and anxious populations

For Healthcare Systems:

  • Increased scanner throughput addresses MRI capacity constraints
  • Emergency department applications become feasible for acute chest pain evaluation
  • Reduced motion artifacts decrease repeat scan rates

For Diagnostic Accuracy:

  • Preservation of temporal resolution enables precise strain analysis
  • Sharp myocardial border delineation improves infarct sizing
  • Consistent image quality across manufacturers and field strengths

Future Horizons: Extending the KGMgT Paradigm

While current validation focuses on Cartesian undersampling patterns, the architecture’s flexibility suggests natural extensions to:

  • Radial and spiral trajectories: Physics sequences with inherent motion robustness
  • Real-time imaging: Single-heartbeat acquisitions for hemodynamic assessment
  • Multi-contrast integration: Simaneous T1/T2 mapping with cine morphology
  • 4D flow extension: Velocity-encoded phase contrast with shared spatiotemporal priors

The knowledge-guided framework may additionally transfer to other dynamic imaging domains—perfusion MRI, fetal cardiac imaging, or even non-cardiac applications like dynamic contrast-enhanced studies.

Conclusion: The New Standard in Accelerated Cardiac Imaging

KGMgT represents more than incremental improvement—it establishes a new architectural paradigm for medical image reconstruction where knowledge transfer, adaptive attention, and geometric diversity converge to solve previously intractable trade-offs. The achievement of diagnostic-quality imaging at 10× acceleration, validated by quantitative metrics and clinical expert assessment, positions this technology for immediate translational impact.

For radiologists, cardiologists, and imaging scientists, KGMgT offers a glimpse of AI’s true potential: not replacing human expertise, but amplifying it through computational capabilities that transcend human temporal perception. The beating heart, captured in tenth the time, with undiminished diagnostic power—this is the promise realized.


What’s your experience with accelerated cardiac MRI protocols? Share your insights on implementing deep learning reconstruction in clinical practice, or explore how these advances might transform your institution’s imaging capabilities. Subscribe for updates on FDA clearance timelines and multi-center validation studies, and connect with our research team to explore collaboration opportunities in bringing KGMgT to patient care.

All code and trained models are available through the authors’ repository, supporting reproducible research and clinical translation.


Technical Specifications Summary:

AttributeSpecification
Input frames3 (target + adjacent references)
Base architectureU-Net with Transformer blocks
Attention headsMulti-head (SW-MSA + RW-MSA)
Training hardwareNVIDIA A100 (40GB)
FrameworkPyTorch with Adam optimization
Learning rate1×10−4
Loss weightsλ1​=10−4, λ2​=10−6

Now write the complete code of the proposed model.

"""
KGMgT: Knowledge-Guided Multi-Geometric Window Transformer
for Cardiac Cine MRI Reconstruction

Complete PyTorch Implementation
Based on: Lyu et al., Medical Image Analysis 2026
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple, List, Dict
import math
from einops import rearrange, repeat
import numpy as np


# =============================================================================
# Utility Functions and Layers
# =============================================================================

def default(val, d):
    """Return default value if val is None."""
    return val if val is not None else d


class Residual(nn.Module):
    """Residual connection wrapper."""
    def __init__(self, fn):
        super().__init__()
        self.fn = fn
    
    def forward(self, x, **kwargs):
        return self.fn(x, **kwargs) + x


class PreNorm(nn.Module):
    """Pre-normalization wrapper."""
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn
    
    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)


class FeedForward(nn.Module):
    """MLP feed-forward network."""
    def __init__(self, dim, hidden_dim, dropout=0.0):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )
    
    def forward(self, x):
        return self.net(x)


class ConvBlock(nn.Module):
    """Convolutional block with LeakyReLU activation."""
    def __init__(self, in_ch, out_ch, kernel_size=3, stride=1, padding=1, bias=True):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size, stride, padding, bias=bias),
            nn.LeakyReLU(0.2, inplace=True)
        )
    
    def forward(self, x):
        return self.conv(x)


# =============================================================================
# Data Consistency Layer
# =============================================================================

class DataConsistency(nn.Module):
    """
    Data Consistency layer enforcing k-space fidelity.
    Ensures sampled regions match original measurements while 
    unsampled regions use network predictions.
    """
    def __init__(self, noise_eta=0.01):
        super().__init__()
        self.noise_eta = noise_eta
    
    def forward(self, rec_img, mask, k_space_full):
        """
        Args:
            rec_img: Reconstructed image [B, 2, H, W] (real/imag channels)
            mask: Sampling mask [B, 1, H, W] or [B, H, W]
            k_space_full: Fully sampled k-space [B, 2, H, W]
        Returns:
            dc_img: Data-consistent image [B, 2, H, W]
        """
        # Convert to complex representation
        rec_complex = torch.view_as_complex(rec_img.permute(0, 2, 3, 1).contiguous())
        
        # FFT to k-space
        rec_k = torch.fft.fft2(rec_complex, norm='ortho')
        
        # Ensure mask is properly shaped
        if mask.dim() == 3:
            mask = mask.unsqueeze(1)
        mask = mask.to(rec_k.dtype)
        
        # Data consistency operation
        # Sampled regions: weighted combination of original and reconstruction
        # Unsampled regions: use reconstruction
        mask_bool = mask > 0.5
        rec_k_real = torch.view_as_real(rec_k)  # [B, H, W, 2]
        k_space_full_real = k_space_full.permute(0, 2, 3, 1)  # [B, H, W, 2]
        
        # Apply DC constraint
        dc_k_real = torch.where(
            mask_bool.permute(0, 2, 3, 1).expand_as(rec_k_real),
            (rec_k_real + self.noise_eta * k_space_full_real) / (1 + self.noise_eta),
            rec_k_real
        )
        
        # Convert back to complex and IFFT
        dc_k = torch.view_as_complex(dc_k_real)
        dc_img = torch.fft.ifft2(dc_k, norm='ortho')
        
        # Return as real/imag channels
        dc_img = torch.view_as_real(dc_img).permute(0, 3, 1, 2)
        return dc_img


# =============================================================================
# VGG Feature Extractor for Perceptual Loss
# =============================================================================

class VGGFeatureExtractor(nn.Module):
    """VGG19-based multi-scale feature extractor."""
    def __init__(self, layers=['relu1_1', 'relu2_1', 'relu3_1']):
        super().__init__()
        from torchvision.models import vgg19
        vgg = vgg19(pretrained=True)
        self.features = vgg.features
        self.layers = layers
        self.layer_name_mapping = {
            'relu1_1': 1, 'relu1_2': 3,
            'relu2_1': 6, 'relu2_2': 8,
            'relu3_1': 11, 'relu3_2': 13, 'relu3_3': 15, 'relu3_4': 17,
            'relu4_1': 20, 'relu4_2': 22, 'relu4_3': 24, 'relu4_4': 26,
            'relu5_1': 29, 'relu5_2': 31, 'relu5_3': 33, 'relu5_4': 35
        }
        
        # Freeze parameters
        for param in self.parameters():
            param.requires_grad = False
    
    def forward(self, x):
        """
        Args:
            x: Input image [B, 2, H, W] - convert to 3-channel for VGG
        Returns:
            features: List of feature maps at specified layers
        """
        # Convert 2-channel (real/imag) to 3-channel RGB-like
        if x.shape[1] == 2:
            x = torch.cat([x, x[:, :1]], dim=1)  # [B, 3, H, W]
        
        # Normalize for ImageNet
        mean = torch.tensor([0.485, 0.456, 0.406]).view(1, 3, 1, 1).to(x.device)
        std = torch.tensor([0.229, 0.224, 0.225]).view(1, 3, 1, 1).to(x.device)
        x = (x - mean) / std
        
        features = []
        for name, module in self.features._modules.items():
            x = module(x)
            if int(name) in [self.layer_name_mapping[l] for l in self.layers]:
                features.append(x)
        
        return features


# =============================================================================
# Adaptive Spatiotemporal Attention (ASA)
# =============================================================================

class AdaptiveSpatiotemporalAttention(nn.Module):
    """
    Adaptive Spatiotemporal Attention module with optical flow estimation
    and deformable convolution for motion-aware feature alignment.
    """
    def __init__(
        self,
        dim: int,
        num_heads: int = 8,
        n_offsets: int = 9,  # 3x3 neighborhood
        kernel_size: int = 3
    ):
        super().__init__()
        self.dim = dim
        self.num_heads = num_heads
        self.n_offsets = n_offsets
        self.kernel_size = kernel_size
        
        # Correlation computation
        self.corr_conv = nn.Sequential(
            nn.Conv2d(dim * 2, dim, 3, 1, 1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(dim, dim, 3, 1, 1)
        )
        
        # Flow estimation
        self.flow_conv = nn.Sequential(
            nn.Conv2d(dim, dim, 3, 1, 1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(dim, 2 * n_offsets, 3, 1, 1)  # 2D offsets for each position
        )
        
        # Attention weights
        self.attn_conv = nn.Sequential(
            nn.Conv2d(dim, dim, 3, 1, 1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(dim, n_offsets, 3, 1, 1),
            nn.Sigmoid()
        )
        
        # Feature transformation for deformable conv
        self.offset_conv = nn.Sequential(
            nn.Conv2d(dim * 3 + 2 * n_offsets, dim, 3, 1, 1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(dim, dim, 3, 1, 1)
        )
        
        # Deformable convolution (simplified implementation)
        self.dcn_weight = nn.Parameter(torch.randn(dim, dim, kernel_size, kernel_size))
        self.dcn_bias = nn.Parameter(torch.zeros(dim))
        
        # Output projection
        self.proj = nn.Sequential(
            nn.Conv2d(dim, dim, 1),
            nn.LeakyReLU(0.2)
        )
        
        # Learnable temperature for attention
        self.temperature = nn.Parameter(torch.ones(1))
    
    def corr(self, q, k):
        """
        Compute correlation between query and key features.
        Args:
            q: Query features [B, C, H, W]
            k: Key features [B, C, H, W]
        Returns:
            corr: Correlation map [B, C, H, W]
            offset_indices: Displacement indices [B, 9, H, W]
        """
        B, C, H, W = q.shape
        
        # Normalize for stable correlation
        q_norm = F.normalize(q, dim=1)
        k_norm = F.normalize(k, dim=1)
        
        # Compute correlation at multiple displacements
        corr_maps = []
        offset_indices = []
        
        for dy in range(-1, 2):
            for dx in range(-1, 2):
                # Shift key features
                if dy == 0 and dx == 0:
                    k_shifted = k_norm
                else:
                    k_shifted = torch.roll(k_norm, shifts=(dy, dx), dims=(2, 3))
                
                # Correlation
                corr = (q_norm * k_shifted).sum(dim=1, keepdim=True)  # [B, 1, H, W]
                corr_maps.append(corr)
                offset_indices.append(torch.full((B, 1, H, W), (dy + 1) * 3 + (dx + 1), 
                                                device=q.device))
        
        corr_vol = torch.cat(corr_maps, dim=1)  # [B, 9, H, W]
        offset_idx = torch.cat(offset_indices, dim=1)  # [B, 9, H, W]
        
        # Pad correlation volume
        corr_vol = F.pad(corr_vol, (1, 1, 1, 1), mode='replicate')
        
        return corr_vol, offset_idx
    
    def forward(self, q, k, v, target_feat):
        """
        Args:
            q: Query features from target frame [B, C, H, W]
            k: Key features from reference frame [B, C, H, W]
            v: Value features (multi-scale) [B, 3, C, H, W] or [B, C, H, W]
            target_feat: Target frame shallow features [B, C, H, W]
        Returns:
            out: Aligned and aggregated features [B, C, H, W]
        """
        B, C, H, W = q.shape
        
        # Handle multi-scale value features
        if v.dim() == 5:
            # Average across 3 scales or use first scale
            v = v[:, 0]  # [B, C, H, W]
        
        # Compute correlation and flow
        corr_vol, offset_idx = self.corr(q, k)  # [B, 9, H, W], [B, 9, H, W]
        
        # Generate attention weights from correlation
        I1 = self.attn_conv(corr_vol[:, :self.n_offsets])  # [B, 9, H, W]
        
        # Generate flow offsets
        flow_feat = self.flow_conv(corr_vol[:, :self.n_offsets])
        flow = torch.tanh(flow_feat)  # [B, 2*9, H, W]
        
        # Reshape flow to [B, 9, 2, H, W]
        flow = flow.view(B, self.n_offsets, 2, H, W)
        
        # Create sampling grid with offsets
        grid_y, grid_x = torch.meshgrid(
            torch.linspace(-1, 1, H, device=q.device),
            torch.linspace(-1, 1, W, device=q.device),
            indexing='ij'
        )
        grid = torch.stack([grid_x, grid_y], dim=-1).unsqueeze(0).unsqueeze(0)  # [1, 1, H, W, 2]
        grid = grid.repeat(B, self.n_offsets, 1, 1, 1)  # [B, 9, H, W, 2]
        
        # Add learned offsets
        flow_permuted = flow.permute(0, 1, 3, 4, 2)  # [B, 9, H, W, 2]
        sampling_grid = grid + flow_permuted * 0.1  # Scale factor for stability
        
        # Sample value features at offset positions
        v_expanded = v.unsqueeze(1).repeat(1, self.n_offsets, 1, 1, 1)  # [B, 9, C, H, W]
        v_reshaped = v_expanded.view(B * self.n_offsets, C, H, W)
        sampling_grid_flat = sampling_grid.view(B * self.n_offsets, H, W, 2)
        
        v_sampled = F.grid_sample(
            v_reshaped, 
            sampling_grid_flat, 
            mode='bilinear', 
            padding_mode='zeros',
            align_corners=False
        )  # [B*9, C, H, W]
        
        v_sampled = v_sampled.view(B, self.n_offsets, C, H, W)
        
        # Weighted aggregation
        I1_expanded = I1.unsqueeze(2)  # [B, 9, 1, H, W]
        v_weighted = (v_sampled * I1_expanded).sum(dim=1)  # [B, C, H, W]
        
        # Combine with target features (simplified deformable conv)
        combined = torch.cat([target_feat, v_weighted, q, k], dim=1)
        features = self.offset_conv(combined)
        
        # Final projection
        out = self.proj(features)
        
        return out


# =============================================================================
# Multi-Geometric Window Transformer (MGwinT)
# =============================================================================

class WindowAttention(nn.Module):
    """Base window attention with support for different window shapes."""
    def __init__(
        self,
        dim: int,
        window_size: Tuple[int, int],
        num_heads: int = 8,
        qkv_bias: bool = True,
        attn_drop: float = 0.0,
        proj_drop: float = 0.0
    ):
        super().__init__()
        self.dim = dim
        self.window_size = window_size  # (Wh, Ww)
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = head_dim ** -0.5
        
        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)
        
        # Relative position bias
        self.relative_position_bias_table = nn.Parameter(
            torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1), num_heads)
        )
        nn.init.trunc_normal_(self.relative_position_bias_table, std=0.02)
        
        # Get relative position index
        coords_h = torch.arange(window_size[0])
        coords_w = torch.arange(window_size[1])
        coords = torch.stack(torch.meshgrid(coords_h, coords_w, indexing='ij'))
        coords_flatten = torch.flatten(coords, 1)
        
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]
        relative_coords = relative_coords.permute(1, 2, 0).contiguous()
        relative_coords[:, :, 0] += window_size[0] - 1
        relative_coords[:, :, 1] += window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)
        self.register_buffer('relative_position_index', relative_position_index)
    
    def forward(self, x, mask=None):
        """
        Args:
            x: Input features [B*num_windows, N, C]
            mask: Attention mask (optional)
        """
        B_, N, C = x.shape
        
        qkv = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]
        
        q = q * self.scale
        attn = (q @ k.transpose(-2, -1))
        
        # Add relative position bias
        relative_position_bias = self.relative_position_bias_table[
            self.relative_position_index.view(-1)
        ].view(
            self.window_size[0] * self.window_size[1],
            self.window_size[0] * self.window_size[1],
            -1
        )
        relative_position_bias = relative_position_bias.permute(2, 0, 1).contiguous()
        attn = attn + relative_position_bias.unsqueeze(0)
        
        if mask is not None:
            attn = attn.masked_fill(mask == 0, float('-inf'))
        
        attn = F.softmax(attn, dim=-1)
        attn = self.attn_drop(attn)
        
        x = (attn @ v).transpose(1, 2).reshape(B_, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        
        return x


def window_partition(x, window_size):
    """Partition into non-overlapping windows."""
    B, H, W, C = x.shape
    x = x.view(B, H // window_size[0], window_size[0], W // window_size[1], window_size[1], C)
    windows = x.permute(0, 1, 3, 2, 4, 5).contiguous()
    windows = windows.view(-1, window_size[0] * window_size[1], C)
    return windows


def window_reverse(windows, window_size, H, W):
    """Reverse window partition."""
    B = int(windows.shape[0] / (H * W / window_size[0] / window_size[1]))
    x = windows.view(B, H // window_size[0], W // window_size[1], window_size[0], window_size[1], -1)
    x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1)
    return x


class MultiGeometricWindowTransformerBlock(nn.Module):
    """
    Multi-Geometric Window Transformer Block combining:
    - Square Window MSA (SW-MSA)
    - Rectangle Window MSA (RW-MSA)
    """
    def __init__(
        self,
        dim: int,
        num_heads: int = 8,
        window_size: int = 8,
        mlp_ratio: float = 4.0,
        drop: float = 0.0,
        attn_drop: float = 0.0,
        drop_path: float = 0.0,
        use_rectangle: bool = True,
        rectangle_orientation: str = 'vertical'  # 'vertical' or 'horizontal'
    ):
        super().__init__()
        self.dim = dim
        self.num_heads = num_heads
        self.window_size = window_size
        self.use_rectangle = use_rectangle
        
        # Square Window MSA
        self.norm1_sw = nn.LayerNorm(dim)
        self.attn_sw = WindowAttention(
            dim, 
            window_size=(window_size, window_size),
            num_heads=num_heads,
            attn_drop=attn_drop,
            proj_drop=drop
        )
        
        # Rectangle Window MSA
        if use_rectangle:
            self.norm1_rw = nn.LayerNorm(dim)
            if rectangle_orientation == 'vertical':
                rw_size = (window_size * 2, window_size // 2)
            else:
                rw_size = (window_size // 2, window_size * 2)
            
            self.attn_rw = WindowAttention(
                dim,
                window_size=rw_size,
                num_heads=num_heads,
                attn_drop=attn_drop,
                proj_drop=drop
            )
            self.rectangle_size = rw_size
        
        # Drop path (stochastic depth)
        self.drop_path = nn.Identity() if drop_path == 0 else nn.Dropout(drop_path)
        
        # MLP
        self.norm2 = nn.LayerNorm(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = FeedForward(dim, mlp_hidden_dim, dropout=drop)
        
        # Fusion layer
        if use_rectangle:
            self.fusion = nn.Linear(dim * 2, dim)
    
    def forward(self, x, H, W):
        """
        Args:
            x: Input [B, H*W, C]
            H, W: Spatial dimensions
        """
        B, L, C = x.shape
        assert L == H * W, "Input feature size mismatch"
        
        shortcut = x
        
        # Square Window branch
        x_sw = self.norm1_sw(x)
        x_sw = x_sw.view(B, H, W, C)
        
        # Pad if needed
        pad_l = pad_t = 0
        pad_r = (self.window_size - W % self.window_size) % self.window_size
        pad_b = (self.window_size - H % self.window_size) % self.window_size
        x_sw = F.pad(x_sw, (0, 0, pad_l, pad_r, pad_t, pad_b))
        _, Hp, Wp, _ = x_sw.shape
        
        # Cyclic shift for SW-MSA
        shifted_x = torch.roll(x_sw, shifts=(-self.window_size // 2, -self.window_size // 2), dims=(1, 2))
        
        # Partition windows
        x_windows = window_partition(shifted_x, (self.window_size, self.window_size))
        
        # W-MSA
        attn_windows = self.attn_sw(x_windows)
        
        # Merge windows
        shifted_x = window_reverse(attn_windows, (self.window_size, self.window_size), Hp, Wp)
        
        # Reverse cyclic shift
        x_sw = torch.roll(shifted_x, shifts=(self.window_size // 2, self.window_size // 2), dims=(1, 2))
        
        # Unpad
        x_sw = x_sw[:, :H, :W, :].reshape(B, H * W, C)
        x_sw = shortcut + self.drop_path(x_sw)
        
        # Rectangle Window branch
        if self.use_rectangle:
            x_rw = self.norm1_rw(x)
            x_rw = x_rw.view(B, H, W, C)
            
            # Pad for rectangle windows
            pad_r_rw = (self.rectangle_size[1] - W % self.rectangle_size[1]) % self.rectangle_size[1]
            pad_b_rw = (self.rectangle_size[0] - H % self.rectangle_size[0]) % self.rectangle_size[0]
            x_rw = F.pad(x_rw, (0, 0, 0, pad_r_rw, 0, pad_b_rw))
            _, Hp_rw, Wp_rw, _ = x_rw.shape
            
            # Partition into rectangle windows
            x_rw_windows = window_partition(x_rw, self.rectangle_size)
            
            # RW-MSA
            attn_rw_windows = self.attn_rw(x_rw_windows)
            
            # Merge
            x_rw = window_reverse(attn_rw_windows, self.rectangle_size, Hp_rw, Wp_rw)
            x_rw = x_rw[:, :H, :W, :].reshape(B, H * W, C)
            x_rw = shortcut + self.drop_path(x_rw)
            
            # Fuse both branches
            x = self.fusion(torch.cat([x_sw, x_rw], dim=-1))
        else:
            x = x_sw
        
        # MLP
        x = x + self.drop_path(self.mlp(self.norm2(x)))
        
        return x


# =============================================================================
# Transformer-driven Dynamic Feature Aggregation (TdFA)
# =============================================================================

class TdFA(nn.Module):
    """
    Transformer-driven Dynamic Feature Aggregation module.
    Aggregates features from multiple frames using MGwinT blocks.
    """
    def __init__(
        self,
        dim: int,
        num_blocks: int = 4,
        num_heads: int = 8,
        window_size: int = 8,
        mlp_ratio: float = 4.0
    ):
        super().__init__()
        self.dim = dim
        
        # Initial fusion convolution
        self.fusion_conv = nn.Sequential(
            nn.Conv2d(dim * 3, dim, 3, 1, 1),  # Concatenate target, prior, and ASA output
            nn.LeakyReLU(0.2),
            nn.Conv2d(dim, dim, 3, 1, 1)
        )
        
        # MGwinT blocks
        self.blocks = nn.ModuleList([
            MultiGeometricWindowTransformerBlock(
                dim=dim,
                num_heads=num_heads,
                window_size=window_size,
                mlp_ratio=mlp_ratio,
                use_rectangle=(i % 2 == 0),  # Alternate rectangle orientation
                rectangle_orientation='vertical' if i % 2 == 0 else 'horizontal'
            )
            for i in range(num_blocks)
        ])
        
        # Output convolution
        self.out_conv = nn.Sequential(
            nn.Conv2d(dim, dim, 3, 1, 1),
            nn.LeakyReLU(0.2)
        )
    
    def forward(self, target_feat, prior_feat, asa_output):
        """
        Args:
            target_feat: Target frame features [B, C, H, W]
            prior_feat: Prior features from Mentor [B, C, H, W]
            asa_output: ASA module output [B, C, H, W]
        Returns:
            aggregated: Aggregated features [B, C, H, W]
        """
        B, C, H, W = target_feat.shape
        
        # Concatenate and fuse
        x = torch.cat([target_feat, prior_feat, asa_output], dim=1)
        x = self.fusion_conv(x)  # [B, C, H, W]
        
        # Reshape for transformer
        x = x.flatten(2).transpose(1, 2)  # [B, H*W, C]
        
        # Apply MGwinT blocks
        for block in self.blocks:
            x = block(x, H, W)
        
        # Reshape back
        x = x.transpose(1, 2).reshape(B, C, H, W)
        
        # Output
        aggregated = self.out_conv(x)
        
        # Residual connection
        return aggregated + target_feat


# =============================================================================
# Feature Encoder
# =============================================================================

class FeatureEncoder(nn.Module):
    """
    Multi-scale feature encoder using VGG-style architecture.
    Extracts deep features at 3 scales for Q, K, V computation.
    """
    def __init__(self, in_ch=2, base_ch=64, num_scales=3):
        super().__init__()
        self.num_scales = num_scales
        
        # Scale 1 (full resolution)
        self.enc1 = nn.Sequential(
            ConvBlock(in_ch, base_ch),
            ConvBlock(base_ch, base_ch),
        )
        
        # Scale 2 (1/2 resolution)
        self.down1 = nn.MaxPool2d(2)
        self.enc2 = nn.Sequential(
            ConvBlock(base_ch, base_ch * 2),
            ConvBlock(base_ch * 2, base_ch * 2),
        )
        
        # Scale 3 (1/4 resolution)
        self.down2 = nn.MaxPool2d(2)
        self.enc3 = nn.Sequential(
            ConvBlock(base_ch * 2, base_ch * 4),
            ConvBlock(base_ch * 4, base_ch * 4),
        )
        
        self.base_ch = base_ch
    
    def forward(self, x):
        """
        Args:
            x: Input image [B, 2, H, W]
        Returns:
            features: List of features at different scales
            shallow: Shallow features for skip connections
        """
        # Scale 1
        f1 = self.enc1(x)
        
        # Scale 2
        x2 = self.down1(f1)
        f2 = self.enc2(x2)
        
        # Scale 3
        x3 = self.down2(f2)
        f3 = self.enc3(x3)
        
        return [f1, f2, f3], f1


# =============================================================================
# Denoising Decoder (DWS - Denoising Weights Subnetwork)
# =============================================================================

class DenoisingDecoder(nn.Module):
    """
    Denoising decoder with skip connections.
    Adapted from MoDL's Denoising Weights Subnetwork with LeakyReLU.
    """
    def __init__(self, in_ch=64, out_ch=2, num_scales=3):
        super().__init__()
        
        # Decoder blocks with skip connections
        self.up3 = nn.Sequential(
            nn.ConvTranspose2d(in_ch * 4, in_ch * 2, 4, 2, 1),
            nn.LeakyReLU(0.2)
        )
        self.dec3 = nn.Sequential(
            ConvBlock(in_ch * 4, in_ch * 2),  # Skip from enc2
            ConvBlock(in_ch * 2, in_ch * 2)
        )
        
        self.up2 = nn.Sequential(
            nn.ConvTranspose2d(in_ch * 2, in_ch, 4, 2, 1),
            nn.LeakyReLU(0.2)
        )
        self.dec2 = nn.Sequential(
            ConvBlock(in_ch * 2, in_ch),  # Skip from enc1
            ConvBlock(in_ch, in_ch)
        )
        
        self.out = nn.Conv2d(in_ch, out_ch, 3, 1, 1)
    
    def forward(self, x, skips):
        """
        Args:
            x: Deepest features [B, C*4, H/4, W/4]
            skips: [f1, f2, f3] from encoder
        """
        f1, f2, f3 = skips
        
        # Decode scale 3
        x = self.up3(x)
        x = torch.cat([x, f2], dim=1)
        x = self.dec3(x)
        
        # Decode scale 2
        x = self.up2(x)
        x = torch.cat([x, f1], dim=1)
        x = self.dec2(x)
        
        # Output
        out = self.out(x)
        
        return out


# =============================================================================
# Complete KGMgT Model
# =============================================================================

class KGMgT(nn.Module):
    """
    Complete Knowledge-Guided Multi-Geometric Window Transformer
    for Cardiac Cine MRI Reconstruction.
    """
    def __init__(
        self,
        in_ch: int = 2,
        base_ch: int = 64,
        num_scales: int = 3,
        num_mgwin_blocks: int = 4,
        num_heads: int = 8,
        window_size: int = 8,
        use_dc: bool = True,
        dc_eta: float = 0.01
    ):
        super().__init__()
        
        self.in_ch = in_ch
        self.use_dc = use_dc
        
        # ========== Shared Components ==========
        self.feature_encoder = FeatureEncoder(in_ch, base_ch, num_scales)
        
        # VGG feature extractor for perceptual loss (frozen)
        self.vgg_extractor = VGGFeatureExtractor()
        
        # ========== Mentor Network ==========
        self.mentor_asa = nn.ModuleList([
            AdaptiveSpatiotemporalAttention(base_ch * (2**i))
            for i in range(num_scales)
        ])
        
        self.mentor_tdfa = nn.ModuleList([
            TdFA(
                dim=base_ch * (2**i),
                num_blocks=num_mgwin_blocks,
                num_heads=num_heads,
                window_size=window_size
            )
            for i in range(num_scales)
        ])
        
        self.mentor_decoder = DenoisingDecoder(base_ch, in_ch, num_scales)
        
        # ========== Learner Network ==========
        self.learner_asa = nn.ModuleList([
            AdaptiveSpatiotemporalAttention(base_ch * (2**i))
            for i in range(num_scales)
        ])
        
        self.learner_tdfa = nn.ModuleList([
            TdFA(
                dim=base_ch * (2**i),
                num_blocks=num_mgwin_blocks,
                num_heads=num_heads,
                window_size=window_size
            )
            for i in range(num_scales)
        ])
        
        self.learner_decoder = DenoisingDecoder(base_ch, in_ch, num_scales)
        
        # ========== Data Consistency ==========
        if use_dc:
            self.dc_layer = DataConsistency(noise_eta=dc_eta)
        
        # ========== Knowledge Transfer Projections ==========
        self.knowledge_proj = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(base_ch * (2**i), base_ch * (2**i), 3, 1, 1),
                nn.LeakyReLU(0.2)
            )
            for i in range(num_scales)
        ])
    
    def forward_mentor(self, target_frame, ref_prev, mask=None, k_space_full=None):
        """
        Mentor network forward pass.
        
        Args:
            target_frame: Target frame to reconstruct [B, 2, H, W]
            ref_prev: Previous reference frame [B, 2, H, W]
            mask: Sampling mask (for DC)
            k_space_full: Full k-space (for DC)
        """
        # Encode target and reference
        feat_target, shallow_target = self.feature_encoder(target_frame)
        feat_ref, _ = self.feature_encoder(ref_prev)
        
        # Multi-scale feature aggregation
        mentor_features = []
        for i in range(len(feat_target)):
            # ASA: align reference to target
            q = feat_target[i]
            k = feat_ref[i]
            v = feat_ref[i].unsqueeze(1) if i == 0 else torch.stack([feat_ref[i]] * 3, dim=1)
            
            asa_out = self.mentor_asa[i](q, k, v, feat_target[i])
            
            # TdFA: aggregate features (no prior for Mentor)
            dummy_prior = torch.zeros_like(feat_target[i])
            tdfa_out = self.mentor_tdfa[i](feat_target[i], dummy_prior, asa_out)
            
            mentor_features.append(tdfa_out)
        
        # Decode
        mentor_out = self.mentor_decoder(mentor_features[-1], 
                                          [feat_target[0], feat_target[1], feat_target[2]])
        
        # Residual learning
        mentor_out = mentor_out + target_frame
        
        # Data consistency
        if self.use_dc and mask is not None:
            mentor_out = self.dc_layer(mentor_out, mask, k_space_full)
        
        return mentor_out, mentor_features
    
    def forward_learner(self, target_frame, ref_next, mentor_features, 
                        mask=None, k_space_full=None):
        """
        Learner network forward pass with knowledge guidance.
        
        Args:
            target_frame: Target frame (with knowledge guidance) [B, 2, H, W]
            ref_next: Next reference frame [B, 2, H, W]
            mentor_features: Prior features from Mentor network
            mask: Sampling mask
            k_space_full: Full k-space
        """
        # Encode target and reference
        feat_target, shallow_target = self.feature_encoder(target_frame)
        feat_ref, _ = self.feature_encoder(ref_next)
        
        # Multi-scale feature aggregation with knowledge guidance
        learner_features = []
        for i in range(len(feat_target)):
            # Project mentor knowledge
            knowledge = self.knowledge_proj[i](mentor_features[i])
            
            # ASA: align reference to target
            q = feat_target[i]
            k = feat_ref[i]
            v = feat_ref[i].unsqueeze(1) if i == 0 else torch.stack([feat_ref[i]] * 3, dim=1)
            
            asa_out = self.learner_asa[i](q, k, v, feat_target[i])
            
            # TdFA: aggregate with knowledge guidance
            tdfa_out = self.learner_tdfa[i](feat_target[i], knowledge, asa_out)
            
            learner_features.append(tdfa_out)
        
        # Decode
        learner_out = self.learner_decoder(learner_features[-1],
                                           [feat_target[0], feat_target[1], feat_target[2]])
        
        # Residual learning
        learner_out = learner_out + target_frame
        
        # Data consistency
        if self.use_dc and mask is not None:
            learner_out = self.dc_layer(learner_out, mask, k_space_full)
        
        return learner_out
    
    def forward(self, input_t, ref_t_minus_1, ref_t_plus_1, mask=None, k_space_full=None):
        """
        Complete KGMgT forward pass.
        
        Args:
            input_t: Undersampled target frame [B, 2, H, W]
            ref_t_minus_1: Previous frame [B, 2, H, W]
            ref_t_plus_1: Next frame [B, 2, H, W]
            mask: Undersampling mask [B, 1, H, W]
            k_space_full: Fully sampled k-space for DC [B, 2, H, W]
        
        Returns:
            reconstruction: Final reconstructed frame [B, 2, H, W]
            mentor_out: Intermediate mentor output (for auxiliary loss)
        """
        # Stage 1: Mentor Network
        mentor_out, mentor_features = self.forward_mentor(
            input_t, ref_t_minus_1, mask, k_space_full
        )
        
        # Knowledge injection: create guided input for learner
        # Simple addition as per paper: Input_t^kg = K_m(Input_t, Ref_{t-1}) + Input_t
        guided_input = mentor_out + input_t
        
        # Stage 2: Learner Network
        reconstruction = self.forward_learner(
            guided_input, ref_t_plus_1, mentor_features, mask, k_space_full
        )
        
        return reconstruction, mentor_out


# =============================================================================
# Loss Functions
# =============================================================================

class KGMgTLoss(nn.Module):
    """
    Combined loss for KGMgT training:
    - L1 pixel loss
    - Perceptual loss (VGG)
    - Adversarial loss
    """
    def __init__(
        self,
        lambda_vgg: float = 1e-4,
        lambda_adv: float = 1e-6,
        use_adv: bool = True
    ):
        super().__init__()
        self.lambda_vgg = lambda_vgg
        self.lambda_adv = lambda_adv
        self.use_adv = use_adv
        
        # VGG feature extractor
        self.vgg = VGGFeatureExtractor()
        
        # L1 loss
        self.l1_loss = nn.L1Loss()
    
    def forward(self, pred, target, mentor_pred=None, discriminator=None):
        """
        Args:
            pred: Learner prediction [B, 2, H, W]
            target: Ground truth [B, 2, H, W]
            mentor_pred: Mentor prediction (optional, for auxiliary loss)
            discriminator: Discriminator network (optional)
        """
        losses = {}
        
        # L1 pixel loss
        l1_loss = self.l1_loss(pred, target)
        losses['l1'] = l1_loss
        
        # Mentor auxiliary loss
        if mentor_pred is not None:
            mentor_loss = self.l1_loss(mentor_pred, target)
            losses['mentor'] = mentor_loss
            total_loss = l1_loss + 0.5 * mentor_loss
        else:
            total_loss = l1_loss
        
        # Perceptual loss
        pred_features = self.vgg(pred)
        target_features = self.vgg(target)
        
        vgg_loss = 0
        for pf, tf in zip(pred_features, target_features):
            vgg_loss += F.mse_loss(pf, tf)
        vgg_loss = vgg_loss / len(pred_features)
        
        losses['vgg'] = vgg_loss
        total_loss = total_loss + self.lambda_vgg * vgg_loss
        
        # Adversarial loss
        if self.use_adv and discriminator is not None:
            # Generator wants discriminator to predict real
            fake_pred = discriminator(pred)
            adv_loss = -fake_pred.mean()  # WGAN-style
            losses['adv'] = adv_loss
            total_loss = total_loss + self.lambda_adv * adv_loss
        
        losses['total'] = total_loss
        return losses


class Discriminator(nn.Module):
    """
    PatchGAN discriminator for adversarial training.
    """
    def __init__(self, in_ch=2, base_ch=64):
        super().__init__()
        
        self.model = nn.Sequential(
            # Input layer
            nn.Conv2d(in_ch, base_ch, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            
            # Hidden layers
            nn.Conv2d(base_ch, base_ch * 2, 4, 2, 1),
            nn.InstanceNorm2d(base_ch * 2),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(base_ch * 2, base_ch * 4, 4, 2, 1),
            nn.InstanceNorm2d(base_ch * 4),
            nn.LeakyReLU(0.2, inplace=True),
            
            nn.Conv2d(base_ch * 4, base_ch * 8, 4, 1, 1),
            nn.InstanceNorm2d(base_ch * 8),
            nn.LeakyReLU(0.2, inplace=True),
            
            # Output layer
            nn.Conv2d(base_ch * 8, 1, 4, 1, 1)
        )
    
    def forward(self, x):
        return self.model(x)


# =============================================================================
# Training and Inference Utilities
# =============================================================================

class KGMgTTrainer:
    """
    Training pipeline for KGMgT.
    """
    def __init__(
        self,
        model: KGMgT,
        discriminator: Optional[Discriminator] = None,
        device: str = 'cuda',
        lr: float = 1e-4,
        betas: Tuple[float, float] = (0.9, 0.999)
    ):
        self.model = model.to(device)
        self.device = device
        
        # Optimizers
        self.optimizer_g = torch.optim.Adam(
            model.parameters(), lr=lr, betas=betas
        )
        
        self.discriminator = discriminator
        if discriminator is not None:
            self.discriminator = discriminator.to(device)
            self.optimizer_d = torch.optim.Adam(
                discriminator.parameters(), lr=lr, betas=betas
            )
        
        self.criterion = KGMgTLoss()
        self.global_step = 0
    
    def train_step(self, batch):
        """
        Single training step.
        
        Args:
            batch: Dictionary containing:
                - 'input_t': Undersampled target [B, 2, H, W]
                - 'ref_prev': Previous frame [B, 2, H, W]
                - 'ref_next': Next frame [B, 2, H, W]
                - 'target': Ground truth [B, 2, H, W]
                - 'mask': Sampling mask [B, 1, H, W]
                - 'k_space': Full k-space [B, 2, H, W]
        """
        # Move to device
        input_t = batch['input_t'].to(self.device)
        ref_prev = batch['ref_prev'].to(self.device)
        ref_next = batch['ref_next'].to(self.device)
        target = batch['target'].to(self.device)
        mask = batch['mask'].to(self.device)
        k_space = batch['k_space'].to(self.device)
        
        # ================== Train Generator ==================
        self.optimizer_g.zero_grad()
        
        pred, mentor_pred = self.model(input_t, ref_prev, ref_next, mask, k_space)
        
        # Update discriminator if using adversarial loss
        disc = self.discriminator if hasattr(self, 'discriminator') else None
        
        losses = self.criterion(pred, target, mentor_pred, disc)
        losses['total'].backward()
        
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        self.optimizer_g.step()
        
        # ================== Train Discriminator ==================
        if self.discriminator is not None and self.global_step % 2 == 0:
            self.optimizer_d.zero_grad()
            
            # Real loss
            real_pred = self.discriminator(target)
            d_real_loss = -real_pred.mean()
            
            # Fake loss (detach to avoid backprop through generator)
            fake_pred = self.discriminator(pred.detach())
            d_fake_loss = fake_pred.mean()
            
            # Gradient penalty (WGAN-GP style, simplified)
            d_loss = d_real_loss + d_fake_loss
            
            d_loss.backward()
            self.optimizer_d.step()
            
            losses['d_real'] = d_real_loss
            losses['d_fake'] = d_fake_loss
            losses['d_total'] = d_loss
        
        self.global_step += 1
        return {k: v.item() if isinstance(v, torch.Tensor) else v 
                for k, v in losses.items()}
    
    @torch.no_grad()
    def validate(self, val_loader):
        """Validation loop."""
        self.model.eval()
        metrics = {'psnr': [], 'ssim': [], 'nmse': []}
        
        for batch in val_loader:
            input_t = batch['input_t'].to(self.device)
            ref_prev = batch['ref_prev'].to(self.device)
            ref_next = batch['ref_next'].to(self.device)
            target = batch['target'].to(self.device)
            mask = batch['mask'].to(self.device)
            k_space = batch['k_space'].to(self.device)
            
            pred, _ = self.model(input_t, ref_prev, ref_next, mask, k_space)
            
            # Calculate metrics
            mse = F.mse_loss(pred, target).item()
            psnr = 10 * math.log10(2.0 ** 2 / mse) if mse > 0 else 100
            
            # Simple SSIM approximation (use proper implementation in production)
            ssim = 1 - F.mse_loss(pred, target).item() / (target.var().item() + 1e-8)
            
            nmse = mse / (target ** 2).mean().item()
            
            metrics['psnr'].append(psnr)
            metrics['ssim'].append(ssim)
            metrics['nmse'].append(nmse)
        
        self.model.train()
        return {k: sum(v) / len(v) for k, v in metrics.items()}


# =============================================================================
# Inference and Demo
# =============================================================================

@torch.no_grad()
def reconstruct_cardiac_cine(
    model: KGMgT,
    undersampled_sequence: torch.Tensor,
    mask: torch.Tensor,
    device: str = 'cuda'
) -> torch.Tensor:
    """
    Reconstruct a full cardiac cine sequence.
    
    Args:
        model: Trained KGMgT model
        undersampled_sequence: [T, 2, H, W] undersampled frames
        mask: [1, H, W] or [T, 1, H, W] sampling mask
        device: Computation device
    
    Returns:
        reconstructed: [T, 2, H, W] reconstructed frames
    """
    model.eval()
    T = undersampled_sequence.shape[0]
    reconstructed = []
    
    # Ensure mask has temporal dimension
    if mask.dim() == 3:
        mask = mask.unsqueeze(0).expand(T, -1, -1, -1)
    
    for t in range(T):
        # Handle boundary conditions
        t_prev = max(0, t - 1)
        t_next = min(T - 1, t + 1)
        
        input_t = undersampled_sequence[t:t+1].to(device)
        ref_prev = undersampled_sequence[t_prev:t_prev+1].to(device)
        ref_next = undersampled_sequence[t_next:t_next+1].to(device)
        mask_t = mask[t:t+1].to(device)
        
        # Create dummy k-space for DC (in practice, use actual measurements)
        k_space = torch.fft.fft2(
            torch.view_as_complex(input_t.permute(0, 2, 3, 1)),
            norm='ortho'
        )
        k_space_real = torch.view_as_real(k_space).permute(0, 3, 1, 2)
        
        pred, _ = model(input_t, ref_prev, ref_next, mask_t, k_space_real)
        reconstructed.append(pred.cpu())
    
    return torch.cat(reconstructed, dim=0)


# =============================================================================
# Main Execution
# =============================================================================

def create_model(config: Optional[Dict] = None) -> KGMgT:
    """Factory function to create KGMgT model with default or custom config."""
    default_config = {
        'in_ch': 2,
        'base_ch': 64,
        'num_scales': 3,
        'num_mgwin_blocks': 4,
        'num_heads': 8,
        'window_size': 8,
        'use_dc': True,
        'dc_eta': 0.01
    }
    
    if config is not None:
        default_config.update(config)
    
    return KGMgT(**default_config)


def test_model():
    """Test the complete model architecture."""
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Testing KGMgT on {device}")
    
    # Create model
    model = create_model({
        'base_ch': 32,  # Smaller for testing
        'num_mgwin_blocks': 2
    }).to(device)
    
    # Create dummy data
    B, H, W = 2, 256, 256
    input_t = torch.randn(B, 2, H, W).to(device)
    ref_prev = torch.randn(B, 2, H, W).to(device)
    ref_next = torch.randn(B, 2, H, W).to(device)
    mask = torch.ones(B, 1, H, W).to(device)
    mask[:, :, :, ::4] = 0  # 4x undersampling
    k_space = torch.randn(B, 2, H, W).to(device)
    
    # Forward pass
    print("Running forward pass...")
    pred, mentor_out = model(input_t, ref_prev, ref_next, mask, k_space)
    
    print(f"Input shape: {input_t.shape}")
    print(f"Output shape: {pred.shape}")
    print(f"Mentor output shape: {mentor_out.shape}")
    
    # Test loss
    criterion = KGMgTLoss()
    target = torch.randn(B, 2, H, W).to(device)
    losses = criterion(pred, target, mentor_out)
    
    print("\nLoss components:")
    for k, v in losses.items():
        print(f"  {k}: {v.item():.6f}")
    
    # Count parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"\nTotal parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    
    # Test sequence reconstruction
    print("\nTesting sequence reconstruction...")
    sequence = torch.randn(12, 2, H, W)
    mask_seq = torch.ones(1, H, W)
    mask_seq[:, :, ::4] = 0
    
    reconstructed = reconstruct_cardiac_cine(model, sequence, mask_seq, device)
    print(f"Reconstructed sequence shape: {reconstructed.shape}")
    
    print("\n✓ All tests passed!")


if __name__ == "__main__":
    test_model()

References

All claims in this article are supported by the Knowledge-guided multi-geometric window transformer Medical Image analysis publication (Lyu et al., 2026).

Related posts, You May like to read

  1. 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
  2. 7 Revolutionary Breakthroughs in Medical Image Translation (And 1 Fatal Flaw That Could Derail Your AI Model)
  3. TimeDistill: Revolutionizing Time Series Forecasting with Cross-Architecture Knowledge Distillation
  4. HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
  5. GeoSAM2 3D Part Segmentation — Prompt-Controllable, Geometry-Aware Masks for Precision 3D Editing
  6. CelloType: Transformer-Based Deep Learning for Automated Cell Segmentation and Classification in Tissue Imaging
  7. A Knowledge Distillation-Based Approach to Enhance Transparency of Classifier Models
  8. Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
  9. Discrete Migratory Bird Optimizer with Deep Transfer Learning for Multi-Retinal Disease Detection

Leave a Comment

Your email address will not be published. Required fields are marked *

Follow by Email
Tiktok