Skip to content

Commit

Permalink
修复广播问题
Browse files Browse the repository at this point in the history
  • Loading branch information
vnt-dev committed Jun 26, 2023
1 parent bc9d831 commit 69238df
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/service/igmp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use std::time::Duration;
use crate::error::*;

lazy_static::lazy_static! {
//组播缓存 10分钟 (token,group_address) -> members
//组播缓存 30分钟 (token,group_address) -> members
static ref MULTICAST:Cache<(String,Ipv4Addr), Arc<RwLock<Multicast>>> = Cache::builder()
.time_to_idle(Duration::from_secs(10*60)).build();
.time_to_idle(Duration::from_secs(30*60)).build();
// (token,group_address,member_ip)
static ref MULTICAST_MEMBER:Cache<(String,Ipv4Addr,Ipv4Addr), ()> = Cache::builder()
.time_to_idle(Duration::from_secs(10*60)).eviction_listener(|k:Arc<(String,Ipv4Addr,Ipv4Addr)>,_,cause|{
.time_to_idle(Duration::from_secs(20*60)).eviction_listener(|k:Arc<(String,Ipv4Addr,Ipv4Addr)>,_,cause|{
if cause==moka::notification::RemovalCause::Replaced{
return;
}
Expand Down
33 changes: 22 additions & 11 deletions src/service/udp_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ fn handle(udp: &UdpSocket, buf: &mut [u8], addr: SocketAddr, config: &ConfigInfo
Ok(())
}

fn broadcast(udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr]) -> Result<()> {
fn broadcast(source_addr: SocketAddr, udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr]) -> Result<()> {
if let Some(v) = VIRTUAL_NETWORK.get(&context.token) {
let lock = v.read();
let ips: Vec<u32> = lock
Expand All @@ -297,7 +297,9 @@ fn broadcast(udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr
for ip in ips {
if !exclude.contains(&Ipv4Addr::from(ip)) {
if let Some(peer) = DEVICE_ADDRESS.get(&(context.token.clone(), ip)) {
let _ = udp.send_to(buf, peer);
if peer != source_addr {
let _ = udp.send_to(buf, peer);
}
}
}
}
Expand All @@ -306,7 +308,7 @@ fn broadcast(udp: &UdpSocket, context: &Context, buf: &[u8], exclude: &[Ipv4Addr
Ok(())
}

fn multicast(udp: &UdpSocket, context: &Context, buf: &[u8], multicast_info: &RwLock<Multicast>, exclude: &[Ipv4Addr]) -> Result<()> {
fn multicast(source_addr: SocketAddr, udp: &UdpSocket, context: &Context, buf: &[u8], multicast_info: &RwLock<Multicast>, exclude: &[Ipv4Addr]) -> Result<()> {
if let Some(v) = VIRTUAL_NETWORK.get(&context.token) {
let lock = v.read();
let ips: Vec<u32> = lock
Expand All @@ -321,7 +323,9 @@ fn multicast(udp: &UdpSocket, context: &Context, buf: &[u8], multicast_info: &Rw
let ipv4 = Ipv4Addr::from(ip);
if !exclude.contains(&ipv4) && info.is_send(&ipv4) {
if let Some(peer) = DEVICE_ADDRESS.get(&(context.token.clone(), ip)) {
let _ = udp.send_to(buf, peer);
if peer != source_addr {
let _ = udp.send_to(buf, peer);
}
}
}
}
Expand All @@ -347,27 +351,34 @@ fn handle_(
match ip_turn_packet::Protocol::from(net_packet.transport_protocol()) {
ip_turn_packet::Protocol::Ipv4Broadcast => {
net_packet.set_transport_protocol(ip_turn_packet::Protocol::Ipv4.into());
return change_broadcast(udp, &context, config.broadcast, destination, net_packet.buffer());
return change_broadcast(addr, udp, &context, config.broadcast, destination, net_packet.buffer());
}
ip_turn_packet::Protocol::Icmp => {}
ip_turn_packet::Protocol::Igmp => {
let ipv4 = IpV4Packet::new(net_packet.payload())?;
if ipv4.protocol() == ipv4::protocol::Protocol::Igmp {
crate::service::igmp_server::handle(ipv4.payload(), &context.token, source)?;
//Igmp数据也会广播出去,让大家都知道谁加入什么组播
broadcast(udp, &context, net_packet.buffer(), &[])?;
broadcast(addr, udp, &context, net_packet.buffer(), &[])?;
}
return Ok(());
}
ip_turn_packet::Protocol::Ipv4 => {
//处理广播
if destination.is_broadcast() || config.broadcast == destination {
broadcast(udp, &context, net_packet.buffer(), &[])?;
broadcast(addr, udp, &context, net_packet.buffer(), &[])?;
return Ok(());
} else if destination.is_multicast() {
let ipv4 = IpV4Packet::new(net_packet.payload())?;
if ipv4.protocol() == ipv4::protocol::Protocol::Igmp {
crate::service::igmp_server::handle(ipv4.payload(), &context.token, source)?;
//Igmp数据也会广播出去,让大家都知道谁加入什么组播
broadcast(addr, udp, &context, net_packet.buffer(), &[])?;
return Ok(());
}
if let Some(multicast_info) = crate::service::igmp_server
::load(&context.token, destination) {
multicast(udp, &context, net_packet.buffer(), &multicast_info, &[])?;
multicast(addr, udp, &context, net_packet.buffer(), &multicast_info, &[])?;
}
return Ok(());
}
Expand Down Expand Up @@ -489,17 +500,17 @@ fn handle_(
}

/// 选择性转发广播/组播,并且去除尾部
fn change_broadcast(udp: &UdpSocket, context: &Context, broadcast_addr: Ipv4Addr, destination: Ipv4Addr, buf: &[u8]) -> Result<()> {
fn change_broadcast(source_addr: SocketAddr, udp: &UdpSocket, context: &Context, broadcast_addr: Ipv4Addr, destination: Ipv4Addr, buf: &[u8]) -> Result<()> {
let packet_end = BroadcastPacketEnd::new(buf)?;
let end_len = packet_end.len();
let exclude = packet_end.addresses();
let buf = &buf[..buf.len() - end_len];
if destination.is_broadcast() || broadcast_addr == destination {
broadcast(udp, context, buf, &exclude)?;
broadcast(source_addr, udp, context, buf, &exclude)?;
} else if destination.is_multicast() {
if let Some(multicast_info) = crate::service::igmp_server
::load(&context.token, destination) {
multicast(udp, context, buf, &multicast_info, &exclude)?;
multicast(source_addr, udp, context, buf, &multicast_info, &exclude)?;
}
}
Ok(())
Expand Down

0 comments on commit 69238df

Please sign in to comment.