Downtime Monitoring Service

This tutorial walks through building a decentralized, verifiable downtime monitoring system step-by-step.

Starting from a simple centralized service, we progressively add decentralized replication, cryptographic proof aggregation, and full BLS-signed downtime reporting across multiple monitoring nodes.

By the end, you will have a complete monitoring system where downtime events and measurements are cryptographically verifiable β€” enabling any third party to independently audit downtime proofs without relying on trust.

01 - Centralized Monitoring Service

In the first version, a single centralized server monitors node states by periodically querying their health endpoints.

The server tracks the current state (up or down) of each node and logs events whenever a state change is detected.

πŸ“„ File: downtime_monitoring/01_centralized_monitoring_service.py

Key Concepts and Implementation Details

  • Monitoring Nodes: A dictionary MONITORED_NODES defines the list of nodes being monitored, each identified by an address and a URL.

    # Node URL configuration
    MONITORED_NODES = {
        "0xNodeA123": "http://localhost:8001",
        "0xNodeB456": "http://localhost:8002",
        "0xNodeC789": "http://localhost:8003",
    }
    
  • Node State Tracking: Two in-memory structures are maintained:

    • nodes_state: Current up/down status of each node.

    • nodes_events: Historical list of state change events for each node.

    nodes_state: dict[str, str] = {addr: "up" for addr in MONITORED_NODES}
    
    nodes_events: dict[str, list[dict[str, Any]]] = {
        addr: [{"state": "up", "timestamp": 0}] for addr in MONITORED_NODES
    }
    
  • Health Checking: The check_node_state function periodically queries each node’s /health endpoint to determine if it is up or down.

    def check_node_state(node_address: str, node_url: str) -> str:
        try:
            response = requests.get(f"{node_url}/health", timeout=REQUEST_TIMEOUT)
            return "up" if response.status_code == 200 else "down"
        except requests.RequestException:
            return "down"
    
    
    
  • Change Detection: If a node’s state has changed since the last check, the change is logged, stored, and timestamped.

    def monitor_loop():
        while True:
            node_address = random.choice(list(MONITORED_NODES.keys()))
            node_url = MONITORED_NODES[node_address]
    
            new_state = check_node_state(node_address, node_url)
            last_state = nodes_state.get(node_address)
    
            if last_state != new_state:
                nodes_state[node_address] = new_state
                event = {"state": new_state, "timestamp": int(time.time())}
                nodes_events[node_address].append(event)
                logger.info(f"{node_address} βž” {new_state}")
            else:
                logger.info(f"No change: {node_address} is {new_state}")
    
            time.sleep(POLL_INTERVAL_SECONDS)
    
    
    
  • Downtime Calculation: The calculate_downtime function computes total downtime for a node between two timestamps based on recorded events.

    def calculate_downtime(events: list[dict[str, Any]], from_ts: int, to_ts: int) -> int:
        interval_events = [e for e in events if from_ts <= e["timestamp"] <= to_ts]
    
        if not interval_events:
            starting_state = max(
                (e for e in events if e["timestamp"] < from_ts),
                key=lambda e: e["timestamp"],
            )["state"]
            return to_ts - from_ts if starting_state == "down" else 0
    
        downtime = 0
        down_since = from_ts
    
        for event in interval_events:
            if event["state"] == "down":
                down_since = event["timestamp"]
            elif event["state"] == "up":
                downtime += event["timestamp"] - down_since
    
        if interval_events[-1]["state"] == "down":
            downtime += to_ts - down_since
    
        return downtime
    
    
    
  • FastAPI Endpoint: A /downtime endpoint allows external clients to query a node’s total downtime for a given period.

    @app.get("/downtime")
    def get_downtime(address: str, from_timestamp: int, to_timestamp: int):
        if address not in nodes_events:
            raise HTTPException(status_code=404, detail="Address not found")
    
        events = nodes_events[address]
        total_downtime = calculate_downtime(events, from_timestamp, to_timestamp)
        return JSONResponse(
            {
                "address": address,
                "from_timestamp": from_timestamp,
                "to_timestamp": to_timestamp,
                "total_downtime_seconds": total_downtime,
            }
        )
    
    
    
  • Background Monitoring Loop: The monitor_loop function runs in a background thread, randomly selecting nodes to check at fixed intervals.

    if __name__ == "__main__":
        threading.Thread(target=monitor_loop, daemon=True).start()
        uvicorn.run(app, host="0.0.0.0", port=5000)
    

Endpoints

  • GET /downtime?address=<address>&from_timestamp=<from>&to_timestamp=<to>: Retrieve the total downtime (in seconds) for a specific node over a given time period.

Example Usage

  1. Check Downtime

curl "http://localhost:5000/downtime?address=0xNodeA123&from_timestamp=1714130000&to_timestamp=1714140000"

Limitations

  • Single-node, centralized architecture

  • No replication or failover

  • No cryptographic guarantees on downtime proofs

  • Trust is placed entirely in the monitoring server

In the next step, we introduce replicated monitoring using Zellular to distribute and synchronize downtime tracking across multiple nodes.

02 - Replicated Monitoring Service

In this version, the downtime monitoring service is decentralized across multiple nodes using Zellular.

Instead of only storing downtime events locally, nodes now send and retrieve events through the Zellular network.

This ensures that all replicas observe and apply the same downtime updates, providing resilience and consistency even if some nodes temporarily fail.

πŸ“„ File: downtime_monitoring/02_replicated_monitoring_service.py

Key Concepts and Implementation Details

  • Decentralized Event Synchronization: Downtime events (state changes) are sent to Zellular, making them available to all participating nodes.

            if last_state != new_state:
                event = {
                    "address": node_address,
                    "state": new_state,
                    "timestamp": int(time.time()),
                }
                zellular.send([event], blocking=False)
                logger.info(
                    f"Sent state change event to Zellular: {node_address} βž” {new_state}"
                )
            else:
                logger.info(f"No change: {node_address} is {new_state}")
    
  • Eigenlayer Network Configuration: Nodes connect to a shared Eigenlayer network and configure a threshold percentage for batching updates.

    # Initialize Zellular client
    network = EigenlayerNetwork(
        subgraph_url="https://api.studio.thegraph.com/query/95922/avs-subgraph/version/latest",
        threshold_percent=40,
    )
    zellular = Zellular("downtime-monitoring", network)
    
  • Two Separate Loops:

    • monitor_loop: Periodically checks node health and sends downtime events if a change is detected.

    • process_loop: Continuously pulls downtime events from Zellular and applies them to the local event log.

    if __name__ == "__main__":
        threading.Thread(target=monitor_loop, daemon=True).start()
        threading.Thread(target=process_loop, daemon=True).start()
        uvicorn.run(app, host="0.0.0.0", port=5000)
    
  • Event-based Updates: Local node state and history are updated only after receiving and applying a downtime event.

    def apply_event(event: dict[str, Any]):
        address = event["address"]
        state = event["state"]
        timestamp = event["timestamp"]
    
        last_state = nodes_state.get(address)
        if last_state != state:
            nodes_state[address] = state
            nodes_events[address].append({"state": state, "timestamp": timestamp})
            logger.info(f"Applied event: {address} βž” {state}")
        else:
            logger.warning(f"Duplicate state for {address}, event ignored")
    
    
    

Limitations

  • Still no cryptographic proof of downtime (events are trusted as-is)

  • Zellular ensures consistency but not verifiability

  • Anyone who controls event submission could falsify downtime data

In the next step, we introduce BLS signatures and decentralized attestation to verify downtime claims cryptographically.

03 - Proof Aggregating Monitoring Service

In this version, downtime events are no longer trusted blindly. Before accepting a node’s state change, the monitoring node gathers signed confirmations from other nodes.

These confirmations are BLS aggregated into a single compact proof, ensuring that a majority agrees on the state change before it is accepted and broadcast through Zellular.

This upgrade introduces cryptographic verifiability to downtime monitoring.

πŸ“„ File: downtime_monitoring/03_proof_aggregating_monitoring_service.py

Key Concepts and Implementation Details

  • BLS-Based State Confirmation: Each downtime monitoring node signs the current state (up/down) of a node together with a timestamp.

        message = f"Address: {address}, State: {state}, Timestamp: {timestamp}".encode(
            "utf-8"
        )
        signature = PopSchemeMPL.sign(sk, message)
    
  • Asynchronous State Gathering: When a change is detected, the monitoring node queries other nodes for their signed state observations asynchronously.

        async with aiohttp.ClientSession() as session:
            tasks = [
                fetch_state(session, node, info, address, timestamp)
                for node, info in MONITORING_NODES.items()
                if node != SELF_NODE_ID
            ]
            results = await asyncio.gather(*tasks)
    
  • Signature Aggregation and Threshold Enforcement: Only if a 2/3 majority of nodes agree, their individual BLS signatures are aggregated into a single proof.

    def aggregate_signatures(
        message: bytes, expected_value: Any, results: list[tuple[str, Any, str]]
    ):
        valid_signatures = []
        non_signers = []
    
        for node_name, value, signature_hex in results:
            if value != expected_value or signature_hex is None:
                non_signers.append(node_name)
                continue
    
            try:
                pubkey = G1Element.from_bytes(
                    bytes.fromhex(MONITORING_NODES[node_name]["pubkey"])
                )
                signature = G2Element.from_bytes(bytes.fromhex(signature_hex))
                if PopSchemeMPL.verify(pubkey, message, signature):
                    valid_signatures.append(signature)
                else:
                    non_signers.append(node_name)
            except Exception:
                non_signers.append(node_name)
    
        if len(valid_signatures) < 2 * len(MONITORING_NODES) / 3:
            raise ValueError("Not enough valid signatures to reach threshold")
    
        aggregated_signature = PopSchemeMPL.aggregate(valid_signatures)
        return aggregated_signature, non_signers
    
    
    
  • Verifiable Event Broadcasting: The aggregated proof is attached to the downtime event and sent to Zellular. All replicas verify the proof before accepting and applying the event.

    def process_loop():
        for batch, index in zellular.batches():
            events = json.loads(batch)
            for event in events:
                try:
                    verified = verify_event(event)
                except Exception as e:
                    logger.warning(
                        f"Error in event verification: {event}, error: {type(e)} {e}"
                    )
                    continue
                if verified:
                    apply_event(event)
                else:
                    logger.error(f"Invalid proof for event {event['address']}, ignored")
    
    
    

Limitations

  • Still relies on a single node to calculate and report downtime

  • Cryptographic proofs exist for state changes, but not for total downtime

In the next step, we extend BLS verification to cover downtime calculations, ensuring majority agreement on reported downtime values.

04 - Verifiable Downtime Monitoring Service

In a decentralized monitoring system, it’s not enough to track downtime β€” the correctness of the reported downtime must also be verifiable.

When other services (such as reward managers, slashing modules, or external analytics) rely on this monitoring system, they must be able to trust the downtime data. Verifiable downtime proofs allow these external systems to independently confirm that nodes report downtime accurately and consistently, without relying purely on trust.

By signing each downtime response with a BLS key:

  • The monitoring node attests to the specific downtime value it calculated.

  • The signature can be independently verified or aggregated with signatures from other nodes.

  • Clients can detect misreporting or inconsistencies across different monitoring nodes.

This verifiability forms the foundation for trustless reward distribution, slashing mechanisms, and interoperable decentralized monitoring, ensuring resilience against dishonest participants.

πŸ“„ File: downtime_monitoring/04_verifiable_monitoring_service.py

Key Concepts and Implementation Details

  • Signed Downtime Calculation: Every node signs its calculated downtime for a given address and time period.

@app.get("/downtime")
def get_downtime(address: str, from_timestamp: int, to_timestamp: int):
    if address not in nodes_events:
        raise HTTPException(status_code=404, detail="Address not found")

    events = nodes_events[address]
    total_downtime = calculate_downtime(events, from_timestamp, to_timestamp)

    message = f"Address: {address}, Downtime: {total_downtime}, From: {from_timestamp}, To: {to_timestamp}".encode(
        "utf-8"
    )
    signature = PopSchemeMPL.sign(sk, message)

    return JSONResponse(
        {
            "address": address,
            "from_timestamp": from_timestamp,
            "to_timestamp": to_timestamp,
            "total_downtime_seconds": total_downtime,
            "signature": str(signature),
        }
    )


  • Downtime Aggregation: A node can request downtime values from other monitoring nodes, aggregate valid signatures, and verify majority consensus.

async def fetch_downtime(
    session: aiohttp.ClientSession,
    node_name: str,
    node_info: dict[str, str],
    address: str,
    from_timestamp: int,
    to_timestamp: int,
):
    try:
        async with session.get(
            f"{node_info['url']}/downtime",
            params={
                "address": address,
                "from_timestamp": from_timestamp,
                "to_timestamp": to_timestamp,
            },
            timeout=REQUEST_TIMEOUT,
        ) as response:
            data = await response.json()
            return node_name, data["total_downtime_seconds"], data["signature"]
    except Exception:
        return node_name, None, None


async def query_monitoring_nodes_for_downtime(
    address: str, from_timestamp: int, to_timestamp: int
) -> tuple[list[tuple[str, int, str]], int]:
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_downtime(session, node, info, address, from_timestamp, to_timestamp)
            for node, info in MONITORING_NODES.items()
            if node != SELF_NODE_ID
        ]
        results = await asyncio.gather(*tasks)

    events = nodes_events.get(address)
    if not events:
        raise HTTPException(status_code=404, detail="Address not found")

    total_downtime = calculate_downtime(events, from_timestamp, to_timestamp)

    message = f"Address: {address}, Downtime: {total_downtime}, From: {from_timestamp}, To: {to_timestamp}".encode(
        "utf-8"
    )
    signature = PopSchemeMPL.sign(sk, message)
    results.append((SELF_NODE_ID, total_downtime, str(signature)))

    return results, total_downtime


  • New `/aggregate_downtime` Endpoint: A new FastAPI route triggers aggregation of downtime proofs and returns a cryptographically verifiable result.

@app.get("/aggregate_downtime")
async def aggregate_downtime(address: str, from_timestamp: int, to_timestamp: int):
    results, target_downtime = await query_monitoring_nodes_for_downtime(
        address, from_timestamp, to_timestamp
    )

    message = f"Address: {address}, Downtime: {target_downtime}, From: {from_timestamp}, To: {to_timestamp}".encode(
        "utf-8"
    )
    try:
        aggregated_signature, non_signers = aggregate_signatures(
            message, target_downtime, results
        )
    except ValueError:
        raise HTTPException(
            status_code=424,  # 424 Failed Dependency
            detail="Not enough valid signatures to aggregate downtime proof.",
        )

    return {
        "address": address,
        "from_timestamp": from_timestamp,
        "to_timestamp": to_timestamp,
        "total_downtime_seconds": target_downtime,
        "aggregated_signature": str(aggregated_signature),
        "non_signing_nodes": non_signers,
    }