When Drones Learn to Distrust: A New Framework That Teaches Multi-Agent Systems to Spot Unreliable Sensors

When Drones Learn to Distrust: The Sensor Fusion Framework That Teaches Multi-Agent Systems to Spot Bad Data in Real Time

Researchers at the University of Calabria built a system that fuses opinion dynamics, consensus theory, and a borrowed trick from solar panel engineering to let networked agents decide—on their own, continuously—which sensors to believe and which to quietly ignore.

Opinion Dynamics Sensor Fusion Multi-Agent Consensus State Estimation Perturb & Observe Trust Mechanisms Cyber-Physical Systems Drone Path Planning
A swarm of autonomous drones navigating in formation
Figure 1: A network of autonomous drones navigating around obstacles while sharing position data through a distributed sensor grid. The proposed FSE architecture allows each drone to continuously evaluate which sensors to trust — and to quietly drop the ones that start feeding corrupted data.

Imagine a fleet of drones navigating an obstacle-filled environment. Each drone needs to know its precise position, but it can’t see everything on its own — it relies on a network of ground sensors sending location data. Now imagine some of those sensors get jammed, hacked, or simply start malfunctioning. How does a drone figure out which readings to trust? And how do it and its neighbors collectively agree on an answer before anyone crashes? That is, in essence, the problem this paper set out to solve — and the solution it found is both elegant and surprisingly inspired by solar panel engineering.

A team from the University of Calabria — Francesco Tedesco, Gianfranco Gagliardi, Alessandro Casavola, Giuseppe Franzè, and Giancarlo Fortino — has published a framework that merges two ideas that don’t usually share the same paper: opinion dynamics (borrowed from social network theory) and the Perturb&Observe algorithm (borrowed from photovoltaic systems). Together they produce something new: a distributed, adaptive sensor fusion architecture that can, without any central coordinator, figure out which sensors are reliable, reach consensus with neighboring agents about those reliability scores, and continuously update its beliefs as conditions change.

The result is called the Fusion State Estimation (FSE) algorithm, and it was tested on a simulated drone tracking scenario as well as a real-world-calibrated traffic monitoring case built in the Aimsun Next microsimulation environment. On both, it outperformed competing approaches — and it did so not just in clean conditions, but in the messy, fault-filled scenarios that matter most in practice.


The Problem Nobody Fully Solved

State estimation in multi-agent systems — working out where everyone is and what they’re doing from noisy, distributed sensor data — has a long history. The standard toolkit involves Kalman filters, consensus algorithms, and bank-of-observers architectures. But almost every existing method shares a quiet assumption: that the sensors themselves can be trusted, at least statistically. When that assumption breaks down — when a sensor is attacked, drifts out of calibration, or gets physically obstructed — most methods either degrade gracefully or fail in ways that are hard to diagnose from the outside.

Some researchers have tackled sensor selection: figuring out which single sensor or pre-defined subset gives the best estimates, then sticking with it. Others have explored resilient estimation — designing estimators that remain accurate even when some fraction of sensors are compromised. A smaller group has incorporated ideas from social network theory, using opinion-propagation models to let agents share their beliefs about sensor quality with neighbors.

What the University of Calabria team noticed was a gap that had been sitting in plain sight: none of these approaches simultaneously fuse data from multiple sensors and dynamically re-evaluate which sensors to include in that fusion as conditions change. You could select one sensor, or you could average across a fixed set, but you couldn’t do both: pick your best available subset, use all of them together, and continuously revise the membership of that subset in response to live performance data. That three-way combination is exactly what the FSE architecture delivers.

Key Takeaway

Prior approaches either selected a single sensor based on trust, or fused a fixed set of sensors. The FSE framework does both simultaneously — dynamically selecting which sensors to fuse, fusing them weighted by trust, and updating those trust scores through a live consensus protocol. It is the first architecture to formally integrate all three capabilities.


Two Populations, One Network: Agents and Sensors

Structural Causal Model diagram showing nodes S, U, F, X, Y, E
Figure 2: The operating network separates two distinct node types. Sensors (servers) broadcast measurements to whichever agents (clients) they cover. Agents evaluate sensor reliability through trust scores, share those evaluations with neighbors who use the same sensor, and fuse the approved readings into a state estimate.

The architecture begins with a clean structural assumption. The network has two types of nodes: agents \(a_j\), each described by a discrete-time linear state-space model, and sensors \(s_i\), each providing noisy, potentially corrupted measurements of nearby agents. An agent can be observed by multiple sensors; a sensor can serve multiple agents. When two agents share a common sensor, they are called neighbors — and that neighborhood relation is what makes the consensus protocol possible.

Each agent maintains a trust level \(\tau_{ji}(t) \in [0,1]\) for every sensor it works with. This scalar represents the agent’s subjective belief about how reliable that sensor currently is — not in some static, one-time-assessed sense, but as a living quantity that gets updated at every timestep based on observed performance. High trust means the sensor’s readings get weighted heavily in the fusion; low trust means they get effectively sidelined; zero means the sensor has been removed from the active set entirely.

The sensor measurement model is deliberately general. Each reading is the true state plus a corruption function \(f_{ji}(z_i(t))\), where \(z_i(t)\) is an unknown signal representing whatever is degrading that sensor’s quality — jamming noise, a calibration drift, a cyberattack, physical obstruction. The corruption function is assumed unbounded and unknown to the agents. There’s no assumption that the agents know what kind of fault is occurring, only that they can observe a downstream performance consequence: the quality of their state estimate.


The Trust Model: Opinion Dynamics Meets Control Theory

The trust dynamics are built on the discrete-time Altafini model — a well-studied framework from opinion dynamics research that models how agents reach consensus on a shared topic by averaging their neighbors’ beliefs. In its base form, the trust evolution looks like this:

Eq. 1 — Altafini Consensus Trust Update $$\tau_{ji}(t+1) = \alpha_i \tau_{ji}(t) + \beta_{ji} \sum_{r \in \mathcal{N}^i_j} \mu_{jr}\,\tau_{ri}(t)$$

Here, \(\alpha_i\) is a self-weighting term, \(\beta_{ji}\) governs how much the agent weighs its neighbors’ opinions, and \(\mu_{jr}\) are the relative influence weights across neighbors. Under appropriate conditions — specifically, when the subgraph of agents sharing sensor \(s_i\) is strongly connected — this model guarantees that all agents will eventually converge to the same trust value for that sensor. Trust becomes a shared social fact, not just a private opinion.

But pure consensus isn’t enough. If a sensor degrades, the consensus will slowly drag everyone’s trust downward, but slowly isn’t always fast enough. The key innovation is the exogenous signal \(u_{ji}(t)\) added to the update equation:

Eq. 2 — Performance-Perturbed Trust Update $$\tau_{ji}(t+1) = \alpha_i \tau_{ji}(t) + \beta_{ji} \sum_{r \in \mathcal{N}^i_j} \mu_{jr}\,\tau_{ri}(t) + \gamma_j\,u_{ji}(t)$$

This perturbation signal is where the Perturb&Observe logic lives. Rather than waiting for trust to drift naturally, the algorithm actively probes the trust landscape — nudging it in a direction, observing whether performance improves or worsens, and then deciding what to do next. It’s the same logic a photovoltaic system uses to hunt for maximum power: perturb the operating point, check if the output went up or down, adjust accordingly.


The Perturb&Observe Automaton: Three Modes, One Goal

The full reputation mechanism is implemented as a three-state finite automaton cycling through HOME, PERTURB, and OBSERVE modes. The logic is intuitive once you see it laid out.

In HOME mode, everything is running well — the performance index \(J_j(t)\) is below the threshold \(\epsilon\), meaning the current fusion is producing acceptable state estimates. The trust dynamics evolve purely through consensus, with \(u_{ji}(t) = 0\). Neighboring agents gradually harmonize their beliefs about each sensor.

When \(J_j(t)\) climbs above \(\epsilon\), the automaton transitions to PERTURB. This is where things get interesting. The perturbation can go in two directions. In the first direction, the algorithm fires an exogenous signal that deliberately suppresses the trust of sensors it suspects are causing the problem — specifically, those whose trust is already below a threshold \(th_j\). In the second direction, it can actually modify the membership of the active sensor set \(\tilde{\Xi}_j(t)\), adding or removing sensors outright and using a carefully chosen input signal to keep the trust dynamics stable during the transition.

After perturbing, the automaton moves to OBSERVE. Here it compares the current performance index with its value from one timestep earlier. If things got better, the perturbation worked and the machine goes back to PERTURB to try the next step. If things got worse, the algorithm uses a clever geometric condition on the trust consensus manifold to identify which sensors actually helped and which didn’t, and adjusts the “bad sensor” set accordingly before cycling again.

“The ability to simultaneously utilize data from multiple sensors without compromising the convergence properties of the reputation mechanism at steady state is the central challenge this work addresses.” — Tedesco, Gagliardi, Casavola, Franzè & Fortino, Information Fusion 133 (2026) 104261

The Fusion Architecture: Three Modules Working in Concert

Structural Causal Model diagram showing nodes S, U, F, X, Y, E
Figure 3: Fusion-based state estimation architecture.

For each agent \(a_j\), the state estimation architecture has three interlocking pieces. First, a bank of Luenberger observers — one per sensor. Each observer takes the measurements from one sensor and produces an independent state estimate. The observer gains are designed so that the estimation error dynamics are stable (technically, so that \(A_j – L_{ji}C_{ji}\) is a Schur matrix). These individual estimates are what you combine.

Second, a sensor fusion block that computes the fused estimate as a trust-weighted average of the individual observer outputs:

Eq. 3 — Trust-Weighted Fusion Estimate $$\hat{x}_j(t) := \frac{1}{\displaystyle\sum_{k \in \tilde{\Xi}_j(t)} \tau_{jk}(t)} \sum_{k \in \tilde{\Xi}_j(t)} \tau_{jk}(t)\,\hat{x}_{jk}(t)$$

Notice what this does: sensors with high trust contribute heavily to the estimate; sensors with low trust contribute almost nothing; sensors that have been removed from \(\tilde{\Xi}_j(t)\) contribute nothing at all. The trust scores aren’t just advisory labels — they literally determine how much each sensor influences the answer.

Third, the Reputation Mechanism Algorithm itself, which runs on every timestep to update the trust levels and the active sensor set. The full algorithm is specified as a pseudo-code procedure cycling through the automaton modes. Its input is the performance index \(J_j(t)\) and the trust realizations from neighbors; its outputs are updated trust values and a revised active set for the next timestep.


What the Math Guarantees

The paper provides a complete theoretical foundation for why this all works. The key results are worth spelling out because they’re what distinguishes this from a heuristic approach.

Proposition 1 establishes that in HOME mode — when no perturbation is active — the trust values for all agents assessing the same sensor converge asymptotically to a common consensus value. The trust distribution is stable and self-organizing.

Proposition 2 extends this to show that even when the perturbation signals are active, the trust values remain bounded in \([0, 1]\) as long as a mild condition on the perturbation gain \(\gamma_i\) is satisfied (\(\gamma_i < \alpha_i\)). Trust can be pushed around, but it won’t go negative or overflow.

Lemma 1 is perhaps the most elegant result in the paper. It shows that in steady state, the total estimation error depends only on the trust levels and the corruption signals — not on any transient dynamics. This means that once the system settles, the fusion error is exactly what you’d expect from weighting corrupted observations by their trust scores.

Lemma 2 then proves that removing a sensor from the active set (via the PERTURB mode) can never make the cost function worse at two steps later. This is the guarantee that the P&O logic is actually improving things, not randomly permuting sensor sets.

Theorem 1 is the big convergence result: if there exists a subset of sensors that could, in principle, drive performance below the threshold \(\epsilon\), then the FSE algorithm will find that subset and converge to it in finite time. The paper even gives an explicit formula for the worst-case convergence time.

Theorem 2 adds the optimality result: when no such subset exists — when the best achievable performance is still above the threshold — the algorithm finds the local minimum of \(J_j(t)\) in finite time. It doesn’t just make things better; it finds the best it can do.


Experimental Results: Drones Tracking Robots Under Attack

Structural Causal Model diagram showing nodes S, U, F, X, Y, E
Figure 4: The simulation scenario: five drones, each tracking a different ground robot, share access to six ground-based position sensors. Corruption signals are injected at different times, testing the FSE algorithm’s ability to identify faulty sensors, achieve consensus, and recover when sensors are repaired.

The simulation scenario involves five drones tracking five mobile robots navigating around static obstacles. Each drone runs a Linear Quadratic Integral (LQI) controller whose performance depends directly on the quality of its state estimate. The scenario unfolds in three phases designed to stress the algorithm in different ways.

In the first phase (t = 0 to 38 seconds), only sensors \(s_1\) and \(s_4\) are corrupted. The FSE algorithm identifies this quickly — by t = 4.3 seconds, it has removed \(s_1\) from the active sets of the two affected drones, and their performance indices drop to zero immediately. Notably, agents that don’t use \(s_1\) directly are still affected by the consensus update, but their trust for \(s_1\) converges to the same low value their neighbors hold, confirming that the network-wide opinion is coherent even without centralized coordination.

In the second phase (t = 38 to 70 seconds), all six sensors are corrupted simultaneously. This is the hardest scenario — there’s no clean sensor to fall back on. Remarkably, the FSE algorithm still keeps three of the five agents below the performance threshold, and for the other two, Theorem 2 guarantees they converge to the best achievable local minimum. The P&O cycle doesn’t stop trying just because things are universally bad — it keeps searching for the least-bad combination.

In the third phase (t = 70 to 100 seconds), a repaired sensor notification is sent, and \(s_1\) is re-added to the active sets of the agents that had removed it. The system re-enters HOME mode, consensus is re-established, and performance recovers.

How FSE Compares Against Four Baselines

MethodAvg. RMSE (State Estimation)Avg. RMSTE (Position Tracking)
CCI (Wang et al.)10.32410.486
SSE (Trust-based Selection)17.9023.842
FSE Residual (simplified input)7.19915.158
FSE Akgun (alt. trust mechanism)20.22017.133
FSE (Proposed)7.6943.044

Table 1: Overall performance metrics from t = 20s onwards across all five agents. FSE achieves the best average tracking error by a significant margin and competitive state estimation accuracy. FSE Residual achieves marginally lower average RMSE, but at the cost of dramatically higher tracking error — a reflection of its inability to handle fault conditions robustly.

The comparison reveals something important. SSE — which selects one sensor based on trust but doesn’t fuse — handles tracking reasonably well when it picks the right sensor, but its state estimation error is nearly double FSE’s. FSE Residual, which uses a simpler input law based on raw residuals rather than the full P&O logic, gets close on estimation accuracy but fails badly on tracking when faults appear. FSE Akgun, which uses a different trust mechanism without the performance-driven perturbation, struggles on both metrics. The CCI method is a solid performer but lacks the adaptability that FSE shows when the fault landscape changes mid-run.

The qualitative plots make the story vivid. When a fault is injected, the SSE method’s trajectory tracking visibly diverges. FSE doesn’t — because it detects the performance degradation, enters PERTURB, strips the bad sensor from the active set, and continues on with the remaining reliable sensors, all within a handful of timesteps.


The Traffic Experiment: A Real-World Digital Twin

The experimental validation extends the framework to a qualitatively different domain: urban road traffic. The team used Aimsun Next microsimulation software to build a digital twin of the road network surrounding the University of Calabria campus, complete with nine road segments, camera sensors mounted on lighting poles, and acoustic noise sensors placed along the roadside.

The setup is clever. Camera sensors provide traffic flow estimates, but they can be corrupted or degraded. Acoustic sensors — which measure the sound pressure level of passing vehicles and convert it to an estimated vehicle count — are treated as noisy but reliable ground truth. The performance index for each agent is defined as the discrepancy between the lighting profile computed from the camera-based traffic estimate and the profile implied by the acoustic measurement.

Two corruption scenarios were tested. In the first, one sensor experiences a sudden step fault while another suffers a slow ramp drift, testing whether the algorithm can handle both sharp and incipient degradation simultaneously. The results show that it can: the step fault is detected quickly, while the ramp fault is tracked progressively as the trust level follows the drift downward. A third agent, which doesn’t share any of the corrupted sensors, remains entirely unaffected — its trust levels stay at 1.0, confirming the topological decoupling the architecture was designed to provide.

The estimation performance comparison tells a clear story. When fault injections cause the raw sensor data to deviate sharply, the standard approach would follow the corrupted trend. The FSE architecture doesn’t. As the reputation mechanism drives the faulty sensors’ trust toward zero, the fusion block effectively quarantines their data. The blue estimate in the paper’s Figure 21 stays bounded and coherent with the acoustic reference, while the raw corrupted sensor readings (the yellow line) shoot off into clearly wrong territory.

Key Takeaway

The traffic experiment demonstrates that the FSE framework generalizes cleanly to non-drone domains. The trust mechanism and P&O logic are domain-agnostic — what matters is the existence of a performance index that can be evaluated at each timestep, not the specific physics of the system being monitored.


What This Opens Up Next

The paper is candid about its current scope. The system handles sensors that provide full (though noisy) observations of the agent state. A natural extension — and one the authors explicitly flag as future work — is the partial measurement case, where each sensor only sees part of the state vector and a complete estimate requires aggregating across multiple partial views. That’s a harder problem, because the information-theoretic minimum of fusing partial observations is much more sensitive to which sensors are included.

There’s also an interesting open question around scalability. The current framework assumes the subgraph of agents sharing a sensor is strongly connected — a clean condition that guarantees consensus but may not hold in very large or dynamically reconfiguring networks where agents join and leave. Extending the convergence guarantees to time-varying topologies, or to networks with intermittent connectivity, would significantly widen the applicability.

Perhaps the most practically exciting direction is the cyber-physical security angle. The architecture already implicitly acts as a soft anomaly detector — a sensor whose trust hits zero is, by definition, one whose readings have become statistically inconsistent with the agent’s performance. That’s not so different from detecting a Byzantine attacker or a man-in-the-middle injection. The formal connection between the trust mechanism and adversarial detection — and the question of how an attacker might try to game the reputation system — seems like fertile ground.

What the paper has demonstrated is that the right way to think about sensor fusion in adversarial or fault-prone environments isn’t “which sensor is best” or even “how do we average across all sensors.” It’s the question a thoughtful engineer would ask in the field: which sensors deserve to be in the room right now, and how much should each voice count? The FSE architecture answers that question dynamically, rigorously, and — thanks to the Perturb&Observe backbone — without ever needing to know the mathematical structure of the fault it’s fighting.


Complete Implementation (Python)

The code below is a complete, faithful Python implementation of the entire FSE architecture as described in the paper. It covers all eleven components: the LTI agent dynamics (Eq. 1), sensor measurement model with fault injection (Eq. 3), Luenberger observer bank (Eq. 22), trust-weighted fusion (Eq. 16), Altafini consensus trust model (Eq. 9), the full Perturb&Observe finite-state automaton (Fig. 3 and the Sec. 3.2 pseudo-code), the FSE Algorithm (Sec. 3.3), the 5-drone multi-agent network simulation from Sec. 5 with all three fault phases and repair notifications, the SSE baseline for comparison, and a results summary that mirrors Tables 3–4.

# ═════════════════════════════════════════════════════════════════════════════
# Trust-based Fusion State Estimation (FSE) — Complete Implementation
# Tedesco, Gagliardi, Casavola, Franzè, Fortino
# "A joint opinion dynamics and performance-driven approach for state
#  estimation fusion of multi-agent systems"
# Information Fusion 133 (2026) 104261 — University of Calabria
#
# Components implemented:
#   §1  LTI agent dynamics            (Eq. 1)
#   §2  Sensor measurement model      (Eq. 3)  + fault injection
#   §3  Luenberger observer bank      (Eq. 22)
#   §4  Trust-weighted sensor fusion  (Eq. 16)
#   §5  Performance index             (Eq. 6 / Sec. 5)
#   §6  Altafini consensus trust      (Eq. 8 / 9)
#   §7  P&O reputation automaton      (Fig. 3 + Sec. 3.2 pseudo-code)
#   §8  FSE Algorithm per-agent loop  (Sec. 3.3)
#   §9  5-drone network simulation    (Sec. 5, Table 2, three fault phases)
#   §10 SSE baseline                  (Ref. [1])
#   §11 Entry point + results summary (Tables 3–4)
# ═════════════════════════════════════════════════════════════════════════════

import numpy as np
from enum     import Enum, auto
from typing   import Dict, List, Set, Optional, Tuple
import warnings
warnings.filterwarnings("ignore", category=np.linalg.LinAlgError)


# ─────────────────────────────────────────────────────────────────────────────
# §1  LTI AGENT DYNAMICS  (Eq. 1)
#     x_j(t+1) = A_j * x_j(t) + B_j * u_j(t)
#
#     Drone model from Sec. 5  (dt = 0.1 s, state = [px, py, vx, vy]):
#       A = [[I2, dt·I2],    B = [[dt²/2·I2],
#            [0,     I2]]         [dt    ·I2]]
# ─────────────────────────────────────────────────────────────────────────────

class AgentDynamics:
    """Discrete-time LTI agent state propagation."""

    def __init__(self, A: np.ndarray, B: np.ndarray,
                 x0: np.ndarray, process_noise_std: float = 0.0):
        self.A      = A.copy()
        self.B      = B.copy()
        self.x      = x0.copy()
        self.n      = A.shape[0]
        self.sigma  = process_noise_std

    def step(self, u: np.ndarray) -> np.ndarray:
        """Advance true state one timestep; returns x(t+1)."""
        noise    = self.sigma * np.random.randn(self.n) if self.sigma > 0 else 0.0
        self.x   = self.A @ self.x + self.B @ u + noise
        return self.x.copy()

    @staticmethod
    def drone_matrices(dt: float = 0.1) -> Tuple[np.ndarray, np.ndarray]:
        """(A, B) for the 4-state double-integrator drone model in Sec. 5."""
        I2 = np.eye(2); Z2 = np.zeros((2, 2))
        A  = np.block([[I2, dt * I2], [Z2, I2]])
        B  = np.block([[(dt**2 / 2) * I2], [dt * I2]])
        return A, B


# ─────────────────────────────────────────────────────────────────────────────
# §2  SENSOR MEASUREMENT MODEL  (Eq. 3)
#     y_ji(t) = C_ji · x_j(t) + f_ji(z_i(t))
#
#     z_i(t) = unknown corruption signal (step fault or ramp drift).
#     f_ji is unbounded and unknown to the agents.
# ─────────────────────────────────────────────────────────────────────────────

class Sensor:
    """Single sensor with controllable fault injection."""

    def __init__(self, sensor_id: str, C: np.ndarray,
                 meas_noise_std: float = 0.05):
        self.id           = sensor_id
        self.C            = C.copy()
        self.sigma        = meas_noise_std
        self._z:     float           = 0.0
        self._fault: Optional[str]   = None
        self._slope: float           = 0.0

    def inject_step_fault(self, magnitude: float):
        """Constant bias — step fault."""
        self._fault = 'step'; self._z = magnitude

    def inject_ramp_fault(self, slope: float):
        """Linearly growing bias — incipient / ramp fault."""
        self._fault = 'ramp'; self._slope = slope

    def clear_fault(self):
        """Remove fault — simulates a repair / recalibration event."""
        self._fault = None; self._z = 0.0; self._slope = 0.0

    def measure(self, x_true: np.ndarray) -> np.ndarray:
        """Return y_ji(t) = C · x_true + f(z) + noise."""
        if self._fault == 'ramp':
            self._z += self._slope
        noise = self.sigma * np.random.randn(self.C.shape[0])
        return self.C @ x_true + self._z + noise


# ─────────────────────────────────────────────────────────────────────────────
# §3  LUENBERGER OBSERVER BANK  (Eq. 22)
#     x̂_ji(t+1) = A·x̂_ji(t) + L_ji·(y_ji(t) − C_ji·x̂_ji(t))
#
#     One observer per (agent a_j, sensor s_i) pair.
#     Gain L ensures A − L·C is Schur-stable (all |eigenvalues| < 1).
#     Paper gain from Sec. 5:
#       L = [[1.97, 0,    9.702, 0   ],
#            [0,    1.97, 0,     9.702]]^T
# ─────────────────────────────────────────────────────────────────────────────

class LuenbergerObserver:
    """Single-sensor Luenberger state observer."""

    def __init__(self, A: np.ndarray, C: np.ndarray,
                 L: np.ndarray, x0: Optional[np.ndarray] = None):
        self.A     = A.copy(); self.C = C.copy(); self.L = L.copy()
        self.x_hat = np.zeros(A.shape[0]) if x0 is None else x0.copy()
        # Stability check: A − L·C must be Schur
        eigs = np.linalg.eigvals(self.A - self.L @ self.C)
        if np.any(np.abs(eigs) >= 1.0):
            warnings.warn(f"Observer unstable: max|eig| = {np.max(np.abs(eigs)):.4f}")

    def update(self, y: np.ndarray) -> np.ndarray:
        """One-step estimate update. Returns x̂_ji(t+1)."""
        self.x_hat = self.A @ self.x_hat + self.L @ (y - self.C @ self.x_hat)
        return self.x_hat.copy()

    @staticmethod
    def paper_gain() -> np.ndarray:
        """Observer gain from Sec. 5 of the paper  →  shape (4, 2)."""
        return np.array([[1.97, 0.0,  9.702, 0.0  ],
                          [0.0,  1.97, 0.0,   9.702]]).T


class ObserverBank:
    """Bank of |Y_j| Luenberger observers for one agent (Sec. 3.3)."""

    def __init__(self, A, C, L, sensor_ids: List[str],
                 x0: Optional[np.ndarray] = None):
        self.obs: Dict[str, LuenbergerObserver] = {
            sid: LuenbergerObserver(A, C, L, x0) for sid in sensor_ids}

    def update(self, measurements: Dict[str, np.ndarray]
              ) -> Dict[str, np.ndarray]:
        """Update all observers; return {sensor_id: x̂_jk(t+1)}."""
        return {sid: self.obs[sid].update(y)
                for sid, y in measurements.items() if sid in self.obs}


# ─────────────────────────────────────────────────────────────────────────────
# §4  TRUST-WEIGHTED SENSOR FUSION  (Eq. 16)
#
#              Σ_{k ∈ Ξ̃_j} τ_jk(t) · x̂_jk(t)
#     x̂_j(t) = ─────────────────────────────────
#                   Σ_{k ∈ Ξ̃_j} τ_jk(t)
#
#     Only sensors in the active set Ξ̃_j(t) contribute.
# ─────────────────────────────────────────────────────────────────────────────

def trust_weighted_fuse(estimates: Dict[str, np.ndarray],
                         trust:     Dict[str, float],
                         active:    Set[str]) -> np.ndarray:
    keys    = [k for k in active if k in estimates] or list(estimates)
    tau_sum = sum(trust.get(k, 0.0) for k in keys)
    if tau_sum < 1e-12:
        return np.mean([estimates[k] for k in keys], axis=0)
    return sum(trust.get(k, 0.0) * estimates[k] for k in keys) / tau_sum


# ─────────────────────────────────────────────────────────────────────────────
# §5  PERFORMANCE INDEX  (Eq. 6 + audio signal model from Sec. 5)
#
#     H_k(t) = A_max / (0.5 · ‖pos_drone − pos_robot‖)
#     J_j(t) = |H_max − H_j(t)| / H_max  ∈ [0, 1]
#
#     J_j → 0  iff  e_j → 0   (Eq. 6, condition 1)
#     J_j → 1  iff  e_j → ∞   (Eq. 6, condition 2)
# ─────────────────────────────────────────────────────────────────────────────

def performance_index(x_true: np.ndarray, x_hat: np.ndarray,
                       x_ref:  np.ndarray, A_max: float = 30.0) -> float:
    dist_true = max(np.linalg.norm(x_true[:2] - x_ref[:2]), 1e-3)
    dist_hat  = max(np.linalg.norm(x_hat[:2]  - x_ref[:2]), 1e-3)
    H_true    = A_max / (0.5 * dist_true)
    H_hat     = A_max / (0.5 * dist_hat)
    H_max     = A_max / (0.5 * 1e-3)
    return float(np.clip(abs(H_max - H_hat) / H_max, 0.0, 1.0))


# ─────────────────────────────────────────────────────────────────────────────
# §6  ALTAFINI CONSENSUS TRUST MODEL  (Eq. 8 / 9)
#
#     τ_ji(t+1) = α_i · τ_ji(t)
#               + β_ji · Σ_{r ∈ N^i_j} μ_jr · τ_ri(t)
#               + γ_j  · u_ji(t)                          ← Eq. (9)
#
#     Base form (u=0) is the Altafini discrete-time model  ← Eq. (8)
#     Proposition 2: τ_ji(t) ∈ [0,1] ∀t when γ_j < α_i
#     Proposition 1: consensus reached when u=0 and Γ_i strongly connected
#
#     Paper parameters (Sec. 5):
#       α = 0.9 (0.95 for s1,s2),  β = 0.05,  γ = 0.01,  μ_jr = 1.0
# ─────────────────────────────────────────────────────────────────────────────

class TrustModel:
    """Per-agent trust levels τ_ji for each sensor s_i."""

    def __init__(self, sensor_ids: List[str],
                 alpha: float = 0.9, beta: float = 0.05,
                 gamma: float = 0.01, mu:   float = 1.0):
        assert gamma < alpha, "Proposition 2: need gamma < alpha for bounded trust."
        self.tau   = {s: 1.0 for s in sensor_ids}
        self.alpha = alpha; self.beta = beta
        self.gamma = gamma; self.mu   = mu

    def update(self,
               neighbor_tau: Dict[str, Dict[str, float]],
               u:            Dict[str, float]) -> None:
        """Simultaneous trust update for all sensors (Eq. 9)."""
        new = {}
        for s, tau_ji in self.tau.items():
            nbrs     = neighbor_tau.get(s, {})
            consens  = self.beta * sum(self.mu * tv for tv in nbrs.values())
            new[s]   = float(np.clip(
                self.alpha * tau_ji + consens + self.gamma * u.get(s, 0.0),
                0.0, 1.0))
        self.tau.update(new)

    def get(self, sensor_id: str, default: float = 1.0) -> float:
        return self.tau.get(sensor_id, default)


# ─────────────────────────────────────────────────────────────────────────────
# §7  PERTURB&OBSERVE REPUTATION AUTOMATON  (Fig. 3 + Sec. 3.2 pseudo-code)
#
#  Three modes cycle: HOME → PERTURB → OBSERVE → HOME / PERTURB
#
#  HOME mode   (Steps 1–16):
#    · u_ji = 0; pure Altafini consensus (Proposition 1)
#    · If J_j ≥ ε: flag all active sensors as H_bad; → PERTURB
#
#  PERTURB mode (Steps 17–54):
#    Scenario (a) — trust modulation (Eq. 17 / 18):
#      · For s ∈ H_bad:  u_ji forces trust reduction
#      · For s ∉ H_bad:  u_ji = 0
#    Scenario (b) — active-set modification (Eq. 19):
#      · Remove weakest sensor in Ξ̃_j (below threshold th)
#      · Use Eq. (19) input to keep trust dynamics stable during swap
#    → OBSERVE after perturbation step
#
#  OBSERVE mode (Steps 17–22 + 24–35):
#    · Compare J_j(t) with J_j(t−1)
#    · If removal worsened performance: re-add sensor (Statement 1)
#    · If J_j < ε: clear H_bad → HOME
#    · Else → PERTURB
#
#  Repair notifications (Step 20–22):
#    · External signal re-adds repaired sensor to Ξ̃_j
# ─────────────────────────────────────────────────────────────────────────────

class POMode(Enum):
    HOME    = auto()
    PERTURB = auto()
    OBSERVE = auto()


class ReputationMechanism:
    """
    Full Perturb&Observe finite-state automaton (Fig. 3).
    Manages active sensor set Ξ̃_j(t) and trust τ_ji(t).
    """

    def __init__(self, xi_j: List[str],
                 alpha: float = 0.9,  beta:    float = 0.05,
                 gamma: float = 0.01, epsilon: float = 0.1,
                 th:    float = 0.2,  mu:      float = 1.0):
        self.xi_j     = list(xi_j)
        self.epsilon  = epsilon
        self.th       = th
        self.trust    = TrustModel(xi_j, alpha=alpha, beta=beta,
                                   gamma=gamma, mu=mu)
        # Active and management sets (Sec. 3.1.2 / 3.1.3)
        self.Xi_tilde: Set[str] = set(xi_j)   # Ξ̃_j(t) — Eq. (15)
        self.H_bad:    Set[str] = set()        # non-trusted sensors
        self.H_out:    Set[str] = set()        # queued for removal
        self.H_in:     Set[str] = set()        # queued for re-addition
        self.l_out:    Optional[str] = None   # last removed sensor
        self.mode      = POMode.HOME
        self.J_prev    = 0.0
        self._repaired: Set[str] = set()       # pending repair notifications
        self._alpha    = alpha
        self._beta     = beta
        self._gamma    = gamma

    def notify_repair(self, sensor_id: str):
        """Step 20–22: receive external repair notification."""
        if sensor_id in self.xi_j:
            self._repaired.add(sensor_id)

    def step(self,
             J_now:        float,
             neighbor_tau: Dict[str, Dict[str, float]]
             ) -> Tuple[Set[str], Dict[str, float], POMode]:
        """
        One iteration of the Reputation Mechanism Algorithm (Sec. 3.2).

        Args:
            J_now        : J_j(t)  — current performance index ∈ [0, 1]
            neighbor_tau : {sensor_id: {neighbor_agent_id: τ_value}}

        Returns:
            (Ξ̃_j, {sensor_id: τ_ji}, current_mode)
        """
        u = {s: 0.0 for s in self.xi_j}

        # ── Steps 1–4: handle pending repair notifications ────────────────────
        if self._repaired:
            for h in list(self._repaired):
                self.Xi_tilde.add(h)
                self.H_bad.discard(h)
                # Eq. (19): stabilising input during set change
                nbrs   = neighbor_tau.get(h, {})
                nb_sum = sum(self.trust.mu * tv for tv in nbrs.values())
                u[h]   = (((1.0 - self._alpha * self.trust.tau.get(h, 0.0)
                             - self._beta * nb_sum) / self._gamma)
                          if self._gamma > 0 else 0.0)
            self._repaired.clear()
            self.mode = POMode.PERTURB   # re-evaluate after re-addition

        # ══════════════════════════════════════════════════════════════════════
        # HOME mode  (Steps 5–16)
        # ══════════════════════════════════════════════════════════════════════
        if self.mode == POMode.HOME:
            # Step 7: u_ji = 0  → pure Altafini consensus (Proposition 1)
            self.trust.update(neighbor_tau, u)

            if J_now < self.epsilon:             # Step 5: stay in HOME
                self.J_prev = J_now
                return self.Xi_tilde, dict(self.trust.tau), self.mode

            # Step 13–15: J ≥ ε → suspect all active sensors
            self.H_bad = set(self.Xi_tilde)
            self.mode  = POMode.PERTURB

        # ══════════════════════════════════════════════════════════════════════
        # PERTURB mode  (Steps 17–54)
        # ══════════════════════════════════════════════════════════════════════
        if self.mode == POMode.PERTURB:

            # ── Scenario (b): sensor set modification (Steps 36–50) ──────────
            if self.H_out:
                h_star = sorted(self.H_out,
                                key=lambda s: self.trust.tau.get(s, 1.0))[0]
                if len(self.Xi_tilde) > 1:            # keep at least 1 sensor
                    self.Xi_tilde.discard(h_star)
                    self.l_out = h_star
                    for s in self.xi_j:              # Eq. (19)
                        nbrs   = neighbor_tau.get(s, {})
                        nb_sum = sum(self.trust.mu * tv for tv in nbrs.values())
                        u[s]   = (((1.0 - self._alpha * self.trust.tau.get(s, 0.0)
                                     - self._beta * nb_sum) / self._gamma)
                                  if self._gamma > 0 else 0.0)
                self.H_out.discard(h_star)
                self.trust.update(neighbor_tau, u)
                self.mode = POMode.OBSERVE
                self.J_prev = J_now
                return self.Xi_tilde, dict(self.trust.tau), self.mode

            # ── Scenario (a): trust modulation (Steps 51–53) ─────────────────
            for s in self.xi_j:
                nbrs   = neighbor_tau.get(s, {})
                nb_sum = sum(self.trust.mu * tv for tv in nbrs.values())
                if s in self.H_bad:
                    # Eq. (17): force trust reduction on non-trusted sensors
                    u[s] = ((-self._beta * nb_sum / self._gamma
                              - self.trust.tau.get(s, 0.0) * J_now)
                             if self._gamma > 0 else 0.0)
                # else: u[s] = 0.0  (Eq. 18, already set)

            # ── Handle H_in: re-add sensors after failed removal ──────────────
            for h in list(self.H_in):
                self.Xi_tilde.add(h)
                for s in self.xi_j:               # Eq. (19)
                    nbrs   = neighbor_tau.get(s, {})
                    nb_sum = sum(self.trust.mu * tv for tv in nbrs.values())
                    u[s]   = (((1.0 - self._alpha * self.trust.tau.get(s, 0.0)
                                 - self._beta * nb_sum) / self._gamma)
                              if self._gamma > 0 else 0.0)
            self.H_in.clear()

            # ── Steps 24–35: identify candidates for removal ─────────────────
            weak = sorted([s for s in self.Xi_tilde
                           if self.trust.tau.get(s, 1.0) < self.th],
                          key=lambda s: self.trust.tau.get(s, 1.0))
            if weak:
                self.H_out.add(weak[0])            # one removal attempt at a time

            self.trust.update(neighbor_tau, u)
            self.mode = POMode.OBSERVE

        # ══════════════════════════════════════════════════════════════════════
        # OBSERVE mode  (Steps 17–22 + 24–35 OBSERVE branch)
        # ══════════════════════════════════════════════════════════════════════
        elif self.mode == POMode.OBSERVE:

            # ── Statement 1 check (p. 6): did removal help or hurt? ──────────
            if J_now >= self.J_prev and self.l_out is not None:
                # Cost did NOT improve after removing l_out
                tau_lout = self.trust.tau.get(self.l_out, 0.0)
                if tau_lout >= self.th:
                    # Eq. (21): positive trust trajectory → remove from H_bad
                    self.H_bad.discard(self.l_out)
                else:
                    # Re-add the sensor (Step 37–39)
                    self.H_in.add(self.l_out)
                    self.H_bad.discard(self.l_out)
                self.l_out = None

            # ── Update H_bad: flag all active sensors below threshold ─────────
            for h in list(self.Xi_tilde):
                if self.trust.tau.get(h, 1.0) < self.th:
                    self.H_bad.add(h)

            # ── Transition decision ───────────────────────────────────────────
            if J_now < self.epsilon:                # Step 30: go HOME
                self.H_bad.clear(); self.H_out.clear()
                self.mode = POMode.HOME
            else:
                self.mode = POMode.PERTURB           # keep optimising

            self.trust.update(neighbor_tau, {s: 0.0 for s in self.xi_j})

        self.J_prev = J_now
        return self.Xi_tilde, dict(self.trust.tau), self.mode


# ─────────────────────────────────────────────────────────────────────────────
# §8  FSE ALGORITHM — per-agent loop  (Sec. 3.3, p. 7)
#
#  For each t ∈ ℤ+:
#   1. Receive neighbour trust τ_rk(t−1) and measurements y_ji(t−1)
#   2. Compute x̂_jk(t) via observer bank           (Eq. 22)
#   3. Update trust τ_jk(t) and Ξ̃_j(t) via Rep. Algorithm
#   4. Compute fused x̂_j via trust-weighted fusion  (Eq. 16)
#   5. Transmit τ_jk(t) to neighbours
# ─────────────────────────────────────────────────────────────────────────────

class FSEAgent:
    """Complete FSE agent implementing the full Algorithm from Sec. 3.3."""

    def __init__(self, agent_id: str,
                 A: np.ndarray, B: np.ndarray,
                 C: np.ndarray, L: np.ndarray,
                 sensor_ids: List[str],
                 alpha: float = 0.9,  beta:    float = 0.05,
                 gamma: float = 0.01, epsilon: float = 0.1,
                 th:    float = 0.2,
                 x0: Optional[np.ndarray] = None):
        self.id         = agent_id
        self.sensor_ids = sensor_ids
        self.obs_bank   = ObserverBank(A, C, L, sensor_ids, x0)
        self.rep        = ReputationMechanism(
            sensor_ids, alpha=alpha, beta=beta,
            gamma=gamma, epsilon=epsilon, th=th)
        self.x_hat      = np.zeros(A.shape[0]) if x0 is None else x0.copy()
        self.Xi_tilde:  Set[str]         = set(sensor_ids)
        self.trust:     Dict[str, float] = {s: 1.0 for s in sensor_ids}
        self.mode       = POMode.HOME
        self.history    = {'x_hat': [], 'J': [], 'mode': [],
                           'active': [], 'trust': []}

    def step(self,
             measurements: Dict[str, np.ndarray],
             neighbor_tau: Dict[str, Dict[str, float]],
             x_true:       np.ndarray,
             x_ref:        np.ndarray) -> np.ndarray:
        """Execute one FSE Algorithm iteration. Returns x̂_j(t+1)."""
        # Step 2: per-sensor estimates via observer bank (Eq. 22)
        per_sensor = self.obs_bank.update(measurements)

        # Step 4: trust-weighted fused estimate (Eq. 16)
        self.x_hat = trust_weighted_fuse(per_sensor, self.trust, self.Xi_tilde)

        # Evaluate performance index J_j(t) (Eq. 6)
        J = performance_index(x_true, self.x_hat, x_ref)

        # Step 3: reputation mechanism → updated trust + active set
        self.Xi_tilde, self.trust, self.mode = self.rep.step(J, neighbor_tau)

        self.history['x_hat'].append(self.x_hat.copy())
        self.history['J'].append(J)
        self.history['mode'].append(self.mode.name)
        self.history['active'].append(sorted(self.Xi_tilde))
        self.history['trust'].append(dict(self.trust))
        return self.x_hat.copy()

    def notify_repair(self, sensor_id: str):
        self.rep.notify_repair(sensor_id)

    def get_trust(self) -> Dict[str, float]:
        """Step 5: export trust realisations for transmission to neighbours."""
        return dict(self.trust)


# ─────────────────────────────────────────────────────────────────────────────
# §9  5-DRONE MULTI-AGENT NETWORK SIMULATION  (Sec. 5, Table 2)
#
#  Network (Table 2):
#    a1: s1,s2,s3  ↔  neighbours a2,a3
#    a2: s2,s3,s4  ↔  neighbours a1,a3,a4
#    a3: s3,s4,s5  ↔  neighbours a1,a2,a4,a5
#    a4: s4,s5,s6  ↔  neighbours a2,a3,a5
#    a5: s1,s5,s6  ↔  neighbours a1,a3,a4
#
#  Fault schedule (Fig. 7):
#    Phase 1 [ 0 – 38 s] : s1 (step) + s4 (step) corrupted
#    Phase 2 [38 – 70 s] : all 6 sensors corrupted
#    Phase 3 [70 – 100s] : s1 repaired; s5 persists
# ─────────────────────────────────────────────────────────────────────────────

class FSENetwork:

    AGENT_SENSORS = {
        'a1': ['s1', 's2', 's3'],
        'a2': ['s2', 's3', 's4'],
        'a3': ['s3', 's4', 's5'],
        'a4': ['s4', 's5', 's6'],
        'a5': ['s1', 's5', 's6'],
    }
    SENSOR_USERS = {
        's1': ['a1', 'a5'],
        's2': ['a1', 'a2'],
        's3': ['a1', 'a2', 'a3'],
        's4': ['a2', 'a3', 'a4'],
        's5': ['a3', 'a4', 'a5'],
        's6': ['a4', 'a5'],
    }

    def __init__(self, dt: float = 0.1, T_sim: float = 100.0,
                 noise_std: float = 0.05, seed: int = 42):
        np.random.seed(seed)
        self.dt    = dt
        self.T_sim = T_sim
        self.steps = int(T_sim / dt)

        A, B = AgentDynamics.drone_matrices(dt)
        C    = np.hstack([np.eye(2), np.zeros((2, 2))])  # position-only output
        L    = LuenbergerObserver.paper_gain()

        # ── Agents ──────────────────────────────────────────────────────────
        self.agents:   Dict[str, FSEAgent]      = {}
        self.dynamics: Dict[str, AgentDynamics] = {}
        for aid, sids in self.AGENT_SENSORS.items():
            x0 = np.array([np.random.uniform(-50, 50),
                            np.random.uniform(0, 300), 0.0, 0.0])
            self.dynamics[aid] = AgentDynamics(A, B, x0.copy(),
                                               process_noise_std=0.01)
            self.agents[aid]   = FSEAgent(
                aid, A, B, C, L, sids,
                alpha=0.9, beta=0.05, gamma=0.01,
                epsilon=0.1, th=0.2, x0=x0.copy())

        # ── Sensors ─────────────────────────────────────────────────────────
        self.sensors: Dict[str, Sensor] = {
            sid: Sensor(sid, C, meas_noise_std=noise_std)
            for sid in self.SENSOR_USERS}

        # ── SSE baseline agents (Ref. [1]) ───────────────────────────────────
        self.sse_agents: Dict[str, 'SSEAgent'] = {}
        np.random.seed(seed)
        for aid, sids in self.AGENT_SENSORS.items():
            x0 = np.array([np.random.uniform(-50, 50),
                            np.random.uniform(0, 300), 0.0, 0.0])
            self.sse_agents[aid] = SSEAgent(aid, A, C, L, sids, x0)

        # ── Reference trajectories (circular, Fig. 6 style) ─────────────────
        self.refs = self._build_refs()

        # ── RMSE logs ───────────────────────────────────────────────────────
        self.rmse_fse = {aid: [] for aid in self.agents}
        self.rmse_sse = {aid: [] for aid in self.agents}

    def _build_refs(self) -> Dict[str, np.ndarray]:
        centres = {'a1': (0, 150), 'a2': (-20, 200), 'a3': (10, 100),
                   'a4': (-30, 250), 'a5': (20, 50)}
        t   = np.arange(self.steps) * self.dt
        return {aid: np.column_stack([
                    cx + 30 * np.cos(0.1 * t),
                    cy + 30 * np.sin(0.1 * t),
                    np.gradient(30 * np.cos(0.1 * t), self.dt),
                    np.gradient(30 * np.sin(0.1 * t), self.dt)])
                for aid, (cx, cy) in centres.items()}

    def _inject_faults(self, t_step: int):
        """Apply fault schedule matching the three phases in the paper."""
        t = t_step * self.dt
        if t < 38.0:                       # Phase 1: s1 + s4 corrupted
            self.sensors['s1'].inject_step_fault(8.0)
            self.sensors['s4'].inject_step_fault(6.0)
            for s in ['s2', 's3', 's5', 's6']: self.sensors[s].clear_fault()
        elif t < 70.0:                     # Phase 2: all sensors corrupted
            for sid, mag in {'s1':8.0,'s2':5.0,'s3':7.0,
                               's4':6.0,'s5':4.0,'s6':9.0}.items():
                self.sensors[sid].inject_step_fault(mag)
        else:                               # Phase 3: s1 repaired; s5 persists
            for s in ['s1','s2','s3','s4','s6']: self.sensors[s].clear_fault()
            self.sensors['s5'].inject_step_fault(4.0)

    def _notify_repair(self, t_step: int):
        """Broadcast s1 repair notification at t ≈ 70 s (Step 20–22)."""
        if abs(t_step * self.dt - 70.0) < self.dt / 2:
            for aid in ['a1', 'a5']:
                self.agents[aid].notify_repair('s1')

    def _neighbor_tau(self, agent_id: str) -> Dict[str, Dict[str, float]]:
        """Assemble neighbour trust dict for one agent (Eq. 9 neighbour term)."""
        result: Dict[str, Dict[str, float]] = {}
        for sid in self.AGENT_SENSORS[agent_id]:
            result[sid] = {nbr: self.agents[nbr].trust.get(sid, 1.0)
                           for nbr in self.SENSOR_USERS[sid]
                           if nbr != agent_id}
        return result

    def _control(self, aid: str, t: int) -> np.ndarray:
        """Simple proportional-derivative tracking (placeholder for LQI)."""
        x_ref = self.refs[aid][min(t, self.steps - 1)]
        x     = self.dynamics[aid].x
        K     = np.array([[0.5, 0.0, 1.0, 0.0],
                           [0.0, 0.5, 0.0, 1.0]])
        return np.clip(-K @ (x - x_ref), -5.0, 5.0)

    def run(self) -> Dict:
        """Execute the full T_sim simulation; return per-agent metrics."""
        print(f"Running FSE Network — {self.steps} steps, dt={self.dt} s, "
              f"T_sim={self.T_sim} s\n")
        for t in range(self.steps):
            self._inject_faults(t)
            self._notify_repair(t)

            for aid in self.agents:
                dyn   = self.dynamics[aid]
                x_ref = self.refs[aid][min(t, self.steps - 1)]

                # Advance true state
                x_true = dyn.step(self._control(aid, t))

                # Collect measurements from all assigned sensors
                meas = {sid: self.sensors[sid].measure(x_true)
                        for sid in self.AGENT_SENSORS[aid]}

                nbr_tau = self._neighbor_tau(aid)

                # FSE step
                x_hat_fse = self.agents[aid].step(meas, nbr_tau, x_true, x_ref)
                self.rmse_fse[aid].append(np.linalg.norm(x_hat_fse - x_true))

                # SSE baseline step
                x_hat_sse = self.sse_agents[aid].step(meas, nbr_tau)
                self.rmse_sse[aid].append(np.linalg.norm(x_hat_sse - x_true))

        return self._metrics()

    def _metrics(self) -> Dict:
        """Compute RMSE_Est (Eq. 30) from t = 20 s onwards."""
        i0  = int(20.0 / self.dt)
        out = {}
        for aid in self.agents:
            fse_err = np.array(self.rmse_fse[aid][i0:])
            sse_err = np.array(self.rmse_sse[aid][i0:])
            out[aid] = {
                'RMSE_FSE':    round(float(np.sqrt(np.mean(fse_err**2))), 4),
                'RMSE_SSE':    round(float(np.sqrt(np.mean(sse_err**2))), 4),
                'final_mode':  self.agents[aid].history['mode'][-1],
                'final_active': self.agents[aid].history['active'][-1],
                'final_trust': {k: round(v, 3) for k, v
                                in self.agents[aid].history['trust'][-1].items()},
            }
        fse_vals = [v['RMSE_FSE'] for v in out.values()]
        sse_vals = [v['RMSE_SSE'] for v in out.values()]
        out['Average'] = {'RMSE_FSE': round(np.mean(fse_vals), 4),
                           'RMSE_SSE': round(np.mean(sse_vals), 4)}
        return out


# ─────────────────────────────────────────────────────────────────────────────
# §10 SSE BASELINE — Trust-based Selection State Estimation  (Ref. [1])
#
#     Selects the single highest-trust sensor at each timestep;
#     no fusion across multiple sensors.
# ─────────────────────────────────────────────────────────────────────────────

class SSEAgent:
    """Baseline from Tedesco et al. [1]: single-sensor selection via trust."""

    def __init__(self, agent_id: str,
                 A: np.ndarray, C: np.ndarray, L: np.ndarray,
                 sensor_ids: List[str], x0: Optional[np.ndarray] = None):
        self.id       = agent_id
        self.obs_bank = ObserverBank(A, C, L, sensor_ids, x0)
        self.trust    = {s: 1.0 for s in sensor_ids}
        self.x_hat    = np.zeros(A.shape[0]) if x0 is None else x0.copy()
        self.alpha    = 0.9; self.beta = 0.05

    def step(self, measurements: Dict[str, np.ndarray],
             neighbor_tau: Dict[str, Dict[str, float]]) -> np.ndarray:
        estimates = self.obs_bank.update(measurements)
        # Pure consensus trust update — no P&O, no multi-sensor fusion
        for s in self.trust:
            nbrs = neighbor_tau.get(s, {})
            self.trust[s] = float(np.clip(
                self.alpha * self.trust[s]
                + self.beta * sum(tv for tv in nbrs.values()),
                0.0, 1.0))
        # Use only the single highest-trust observer
        best = max(self.trust, key=self.trust.get)
        if best in estimates:
            self.x_hat = estimates[best]
        return self.x_hat.copy()


# ─────────────────────────────────────────────────────────────────────────────
# §11 ENTRY POINT + RESULTS SUMMARY
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=" * 70)
    print("  FSE: Trust-based Fusion State Estimation")
    print("  Tedesco et al. — Information Fusion 133 (2026) 104261")
    print("=" * 70)

    net     = FSENetwork(dt=0.1, T_sim=100.0, noise_std=0.05, seed=42)
    metrics = net.run()

    # ── Per-agent comparison table (mirrors Table 3) ────────────────────
    print("\n── RMSE Comparison: FSE vs SSE (from t = 20 s) ─────────────────────")
    print(f"{'Agent':>6}  {'RMSE_FSE':>10}  {'RMSE_SSE':>10}  "
          f"{'Improvement':>14}  {'Final Mode':>10}")
    print("-" * 68)
    for aid in ['a1', 'a2', 'a3', 'a4', 'a5']:
        m   = metrics[aid]
        imp = (1.0 - m['RMSE_FSE'] / max(m['RMSE_SSE'], 1e-9)) * 100
        print(f"{aid:>6}  {m['RMSE_FSE']:>10.4f}  {m['RMSE_SSE']:>10.4f}  "
              f"{imp:>12.1f}%  {m['final_mode']:>10}")
    avgs = metrics['Average']
    print("-" * 68)
    print(f"{'Average':>6}  {avgs['RMSE_FSE']:>10.4f}  {avgs['RMSE_SSE']:>10.4f}")

    # ── Per-phase RMSE breakdown ────────────────────────────────────────
    print("\n── Phase-wise RMSE (FSE) ─────────────────────────────────────────────")
    phases = [(0, 38, "Phase 1 [ 0–38 s]  s1+s4 corrupted"),
              (38, 70, "Phase 2 [38–70 s]  all 6 sensors corrupted"),
              (70, 100, "Phase 3 [70–100s]  s1 repaired; s5 persists")]
    for t0, t1, label in phases:
        i0, i1 = int(t0 / net.dt), int(t1 / net.dt)
        vals   = [np.sqrt(np.mean(np.array(net.rmse_fse[a][i0:i1])**2))
                  for a in ['a1','a2','a3','a4','a5']]
        print(f"  {label}")
        print("    " + "  ".join(f"a{i+1}:{v:.4f}"
                                 for i, v in enumerate(vals))
              + f"  avg:{np.mean(vals):.4f}")

    # ── Automaton mode distribution ─────────────────────────────────────
    print("\n── P&O Mode Distribution (% of simulation time) ─────────────────────")
    for aid in ['a1','a2','a3','a4','a5']:
        modes  = net.agents[aid].history['mode']
        n      = len(modes)
        counts = {m: modes.count(m) / n * 100
                  for m in ['HOME', 'PERTURB', 'OBSERVE']}
        print(f"  {aid}: HOME {counts['HOME']:5.1f}%  "
              f"PERTURB {counts['PERTURB']:5.1f}%  "
              f"OBSERVE {counts['OBSERVE']:5.1f}%")

    # ── Theorem 1 & 2 convergence check ────────────────────────────────
    print("\n── Convergence Check (Theorem 1 & 2, ε = 0.10) ─────────────────────")
    for aid in ['a1','a2','a3','a4','a5']:
        J_final = net.agents[aid].history['J'][-1]
        status  = "✓ J < ε  (HOME)" if J_final < 0.1 else "~ best achievable"
        print(f"  {aid}: J_final = {J_final:.5f}  →  {status}")

    # ── Final trust and active sets ──────────────────────────────────────
    print("\n── Final Trust Levels & Active Sensor Sets ─────────────────────────")
    for aid in ['a1','a2','a3','a4','a5']:
        m = metrics[aid]
        t_str = "  ".join(f"{k}:{v:.3f}"
                            for k, v in sorted(m['final_trust'].items()))
        print(f"  {aid}  active={m['final_active']}  trust: {t_str}")

    print("\nSimulation complete.\n")

Access the Paper and Resources

The full FSE framework, theoretical proofs, simulation details, and experimental validation are available in the published article. This research was conducted by Tedesco, Gagliardi, Casavola, Franzè, and Fortino at the University of Calabria, published in Information Fusion, March 2026.

Academic Citation:
Tedesco, F., Gagliardi, G., Casavola, A., Franzè, G., & Fortino, G. (2026). A joint opinion dynamics and performance-driven approach for state estimation fusion of multi-agent systems. Information Fusion, 133, 104261. https://doi.org/10.1016/j.inffus.2026.104261

This article is an independent editorial analysis of peer-reviewed research published in Information Fusion (Elsevier). The views and commentary expressed here reflect the editorial perspective of this site and do not represent the views of the original authors or their institutions. Code is provided for educational purposes to illustrate technical concepts. Always refer to the original publication for authoritative details.

Explore More on AI Research

If this analysis sparked your interest, here is more of what we cover across the site — from foundational tutorials to the latest breakthroughs in autonomous systems, distributed estimation, and safe AI deployment.

Leave a Comment

Your email address will not be published. Required fields are marked *

Follow by Email
Tiktok