Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ lazy_static = "1.5"
log = "0.4"

tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["rt"] }

# Health check dependencies
axum = "0.8"
Expand Down
23 changes: 12 additions & 11 deletions src/dht/core_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use tokio::sync::{RwLock, oneshot};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

/// DHT key type (256-bit)
Expand Down Expand Up @@ -522,8 +522,8 @@ pub struct DhtCoreEngine {
/// Optional trust-aware peer selector for combining distance with trust scores
trust_peer_selector: Option<TrustAwarePeerSelector<EigenTrustEngine>>,

/// Shutdown flag for background maintenance tasks
shutdown: Arc<AtomicBool>,
/// Shutdown token for background maintenance tasks
shutdown: CancellationToken,
}

impl DhtCoreEngine {
Expand Down Expand Up @@ -586,7 +586,7 @@ impl DhtCoreEngine {
transport: None,
pending_requests: Arc::new(RwLock::new(HashMap::new())),
trust_peer_selector: None,
shutdown: Arc::new(AtomicBool::new(false)),
shutdown: CancellationToken::new(),
})
}

Expand Down Expand Up @@ -698,7 +698,7 @@ impl DhtCoreEngine {

/// Signal background maintenance tasks to stop
pub fn signal_shutdown(&self) {
self.shutdown.store(true, Ordering::Relaxed);
self.shutdown.cancel();
}

/// Start background maintenance tasks for security and health
Expand All @@ -707,16 +707,17 @@ impl DhtCoreEngine {
let eviction_manager = self.eviction_manager.clone();
let close_group_validator = self.close_group_validator.clone();
let security_metrics = self.security_metrics.clone();
let shutdown = Arc::clone(&self.shutdown);
let shutdown = self.shutdown.clone();

tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;

if shutdown.load(Ordering::Relaxed) {
tracing::info!("DHT core maintenance task shutting down");
break;
tokio::select! {
_ = interval.tick() => {}
() = shutdown.cancelled() => {
tracing::info!("DHT core maintenance task shutting down");
break;
}
}

// 1. Run Bucket Refresh Logic with Validation Integration
Expand Down
Loading