Downtime Monitoring Serviceο
This tutorial walks through building a decentralized, verifiable downtime monitoring system step-by-step.
Starting from a simple centralized service, we progressively add decentralized replication, cryptographic proof aggregation, and full BLS-signed downtime reporting across multiple monitoring nodes.
By the end, you will have a complete monitoring system where downtime events and measurements are cryptographically verifiable β enabling any third party to independently audit downtime proofs without relying on trust.
01 - Centralized Monitoring Serviceο
In the first version, a single centralized server monitors node states by periodically querying their health endpoints.
The server tracks the current state (up or down) of each node and logs events whenever a state change is detected.
π File: downtime_monitoring/01_centralized_monitoring_service.py
Key Concepts and Implementation Detailsο
Monitoring Nodes: A dictionary
MONITORED_NODES
defines the list of nodes being monitored, each identified by an address and a URL.# Node URL configuration MONITORED_NODES = { "0xNodeA123": "http://localhost:8001", "0xNodeB456": "http://localhost:8002", "0xNodeC789": "http://localhost:8003", }
Node State Tracking: Two in-memory structures are maintained:
nodes_state
: Current up/down status of each node.nodes_events
: Historical list of state change events for each node.
nodes_state: dict[str, str] = {addr: "up" for addr in MONITORED_NODES} nodes_events: dict[str, list[dict[str, Any]]] = { addr: [{"state": "up", "timestamp": 0}] for addr in MONITORED_NODES }
Health Checking: The
check_node_state
function periodically queries each nodeβs/health
endpoint to determine if it is up or down.def check_node_state(node_address: str, node_url: str) -> str: try: response = requests.get(f"{node_url}/health", timeout=REQUEST_TIMEOUT) return "up" if response.status_code == 200 else "down" except requests.RequestException: return "down"
Change Detection: If a nodeβs state has changed since the last check, the change is logged, stored, and timestamped.
def monitor_loop(): while True: node_address = random.choice(list(MONITORED_NODES.keys())) node_url = MONITORED_NODES[node_address] new_state = check_node_state(node_address, node_url) last_state = nodes_state.get(node_address) if last_state != new_state: nodes_state[node_address] = new_state event = {"state": new_state, "timestamp": int(time.time())} nodes_events[node_address].append(event) logger.info(f"{node_address} β {new_state}") else: logger.info(f"No change: {node_address} is {new_state}") time.sleep(POLL_INTERVAL_SECONDS)
Downtime Calculation: The
calculate_downtime
function computes total downtime for a node between two timestamps based on recorded events.def calculate_downtime(events: list[dict[str, Any]], from_ts: int, to_ts: int) -> int: interval_events = [e for e in events if from_ts <= e["timestamp"] <= to_ts] if not interval_events: starting_state = max( (e for e in events if e["timestamp"] < from_ts), key=lambda e: e["timestamp"], )["state"] return to_ts - from_ts if starting_state == "down" else 0 downtime = 0 down_since = from_ts for event in interval_events: if event["state"] == "down": down_since = event["timestamp"] elif event["state"] == "up": downtime += event["timestamp"] - down_since if interval_events[-1]["state"] == "down": downtime += to_ts - down_since return downtime
FastAPI Endpoint: A
/downtime
endpoint allows external clients to query a nodeβs total downtime for a given period.@app.get("/downtime") def get_downtime(address: str, from_timestamp: int, to_timestamp: int): if address not in nodes_events: raise HTTPException(status_code=404, detail="Address not found") events = nodes_events[address] total_downtime = calculate_downtime(events, from_timestamp, to_timestamp) return JSONResponse( { "address": address, "from_timestamp": from_timestamp, "to_timestamp": to_timestamp, "total_downtime_seconds": total_downtime, } )
Background Monitoring Loop: The
monitor_loop
function runs in a background thread, randomly selecting nodes to check at fixed intervals.if __name__ == "__main__": threading.Thread(target=monitor_loop, daemon=True).start() uvicorn.run(app, host="0.0.0.0", port=5000)
Endpointsο
GET /downtime?address=<address>&from_timestamp=<from>&to_timestamp=<to>
: Retrieve the total downtime (in seconds) for a specific node over a given time period.
Example Usageο
Check Downtime
curl "http://localhost:5000/downtime?address=0xNodeA123&from_timestamp=1714130000&to_timestamp=1714140000"
Limitationsο
Single-node, centralized architecture
No replication or failover
No cryptographic guarantees on downtime proofs
Trust is placed entirely in the monitoring server
In the next step, we introduce replicated monitoring using Zellular to distribute and synchronize downtime tracking across multiple nodes.
02 - Replicated Monitoring Serviceο
In this version, the downtime monitoring service is decentralized across multiple nodes using Zellular.
Instead of only storing downtime events locally, nodes now send and retrieve events through the Zellular network.
This ensures that all replicas observe and apply the same downtime updates, providing resilience and consistency even if some nodes temporarily fail.
π File: downtime_monitoring/02_replicated_monitoring_service.py
Key Concepts and Implementation Detailsο
Decentralized Event Synchronization: Downtime events (state changes) are sent to Zellular, making them available to all participating nodes.
if last_state != new_state: event = { "address": node_address, "state": new_state, "timestamp": int(time.time()), } zellular.send([event], blocking=False) logger.info( f"Sent state change event to Zellular: {node_address} β {new_state}" ) else: logger.info(f"No change: {node_address} is {new_state}")
Eigenlayer Network Configuration: Nodes connect to a shared Eigenlayer network and configure a threshold percentage for batching updates.
# Initialize Zellular client network = EigenlayerNetwork( subgraph_url="https://api.studio.thegraph.com/query/95922/avs-subgraph/version/latest", threshold_percent=40, ) zellular = Zellular("downtime-monitoring", network)
Two Separate Loops:
monitor_loop: Periodically checks node health and sends downtime events if a change is detected.
process_loop: Continuously pulls downtime events from Zellular and applies them to the local event log.
if __name__ == "__main__": threading.Thread(target=monitor_loop, daemon=True).start() threading.Thread(target=process_loop, daemon=True).start() uvicorn.run(app, host="0.0.0.0", port=5000)
Event-based Updates: Local node state and history are updated only after receiving and applying a downtime event.
def apply_event(event: dict[str, Any]): address = event["address"] state = event["state"] timestamp = event["timestamp"] last_state = nodes_state.get(address) if last_state != state: nodes_state[address] = state nodes_events[address].append({"state": state, "timestamp": timestamp}) logger.info(f"Applied event: {address} β {state}") else: logger.warning(f"Duplicate state for {address}, event ignored")
Limitationsο
Still no cryptographic proof of downtime (events are trusted as-is)
Zellular ensures consistency but not verifiability
Anyone who controls event submission could falsify downtime data
In the next step, we introduce BLS signatures and decentralized attestation to verify downtime claims cryptographically.
03 - Proof Aggregating Monitoring Serviceο
In this version, downtime events are no longer trusted blindly. Before accepting a nodeβs state change, the monitoring node gathers signed confirmations from other nodes.
These confirmations are BLS aggregated into a single compact proof, ensuring that a majority agrees on the state change before it is accepted and broadcast through Zellular.
This upgrade introduces cryptographic verifiability to downtime monitoring.
π File: downtime_monitoring/03_proof_aggregating_monitoring_service.py
Key Concepts and Implementation Detailsο
BLS-Based State Confirmation: Each downtime monitoring node signs the current state (up/down) of a node together with a timestamp.
message = f"Address: {address}, State: {state}, Timestamp: {timestamp}".encode( "utf-8" ) signature = PopSchemeMPL.sign(sk, message)
Asynchronous State Gathering: When a change is detected, the monitoring node queries other nodes for their signed state observations asynchronously.
async with aiohttp.ClientSession() as session: tasks = [ fetch_state(session, node, info, address, timestamp) for node, info in MONITORING_NODES.items() if node != SELF_NODE_ID ] results = await asyncio.gather(*tasks)
Signature Aggregation and Threshold Enforcement: Only if a 2/3 majority of nodes agree, their individual BLS signatures are aggregated into a single proof.
def aggregate_signatures( message: bytes, expected_value: Any, results: list[tuple[str, Any, str]] ): valid_signatures = [] non_signers = [] for node_name, value, signature_hex in results: if value != expected_value or signature_hex is None: non_signers.append(node_name) continue try: pubkey = G1Element.from_bytes( bytes.fromhex(MONITORING_NODES[node_name]["pubkey"]) ) signature = G2Element.from_bytes(bytes.fromhex(signature_hex)) if PopSchemeMPL.verify(pubkey, message, signature): valid_signatures.append(signature) else: non_signers.append(node_name) except Exception: non_signers.append(node_name) if len(valid_signatures) < 2 * len(MONITORING_NODES) / 3: raise ValueError("Not enough valid signatures to reach threshold") aggregated_signature = PopSchemeMPL.aggregate(valid_signatures) return aggregated_signature, non_signers
Verifiable Event Broadcasting: The aggregated proof is attached to the downtime event and sent to Zellular. All replicas verify the proof before accepting and applying the event.
def process_loop(): for batch, index in zellular.batches(): events = json.loads(batch) for event in events: try: verified = verify_event(event) except Exception as e: logger.warning( f"Error in event verification: {event}, error: {type(e)} {e}" ) continue if verified: apply_event(event) else: logger.error(f"Invalid proof for event {event['address']}, ignored")
Limitationsο
Still relies on a single node to calculate and report downtime
Cryptographic proofs exist for state changes, but not for total downtime
In the next step, we extend BLS verification to cover downtime calculations, ensuring majority agreement on reported downtime values.
04 - Verifiable Downtime Monitoring Serviceο
In a decentralized monitoring system, itβs not enough to track downtime β the correctness of the reported downtime must also be verifiable.
When other services (such as reward managers, slashing modules, or external analytics) rely on this monitoring system, they must be able to trust the downtime data. Verifiable downtime proofs allow these external systems to independently confirm that nodes report downtime accurately and consistently, without relying purely on trust.
By signing each downtime response with a BLS key:
The monitoring node attests to the specific downtime value it calculated.
The signature can be independently verified or aggregated with signatures from other nodes.
Clients can detect misreporting or inconsistencies across different monitoring nodes.
This verifiability forms the foundation for trustless reward distribution, slashing mechanisms, and interoperable decentralized monitoring, ensuring resilience against dishonest participants.
π File: downtime_monitoring/04_verifiable_monitoring_service.py
Key Concepts and Implementation Detailsο
Signed Downtime Calculation: Every node signs its calculated downtime for a given address and time period.
@app.get("/downtime")
def get_downtime(address: str, from_timestamp: int, to_timestamp: int):
if address not in nodes_events:
raise HTTPException(status_code=404, detail="Address not found")
events = nodes_events[address]
total_downtime = calculate_downtime(events, from_timestamp, to_timestamp)
message = f"Address: {address}, Downtime: {total_downtime}, From: {from_timestamp}, To: {to_timestamp}".encode(
"utf-8"
)
signature = PopSchemeMPL.sign(sk, message)
return JSONResponse(
{
"address": address,
"from_timestamp": from_timestamp,
"to_timestamp": to_timestamp,
"total_downtime_seconds": total_downtime,
"signature": str(signature),
}
)
Downtime Aggregation: A node can request downtime values from other monitoring nodes, aggregate valid signatures, and verify majority consensus.
async def fetch_downtime(
session: aiohttp.ClientSession,
node_name: str,
node_info: dict[str, str],
address: str,
from_timestamp: int,
to_timestamp: int,
):
try:
async with session.get(
f"{node_info['url']}/downtime",
params={
"address": address,
"from_timestamp": from_timestamp,
"to_timestamp": to_timestamp,
},
timeout=REQUEST_TIMEOUT,
) as response:
data = await response.json()
return node_name, data["total_downtime_seconds"], data["signature"]
except Exception:
return node_name, None, None
async def query_monitoring_nodes_for_downtime(
address: str, from_timestamp: int, to_timestamp: int
) -> tuple[list[tuple[str, int, str]], int]:
async with aiohttp.ClientSession() as session:
tasks = [
fetch_downtime(session, node, info, address, from_timestamp, to_timestamp)
for node, info in MONITORING_NODES.items()
if node != SELF_NODE_ID
]
results = await asyncio.gather(*tasks)
events = nodes_events.get(address)
if not events:
raise HTTPException(status_code=404, detail="Address not found")
total_downtime = calculate_downtime(events, from_timestamp, to_timestamp)
message = f"Address: {address}, Downtime: {total_downtime}, From: {from_timestamp}, To: {to_timestamp}".encode(
"utf-8"
)
signature = PopSchemeMPL.sign(sk, message)
results.append((SELF_NODE_ID, total_downtime, str(signature)))
return results, total_downtime
New `/aggregate_downtime` Endpoint: A new FastAPI route triggers aggregation of downtime proofs and returns a cryptographically verifiable result.
@app.get("/aggregate_downtime")
async def aggregate_downtime(address: str, from_timestamp: int, to_timestamp: int):
results, target_downtime = await query_monitoring_nodes_for_downtime(
address, from_timestamp, to_timestamp
)
message = f"Address: {address}, Downtime: {target_downtime}, From: {from_timestamp}, To: {to_timestamp}".encode(
"utf-8"
)
try:
aggregated_signature, non_signers = aggregate_signatures(
message, target_downtime, results
)
except ValueError:
raise HTTPException(
status_code=424, # 424 Failed Dependency
detail="Not enough valid signatures to aggregate downtime proof.",
)
return {
"address": address,
"from_timestamp": from_timestamp,
"to_timestamp": to_timestamp,
"total_downtime_seconds": target_downtime,
"aggregated_signature": str(aggregated_signature),
"non_signing_nodes": non_signers,
}