diff --git a/src/service/igmp_server.rs b/src/service/igmp_server.rs index 3a4b6e2..c4e54f5 100644 --- a/src/service/igmp_server.rs +++ b/src/service/igmp_server.rs @@ -10,12 +10,12 @@ use std::time::Duration; use crate::error::*; lazy_static::lazy_static! { - //组播缓存 10分钟 (token,group_address) -> members + //组播缓存 30分钟 (token,group_address) -> members static ref MULTICAST:Cache<(String,Ipv4Addr), Arc>> = Cache::builder() - .time_to_idle(Duration::from_secs(10*60)).build(); + .time_to_idle(Duration::from_secs(30*60)).build(); // (token,group_address,member_ip) static ref MULTICAST_MEMBER:Cache<(String,Ipv4Addr,Ipv4Addr), ()> = Cache::builder() - .time_to_idle(Duration::from_secs(10*60)).eviction_listener(|k:Arc<(String,Ipv4Addr,Ipv4Addr)>,_,cause|{ + .time_to_idle(Duration::from_secs(20*60)).eviction_listener(|k:Arc<(String,Ipv4Addr,Ipv4Addr)>,_,cause|{ if cause==moka::notification::RemovalCause::Replaced{ return; } diff --git a/src/service/udp_service.rs b/src/service/udp_service.rs index e430d77..8528536 100644 --- a/src/service/udp_service.rs +++ b/src/service/udp_service.rs @@ -284,7 +284,7 @@ fn handle(udp: &UdpSocket, buf: &mut [u8], addr: SocketAddr, config: &ConfigInfo Ok(()) } -fn broadcast(udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr]) -> Result<()> { +fn broadcast(source_addr: SocketAddr, udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr]) -> Result<()> { if let Some(v) = VIRTUAL_NETWORK.get(&context.token) { let lock = v.read(); let ips: Vec = lock @@ -297,7 +297,9 @@ fn broadcast(udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr for ip in ips { if !exclude.contains(&Ipv4Addr::from(ip)) { if let Some(peer) = DEVICE_ADDRESS.get(&(context.token.clone(), ip)) { - let _ = udp.send_to(buf, peer); + if peer != source_addr { + let _ = udp.send_to(buf, peer); + } } } } @@ -306,7 +308,7 @@ fn broadcast(udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr Ok(()) } -fn multicast(udp: &UdpSocket, context: &Context, buf: &[u8], multicast_info: &RwLock, exclude: &[Ipv4Addr]) -> Result<()> { +fn multicast(source_addr: SocketAddr, udp: &UdpSocket, context: &Context, buf: &[u8], multicast_info: &RwLock, exclude: &[Ipv4Addr]) -> Result<()> { if let Some(v) = VIRTUAL_NETWORK.get(&context.token) { let lock = v.read(); let ips: Vec = lock @@ -321,7 +323,9 @@ fn multicast(udp: &UdpSocket, context: &Context, buf: &[u8], multicast_info: &Rw let ipv4 = Ipv4Addr::from(ip); if !exclude.contains(&ipv4) && info.is_send(&ipv4) { if let Some(peer) = DEVICE_ADDRESS.get(&(context.token.clone(), ip)) { - let _ = udp.send_to(buf, peer); + if peer != source_addr { + let _ = udp.send_to(buf, peer); + } } } } @@ -347,7 +351,7 @@ fn handle_( match ip_turn_packet::Protocol::from(net_packet.transport_protocol()) { ip_turn_packet::Protocol::Ipv4Broadcast => { net_packet.set_transport_protocol(ip_turn_packet::Protocol::Ipv4.into()); - return change_broadcast(udp, &context, config.broadcast, destination, net_packet.buffer()); + return change_broadcast(addr, udp, &context, config.broadcast, destination, net_packet.buffer()); } ip_turn_packet::Protocol::Icmp => {} ip_turn_packet::Protocol::Igmp => { @@ -355,19 +359,26 @@ fn handle_( if ipv4.protocol() == ipv4::protocol::Protocol::Igmp { crate::service::igmp_server::handle(ipv4.payload(), &context.token, source)?; //Igmp数据也会广播出去,让大家都知道谁加入什么组播 - broadcast(udp, &context, net_packet.buffer(), &[])?; + broadcast(addr, udp, &context, net_packet.buffer(), &[])?; } return Ok(()); } ip_turn_packet::Protocol::Ipv4 => { //处理广播 if destination.is_broadcast() || config.broadcast == destination { - broadcast(udp, &context, net_packet.buffer(), &[])?; + broadcast(addr, udp, &context, net_packet.buffer(), &[])?; return Ok(()); } else if destination.is_multicast() { + let ipv4 = IpV4Packet::new(net_packet.payload())?; + if ipv4.protocol() == ipv4::protocol::Protocol::Igmp { + crate::service::igmp_server::handle(ipv4.payload(), &context.token, source)?; + //Igmp数据也会广播出去,让大家都知道谁加入什么组播 + broadcast(addr, udp, &context, net_packet.buffer(), &[])?; + return Ok(()); + } if let Some(multicast_info) = crate::service::igmp_server ::load(&context.token, destination) { - multicast(udp, &context, net_packet.buffer(), &multicast_info, &[])?; + multicast(addr, udp, &context, net_packet.buffer(), &multicast_info, &[])?; } return Ok(()); } @@ -489,17 +500,17 @@ fn handle_( } /// 选择性转发广播/组播,并且去除尾部 -fn change_broadcast(udp: &UdpSocket, context: &Context, broadcast_addr: Ipv4Addr, destination: Ipv4Addr, buf: &[u8]) -> Result<()> { +fn change_broadcast(source_addr: SocketAddr, udp: &UdpSocket, context: &Context, broadcast_addr: Ipv4Addr, destination: Ipv4Addr, buf: &[u8]) -> Result<()> { let packet_end = BroadcastPacketEnd::new(buf)?; let end_len = packet_end.len(); let exclude = packet_end.addresses(); let buf = &buf[..buf.len() - end_len]; if destination.is_broadcast() || broadcast_addr == destination { - broadcast(udp, context, buf, &exclude)?; + broadcast(source_addr, udp, context, buf, &exclude)?; } else if destination.is_multicast() { if let Some(multicast_info) = crate::service::igmp_server ::load(&context.token, destination) { - multicast(udp, context, buf, &multicast_info, &exclude)?; + multicast(source_addr, udp, context, buf, &multicast_info, &exclude)?; } } Ok(())