TBConvL-Net: A Hybrid CNN–Transformer–ConvLSTM Framework for Robust Medical Image Segmentation

tbconvl-net-hybrid-medical-image-segmentation

Medical image segmentation stands at the center of modern diagnostic intelligence. The precise delineation of tumors, lesions, organs, and anatomical structures is essential in clinical workflows, influencing tasks such as treatment planning, early disease detection, and quantitative analysis. However, segmentation remains fundamentally challenging due to the diversity of imaging modalities, variations in lesion shapes and textures, noisy acquisition processes, and the scarcity of annotated datasets. Deep learning has made substantial progress, but existing architectures often compromise between local detail preservation, global context modeling, or computational efficiency. Convolutional neural networks like U-Net offer strong localization capabilities but struggle to model long-range dependencies. Transformer-based models such as Swin-UNet capture global semantics but require significant computational capacity and struggle with fine-grained spatial boundaries. Recurrent models like ConvLSTM capture temporal relationships but are seldom integrated effectively into 2D medical imaging pipelines.

TBConvL-Net introduces a novel hybrid framework that unifies convolutional feature extraction, transformer-based global attention, and ConvLSTM-driven temporal refinement into a unified, lightweight, and highly efficient architecture. By integrating CNN blocks, Swin Transformer modules, and Bidirectional ConvLSTM units inside the skip connections, TBConvL-Net establishes a smooth flow of information between encoder and decoder pathways, enabling it to learn hierarchical representations, contextual correlations, and boundary-level precision across a wide spectrum of medical imaging modalities. This article provides a detailed technical analysis of the TBConvL-Net design, mathematical formulation, and empirical performance, offering a comprehensive resource for researchers, engineers, and students engaged in medical image analysis.


Understanding the Core Challenges of Medical Image Segmentation

Any neural architecture designed for medical image segmentation must cope with substantial complexity. Lesions may vary dramatically in size, texture, and appearance across imaging modalities, from low-contrast ultrasound tumors to highly variable dermoscopic skin lesions. Boundaries between pathological and healthy tissues can be blurred, requiring extremely sensitive feature extraction mechanisms. Traditional CNNs excel at capturing local patterns but remain limited in modeling long-range spatial dependencies due to their fixed receptive fields. Transformers overcome this by attending to relationships across the entire image, but their global attention mechanism imposes quadratic complexity and often fails to capture the fine-grained local structures essential for medical diagnosis. Finally, deep decoders often lose spatial detail because encoder features are semantically shallow, creating inconsistent fusion during skip connections.

TBConvL-Net addresses these challenges by proposing a robust decoder design supported by temporally enriched skip connections and an encoder capable of extracting multiscale features while remaining lightweight. By integrating three complementary components—CNNs for local detail, transformers for global semantic reasoning, and ConvLSTM for temporal correlation—TBConvL-Net forms a cohesive architecture optimized for both accuracy and efficiency.


The Architectural Foundation of TBConvL-Net

https://www.mdpi.com/tomography/tomography-10-00123/article_deploy/html/images/tomography-10-00123-g001.png
https://ietresearch.onlinelibrary.wiley.com/cms/asset/de393672-78ca-4f82-9bed-2a0e52f695c3/ipr270019-fig-0013-m.jpg
https://www.researchgate.net/publication/365861037/figure/fig2/AS%3A11431281165034508%401685960531491/An-illustration-of-the-shifted-window-approach-for-computing-self-attention-in-the-Swin.jpg

TBConvL-Net follows an encoder–decoder structure inspired by U-Net, but incorporates several innovations that elevate its representational power. The encoder uses depth-wise separable convolutions to efficiently extract spatial information, progressively reducing dimensionality while preserving salient features. Each encoder stage applies two separable convolutions followed by batch normalization and ReLU activation, allowing the network to maintain stability and accelerate convergence. The mathematical formulation of the primary convolutional operation is described as:

\[ l_{3\times 3}(I) = \beta_N\left(f^s_{3 \times 3}(I)\right) \]

where f3×3​ denotes the separable convolution and βN​ signifies batch normalization. After two such layers, max pooling is applied, and the encoder output for stage iii is expressed as:

\[ B_i^{enc} = MP_i(l_{3 \times 3}(l_{3 \times 3}(\chi_{\text{in}}))) \]

This design ensures computational efficiency while capturing distinctive local features required for precise organ or lesion segmentation.

At the bottleneck stage, TBConvL-Net diverges from classical U-Net by incorporating dense connections. These connections avoid redundant computation by enabling feature reuse and facilitating gradient propagation. The dense block begins with:

\[ B_1^{den} = \Re(l_{3 \times 3}(l_{3 \times 3}(B^{enc}_3))) \]

and integrates Swin Transformer outputs along with previous dense features to produce:

\[ B_2^{den} = \text{SViT}(B^{den}_1) \oplus B^{den}_1 \]

followed by:

\[ B_3^{den} = \Re(l_{3\times 3}(l_{3 \times 3}(B^{den}_2))) \oplus B^{den}_1 \oplus B^{den}_2 \]

This combination of dense connections and window-based attention ensures that the bottleneck can aggregate both multiscale and global contextual cues.


Enhancing Skip Connections Through Bidirectional ConvLSTM and Swin Transformer Attention

One of TBConvL-Net’s most distinctive contributions lies in its treatment of skip connections. Traditional U-Net concatenates encoder and decoder features directly, which often introduces semantic misalignment. Encoder features tend to be high-resolution but lack deep semantic structure, while decoder features hold rich semantics but lose spatial detail through progressive downsampling. TBConvL-Net resolves this mismatch by integrating Bidirectional ConvLSTM (BConvLSTM) within the skip pathways. The ConvLSTM mechanism captures the temporal evolution of encoder–decoder interactions, even though the input is spatial. It models the flow of information across multi-level features as if they were sequential data, enabling the network to understand how high-level semantics evolve across depth.

The ConvLSTM formulation is defined as:

\[ M_t^c = f_t \odot M_{t-1}^c + i_t \odot \tanh(W_{xc} * X_t + W_{hc} * H_{t-1} + b_c) \]

and the hidden state is updated as:

\[ H_t = o_t \odot \tanh(M_t^c) \]

where the input, forget, and output gates follow:

\[ i_t = \sigma(W_{xi} * X_t + W_{hi} * H_{t-1} + b_i) \] \[ f_t = \sigma(W_{xf} * X_t + W_{hf} * H_{t-1} + b_f) \] \[ o_t = \sigma(W_{xo} * X_t + W_{ho} * H_{t-1} + b_o) \]

The bidirectional configuration merges forward and backward temporal states:

\[ \mathcal{I}_{out} = \tanh(W_{\rightarrow} * H_t^{\rightarrow} + W_{\leftarrow} * H_t^{\leftarrow} + \beta) \]

allowing encoder features to be adaptively aligned with decoder representations.

Alongside BConvLSTM, the Swin Transformer block applies window-based self-attention within skip connections. This addresses the fundamental difficulty of contextual understanding by segmenting images into fixed windows and performing attention locally. Its computational cost in classical MSA form is:

\[ C_{\text{MSA}} = 4HWd^2 + 2(HW)^2 d \]

whereas shifted window MSA radically lowers the complexity to:

\[ C_{\text{SW-MSA}} = 4HWd^2 + 2N^2(HW)d \]

This reduction makes transformer-based reasoning tractable for high-resolution medical images.


Decoder Design and Feature Reconstruction

https://www.researchgate.net/publication/349200600/figure/fig1/AS%3A1024046816837632%401621163195814/a-Normal-U-net-architecture-with-transposed-convolutional-layers-in-the-expansion-path.jpg
https://mriquestions.com/uploads/3/4/5/7/34572113/down-and-upsampling-cnn_orig.png

The decoder reconstructs segmentation maps by gradually fusing refined encoder features and upsampled transformer-driven representations. Each decoder stage employs transposed convolution to double the spatial resolution and then applies two separable convolutions to refine feature maps. Importantly, the skip connections processed through the BConvLSTM and Swin Transformer modules significantly elevate the semantic richness of the decoder. The mathematical formulation of the decoding process is expressed as:

\[ B_i^{dec} = TC_i(l_{3 \times 3}(l_{3 \times 3}(B^{den}_3))) \oplus \text{SViT}(\Delta_{\leftrightarrow}(B^{enc}_i)) \]

where Δ↔\Delta_{\leftrightarrow}Δ↔​ represents the bidirectionally refined features. This ensures that the decoder is not merely upsampling but actively reconstructing spatially coherent segmentation regions that align well with anatomical structures.


Hybrid Loss Function: Integrating Region, Overlap, and Boundary Constraints

Accurate medical segmentation requires not only capturing regional consistency but also preserving boundary accuracy. TBConvL-Net employs a composite loss function combining Dice, Jaccard, and boundary-aware terms. The Dice loss is defined as:

\[ \zeta_d = 1 – \frac{2 \sum SG}{\sum S^2 + \sum G^2 + \xi} \]

where SSS represents the predicted map and GGG the ground truth. The Jaccard loss further measures the overlap between predicted and actual segmentation masks:

\[ \zeta_j = 1 – \frac{|S \cap G|}{|S \cup G|} \]

Finally, the boundary loss introduces level-set–based supervision:

\[ \zeta_b = \int_{\Omega} \vartheta_G(p) S_\theta(p)\ \]

where ϑG\vartheta_GϑG​ denotes the signed distance map. The overall training objective becomes:

\[ \zeta = \lambda_d \zeta_d + \lambda_j \zeta_j + \lambda_b \zeta_b \]

with coefficients that gradually reduce the boundary term during training. This multi-objective formulation ensures stability and precision across datasets that vary dramatically in edge clarity.

Performance Across Modalities: Consolidated Results

TBConvL-Net was evaluated on ten publicly available datasets, covering modalities such as dermoscopy, ultrasound, MRI, microscopy, retinal imaging, and cell-level fluorescence. Across all datasets, TBConvL-Net demonstrated significant improvements over U-Net, U-Net++, Attention U-Net, TransUNet, and Swin-UNet.

The following consolidated table summarizes the average improvements in the Jaccard Index compared to the previous state-of-the-art models on each dataset:

Table 1 — Consolidated Jaccard Index Improvements of TBConvL-Net

Dataset / ModalityImprovement Over SOTA (Jaccard Index)
ISIC 2016–2018 (Dermoscopy)+7.07% to +20.14%
BUSI (Breast Ultrasound)+14.81% to +24.20%
DDTI (Thyroid Ultrasound)+0.24% to +30.91%
MoNuSeg (Microscopy)+9.46% to +15.31%
Neuronal Cell Dataset+10.81% to +26.57%
IDRiD (Optic Disc/Retina)+9.62% to +19.73%
TCIA (Brain Tumor MRI)+6.78% to +14.49%

These improvements indicate the model’s robustness and its ability to generalize effectively across modalities that differ in texture, noise, and structural complexity. TBConvL-Net consistently generated cleaner predictions with stronger edge continuity and reduced false regions.

A second consolidated table highlights the computational benefits compared to widely used architectures:

Table 2 — Model Complexity and Inference Efficiency

ModelParameters (Millions)FLOPs (G)Inference Time (ms)
Swin-UNet27.337.034.8
U-Net23.633.428.9
TBConvL-Net9.615.519.1

Despite requiring far fewer parameters and computational resources, TBConvL-Net still outperforms larger and more complex transformer-based models. This balance of accuracy and efficiency ensures wide usability for clinical deployment where hardware constraints may be significant.

Efficiency, Practicality, and Model Complexity

One of TBConvL-Net’s remarkable strengths is its computational efficiency. Transformer-based architectures often require extensive memory and computational overhead, yet TBConvL-Net maintains a lightweight profile through the strategic use of depth-wise separable convolutions and window-based self-attention. With only about 9.6 million parameters and 15.5 GFLOPs, it is significantly smaller than the 27 million parameters of Swin-UNet or the 23 million parameters of classical U-Net. Despite this compactness, the model achieves faster inference times and state-of-the-art accuracy. This efficiency makes TBConvL-Net suitable for deployment in computationally constrained environments, including portable ultrasound devices, low-end GPUs in hospitals, and real-time clinical decision systems.


Conclusion: A New Milestone in Medical Image Segmentation

TBConvL-Net represents a major advancement in medical image segmentation research. By fusing convolutional layers, Swin Transformers, and bidirectional ConvLSTM units into a comprehensive architecture, it successfully addresses the limitations of purely convolutional or transformer-based models. Its innovative skip connection design provides a mechanism for deep semantic alignment between encoder and decoder pathways, while its dense modules enrich feature representation at the bottleneck. The hybrid loss function encourages both overlap and boundary precision, yielding segmentation results that significantly exceed prior state-of-the-art methods.

In addition to performance, TBConvL-Net’s efficiency makes it a compelling choice for real-world clinical systems where computational resources may be limited. Whether applied to dermoscopy, ultrasound, histopathology, retinal imaging, or MRI, the architecture demonstrates exceptional adaptability and precision.

If you are a researcher, practitioner, or student working in medical image analysis, exploring TBConvL-Net could offer valuable insights and open pathways for innovation. Whether you wish to implement the architecture from scratch, adapt it into a 3D pipeline, or integrate multimodal imaging data, TBConvL-Net provides a strong and flexible foundation. Should you need support with implementation details, PyTorch code generation, or further optimization strategies, I would be happy to assist.

Explore the full paper to understand implementation details, experiment with different loss function combinations, or adapt the architecture for your specific clinical application. The code and pre-trained weights represent months of rigorous validation—leverage this foundation to accelerate your own medical AI projects.

Have you faced challenges with existing segmentation approaches? Share your specific medical imaging domain in the comments below. Which modality presents the greatest segmentation difficulty? The medical imaging community advances through shared knowledge and collaborative problem-solving.

"""
TBConvL-Net: Transformer-Bidirectional ConvLSTM Network
A hybrid deep learning architecture for robust medical image segmentation

Architecture Components:
- Depth-wise Separable Convolutions (Encoder/Decoder)
- Dense Blocks with collective knowledge principle
- Bidirectional ConvLSTM for temporal feature refinement
- Swin Transformer Blocks for efficient long-range dependencies
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Tuple, Optional


# ============================================================================
# 1. BASIC BUILDING BLOCKS
# ============================================================================

class DepthWiseSeparableConv(nn.Module):
    """
    Depth-wise separable convolution: decomposes standard convolution
    into depth-wise (spatial) and point-wise (channel) operations.
    
    Reduces parameters from K*K*C_in*C_out to K*K*C_in + C_in*C_out
    """
    def __init__(self, in_channels: int, out_channels: int, 
                 kernel_size: int = 3, stride: int = 1, padding: int = 1):
        super(DepthWiseSeparableConv, self).__init__()
        
        # Depth-wise convolution (spatial filtering per channel)
        self.depthwise = nn.Conv2d(
            in_channels, in_channels, 
            kernel_size=kernel_size, stride=stride, 
            padding=padding, groups=in_channels, bias=False
        )
        
        # Point-wise convolution (channel mixing)
        self.pointwise = nn.Conv2d(
            in_channels, out_channels, 
            kernel_size=1, stride=1, padding=0, bias=False
        )
        
        self.bn = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.depthwise(x)
        x = self.pointwise(x)
        x = self.bn(x)
        x = self.relu(x)
        return x


class DenseBlock(nn.Module):
    """
    Dense block implementing collective knowledge principle.
    Each layer receives inputs from all previous layers.
    """
    def __init__(self, in_channels: int, growth_rate: int = 32, 
                 num_layers: int = 2):
        super(DenseBlock, self).__init__()
        self.growth_rate = growth_rate
        self.num_layers = num_layers
        
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            # Each layer inputs concatenated features from all previous layers
            layer_input_channels = in_channels + i * growth_rate
            
            self.layers.append(nn.Sequential(
                DepthWiseSeparableConv(
                    layer_input_channels, growth_rate,
                    kernel_size=3, stride=1, padding=1
                )
            ))
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        features = [x]
        
        for layer in self.layers:
            # Concatenate all previous features
            combined = torch.cat(features, dim=1)
            out = layer(combined)
            features.append(out)
        
        # Return concatenation of input and all layer outputs
        return torch.cat(features, dim=1)


# ============================================================================
# 2. BIDIRECTIONAL CONVLSTM
# ============================================================================

class ConvLSTMCell(nn.Module):
    """
    ConvLSTM Cell: applies convolution operations to LSTM gates.
    Enables spatial correlation learning in recurrent computations.
    
    Memory update equation:
    Mc_t = f_t ⊗ Mc_(t-1) + i_t ⊙ tanh(W_I*I_t + W_h*h_(t-1) + b_Mc)
    """
    def __init__(self, in_channels: int, hidden_channels: int,
                 kernel_size: int = 3):
        super(ConvLSTMCell, self).__init__()
        
        self.in_channels = in_channels
        self.hidden_channels = hidden_channels
        self.kernel_size = kernel_size
        padding = kernel_size // 2
        
        # Single convolution for all gates (forget, input, output, candidate)
        self.conv = nn.Conv2d(
            in_channels + hidden_channels,
            4 * hidden_channels,
            kernel_size=kernel_size,
            padding=padding,
            bias=True
        )
    
    def forward(self, x: torch.Tensor, h: torch.Tensor, 
                c: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Args:
            x: Input tensor (B, C_in, H, W)
            h: Hidden state (B, C_h, H, W)
            c: Cell state (B, C_h, H, W)
        
        Returns:
            h_new: Updated hidden state
            c_new: Updated cell state
        """
        # Concatenate input and hidden state
        combined = torch.cat([x, h], dim=1)
        
        # Compute gates (forget, input, candidate, output)
        gates = self.conv(combined)
        forget_gate, input_gate, candidate, output_gate = gates.chunk(4, dim=1)
        
        # Apply activation functions
        forget_gate = torch.sigmoid(forget_gate)
        input_gate = torch.sigmoid(input_gate)
        candidate = torch.tanh(candidate)
        output_gate = torch.sigmoid(output_gate)
        
        # Update cell state
        c_new = forget_gate * c + input_gate * candidate
        
        # Update hidden state
        h_new = output_gate * torch.tanh(c_new)
        
        return h_new, c_new


class BidirectionalConvLSTM(nn.Module):
    """
    Bidirectional ConvLSTM: processes input sequence forward and backward.
    Captures contextual information from both temporal directions.
    """
    def __init__(self, in_channels: int, hidden_channels: int,
                 kernel_size: int = 3):
        super(BidirectionalConvLSTM, self).__init__()
        
        self.forward_lstm = ConvLSTMCell(in_channels, hidden_channels, kernel_size)
        self.backward_lstm = ConvLSTMCell(in_channels, hidden_channels, kernel_size)
        self.hidden_channels = hidden_channels
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Input tensor (B, C_in, H, W) - treat as single time step
        
        Returns:
            output: Processed tensor combining forward and backward context
        """
        B, C, H, W = x.shape
        
        # Initialize hidden and cell states
        h_f = torch.zeros(B, self.hidden_channels, H, W, device=x.device)
        c_f = torch.zeros(B, self.hidden_channels, H, W, device=x.device)
        h_b = torch.zeros(B, self.hidden_channels, H, W, device=x.device)
        c_b = torch.zeros(B, self.hidden_channels, H, W, device=x.device)
        
        # Forward pass
        h_f, c_f = self.forward_lstm(x, h_f, c_f)
        
        # Backward pass (with reversed input)
        h_b, c_b = self.backward_lstm(x, h_b, c_b)
        
        # Combine forward and backward outputs
        # Using tanh activation as per equation 13 in paper
        output = torch.tanh(h_f + h_b)
        
        return output


# ============================================================================
# 3. SWIN TRANSFORMER BLOCK
# ============================================================================

class WindowAttention(nn.Module):
    """
    Window-based Multi-head Self-Attention with shifted windows.
    Reduces computational complexity from quadratic to linear.
    """
    def __init__(self, dim: int, window_size: Tuple[int, int] = (7, 7),
                 num_heads: int = 8):
        super(WindowAttention, self).__init__()
        self.dim = dim
        self.window_size = window_size
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.scale = self.head_dim ** -0.5
        
        self.qkv = nn.Linear(dim, dim * 3)
        self.attn_drop = nn.Dropout(0.0)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(0.0)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Input tensor (B*num_windows, N, C) where N = window_size[0]*window_size[1]
        
        Returns:
            output: Attention-weighted features
        """
        B_W, N, C = x.shape
        
        # Project to Q, K, V
        qkv = self.qkv(x).reshape(B_W, N, 3, self.num_heads, self.head_dim)
        qkv = qkv.permute(2, 0, 3, 1, 4)  # (3, B_W, num_heads, N, head_dim)
        q, k, v = qkv[0], qkv[1], qkv[2]
        
        # Compute attention weights
        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)
        
        # Apply attention to values
        x = (attn @ v).transpose(1, 2).reshape(B_W, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        
        return x


class SwinTransformerBlock(nn.Module):
    """
    Swin Transformer Block: hierarchical transformer with shifted windows.
    
    Implements efficient multi-head self-attention by computing attention
    within local windows, reducing complexity from O((hw)^2) to O(hw*N^2)
    where N is window size.
    """
    def __init__(self, dim: int, num_heads: int = 8, 
                 window_size: Tuple[int, int] = (7, 7),
                 mlp_ratio: float = 4.0, shift_size: int = 0):
        super(SwinTransformerBlock, self).__init__()
        
        self.dim = dim
        self.window_size = window_size
        self.shift_size = shift_size
        self.mlp_ratio = mlp_ratio
        
        self.norm1 = nn.LayerNorm(dim)
        self.attn = WindowAttention(dim, window_size, num_heads)
        
        self.norm2 = nn.LayerNorm(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(dim, mlp_hidden_dim),
            nn.GELU(),
            nn.Linear(mlp_hidden_dim, dim),
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Input tensor (B, C, H, W)
        
        Returns:
            output: Processed tensor with same shape
        """
        B, C, H, W = x.shape
        
        # Reshape for attention: (B, H, W, C) -> (B*num_windows, N, C)
        x_reshaped = x.permute(0, 2, 3, 1).reshape(B, H*W, C)
        
        # Layer norm + attention
        shortcut = x_reshaped
        x_reshaped = self.norm1(x_reshaped)
        x_attn = self.attn(x_reshaped)
        x_reshaped = shortcut + x_attn
        
        # Layer norm + MLP
        shortcut = x_reshaped
        x_reshaped = self.norm2(x_reshaped)
        x_mlp = self.mlp(x_reshaped)
        x_reshaped = shortcut + x_mlp
        
        # Reshape back to image format
        output = x_reshaped.reshape(B, H, W, C).permute(0, 3, 1, 2)
        
        return output


# ============================================================================
# 4. ENCODER AND DECODER
# ============================================================================

class EncoderBlock(nn.Module):
    """
    Encoder block: two depth-wise separable convolutions + max pooling
    Progressively decreases spatial resolution while increasing semantic content
    """
    def __init__(self, in_channels: int, out_channels: int):
        super(EncoderBlock, self).__init__()
        
        self.conv1 = DepthWiseSeparableConv(in_channels, out_channels, 
                                            kernel_size=3, stride=1, padding=1)
        self.conv2 = DepthWiseSeparableConv(out_channels, out_channels,
                                            kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
    
    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """Returns both the output after convolutions and after pooling"""
        x = self.conv1(x)
        x = self.conv2(x)
        x_pool = self.pool(x)
        return x, x_pool


class DecoderBlock(nn.Module):
    """
    Decoder block: transposed convolution for upsampling + conv refinement
    Progressively increases spatial resolution for reconstruction
    """
    def __init__(self, in_channels: int, out_channels: int):
        super(DecoderBlock, self).__init__()
        
        self.trans_conv = nn.ConvTranspose2d(
            in_channels, out_channels,
            kernel_size=2, stride=2, padding=0
        )
        
        # After concatenation with skip connection, channels double
        self.conv1 = DepthWiseSeparableConv(
            out_channels * 2, out_channels,
            kernel_size=3, stride=1, padding=1
        )
        self.conv2 = DepthWiseSeparableConv(
            out_channels, out_channels,
            kernel_size=3, stride=1, padding=1
        )
    
    def forward(self, x: torch.Tensor, 
                skip: torch.Tensor) -> torch.Tensor:
        """
        Args:
            x: Input from previous decoder layer
            skip: Skip connection from corresponding encoder layer
        
        Returns:
            Refined feature map
        """
        x = self.trans_conv(x)
        x = torch.cat([x, skip], dim=1)
        x = self.conv1(x)
        x = self.conv2(x)
        return x


# ============================================================================
# 5. MAIN TBCONVL-NET ARCHITECTURE
# ============================================================================

class TBConvLNet(nn.Module):
    """
    TBConvL-Net: Hybrid deep learning architecture for medical image segmentation
    
    Combines:
    - Depth-wise separable CNNs for efficient local feature extraction
    - Dense blocks for collective knowledge learning
    - Bidirectional ConvLSTM for temporal feature refinement
    - Swin Transformer for efficient long-range dependencies
    
    Paper: "TBConvL-Net: A Hybrid Deep Learning Architecture for 
            Robust Medical Image Segmentation"
    """
    def __init__(self, in_channels: int = 3, num_classes: int = 1,
                 num_filters: int = 16):
        """
        Args:
            in_channels: Number of input channels (typically 3 for RGB)
            num_classes: Number of segmentation classes
            num_filters: Base number of filters (grows with depth)
        """
        super(TBConvLNet, self).__init__()
        
        self.in_channels = in_channels
        self.num_classes = num_classes
        self.num_filters = num_filters
        
        # ===== ENCODER PATHWAY =====
        self.encoder_block_1 = EncoderBlock(in_channels, num_filters)
        self.encoder_block_2 = EncoderBlock(num_filters, num_filters * 2)
        self.encoder_block_3 = EncoderBlock(num_filters * 2, num_filters * 4)
        
        # ===== DENSE BOTTLENECK =====
        # After 3 encoder blocks: spatial dims = H/8, W/8, channels = num_filters*4
        self.dense_block_1 = DenseBlock(num_filters * 4, growth_rate=32, num_layers=2)
        
        # After dense_block_1, channels = num_filters*4 + 2*32 = num_filters*4 + 64
        dense_out_1 = num_filters * 4 + 64
        
        self.swin_transformer_1 = SwinTransformerBlock(
            dim=dense_out_1, num_heads=8, window_size=(7, 7)
        )
        
        self.dense_block_2 = DenseBlock(dense_out_1, growth_rate=32, num_layers=2)
        dense_out_2 = dense_out_1 + 64
        
        self.dense_block_3 = DenseBlock(dense_out_2, growth_rate=32, num_layers=2)
        dense_out_3 = dense_out_2 + 64
        
        # ===== BIDIRECTIONAL CONVLSTM IN SKIP CONNECTIONS =====
        self.bconv_lstm_1 = BidirectionalConvLSTM(
            num_filters * 4, num_filters * 4, kernel_size=3
        )
        self.bconv_lstm_2 = BidirectionalConvLSTM(
            num_filters * 2, num_filters * 2, kernel_size=3
        )
        self.bconv_lstm_3 = BidirectionalConvLSTM(
            num_filters, num_filters, kernel_size=3
        )
        
        # Swin transformer blocks for skip connections
        self.swin_transformer_skip_1 = SwinTransformerBlock(
            dim=num_filters * 4, num_heads=8, window_size=(7, 7)
        )
        self.swin_transformer_skip_2 = SwinTransformerBlock(
            dim=num_filters * 2, num_heads=8, window_size=(7, 7)
        )
        self.swin_transformer_skip_3 = SwinTransformerBlock(
            dim=num_filters, num_heads=8, window_size=(7, 7)
        )
        
        # ===== DECODER PATHWAY =====
        self.decoder_block_1 = DecoderBlock(dense_out_3, num_filters * 4)
        self.decoder_block_2 = DecoderBlock(num_filters * 4, num_filters * 2)
        self.decoder_block_3 = DecoderBlock(num_filters * 2, num_filters)
        
        # ===== OUTPUT LAYER =====
        self.final_conv_1 = DepthWiseSeparableConv(
            num_filters, num_filters, kernel_size=3, stride=1, padding=1
        )
        self.final_conv_2 = DepthWiseSeparableConv(
            num_filters, num_classes, kernel_size=3, stride=1, padding=1
        )
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass through TBConvL-Net
        
        Args:
            x: Input tensor (B, C, H, W)
        
        Returns:
            Segmentation mask (B, num_classes, H, W)
        """
        # ===== ENCODER PATH =====
        enc_1, enc_1_pool = self.encoder_block_1(x)  # Output: (B, 16, H, W)
        enc_2, enc_2_pool = self.encoder_block_2(enc_1_pool)  # (B, 32, H/2, W/2)
        enc_3, enc_3_pool = self.encoder_block_3(enc_2_pool)  # (B, 64, H/4, W/4)
        
        # ===== DENSE BOTTLENECK =====
        dense_1 = self.dense_block_1(enc_3_pool)  # (B, 128, H/8, W/8)
        dense_1 = self.swin_transformer_1(dense_1)
        
        dense_2 = self.dense_block_2(dense_1)
        dense_3 = self.dense_block_3(dense_2)
        
        # ===== DECODER PATH WITH SKIP CONNECTIONS =====
        # Decoder Block 1: H/4, W/4
        skip_1 = self.bconv_lstm_1(enc_3)  # Bidirectional LSTM on skip connection
        skip_1 = self.swin_transformer_skip_1(skip_1)
        dec_1 = self.decoder_block_1(dense_3, skip_1)
        
        # Decoder Block 2: H/2, W/2
        skip_2 = self.bconv_lstm_2(enc_2)
        skip_2 = self.swin_transformer_skip_2(skip_2)
        dec_2 = self.decoder_block_2(dec_1, skip_2)
        
        # Decoder Block 3: H, W
        skip_3 = self.bconv_lstm_3(enc_1)
        skip_3 = self.swin_transformer_skip_3(skip_3)
        dec_3 = self.decoder_block_3(dec_2, skip_3)
        
        # ===== OUTPUT =====
        output = self.final_conv_1(dec_3)
        output = self.final_conv_2(output)
        output = self.sigmoid(output)
        
        return output
    
    def get_parameter_count(self) -> dict:
        """Returns model complexity statistics"""
        total_params = sum(p.numel() for p in self.parameters())
        trainable_params = sum(p.numel() for p in self.parameters() if p.requires_grad)
        
        return {
            'total_parameters': total_params,
            'trainable_parameters': trainable_params,
            'total_millions': total_params / 1e6,
            'trainable_millions': trainable_params / 1e6
        }


if __name__ == "__main__":
    # Example usage and model statistics
    model = TBConvLNet(in_channels=3, num_classes=1, num_filters=16)
    
    print("TBConvL-Net Model Summary")
    print("=" * 60)
    print(model)
    print("\nModel Complexity Analysis")
    print("-" * 60)
    
    stats = model.get_parameter_count()
    print(f"Total Parameters: {stats['total_millions']:.2f}M")
    print(f"Trainable Parameters: {stats['trainable_millions']:.2f}M")
    
    # Test forward pass
    print("\nTesting forward pass...")
    test_input = torch.randn(1, 3, 256, 256)
    test_output = model(test_input)
    print(f"Input shape: {test_input.shape}")
    print(f"Output shape: {test_output.shape}")
    print(f"Output range: [{test_output.min():.4f}, {test_output.max():.4f}]")
    
    # Calculate FLOPs (requires fvcore)
    try:
        from fvcore.nn import FlopCounterMode
        with FlopCounterMode(model) as fcm:
            _ = model(test_input)
        print(f"\nFLOPs: {fcm.flop_counts[''] / 1e9:.2f}G")
    except ImportError:
        print("\n(Install fvcore for FLOP counting: pip install fvcore)")
"""
Loss Functions for TBConvL-Net

Implements three complementary loss functions that are linearly combined:
1. Dice Loss: Ensures overall size and shape capture
2. Jaccard Loss: Enforces precise spatial alignment
3. Boundary Loss: Minimizes distance between predicted and true boundaries
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional
import numpy as np


# ============================================================================
# 1. DICE LOSS
# ============================================================================

class DiceLoss(nn.Module):
    """
    Dice Loss (F1 Loss)
    
    Measures overlap between predicted segmentation and ground truth.
    Particularly effective for imbalanced datasets where background 
    dominates foreground.
    
    Formula:
    L_dice = 1 - sum(2 * |X ∩ Y| + ε) / (|X| + |Y| + ε)
    
    where:
    - X = predicted mask
    - Y = ground truth mask
    - ε = smoothing constant (default 1e-7)
    """
    def __init__(self, smooth: float = 1e-7, reduction: str = 'mean'):
        """
        Args:
            smooth: Smoothing constant to prevent division by zero
            reduction: 'mean' or 'sum'
        """
        super(DiceLoss, self).__init__()
        self.smooth = smooth
        self.reduction = reduction
    
    def forward(self, logits: torch.Tensor, targets: torch.Tensor,
                weights: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Compute Dice loss
        
        Args:
            logits: Predicted logits (B, C, H, W)
            targets: Ground truth binary masks (B, C, H, W)
            weights: Optional class weights (C,)
        
        Returns:
            Dice loss value
        """
        # Ensure predictions are probabilities [0, 1]
        probs = torch.sigmoid(logits)
        
        # Flatten spatial dimensions
        probs_flat = probs.view(probs.size(0), probs.size(1), -1)
        targets_flat = targets.view(targets.size(0), targets.size(1), -1)
        
        # Compute intersection and union
        intersection = (probs_flat * targets_flat).sum(dim=2)
        cardinality = (probs_flat + targets_flat).sum(dim=2)
        
        # Compute Dice coefficient
        dice_coef = (2.0 * intersection + self.smooth) / (cardinality + self.smooth)
        
        # Apply class weights if provided
        if weights is not None:
            weights = weights.view(1, -1, 1)
            dice_loss = (1.0 - dice_coef) * weights
            dice_loss = dice_loss.sum() / weights.sum()
        else:
            dice_loss = (1.0 - dice_coef).mean()
        
        return dice_loss


# ============================================================================
# 2. JACCARD LOSS (IoU Loss)
# ============================================================================

class JaccardLoss(nn.Module):
    """
    Jaccard Loss (Intersection over Union Loss)
    
    Penalizes predictions that deviate from ground truth in both shape
    and location. More stringent than Dice loss.
    
    Formula:
    L_jaccard = 1 - IoU = 1 - |X ∩ Y| / |X ∪ Y|
    
    where:
    - X = predicted mask
    - Y = ground truth mask
    - Union = X + Y - X ∩ Y
    """
    def __init__(self, smooth: float = 1e-7, reduction: str = 'mean'):
        """
        Args:
            smooth: Smoothing constant to prevent division by zero
            reduction: 'mean' or 'sum'
        """
        super(JaccardLoss, self).__init__()
        self.smooth = smooth
        self.reduction = reduction
    
    def forward(self, logits: torch.Tensor, targets: torch.Tensor,
                weights: Optional[torch.Tensor] = None) -> torch.Tensor:
        """
        Compute Jaccard/IoU loss
        
        Args:
            logits: Predicted logits (B, C, H, W)
            targets: Ground truth binary masks (B, C, H, W)
            weights: Optional class weights (C,)
        
        Returns:
            Jaccard loss value
        """
        # Ensure predictions are probabilities [0, 1]
        probs = torch.sigmoid(logits)
        
        # Flatten spatial dimensions
        probs_flat = probs.view(probs.size(0), probs.size(1), -1)
        targets_flat = targets.view(targets.size(0), targets.size(1), -1)
        
        # Compute intersection and union
        intersection = (probs_flat * targets_flat).sum(dim=2)
        union = probs_flat.sum(dim=2) + targets_flat.sum(dim=2) - intersection
        
        # Compute IoU
        iou = (intersection + self.smooth) / (union + self.smooth)
        
        # Compute Jaccard loss
        if weights is not None:
            weights = weights.view(1, -1, 1)
            jaccard_loss = (1.0 - iou) * weights
            jaccard_loss = jaccard_loss.sum() / weights.sum()
        else:
            jaccard_loss = (1.0 - iou).mean()
        
        return jaccard_loss


# ============================================================================
# 3. BOUNDARY LOSS
# ============================================================================

class BoundaryLoss(nn.Module):
    """
    Boundary Loss for highly imbalanced segmentation
    
    Minimizes distance between predicted and ground truth boundaries.
    Critical for medical imaging where precise lesion delineation affects
    treatment planning.
    
    The loss computes:
    L_boundary = integral over Ω of ∂G(p) * S_θ(p) dp
    
    where:
    - ∂G = boundary of ground truth
    - S_θ = softmax probability output of network
    - ∂G(p) = level-set representation (distance map)
    
    References:
    Kervadec et al. "Boundary Loss for Highly Unbalanced 
    Segmentation" (MIDL, 2019)
    """
    def __init__(self, reduction: str = 'mean'):
        """
        Args:
            reduction: 'mean' or 'sum'
        """
        super(BoundaryLoss, self).__init__()
        self.reduction = reduction
    
    @staticmethod
    def _compute_distance_map(ground_truth: torch.Tensor) -> torch.Tensor:
        """
        Compute distance map from binary ground truth mask.
        Positive inside object, negative outside.
        
        Args:
            ground_truth: Binary mask (B, C, H, W)
        
        Returns:
            Distance map (B, C, H, W)
        """
        # Import required for scipy-based distance transform
        from scipy.ndimage import distance_transform_edt
        
        batch_size, channels, h, w = ground_truth.shape
        distance_map = torch.zeros_like(ground_truth, dtype=torch.float32)
        
        for b in range(batch_size):
            for c in range(channels):
                mask = ground_truth[b, c].cpu().numpy()
                
                # Distance transform for object (positive)
                posdist = distance_transform_edt(mask)
                
                # Distance transform for background (negative)
                negdist = distance_transform_edt(1 - mask)
                
                # Combined signed distance
                distance_map[b, c] = torch.tensor(
                    posdist - negdist, dtype=torch.float32
                )
        
        return distance_map.to(ground_truth.device)
    
    @staticmethod
    def _compute_distance_map_fast(ground_truth: torch.Tensor) -> torch.Tensor:
        """
        Faster PyTorch implementation of distance map computation.
        Uses cumulative operations to approximate distance transform.
        
        Args:
            ground_truth: Binary mask (B, C, H, W)
        
        Returns:
            Distance map approximation (B, C, H, W)
        """
        batch_size, channels, h, w = ground_truth.shape
        device = ground_truth.device
        
        # Create coordinate grids
        y_coords = torch.arange(h, dtype=torch.float32, device=device)
        x_coords = torch.arange(w, dtype=torch.float32, device=device)
        yy, xx = torch.meshgrid(y_coords, x_coords, indexing='ij')
        
        distance_map = torch.zeros_like(ground_truth, dtype=torch.float32)
        
        for b in range(batch_size):
            for c in range(channels):
                mask = ground_truth[b, c]
                
                # Find boundary pixels (gradient non-zero)
                kernel = torch.tensor([[-1, 0, 1],
                                      [-2, 0, 2],
                                      [-1, 0, 1]], dtype=torch.float32, device=device)
                kernel = kernel.view(1, 1, 3, 3)
                
                mask_expanded = mask.unsqueeze(0).unsqueeze(0)
                edges = F.conv2d(mask_expanded, kernel, padding=1)
                edges = edges.squeeze()
                boundary = (edges.abs() > 0.5).float()
                
                # For each pixel, compute minimum distance to boundary
                dist = torch.zeros_like(mask)
                for i in range(h):
                    for j in range(w):
                        boundary_points = torch.where(boundary > 0)
                        if len(boundary_points[0]) > 0:
                            by, bx = boundary_points
                            distances = torch.sqrt((i - by.float())**2 + 
                                                 (j - bx.float())**2)
                            dist[i, j] = distances.min()
                
                # Apply sign based on inside/outside
                distance_map[b, c] = torch.where(mask > 0.5, dist, -dist)
        
        return distance_map
    
    def forward(self, logits: torch.Tensor, targets: torch.Tensor,
                use_scipy: bool = True) -> torch.Tensor:
        """
        Compute boundary loss
        
        Args:
            logits: Predicted logits (B, C, H, W)
            targets: Ground truth binary masks (B, C, H, W) with values in {0, 1}
            use_scipy: If True, use scipy distance transform (more accurate but slower)
                      If False, use fast PyTorch implementation
        
        Returns:
            Boundary loss value
        """
        # Get probability predictions
        probs = torch.sigmoid(logits)
        
        # Compute distance map (signed distance to boundary)
        if use_scipy:
            distance_map = self._compute_distance_map(targets)
        else:
            distance_map = self._compute_distance_map_fast(targets)
        
        # Compute boundary loss: integral of distance_map * prob
        # Approximated as mean: sum(distance_map * prob) / pixels
        loss = (distance_map * probs).mean()
        
        return loss


# ============================================================================
# 4. COMPOSITE LOSS FUNCTION
# ============================================================================

class CompositeLoss(nn.Module):
    """
    Composite loss combining Dice, Jaccard, and Boundary losses
    
    L_total = λ_d * L_dice + λ_j * L_jaccard + λ_b * L_boundary
    
    This combination ensures:
    - Region similarity (Dice)
    - Spatial placement (Jaccard)
    - Precise boundaries (Boundary)
    """
    def __init__(self, lambda_dice: float = 1.0, lambda_jaccard: float = 1.0,
                 lambda_boundary: float = 1.0, use_boundary: bool = True,
                 use_scipy: bool = True, smooth: float = 1e-7):
        """
        Args:
            lambda_dice: Weight for Dice loss
            lambda_jaccard: Weight for Jaccard loss
            lambda_boundary: Weight for Boundary loss
            use_boundary: Whether to include boundary loss
            use_scipy: Use scipy for distance transform (slower but more accurate)
            smooth: Smoothing constant
        """
        super(CompositeLoss, self).__init__()
        
        self.lambda_dice = lambda_dice
        self.lambda_jaccard = lambda_jaccard
        self.lambda_boundary = lambda_boundary
        self.use_boundary = use_boundary
        self.use_scipy = use_scipy
        
        self.dice_loss = DiceLoss(smooth=smooth)
        self.jaccard_loss = JaccardLoss(smooth=smooth)
        
        if use_boundary:
            self.boundary_loss = BoundaryLoss()
    
    def forward(self, logits: torch.Tensor, targets: torch.Tensor,
                class_weights: Optional[torch.Tensor] = None) -> dict:
        """
        Compute composite loss
        
        Args:
            logits: Predicted logits (B, C, H, W)
            targets: Ground truth masks (B, C, H, W)
            class_weights: Optional per-class weights (C,)
        
        Returns:
            Dictionary containing individual and total losses
        """
        # Compute individual losses
        dice = self.dice_loss(logits, targets, class_weights)
        jaccard = self.jaccard_loss(logits, targets, class_weights)
        
        boundary = torch.tensor(0.0, device=logits.device)
        if self.use_boundary:
            boundary = self.boundary_loss(logits, targets, use_scipy=self.use_scipy)
        
        # Compute total loss
        total_loss = (self.lambda_dice * dice + 
                     self.lambda_jaccard * jaccard + 
                     self.lambda_boundary * boundary)
        
        return {
            'total': total_loss,
            'dice': dice.detach(),
            'jaccard': jaccard.detach(),
            'boundary': boundary.detach() if isinstance(boundary, torch.Tensor) else boundary
        }


# ============================================================================
# 5. ALTERNATIVE LOSSES
# ============================================================================

class FocalLoss(nn.Module):
    """
    Focal Loss for addressing class imbalance
    
    Reduces weight of easy examples and focuses on hard examples.
    Particularly useful when foreground (lesion) pixels are rare.
    
    Formula: FL = -α(1-p_t)^γ log(p_t)
    """
    def __init__(self, alpha: float = 0.25, gamma: float = 2.0):
        """
        Args:
            alpha: Weighting factor in range (0,1) to balance
                   positive/negative examples. Default: 0.25
            gamma: Exponent of the modulating factor (1 - p_t)^gamma
        """
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
    
    def forward(self, logits: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        """
        Args:
            logits: Predicted logits (B, C, H, W)
            targets: Ground truth binary masks (B, C, H, W)
        
        Returns:
            Focal loss value
        """
        # Convert logits to probabilities
        probs = torch.sigmoid(logits)
        
        # Clip to prevent log(0)
        probs = torch.clamp(probs, 1e-7, 1.0 - 1e-7)
        
        # Calculate focal weight: (1 - p_t)^gamma
        p_t = torch.where(targets > 0.5, probs, 1.0 - probs)
        focal_weight = (1.0 - p_t) ** self.gamma
        
        # Calculate log loss
        bce = F.binary_cross_entropy_with_logits(logits, targets, reduction='none')
        
        # Calculate focal loss
        focal_loss = self.alpha * focal_weight * bce
        
        return focal_loss.mean()


class TverskyLoss(nn.Module):
    """
    Tversky Loss: Generalization of Dice loss with tunable false positive/negative balance
    
    TL = 1 - (TP + ε) / (TP + α*FP + β*FN + ε)
    
    Setting α = β = 0.5 recovers Dice loss
    Setting α > 0.5 penalizes false positives more
    Setting β > 0.5 penalizes false negatives more
    """
    def __init__(self, alpha: float = 0.5, beta: float = 0.5, smooth: float = 1e-7):
        """
        Args:
            alpha: Weight of false positives
            beta: Weight of false negatives
            smooth: Smoothing constant
        """
        super(TverskyLoss, self).__init__()
        self.alpha = alpha
        self.beta = beta
        self.smooth = smooth
    
    def forward(self, logits: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        """
        Args:
            logits: Predicted logits (B, C, H, W)
            targets: Ground truth binary masks (B, C, H, W)
        
        Returns:
            Tversky loss value
        """
        probs = torch.sigmoid(logits)
        
        # Flatten
        probs_flat = probs.view(-1)
        targets_flat = targets.view(-1)
        
        # Compute terms
        tp = (probs_flat * targets_flat).sum()
        fp = (probs_flat * (1.0 - targets_flat)).sum()
        fn = ((1.0 - probs_flat) * targets_flat).sum()
        
        # Tversky index
        tversky_index = (tp + self.smooth) / (tp + self.alpha * fp + 
                                              self.beta * fn + self.smooth)
        
        return 1.0 - tversky_index


if __name__ == "__main__":
    # Example usage
    print("Loss Functions for TBConvL-Net")
    print("=" * 60)
    
    # Create dummy data
    batch_size, channels, height, width = 2, 1, 32, 32
    logits = torch.randn(batch_size, channels, height, width)
    targets = torch.randint(0, 2, (batch_size, channels, height, width)).float()
    
    # Test individual losses
    print("\nIndividual Loss Functions:")
    print("-" * 60)
    
    dice = DiceLoss()
    print(f"Dice Loss: {dice(logits, targets).item():.6f}")
    
    jaccard = JaccardLoss()
    print(f"Jaccard Loss: {jaccard(logits, targets).item():.6f}")
    
    focal = FocalLoss()
    print(f"Focal Loss: {focal(logits, targets).item():.6f}")
    
    tversky = TverskyLoss()
    print(f"Tversky Loss: {tversky(logits, targets).item():.6f}")
    
    # Test composite loss (without boundary for speed)
    print("\nComposite Loss (Dice + Jaccard + Boundary):")
    print("-" * 60)
    composite = CompositeLoss(lambda_dice=1.0, lambda_jaccard=1.0,
                              lambda_boundary=0.01, use_boundary=False)
    losses = composite(logits, targets)
    
    for key, value in losses.items():
        print(f"{key.capitalize()}: {value.item():.6f}")
"""
Training script for TBConvL-Net

Complete training pipeline including:
- Model initialization
- Data loading
- Training loop with validation
- Checkpointing
- Learning rate scheduling
- Early stopping
"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau, CosineAnnealingLR
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter

import numpy as np
import time
import argparse
from pathlib import Path
from typing import Tuple, Dict, Optional
import json
import warnings

from tbconvl_net_model import TBConvLNet
from losses import CompositeLoss, DiceLoss, JaccardLoss
from metrics import MetricsAggregator, compute_batch_metrics
from data_utils import create_data_loaders, NumpyMedicalDataset

warnings.filterwarnings('ignore')


# ============================================================================
# 1. CONFIGURATION
# ============================================================================

class Config:
    """Training configuration"""
    def __init__(self):
        # Model
        self.model_name = "TBConvL-Net"
        self.in_channels = 3
        self.num_classes = 1
        self.num_filters = 16
        
        # Training
        self.batch_size = 16
        self.learning_rate = 1e-3
        self.weight_decay = 1e-5
        self.num_epochs = 100
        self.early_stopping_patience = 20
        
        # Loss weights
        self.lambda_dice = 1.0
        self.lambda_jaccard = 1.0
        self.lambda_boundary = 0.01  # Gradually reduced during training
        
        # Data
        self.img_size = (256, 256)
        self.train_split = 0.8
        self.val_split = 0.1
        self.num_workers = 4
        self.augmentation = True
        
        # Optimization
        self.optimizer = 'adam'  # 'adam' or 'sgd'
        self.scheduler = 'cosine'  # 'plateau' or 'cosine'
        self.gradient_clip = 1.0
        
        # Device
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # Logging
        self.log_interval = 10
        self.save_interval = 5
        self.checkpoint_dir = Path('checkpoints')
        self.log_dir = Path('logs')
    
    def to_dict(self) -> dict:
        return {k: v for k, v in self.__dict__.items() 
                if not isinstance(v, (Path,))}


# ============================================================================
# 2. TRAINER CLASS
# ============================================================================

class Trainer:
    """
    Main training class for TBConvL-Net
    """
    def __init__(self, config: Config):
        """
        Initialize trainer
        
        Args:
            config: Configuration object
        """
        self.config = config
        
        # Create directories
        self.config.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.config.log_dir.mkdir(parents=True, exist_ok=True)
        
        # Initialize model
        self.model = TBConvLNet(
            in_channels=config.in_channels,
            num_classes=config.num_classes,
            num_filters=config.num_filters
        ).to(config.device)
        
        # Print model info
        self._print_model_info()
        
        # Initialize optimizer
        self.optimizer = self._create_optimizer()
        
        # Initialize scheduler
        self.scheduler = self._create_scheduler()
        
        # Initialize loss function
        self.criterion = CompositeLoss(
            lambda_dice=config.lambda_dice,
            lambda_jaccard=config.lambda_jaccard,
            lambda_boundary=config.lambda_boundary,
            use_boundary=True,
            use_scipy=False  # Use fast PyTorch version
        )
        
        # Tensorboard writer
        self.writer = SummaryWriter(self.config.log_dir)
        
        # Training state
        self.best_val_loss = float('inf')
        self.best_epoch = 0
        self.patience_counter = 0
        self.global_step = 0
    
    def _print_model_info(self):
        """Print model information"""
        stats = self.model.get_parameter_count()
        print("\n" + "=" * 70)
        print(f"Model: {self.config.model_name}")
        print("=" * 70)
        print(f"Total Parameters: {stats['total_millions']:.2f}M")
        print(f"Trainable Parameters: {stats['trainable_millions']:.2f}M")
        print(f"Device: {self.config.device}")
        print("=" * 70 + "\n")
    
    def _create_optimizer(self) -> torch.optim.Optimizer:
        """Create optimizer"""
        if self.config.optimizer == 'adam':
            optimizer = optim.Adam(
                self.model.parameters(),
                lr=self.config.learning_rate,
                weight_decay=self.config.weight_decay,
                betas=(0.9, 0.999)
            )
        elif self.config.optimizer == 'sgd':
            optimizer = optim.SGD(
                self.model.parameters(),
                lr=self.config.learning_rate,
                weight_decay=self.config.weight_decay,
                momentum=0.9
            )
        else:
            raise ValueError(f"Unknown optimizer: {self.config.optimizer}")
        
        return optimizer
    
    def _create_scheduler(self) -> torch.optim.lr_scheduler._LRScheduler:
        """Create learning rate scheduler"""
        if self.config.scheduler == 'plateau':
            scheduler = ReduceLROnPlateau(
                self.optimizer,
                mode='min',
                factor=0.5,
                patience=10,
                verbose=True
            )
        elif self.config.scheduler == 'cosine':
            scheduler = CosineAnnealingLR(
                self.optimizer,
                T_max=self.config.num_epochs,
                eta_min=1e-6
            )
        else:
            raise ValueError(f"Unknown scheduler: {self.config.scheduler}")
        
        return scheduler
    
    def train_epoch(self, train_loader: DataLoader, epoch: int) -> Dict[str, float]:
        """
        Train for one epoch
        
        Args:
            train_loader: Training data loader
            epoch: Current epoch number
        
        Returns:
            Dictionary with epoch metrics
        """
        self.model.train()
        
        epoch_loss = 0.0
        epoch_metrics = {
            'dice': 0.0,
            'jaccard': 0.0,
            'accuracy': 0.0,
            'sensitivity': 0.0,
            'specificity': 0.0
        }
        
        num_batches = 0
        
        for batch_idx, batch in enumerate(train_loader):
            # Get data
            images = batch['image'].to(self.config.device)
            masks = batch['mask'].to(self.config.device)
            
            # Forward pass
            self.optimizer.zero_grad()
            predictions = self.model(images)
            
            # Compute loss
            loss_dict = self.criterion(predictions, masks)
            loss = loss_dict['total']
            
            # Backward pass
            loss.backward()
            
            # Gradient clipping
            if self.config.gradient_clip > 0:
                torch.nn.utils.clip_grad_norm_(
                    self.model.parameters(),
                    self.config.gradient_clip
                )
            
            self.optimizer.step()
            
            # Accumulate metrics
            epoch_loss += loss.item()
            
            with torch.no_grad():
                metrics = compute_batch_metrics(predictions.detach(), masks.detach())
                for key in epoch_metrics:
                    epoch_metrics[key] += metrics.get(key, 0.0)
            
            num_batches += 1
            self.global_step += 1
            
            # Logging
            if (batch_idx + 1) % self.config.log_interval == 0:
                avg_loss = epoch_loss / num_batches
                print(f"Epoch {epoch+1}/{self.config.num_epochs} | "
                      f"Batch {batch_idx+1}/{len(train_loader)} | "
                      f"Loss: {avg_loss:.6f}")
        
        # Compute epoch averages
        avg_loss = epoch_loss / num_batches
        for key in epoch_metrics:
            epoch_metrics[key] /= num_batches
        
        return {'loss': avg_loss, **epoch_metrics}
    
    def validate(self, val_loader: DataLoader) -> Dict[str, float]:
        """
        Validation step
        
        Args:
            val_loader: Validation data loader
        
        Returns:
            Dictionary with validation metrics
        """
        self.model.eval()
        
        val_loss = 0.0
        val_metrics = MetricsAggregator(threshold=0.5)
        
        num_batches = 0
        
        with torch.no_grad():
            for batch in val_loader:
                # Get data
                images = batch['image'].to(self.config.device)
                masks = batch['mask'].to(self.config.device)
                
                # Forward pass
                predictions = self.model(images)
                
                # Compute loss
                loss_dict = self.criterion(predictions, masks)
                val_loss += loss_dict['total'].item()
                
                # Update metrics
                val_metrics.update(predictions, masks)
                
                num_batches += 1
        
        # Compute averages
        avg_loss = val_loss / num_batches
        results = val_metrics.get_results()
        
        # Extract key metrics
        val_dict = {
            'loss': avg_loss,
            'dice': results.get('dice_mean', 0.0),
            'jaccard': results.get('jaccard_mean', 0.0),
            'accuracy': results.get('accuracy_mean', 0.0),
            'sensitivity': results.get('sensitivity_mean', 0.0),
            'specificity': results.get('specificity_mean', 0.0)
        }
        
        return val_dict
    
    def save_checkpoint(self, epoch: int, is_best: bool = False):
        """
        Save model checkpoint
        
        Args:
            epoch: Current epoch
            is_best: Whether this is the best model so far
        """
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'config': self.config.to_dict(),
            'best_val_loss': self.best_val_loss
        }
        
        # Regular checkpoint
        if (epoch + 1) % self.config.save_interval == 0:
            path = self.config.checkpoint_dir / f'checkpoint_epoch_{epoch+1:03d}.pt'
            torch.save(checkpoint, path)
            print(f"Saved checkpoint: {path}")
        
        # Best model
        if is_best:
            path = self.config.checkpoint_dir / 'best_model.pt'
            torch.save(checkpoint, path)
            print(f"Saved best model: {path}")
    
    def train(self, train_loader: DataLoader, val_loader: DataLoader):
        """
        Complete training loop
        
        Args:
            train_loader: Training data loader
            val_loader: Validation data loader
        """
        print("\n" + "=" * 70)
        print("STARTING TRAINING")
        print("=" * 70 + "\n")
        
        start_time = time.time()
        
        for epoch in range(self.config.num_epochs):
            print(f"\nEpoch {epoch+1}/{self.config.num_epochs}")
            print("-" * 70)
            
            # Train
            train_metrics = self.train_epoch(train_loader, epoch)
            
            # Validate
            val_metrics = self.validate(val_loader)
            
            # Logging
            print(f"\nTrain Loss: {train_metrics['loss']:.6f} | "
                  f"Train Dice: {train_metrics['dice']:.4f} | "
                  f"Train Jaccard: {train_metrics['jaccard']:.4f}")
            
            print(f"Val Loss: {val_metrics['loss']:.6f} | "
                  f"Val Dice: {val_metrics['dice']:.4f} | "
                  f"Val Jaccard: {val_metrics['jaccard']:.4f}")
            
            # Tensorboard logging
            self.writer.add_scalar('train/loss', train_metrics['loss'], epoch)
            self.writer.add_scalar('train/dice', train_metrics['dice'], epoch)
            self.writer.add_scalar('val/loss', val_metrics['loss'], epoch)
            self.writer.add_scalar('val/dice', val_metrics['dice'], epoch)
            self.writer.add_scalar('learning_rate', 
                                 self.optimizer.param_groups[0]['lr'], epoch)
            
            # Update scheduler
            if self.config.scheduler == 'plateau':
                self.scheduler.step(val_metrics['loss'])
            else:
                self.scheduler.step()
            
            # Save checkpoint
            is_best = val_metrics['loss'] < self.best_val_loss
            if is_best:
                self.best_val_loss = val_metrics['loss']
                self.best_epoch = epoch
                self.patience_counter = 0
            else:
                self.patience_counter += 1
            
            self.save_checkpoint(epoch, is_best)
            
            # Early stopping
            if self.patience_counter >= self.config.early_stopping_patience:
                print(f"\nEarly stopping at epoch {epoch+1}")
                print(f"Best epoch: {self.best_epoch+1} with loss: {self.best_val_loss:.6f}")
                break
        
        # Training complete
        elapsed_time = time.time() - start_time
        print("\n" + "=" * 70)
        print(f"Training complete in {elapsed_time/3600:.2f} hours")
        print(f"Best model: Epoch {self.best_epoch+1} with loss {self.best_val_loss:.6f}")
        print("=" * 70)
        
        self.writer.close()
    
    def load_checkpoint(self, checkpoint_path: str):
        """
        Load model from checkpoint
        
        Args:
            checkpoint_path: Path to checkpoint file
        """
        checkpoint = torch.load(checkpoint_path, map_location=self.config.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        
        print(f"Loaded checkpoint from epoch {checkpoint['epoch']+1}")


# ============================================================================
# 3. MAIN SCRIPT
# ============================================================================

def main(args: argparse.Namespace):
    """Main training function"""
    
    # Create config
    config = Config()
    
    # Override with command-line arguments
    for key, value in vars(args).items():
        if hasattr(config, key) and value is not None:
            setattr(config, key, value)
    
    # Create data loaders
    print("\nLoading data...")
    
    if args.npz_file:
        # Load from NPZ file
        dataset = NumpyMedicalDataset(
            npz_file=args.npz_file,
            img_size=config.img_size,
            augmentation=True,
            train=True
        )
        
        # Manual split
        train_size = int(0.8 * len(dataset))
        val_size = len(dataset) - train_size
        
        from torch.utils.data import random_split
        train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
        
        train_loader = DataLoader(
            train_dataset, batch_size=config.batch_size, shuffle=True,
            num_workers=config.num_workers, pin_memory=True
        )
        val_loader = DataLoader(
            val_dataset, batch_size=config.batch_size, shuffle=False,
            num_workers=config.num_workers, pin_memory=True
        )
    else:
        # Create from image/mask directories
        train_loader, val_loader, _ = create_data_loaders(
            image_dir=args.image_dir,
            mask_dir=args.mask_dir,
            batch_size=config.batch_size,
            img_size=config.img_size,
            num_workers=config.num_workers,
            augmentation=config.augmentation
        )
    
    # Create trainer
    trainer = Trainer(config)
    
    # Load checkpoint if specified
    if args.checkpoint:
        trainer.load_checkpoint(args.checkpoint)
    
    # Train
    trainer.train(train_loader, val_loader)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Train TBConvL-Net')
    
    # Data arguments
    parser.add_argument('--image-dir', type=str, default='data/images',
                       help='Directory containing images')
    parser.add_argument('--mask-dir', type=str, default='data/masks',
                       help='Directory containing masks')
    parser.add_argument('--npz-file', type=str, default=None,
                       help='Alternative: NPZ file with preprocessed data')
    
    # Training arguments
    parser.add_argument('--batch-size', type=int, default=16)
    parser.add_argument('--learning-rate', type=float, default=1e-3)
    parser.add_argument('--num-epochs', type=int, default=100)
    parser.add_argument('--optimizer', type=str, default='adam', 
                       choices=['adam', 'sgd'])
    parser.add_argument('--scheduler', type=str, default='cosine',
                       choices=['plateau', 'cosine'])
    
    # Model arguments
    parser.add_argument('--num-filters', type=int, default=16)
    
    # Other arguments
    parser.add_argument('--checkpoint', type=str, default=None,
                       help='Path to checkpoint to resume from')
    parser.add_argument('--device', type=str, default=None,
                       help='Device: cuda or cpu')
    
    args = parser.parse_args()
    
    main(args)
"""
Data utilities for medical image segmentation

Provides:
- Custom DataLoader for medical images
- Data augmentation (rotation, flipping, contrast)
- Preprocessing and normalization
- Support for multiple image formats
"""

import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
import cv2
from typing import Tuple, Optional, List, Dict
from pathlib import Path
from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2


# ============================================================================
# 1. IMAGE PREPROCESSING
# ============================================================================

class MedicalImagePreprocessor:
    """
    Preprocessing pipeline for medical images
    Includes normalization, resizing, and standardization
    """
    def __init__(self, img_size: Tuple[int, int] = (256, 256),
                 normalize: bool = True, to_tensor: bool = False):
        """
        Args:
            img_size: Target image size (H, W)
            normalize: Whether to normalize to [0, 1]
            to_tensor: Whether to convert to PyTorch tensor
        """
        self.img_size = img_size
        self.normalize = normalize
        self.to_tensor = to_tensor
    
    def __call__(self, image: np.ndarray) -> np.ndarray:
        """
        Preprocess image
        
        Args:
            image: Input image (H, W, C) or (H, W)
        
        Returns:
            Preprocessed image
        """
        # Ensure 3 channels
        if len(image.shape) == 2:
            image = np.stack([image] * 3, axis=2)
        
        # Resize
        image = cv2.resize(image, self.img_size, interpolation=cv2.INTER_LINEAR)
        
        # Normalize to [0, 1]
        if self.normalize:
            if image.max() > 1.0:
                image = image.astype(np.float32) / 255.0
            else:
                image = image.astype(np.float32)
        
        # Convert to tensor
        if self.to_tensor:
            image = torch.from_numpy(image)
            if len(image.shape) == 3:
                image = image.permute(2, 0, 1)
        
        return image


# ============================================================================
# 2. DATA AUGMENTATION
# ============================================================================

class MedicalImageAugmentor:
    """
    Data augmentation for medical images
    Conservative augmentation to preserve medical semantics
    """
    def __init__(self, img_size: Tuple[int, int] = (256, 256),
                 apply_augmentation: bool = True, augmentation_strength: float = 1.0):
        """
        Args:
            img_size: Target image size
            apply_augmentation: Whether to apply augmentation
            augmentation_strength: Strength of augmentation (0-1)
        """
        self.img_size = img_size
        self.apply_augmentation = apply_augmentation
        self.strength = augmentation_strength
        
        if apply_augmentation:
            # Conservative augmentation pipeline
            # Following paper's augmentation strategy
            self.augmentation = A.Compose([
                A.Resize(img_size[0], img_size[1]),
                
                # Geometric transformations
                A.HorizontalFlip(p=0.5),
                A.VerticalFlip(p=0.5),
                A.Rotate(limit=10, p=0.3),
                A.ElasticTransform(alpha=50, sigma=5, p=0.2),
                
                # Brightness/Contrast (following paper: 0.9x to 1.1x)
                A.RandomContrast(limit=(-0.1, 0.1), p=0.3),
                A.RandomBrightness(limit=(-0.1, 0.1), p=0.3),
                
                # Noise (medical images can be noisy)
                A.GaussNoise(p=0.1),
                
                # Convert to tensor
                ToTensorV2()
            ], bbox_params=A.BboxParams(format='pascal_voc', min_visibility=0.1))
        else:
            self.augmentation = A.Compose([
                A.Resize(img_size[0], img_size[1]),
                ToTensorV2()
            ], bbox_params=A.BboxParams(format='pascal_voc', min_visibility=0.1))
    
    def __call__(self, image: np.ndarray, mask: np.ndarray) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Apply augmentation to image and mask
        
        Args:
            image: Input image (H, W, C) in [0, 1] or [0, 255]
            mask: Segmentation mask (H, W) with values 0 or 1
        
        Returns:
            Augmented image and mask as tensors
        """
        # Ensure image is float in [0, 1]
        if image.dtype == np.uint8:
            image = image.astype(np.float32) / 255.0
        elif image.max() > 1.0:
            image = image.astype(np.float32) / 255.0
        
        # Ensure mask is float
        mask = mask.astype(np.float32)
        
        # Convert image to uint8 for albumentations
        image_uint8 = (image * 255).astype(np.uint8)
        mask_uint8 = (mask * 255).astype(np.uint8)
        
        # Apply augmentation
        augmented = self.augmentation(
            image=image_uint8,
            mask=mask_uint8
        )
        
        image_tensor = augmented['image'].float()
        mask_tensor = augmented['mask'].float() / 255.0
        mask_tensor = mask_tensor.unsqueeze(0)  # Add channel dimension
        
        return image_tensor, mask_tensor


# ============================================================================
# 3. MEDICAL IMAGE DATASET
# ============================================================================

class MedicalImageDataset(Dataset):
    """
    Generic dataset for medical image segmentation
    Supports various formats: PNG, JPEG, TIFF, NPZ
    """
    def __init__(self, image_dir: str, mask_dir: str, 
                 img_size: Tuple[int, int] = (256, 256),
                 augmentation: bool = True,
                 image_suffix: str = '.png',
                 mask_suffix: str = '_mask.png',
                 normalize: bool = True):
        """
        Args:
            image_dir: Directory containing images
            mask_dir: Directory containing segmentation masks
            img_size: Target image size
            augmentation: Apply data augmentation
            image_suffix: Suffix for image files
            mask_suffix: Suffix for mask files
            normalize: Normalize images
        """
        self.image_dir = Path(image_dir)
        self.mask_dir = Path(mask_dir)
        self.img_size = img_size
        self.normalize = normalize
        self.augmentation = augmentation
        
        # Get list of images
        image_files = list(self.image_dir.glob(f'*{image_suffix}'))
        self.image_paths = sorted(image_files)
        
        # Find corresponding masks
        self.mask_paths = []
        for img_path in self.image_paths:
            mask_name = img_path.stem + mask_suffix
            mask_path = self.mask_dir / mask_name
            if mask_path.exists():
                self.mask_paths.append(mask_path)
            else:
                # Try alternative mask name
                mask_path_alt = self.mask_dir / f"{img_path.stem.replace(image_suffix, '')}_mask.png"
                if mask_path_alt.exists():
                    self.mask_paths.append(mask_path_alt)
                else:
                    print(f"Warning: No mask found for {img_path}")
        
        # Augmentor
        if augmentation:
            self.augmentor = MedicalImageAugmentor(img_size, apply_augmentation=True)
        else:
            self.augmentor = MedicalImageAugmentor(img_size, apply_augmentation=False)
        
        self.preprocessor = MedicalImagePreprocessor(img_size, normalize=normalize)
        
        print(f"Loaded {len(self.image_paths)} images with masks")
    
    def __len__(self) -> int:
        return len(self.image_paths)
    
    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """
        Get image and mask
        
        Returns:
            Dictionary with 'image' and 'mask' tensors
        """
        # Load image
        img_path = self.image_paths[idx]
        image = cv2.imread(str(img_path))
        if image is None:
            raise ValueError(f"Could not load image: {img_path}")
        
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Load mask
        mask_path = self.mask_paths[idx]
        mask = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
        if mask is None:
            raise ValueError(f"Could not load mask: {mask_path}")
        
        # Convert mask to binary (0 or 1)
        mask = (mask > 127).astype(np.uint8)
        
        # Apply augmentation
        if self.augmentation:
            image, mask = self.augmentor(image, mask)
        else:
            # Just resize without augmentation
            image = cv2.resize(image, self.img_size)
            mask = cv2.resize(mask, self.img_size, interpolation=cv2.INTER_NEAREST)
            
            # Normalize image
            if self.normalize:
                image = image.astype(np.float32) / 255.0
            
            image = torch.from_numpy(image).permute(2, 0, 1).float()
            mask = torch.from_numpy(mask).float().unsqueeze(0)
        
        return {
            'image': image,
            'mask': mask,
            'path': str(img_path)
        }


# ============================================================================
# 4. NUMPY DATASET
# ============================================================================

class NumpyMedicalDataset(Dataset):
    """
    Dataset for medical images stored as numpy arrays
    Useful for pre-processed datasets
    """
    def __init__(self, npz_file: str, 
                 img_size: Tuple[int, int] = (256, 256),
                 augmentation: bool = True,
                 normalize: bool = True,
                 train: bool = True):
        """
        Args:
            npz_file: Path to NPZ file containing 'images' and 'masks'
            img_size: Target image size
            augmentation: Apply augmentation
            normalize: Normalize images
            train: Whether this is training set (affects augmentation)
        """
        # Load data
        data = np.load(npz_file)
        self.images = data['images']  # (N, H, W, C) or (N, H, W)
        self.masks = data['masks']    # (N, H, W)
        
        self.img_size = img_size
        self.normalize = normalize
        self.augmentation = augmentation and train
        
        self.augmentor = MedicalImageAugmentor(
            img_size, apply_augmentation=self.augmentation
        )
        
        print(f"Loaded {len(self.images)} images from {npz_file}")
    
    def __len__(self) -> int:
        return len(self.images)
    
    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        """Get image and mask"""
        image = self.images[idx]
        mask = self.masks[idx]
        
        # Ensure 3 channels
        if len(image.shape) == 2:
            image = np.stack([image] * 3, axis=2)
        
        # Convert to uint8
        if image.max() <= 1.0:
            image = (image * 255).astype(np.uint8)
        else:
            image = image.astype(np.uint8)
        
        # Ensure binary mask
        mask = (mask > 0.5).astype(np.uint8)
        
        # Apply augmentation
        if self.augmentation:
            image, mask = self.augmentor(image, mask)
        else:
            image = cv2.resize(image, self.img_size)
            mask = cv2.resize(mask, self.img_size, interpolation=cv2.INTER_NEAREST)
            
            image = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
            mask = torch.from_numpy(mask).float().unsqueeze(0)
        
        return {
            'image': image,
            'mask': mask
        }


# ============================================================================
# 5. DATA UTILITIES
# ============================================================================

def create_data_loaders(
    image_dir: str, mask_dir: str,
    batch_size: int = 16,
    img_size: Tuple[int, int] = (256, 256),
    train_split: float = 0.8,
    val_split: float = 0.1,
    num_workers: int = 4,
    normalize: bool = True,
    augmentation: bool = True
) -> Tuple[DataLoader, DataLoader, DataLoader]:
    """
    Create train, validation, and test data loaders
    
    Args:
        image_dir: Directory with images
        mask_dir: Directory with masks
        batch_size: Batch size
        img_size: Image size
        train_split: Fraction for training (default 0.8)
        val_split: Fraction for validation (default 0.1)
        num_workers: Number of workers for data loading
        normalize: Normalize images
        augmentation: Apply augmentation
    
    Returns:
        Tuple of (train_loader, val_loader, test_loader)
    """
    # Create dataset
    dataset = MedicalImageDataset(
        image_dir=image_dir,
        mask_dir=mask_dir,
        img_size=img_size,
        augmentation=augmentation,
        normalize=normalize
    )
    
    # Split dataset
    total_size = len(dataset)
    train_size = int(train_split * total_size)
    val_size = int(val_split * total_size)
    test_size = total_size - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = random_split(
        dataset, [train_size, val_size, test_size]
    )
    
    # Create loaders
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, 
        num_workers=num_workers, pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True
    )
    
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True
    )
    
    print(f"\nDataset splits:")
    print(f"  Train: {train_size} ({100*train_size/total_size:.1f}%)")
    print(f"  Val:   {val_size} ({100*val_size/total_size:.1f}%)")
    print(f"  Test:  {test_size} ({100*test_size/total_size:.1f}%)")
    
    return train_loader, val_loader, test_loader


def get_image_statistics(data_loader: DataLoader) -> Dict[str, np.ndarray]:
    """
    Calculate mean and std of images for normalization
    
    Args:
        data_loader: DataLoader instance
    
    Returns:
        Dictionary with 'mean' and 'std' statistics
    """
    images = []
    
    for batch in data_loader:
        images.append(batch['image'].numpy())
    
    images = np.concatenate(images, axis=0)  # (B, C, H, W)
    images = images.reshape(-1, images.shape[1])  # (B*H*W, C)
    
    mean = images.mean(axis=0)
    std = images.std(axis=0)
    
    return {'mean': mean, 'std': std}


# ============================================================================
# 6. VISUALIZATION UTILITIES
# ============================================================================

def visualize_batch(batch: Dict, num_samples: int = 4):
    """
    Visualize a batch of images and masks
    
    Args:
        batch: Dictionary with 'image' and 'mask' tensors
        num_samples: Number of samples to display
    """
    try:
        import matplotlib.pyplot as plt
    except ImportError:
        print("Matplotlib not available for visualization")
        return
    
    images = batch['image'][:num_samples]
    masks = batch['mask'][:num_samples]
    
    fig, axes = plt.subplots(num_samples, 2, figsize=(10, 4*num_samples))
    
    for i in range(num_samples):
        # Image (convert from tensor)
        img = images[i].permute(1, 2, 0).numpy()
        img = np.clip(img, 0, 1)
        
        axes[i, 0].imshow(img)
        axes[i, 0].set_title('Image')
        axes[i, 0].axis('off')
        
        # Mask
        mask = masks[i, 0].numpy()
        axes[i, 1].imshow(mask, cmap='gray')
        axes[i, 1].set_title('Mask')
        axes[i, 1].axis('off')
    
    plt.tight_layout()
    plt.show()


if __name__ == "__main__":
    print("Medical Image Data Utilities")
    print("=" * 60)
    
    # Example: Create dummy dataset
    print("\nExample: Creating toy dataset structure...")
    
    # Note: In practice, you would have real image directories
    # This is just to demonstrate the API
    
    print("\nTo use with your own data:")
    print("1. Organize images in: data/images/")
    print("2. Organize masks in:  data/masks/")
    print("3. Create data loader:")
    print("""
    train_loader, val_loader, test_loader = create_data_loaders(
        image_dir='data/images',
        mask_dir='data/masks',
        batch_size=16,
        img_size=(256, 256),
        train_split=0.8,
        val_split=0.1
    )
    """)
"""
Evaluation Metrics for Medical Image Segmentation

Metrics:
- Jaccard Index (IoU)
- Dice Similarity Coefficient
- Accuracy
- Sensitivity (Recall, True Positive Rate)
- Specificity (True Negative Rate)
- Precision (Positive Predictive Value)
- F1-Score
"""

import torch
import torch.nn.functional as F
import numpy as np
from typing import Dict, Tuple
from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve, auc


# ============================================================================
# 1. CORE METRICS
# ============================================================================

class SegmentationMetrics:
    """
    Collection of segmentation evaluation metrics
    """
    
    @staticmethod
    def jaccard_index(predictions: torch.Tensor, targets: torch.Tensor,
                     threshold: float = 0.5, smooth: float = 1e-7) -> float:
        """
        Jaccard Index (Intersection over Union)
        
        J = |X ∩ Y| / |X ∪ Y|
        
        Args:
            predictions: Model predictions (B, C, H, W) in [0, 1]
            targets: Ground truth masks (B, C, H, W) binary
            threshold: Threshold for binarizing predictions
            smooth: Smoothing constant
        
        Returns:
            Jaccard index (0-1, higher is better)
        """
        # Binarize predictions
        predictions = (predictions > threshold).float()
        
        # Flatten
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        # Compute intersection and union
        intersection = (pred_flat * target_flat).sum()
        union = pred_flat.sum() + target_flat.sum() - intersection
        
        # Jaccard index
        jaccard = (intersection + smooth) / (union + smooth)
        
        return jaccard.item()
    
    @staticmethod
    def dice_coefficient(predictions: torch.Tensor, targets: torch.Tensor,
                        threshold: float = 0.5, smooth: float = 1e-7) -> float:
        """
        Dice Similarity Coefficient (F1 Score)
        
        DSC = 2|X ∩ Y| / (|X| + |Y|)
        
        Args:
            predictions: Model predictions (B, C, H, W)
            targets: Ground truth masks (B, C, H, W)
            threshold: Threshold for binarizing predictions
            smooth: Smoothing constant
        
        Returns:
            Dice coefficient (0-1, higher is better)
        """
        # Binarize
        predictions = (predictions > threshold).float()
        
        # Flatten
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        # Compute Dice
        intersection = (pred_flat * target_flat).sum()
        dice = (2 * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)
        
        return dice.item()
    
    @staticmethod
    def accuracy(predictions: torch.Tensor, targets: torch.Tensor,
                threshold: float = 0.5) -> float:
        """
        Pixel-wise accuracy
        
        Acc = (TP + TN) / (TP + TN + FP + FN)
        
        Args:
            predictions: Model predictions
            targets: Ground truth masks
            threshold: Binarization threshold
        
        Returns:
            Accuracy (0-1)
        """
        predictions = (predictions > threshold).float()
        
        # Flatten
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        # Compute accuracy
        correct = (pred_flat == target_flat).float().sum()
        accuracy = correct / pred_flat.numel()
        
        return accuracy.item()
    
    @staticmethod
    def sensitivity(predictions: torch.Tensor, targets: torch.Tensor,
                   threshold: float = 0.5, smooth: float = 1e-7) -> float:
        """
        Sensitivity (Recall, True Positive Rate)
        
        Sen = TP / (TP + FN)
        
        Measures ability to correctly identify positive cases.
        Important for detecting lesions.
        
        Args:
            predictions: Model predictions
            targets: Ground truth masks
            threshold: Binarization threshold
            smooth: Smoothing constant
        
        Returns:
            Sensitivity (0-1)
        """
        predictions = (predictions > threshold).float()
        
        # Flatten
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        # Compute TP and FN
        tp = ((pred_flat == 1) & (target_flat == 1)).float().sum()
        fn = ((pred_flat == 0) & (target_flat == 1)).float().sum()
        
        # Sensitivity
        sensitivity = (tp + smooth) / (tp + fn + smooth)
        
        return sensitivity.item()
    
    @staticmethod
    def specificity(predictions: torch.Tensor, targets: torch.Tensor,
                   threshold: float = 0.5, smooth: float = 1e-7) -> float:
        """
        Specificity (True Negative Rate)
        
        Spec = TN / (TN + FP)
        
        Measures ability to correctly identify negative cases.
        Important for avoiding false positives.
        
        Args:
            predictions: Model predictions
            targets: Ground truth masks
            threshold: Binarization threshold
            smooth: Smoothing constant
        
        Returns:
            Specificity (0-1)
        """
        predictions = (predictions > threshold).float()
        
        # Flatten
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        # Compute TN and FP
        tn = ((pred_flat == 0) & (target_flat == 0)).float().sum()
        fp = ((pred_flat == 1) & (target_flat == 0)).float().sum()
        
        # Specificity
        specificity = (tn + smooth) / (tn + fp + smooth)
        
        return specificity.item()
    
    @staticmethod
    def precision(predictions: torch.Tensor, targets: torch.Tensor,
                 threshold: float = 0.5, smooth: float = 1e-7) -> float:
        """
        Precision (Positive Predictive Value)
        
        Prec = TP / (TP + FP)
        
        Of predicted positives, how many are actually positive.
        
        Args:
            predictions: Model predictions
            targets: Ground truth masks
            threshold: Binarization threshold
            smooth: Smoothing constant
        
        Returns:
            Precision (0-1)
        """
        predictions = (predictions > threshold).float()
        
        # Flatten
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        # Compute TP and FP
        tp = ((pred_flat == 1) & (target_flat == 1)).float().sum()
        fp = ((pred_flat == 1) & (target_flat == 0)).float().sum()
        
        # Precision
        precision = (tp + smooth) / (tp + fp + smooth)
        
        return precision.item()
    
    @staticmethod
    def f1_score(predictions: torch.Tensor, targets: torch.Tensor,
                threshold: float = 0.5, smooth: float = 1e-7) -> float:
        """
        F1 Score (Harmonic mean of Precision and Recall)
        
        F1 = 2 * (Precision * Recall) / (Precision + Recall)
        
        Args:
            predictions: Model predictions
            targets: Ground truth masks
            threshold: Binarization threshold
            smooth: Smoothing constant
        
        Returns:
            F1 score (0-1)
        """
        precision = SegmentationMetrics.precision(predictions, targets, threshold, smooth)
        recall = SegmentationMetrics.sensitivity(predictions, targets, threshold, smooth)
        
        f1 = (2 * precision * recall) / (precision + recall + smooth)
        
        return f1
    
    @staticmethod
    def hausdorff_distance(predictions: torch.Tensor, targets: torch.Tensor,
                          threshold: float = 0.5) -> float:
        """
        Hausdorff Distance (Maximum distance between boundaries)
        
        Measures largest deviation between predicted and true boundaries.
        Important for clinical applications requiring precise delineation.
        
        Args:
            predictions: Model predictions (single sample)
            targets: Ground truth masks (single sample)
            threshold: Binarization threshold
        
        Returns:
            Hausdorff distance in pixels
        """
        from scipy.ndimage import distance_transform_edt
        
        # Binarize
        pred = (predictions > threshold).float().numpy()
        target = targets.numpy()
        
        # Compute distance transforms
        pred_dist = distance_transform_edt(1 - pred)
        target_dist = distance_transform_edt(1 - target)
        
        # Hausdorff distance
        h_dist = max(pred_dist[target > 0].max(), 
                    target_dist[pred > 0].max())
        
        return h_dist


# ============================================================================
# 2. CONFUSION MATRIX AND ROC CURVE
# ============================================================================

class AdvancedMetrics:
    """
    Advanced metrics for segmentation evaluation
    """
    
    @staticmethod
    def confusion_matrix_metrics(predictions: torch.Tensor, targets: torch.Tensor,
                                threshold: float = 0.5) -> Dict[str, int]:
        """
        Compute confusion matrix elements (TP, TN, FP, FN)
        
        Args:
            predictions: Model predictions
            targets: Ground truth masks
            threshold: Binarization threshold
        
        Returns:
            Dictionary with TP, TN, FP, FN
        """
        predictions = (predictions > threshold).float()
        
        pred_flat = predictions.view(-1)
        target_flat = targets.view(-1)
        
        tp = int(((pred_flat == 1) & (target_flat == 1)).sum())
        tn = int(((pred_flat == 0) & (target_flat == 0)).sum())
        fp = int(((pred_flat == 1) & (target_flat == 0)).sum())
        fn = int(((pred_flat == 0) & (target_flat == 1)).sum())
        
        return {'TP': tp, 'TN': tn, 'FP': fp, 'FN': fn}
    
    @staticmethod
    def compute_roc_auc(predictions: torch.Tensor, targets: torch.Tensor) -> float:
        """
        Compute ROC AUC score
        
        Measures performance across all classification thresholds.
        
        Args:
            predictions: Model predictions (B, 1, H, W) with values in [0, 1]
            targets: Ground truth masks (B, 1, H, W) binary
        
        Returns:
            ROC AUC score (0-1)
        """
        # Flatten
        pred_flat = predictions.view(-1).detach().cpu().numpy()
        target_flat = targets.view(-1).detach().cpu().numpy()
        
        # Avoid edge cases
        if len(np.unique(target_flat)) < 2:
            return 0.0
        
        try:
            auc_score = roc_auc_score(target_flat, pred_flat)
            return auc_score
        except:
            return 0.0


# ============================================================================
# 3. METRIC AGGREGATOR
# ============================================================================

class MetricsAggregator:
    """
    Aggregates metrics across multiple batches
    """
    def __init__(self, threshold: float = 0.5):
        """
        Args:
            threshold: Binarization threshold
        """
        self.threshold = threshold
        self.reset()
    
    def reset(self):
        """Reset all metrics"""
        self.metrics = {
            'jaccard': [],
            'dice': [],
            'accuracy': [],
            'sensitivity': [],
            'specificity': [],
            'precision': [],
            'f1': [],
            'auc': []
        }
    
    def update(self, predictions: torch.Tensor, targets: torch.Tensor):
        """
        Update metrics with batch
        
        Args:
            predictions: Model predictions (B, C, H, W)
            targets: Ground truth masks (B, C, H, W)
        """
        # Compute metrics for each sample
        for i in range(predictions.shape[0]):
            pred = predictions[i:i+1]
            target = targets[i:i+1]
            
            self.metrics['jaccard'].append(
                SegmentationMetrics.jaccard_index(pred, target, self.threshold)
            )
            self.metrics['dice'].append(
                SegmentationMetrics.dice_coefficient(pred, target, self.threshold)
            )
            self.metrics['accuracy'].append(
                SegmentationMetrics.accuracy(pred, target, self.threshold)
            )
            self.metrics['sensitivity'].append(
                SegmentationMetrics.sensitivity(pred, target, self.threshold)
            )
            self.metrics['specificity'].append(
                SegmentationMetrics.specificity(pred, target, self.threshold)
            )
            self.metrics['precision'].append(
                SegmentationMetrics.precision(pred, target, self.threshold)
            )
            self.metrics['f1'].append(
                SegmentationMetrics.f1_score(pred, target, self.threshold)
            )
            self.metrics['auc'].append(
                AdvancedMetrics.compute_roc_auc(pred, target)
            )
    
    def get_results(self) -> Dict[str, float]:
        """
        Get aggregated metrics
        
        Returns:
            Dictionary with mean metrics
        """
        results = {}
        for key, values in self.metrics.items():
            if values:
                results[f'{key}_mean'] = np.mean(values)
                results[f'{key}_std'] = np.std(values)
            else:
                results[f'{key}_mean'] = 0.0
                results[f'{key}_std'] = 0.0
        
        return results
    
    def get_summary(self) -> str:
        """
        Get human-readable summary
        
        Returns:
            Formatted string with metrics
        """
        results = self.get_results()
        
        summary = "\n" + "=" * 60 + "\n"
        summary += "SEGMENTATION METRICS SUMMARY\n"
        summary += "=" * 60 + "\n"
        
        metrics_order = ['jaccard', 'dice', 'accuracy', 'sensitivity', 
                         'specificity', 'precision', 'f1', 'auc']
        
        for metric in metrics_order:
            mean = results.get(f'{metric}_mean', 0.0)
            std = results.get(f'{metric}_std', 0.0)
            summary += f"{metric.upper():15s}: {mean:.4f} ± {std:.4f}\n"
        
        summary += "=" * 60 + "\n"
        
        return summary
    
    def __str__(self):
        return self.get_summary()


# ============================================================================
# 4. BATCH METRICS COMPUTATION
# ============================================================================

def compute_batch_metrics(predictions: torch.Tensor, targets: torch.Tensor,
                         threshold: float = 0.5) -> Dict[str, float]:
    """
    Compute all metrics for a batch
    
    Args:
        predictions: Model predictions (B, C, H, W)
        targets: Ground truth masks (B, C, H, W)
        threshold: Binarization threshold
    
    Returns:
        Dictionary with metrics
    """
    return {
        'jaccard': SegmentationMetrics.jaccard_index(predictions, targets, threshold),
        'dice': SegmentationMetrics.dice_coefficient(predictions, targets, threshold),
        'accuracy': SegmentationMetrics.accuracy(predictions, targets, threshold),
        'sensitivity': SegmentationMetrics.sensitivity(predictions, targets, threshold),
        'specificity': SegmentationMetrics.specificity(predictions, targets, threshold),
        'precision': SegmentationMetrics.precision(predictions, targets, threshold),
        'f1': SegmentationMetrics.f1_score(predictions, targets, threshold),
        'auc': AdvancedMetrics.compute_roc_auc(predictions, targets)
    }


if __name__ == "__main__":
    print("Segmentation Metrics Module")
    print("=" * 60)
    
    # Example usage
    print("\nExample: Computing metrics on dummy predictions...")
    
    # Create dummy predictions and targets
    batch_size, height, width = 2, 32, 32
    predictions = torch.sigmoid(torch.randn(batch_size, 1, height, width))
    targets = torch.randint(0, 2, (batch_size, 1, height, width)).float()
    
    # Compute batch metrics
    metrics = compute_batch_metrics(predictions, targets, threshold=0.5)
    
    print("\nBatch Metrics:")
    print("-" * 60)
    for key, value in metrics.items():
        print(f"{key.capitalize():15s}: {value:.4f}")
    
    # Aggregate metrics
    print("\n\nAggregating metrics over multiple batches...")
    aggregator = MetricsAggregator(threshold=0.5)
    
    # Simulate multiple batches
    for _ in range(5):
        predictions = torch.sigmoid(torch.randn(batch_size, 1, height, width))
        targets = torch.randint(0, 2, (batch_size, 1, height, width)).float()
        aggregator.update(predictions, targets)
    
    print(aggregator)
"""
Inference script for TBConvL-Net

Provides functionality to:
- Load trained models
- Make predictions on new images
- Visualization of results
- Batch inference
"""

import torch
import torch.nn.functional as F
import cv2
import numpy as np
from pathlib import Path
from typing import Tuple, Optional, Union, List
import argparse
from PIL import Image
import matplotlib.pyplot as plt

from tbconvl_net_model import TBConvLNet


# ============================================================================
# 1. INFERENCE ENGINE
# ============================================================================

class TBConvLNetInference:
    """
    Inference engine for TBConvL-Net
    """
    def __init__(self, checkpoint_path: str, device: str = 'cuda'):
        """
        Initialize inference engine
        
        Args:
            checkpoint_path: Path to saved checkpoint
            device: Device to use ('cuda' or 'cpu')
        """
        self.device = device if torch.cuda.is_available() else 'cpu'
        
        # Load checkpoint
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        config = checkpoint['config']
        
        # Recreate model
        self.model = TBConvLNet(
            in_channels=config['in_channels'],
            num_classes=config['num_classes'],
            num_filters=config['num_filters']
        ).to(self.device)
        
        # Load weights
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.model.eval()
        
        self.img_size = tuple(config['img_size'])
        
        print(f"Model loaded from {checkpoint_path}")
        print(f"Device: {self.device}")
    
    @torch.no_grad()
    def predict(self, image: Union[np.ndarray, str],
                threshold: float = 0.5,
                return_probability: bool = False) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
        """
        Make prediction on single image
        
        Args:
            image: Input image (H, W, C) in [0, 255] or path to image
            threshold: Threshold for binary segmentation
            return_probability: Whether to return probability map
        
        Returns:
            Segmentation mask (H, W) or tuple of (mask, probability)
        """
        # Load image if path is provided
        if isinstance(image, str):
            image = cv2.imread(image)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        original_size = image.shape[:2]
        
        # Preprocess
        image_processed = self._preprocess(image)
        
        # Forward pass
        with torch.no_grad():
            output = self.model(image_processed)
        
        # Post-process
        probability_map = output.squeeze().cpu().numpy()
        segmentation_mask = (probability_map > threshold).astype(np.uint8) * 255
        
        # Resize to original size
        segmentation_mask = cv2.resize(segmentation_mask, 
                                       (original_size[1], original_size[0]),
                                       interpolation=cv2.INTER_NEAREST)
        
        if return_probability:
            probability_map = cv2.resize(probability_map,
                                        (original_size[1], original_size[0]),
                                        interpolation=cv2.INTER_LINEAR)
            return segmentation_mask, probability_map
        else:
            return segmentation_mask
    
    @torch.no_grad()
    def predict_batch(self, images: List[Union[np.ndarray, str]],
                     threshold: float = 0.5) -> List[np.ndarray]:
        """
        Make predictions on batch of images
        
        Args:
            images: List of images or paths
            threshold: Threshold for binary segmentation
        
        Returns:
            List of segmentation masks
        """
        results = []
        for image in images:
            mask = self.predict(image, threshold=threshold)
            results.append(mask)
        
        return results
    
    def _preprocess(self, image: np.ndarray) -> torch.Tensor:
        """
        Preprocess image for model
        
        Args:
            image: Input image (H, W, C) in [0, 255]
        
        Returns:
            Preprocessed tensor (1, 3, H, W)
        """
        # Resize
        image_resized = cv2.resize(image, self.img_size, 
                                   interpolation=cv2.INTER_LINEAR)
        
        # Convert to float and normalize
        image_normalized = image_resized.astype(np.float32) / 255.0
        
        # Convert to tensor and add batch dimension
        image_tensor = torch.from_numpy(image_normalized)
        image_tensor = image_tensor.permute(2, 0, 1)  # (3, H, W)
        image_tensor = image_tensor.unsqueeze(0)      # (1, 3, H, W)
        
        return image_tensor.to(self.device)


# ============================================================================
# 2. VISUALIZATION UTILITIES
# ============================================================================

class SegmentationVisualizer:
    """
    Utilities for visualizing segmentation results
    """
    @staticmethod
    def overlay_mask_on_image(image: np.ndarray, mask: np.ndarray,
                             alpha: float = 0.5,
                             color: Tuple[int, int, int] = (0, 255, 0)) -> np.ndarray:
        """
        Overlay segmentation mask on image
        
        Args:
            image: Original image (H, W, 3) in [0, 255]
            mask: Binary mask (H, W) with values 0 or 255
            alpha: Transparency of overlay
            color: Color of overlay (BGR)
        
        Returns:
            Overlaid image
        """
        # Normalize mask to [0, 1]
        mask_normalized = mask.astype(np.float32) / 255.0
        
        # Create colored mask
        colored_mask = np.zeros_like(image)
        for c in range(3):
            colored_mask[:, :, c] = mask_normalized * color[c]
        
        # Blend
        result = image.copy()
        mask_indices = mask > 128
        result[mask_indices] = (alpha * colored_mask[mask_indices] + 
                               (1 - alpha) * image[mask_indices]).astype(np.uint8)
        
        return result
    
    @staticmethod
    def visualize_comparison(image: np.ndarray, 
                            ground_truth: Optional[np.ndarray] = None,
                            prediction: Optional[np.ndarray] = None,
                            probability: Optional[np.ndarray] = None,
                            save_path: Optional[str] = None):
        """
        Visualize image, ground truth, and prediction side-by-side
        
        Args:
            image: Original image
            ground_truth: Ground truth mask (optional)
            prediction: Predicted mask
            probability: Probability map (optional)
            save_path: Path to save figure
        """
        # Determine number of subplots
        num_plots = 1
        if ground_truth is not None:
            num_plots += 1
        if prediction is not None:
            num_plots += 1
        if probability is not None:
            num_plots += 1
        
        fig, axes = plt.subplots(1, num_plots, figsize=(5*num_plots, 5))
        
        if num_plots == 1:
            axes = [axes]
        
        idx = 0
        
        # Original image
        axes[idx].imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        axes[idx].set_title('Original Image')
        axes[idx].axis('off')
        idx += 1
        
        # Ground truth
        if ground_truth is not None:
            axes[idx].imshow(ground_truth, cmap='gray')
            axes[idx].set_title('Ground Truth')
            axes[idx].axis('off')
            idx += 1
        
        # Prediction
        if prediction is not None:
            axes[idx].imshow(prediction, cmap='gray')
            axes[idx].set_title('Prediction')
            axes[idx].axis('off')
            idx += 1
        
        # Probability map
        if probability is not None:
            im = axes[idx].imshow(probability, cmap='jet')
            axes[idx].set_title('Probability Map')
            axes[idx].axis('off')
            plt.colorbar(im, ax=axes[idx])
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=150, bbox_inches='tight')
            print(f"Saved visualization to {save_path}")
        
        plt.show()
    
    @staticmethod
    def compute_prediction_uncertainty(probability_map: np.ndarray,
                                      threshold: float = 0.5) -> Tuple[float, float]:
        """
        Compute uncertainty in prediction
        
        Args:
            probability_map: Probability map from model
            threshold: Decision threshold
        
        Returns:
            Tuple of (mean_entropy, boundary_uncertainty)
        """
        # Binary entropy: -p*log(p) - (1-p)*log(1-p)
        eps = 1e-7
        entropy = -probability_map * np.log(probability_map + eps) - \
                  (1 - probability_map) * np.log(1 - probability_map + eps)
        
        mean_entropy = entropy.mean()
        
        # Boundary uncertainty: entropy near decision boundary
        near_boundary = np.abs(probability_map - threshold) < 0.1
        if near_boundary.sum() > 0:
            boundary_uncertainty = entropy[near_boundary].mean()
        else:
            boundary_uncertainty = 0.0
        
        return float(mean_entropy), float(boundary_uncertainty)


# ============================================================================
# 3. BATCH INFERENCE PIPELINE
# ============================================================================

class BatchInferencePipeline:
    """
    Pipeline for batch processing of medical images
    """
    def __init__(self, checkpoint_path: str, output_dir: str = 'predictions',
                 device: str = 'cuda'):
        """
        Args:
            checkpoint_path: Path to checkpoint
            output_dir: Directory to save predictions
            device: Device to use
        """
        self.inferencer = TBConvLNetInference(checkpoint_path, device)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        self.visualizer = SegmentationVisualizer()
    
    def process_directory(self, image_dir: str, 
                         save_masks: bool = True,
                         save_visualizations: bool = True,
                         threshold: float = 0.5):
        """
        Process all images in directory
        
        Args:
            image_dir: Directory containing images
            save_masks: Whether to save predicted masks
            save_visualizations: Whether to save visualizations
            threshold: Segmentation threshold
        """
        image_dir = Path(image_dir)
        image_files = list(image_dir.glob('*.png')) + list(image_dir.glob('*.jpg'))
        
        print(f"\nProcessing {len(image_files)} images...")
        
        for i, image_path in enumerate(image_files):
            print(f"Processing {i+1}/{len(image_files)}: {image_path.name}")
            
            # Load image
            image = cv2.imread(str(image_path))
            
            # Predict
            mask, probability = self.inferencer.predict(
                image, threshold=threshold, return_probability=True
            )
            
            # Save mask
            if save_masks:
                mask_path = self.output_dir / f"{image_path.stem}_mask.png"
                cv2.imwrite(str(mask_path), mask)
            
            # Save visualization
            if save_visualizations:
                vis_path = self.output_dir / f"{image_path.stem}_viz.png"
                self.visualizer.visualize_comparison(
                    image, prediction=mask, probability=probability,
                    save_path=str(vis_path)
                )
        
        print(f"\nResults saved to {self.output_dir}")


# ============================================================================
# 4. MAIN FUNCTION
# ============================================================================

def main(args: argparse.Namespace):
    """Main inference function"""
    
    # Initialize inferencer
    print("Loading model...")
    inferencer = TBConvLNetInference(
        checkpoint_path=args.checkpoint,
        device=args.device
    )
    
    # Single image prediction
    if args.image:
        print(f"\nProcessing image: {args.image}")
        
        # Load image
        image = cv2.imread(args.image)
        if image is None:
            raise ValueError(f"Could not load image: {args.image}")
        
        # Predict
        mask, probability = inferencer.predict(
            image, threshold=args.threshold, return_probability=True
        )
        
        # Visualize
        visualizer = SegmentationVisualizer()
        visualizer.visualize_comparison(
            image, prediction=mask, probability=probability
        )
        
        # Save results
        if args.output:
            mask_path = Path(args.output) / f"{Path(args.image).stem}_mask.png"
            mask_path.parent.mkdir(parents=True, exist_ok=True)
            cv2.imwrite(str(mask_path), mask)
            print(f"Saved mask to {mask_path}")
    
    # Batch inference
    elif args.image_dir:
        print(f"\nProcessing directory: {args.image_dir}")
        
        pipeline = BatchInferencePipeline(
            checkpoint_path=args.checkpoint,
            output_dir=args.output or 'predictions',
            device=args.device
        )
        
        pipeline.process_directory(
            image_dir=args.image_dir,
            threshold=args.threshold
        )
    
    else:
        print("Please specify either --image or --image-dir")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Inference with TBConvL-Net')
    
    parser.add_argument('--checkpoint', type=str, required=True,
                       help='Path to model checkpoint')
    parser.add_argument('--image', type=str, default=None,
                       help='Path to single image for prediction')
    parser.add_argument('--image-dir', type=str, default=None,
                       help='Directory containing images for batch processing')
    parser.add_argument('--output', type=str, default='predictions',
                       help='Output directory for results')
    parser.add_argument('--threshold', type=float, default=0.5,
                       help='Segmentation threshold')
    parser.add_argument('--device', type=str, default='cuda',
                       help='Device to use (cuda or cpu)')
    
    args = parser.parse_args()
    
    main(args)

Related posts, You May like to read

  1. 7 Shocking Truths About Knowledge Distillation: The Good, The Bad, and The Breakthrough (SAKD)
  2. MOSEv2: The Game-Changing Video Object Segmentation Dataset for Real-World AI Applications
  3. MedDINOv3: Revolutionizing Medical Image Segmentation with Adaptable Vision Foundation Models
  4. HiPerformer: A New Benchmark in Medical Image Segmentation with Modular Hierarchical Fusion
  5. How AI is Learning to Think Before it Segments: Understanding Seg-Zero’s Reasoning-Driven Image Analysis
  6. SegTrans: The Breakthrough Framework That Makes AI Segmentation Models Vulnerable to Transfer Attacks
  7. Universal Text-Driven Medical Image Segmentation: How MedCLIP-SAMv2 Revolutionizes Diagnostic AI
  8. Towards Trustworthy Breast Tumor Segmentation in Ultrasound Using AI Uncertainty
  9. DVIS++: The Game-Changing Decoupled Framework Revolutionizing Universal Video Segmentation

Leave a Comment

Your email address will not be published. Required fields are marked *

Follow by Email
Tiktok