Skip to content

Commit

Permalink
update: fix scanning multi threading codes
Browse files Browse the repository at this point in the history
  • Loading branch information
Esonhugh committed Jan 16, 2025
1 parent fdec346 commit 903a9c2
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 89 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ test: build
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) dnssd --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) dnssd ptr --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) dnssd srv --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) metric --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) metrics --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) neighbor --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) neighbor pod --help
$(BUILD_DIR)/$(MAIN_PROGRAM_NAME) neighbor svc --help
Expand Down
19 changes: 1 addition & 18 deletions cmd/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,13 @@ var AllCmd = &cobra.Command{
}

var finalRecord define.Records
if command.Opts.MultiThreadingMode {
finalRecord = RunMultiThread(ipNets, podNets, command.Opts.ThreadingNum)
} else {
finalRecord = Run(ipNets, podNets)
}
finalRecord = RunMultiThread(ipNets, podNets, command.Opts.ThreadingNum)
printer.PrintResult(finalRecord, command.Opts.OutputFile)

PostRun(finalRecord)
},
}

func Run(net, pod *net.IPNet) (finalRecord define.Records) {
var records define.Records = scanner.ScanSubnet(net)
if records == nil || len(records) == 0 {
log.Warnf("ScanSubnet Found Nothing")
return
}
records = scanner.ScanSvcForPorts(records)
for r := range mutli.ScanNeighborSvc(pod, 1) {
finalRecord = append(finalRecord, r...)
}
return records
}

func RunMultiThread(net, pod *net.IPNet, count int) (finalRecord define.Records) {
scan := mutli.ScanAll(net, count)
scan2 := mutli.ScanNeighborSvc(pod, count)
Expand Down
15 changes: 1 addition & 14 deletions cmd/service/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,11 @@ var SubNetCmd = &cobra.Command{
return
}
var finalRecord define.Records
if command.Opts.MultiThreadingMode {
finalRecord = RunMultiThread(ipNets, command.Opts.ThreadingNum)
} else {
finalRecord = Run(ipNets)
}
finalRecord = RunMultiThread(ipNets, command.Opts.ThreadingNum)
printer.PrintResult(finalRecord, command.Opts.OutputFile)
},
}

func Run(net *net.IPNet) (records define.Records) {
records = scanner.ScanSubnet(net)
if records == nil || len(records) == 0 {
log.Warnf("ScanSubnet Found Nothing")
return
}
return
}

func RunMultiThread(net *net.IPNet, num int) (finalRecord define.Records) {
scan := mutli.NewSubnetScanner(num)
for r := range scan.ScanSubnet(net) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mutli/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ func ScanAll(subnet *net.IPNet, num int) (result <-chan []define.Record) {
}

func ScanNeighbor(namespace []string, subnet *net.IPNet, num int) <-chan []define.Record {
subs := NewNeighborScanner(ScanPods, num)
subs := NewNeighborScanner(num)
if len(namespace) == 1 {
return subs.ScanSingleNeighbor(namespace[0], subnet)
}
return subs.ScanMultiNeighbor(namespace, subnet)
}

func ScanNeighborSvc(subnet *net.IPNet, num int) <-chan []define.Record {
subs := NewNeighborScanner(ScanSvc, num)
subs := NewNeighborScanner(num)
return ScanServiceWithChan(subs.ScanSvcNeighbor(subnet))
}
67 changes: 32 additions & 35 deletions pkg/mutli/neigbhor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,25 @@ package mutli

import (
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/esonhugh/k8spider/define"
"github.com/esonhugh/k8spider/pkg"
"github.com/esonhugh/k8spider/pkg/post"
"github.com/esonhugh/k8spider/pkg/scanner"
log "github.com/sirupsen/logrus"
"net"
"strings"
"sync"
)

type NeighborScanner struct {
wg *sync.WaitGroup
count int
m ScanMode
}

type ScanMode string

const (
ScanPods ScanMode = "pods"
ScanSvc ScanMode = "svc"
)

func NewNeighborScanner(mode ScanMode, threading ...int) *NeighborScanner {
if len(threading) == 0 {
return &NeighborScanner{
wg: new(sync.WaitGroup),
}
} else {
return &NeighborScanner{
wg: new(sync.WaitGroup),
count: threading[0],
}
func NewNeighborScanner(threading int) *NeighborScanner {
return &NeighborScanner{
wg: new(sync.WaitGroup),
count: threading,
}
}

Expand All @@ -50,14 +34,21 @@ func (s *NeighborScanner) ScanSingleNeighbor(ns string, subnet *net.IPNet) <-cha
// if subnets, err := pkg.SubnetShift(subnet, 4); err != nil {
if subnets, err := pkg.SubnetInto(subnet, s.count); err != nil {
log.Errorf("Subnet split into %v failed, fallback to single mode, reason: %v", s.count, err)
go s.scan(ns, subnet, out)
s.wg.Add(1)
go func() {
s.scan(ns, subnet, out)
defer s.wg.Done()
}()
} else {
log.Debugf("Subnet split into %v success", len(subnets))
s.wg.Add(len(subnets))
for _, sn := range subnets {
go s.scan(ns, sn, out)
go func(sn *net.IPNet) {
s.scan(ns, sn, out)
defer s.wg.Done()
}(sn)
}
}
time.Sleep(10 * time.Millisecond) // wait for all goroutines to start
s.wg.Wait()
close(out)
}()
Expand All @@ -67,18 +58,20 @@ func (s *NeighborScanner) ScanSingleNeighbor(ns string, subnet *net.IPNet) <-cha
func (s *NeighborScanner) ScanMultiNeighbor(nss []string, subnet *net.IPNet) <-chan []define.Record {
out := make(chan []define.Record, 100)
go func() {
s.wg.Add(len(nss))
for _, ns := range nss {
go s.scan(ns, subnet, out)
go func(ns string) {
defer s.wg.Done()
s.scan(ns, subnet, out)
}(ns)
}
time.Sleep(10 * time.Millisecond) // wait for all goroutines to start
s.wg.Wait()
close(out)
}()
return out
}

func (s *NeighborScanner) scan(ns string, subnet *net.IPNet, to chan []define.Record) {
s.wg.Add(1)
// to <- scanner.ScanSubnet(subnet)
for _, ip := range pkg.ParseIPNetToIPs(subnet) {
if scanner.ScanPodExist(ip, ns) {
Expand All @@ -91,7 +84,6 @@ func (s *NeighborScanner) scan(ns string, subnet *net.IPNet, to chan []define.Re
continue
}
}
s.wg.Done()
}

func (s *NeighborScanner) ScanSvcNeighbor(subnet *net.IPNet) <-chan []define.Record {
Expand All @@ -104,22 +96,28 @@ func (s *NeighborScanner) ScanSvcNeighbor(subnet *net.IPNet) <-chan []define.Rec
// if subnets, err := pkg.SubnetShift(subnet, 4); err != nil {
if subnets, err := pkg.SubnetInto(subnet, s.count); err != nil {
log.Errorf("Subnet split into %v failed, fallback to single mode, reason: %v", s.count, err)
go s.scanSvc(subnet, out)
s.wg.Add(1)
go func() {
s.scanSvc(subnet, out)
defer s.wg.Done()
}()
} else {
log.Debugf("Subnet split into %v success", len(subnets))
s.wg.Add(len(subnets))
for _, sn := range subnets {
go s.scanSvc(sn, out)
go func(sn *net.IPNet) {
defer s.wg.Done()
s.scanSvc(sn, out)
}(sn)
}
}
time.Sleep(10 * time.Millisecond) // wait for all goroutines to start
s.wg.Wait()
close(out)
}()
return out
}

func (s *NeighborScanner) scanSvc(subnet *net.IPNet, to chan []define.Record) {
s.wg.Add(1)
for _, ip := range pkg.ParseIPNetToIPs(subnet) {
hostList := pkg.PTRRecord(ip)
for _, host := range hostList {
Expand All @@ -134,5 +132,4 @@ func (s *NeighborScanner) scanSvc(subnet *net.IPNet, to chan []define.Record) {
}
}
}
s.wg.Done()
}
35 changes: 16 additions & 19 deletions pkg/mutli/subnet.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,23 @@
package mutli

import (
"net"
"sync"
"time"

"github.com/esonhugh/k8spider/define"
"github.com/esonhugh/k8spider/pkg"
"github.com/esonhugh/k8spider/pkg/scanner"
log "github.com/sirupsen/logrus"
"net"
"sync"
)

type SubnetScanner struct {
wg *sync.WaitGroup
count int
}

func NewSubnetScanner(threading ...int) *SubnetScanner {
if len(threading) == 0 {
return &SubnetScanner{
wg: new(sync.WaitGroup),
}
} else {
return &SubnetScanner{
wg: new(sync.WaitGroup),
count: threading[0],
}
func NewSubnetScanner(threading int) *SubnetScanner {
return &SubnetScanner{
wg: new(sync.WaitGroup),
count: threading,
}
}

Expand All @@ -39,25 +31,30 @@ func (s *SubnetScanner) ScanSubnet(subnet *net.IPNet) <-chan []define.Record {
// if subnets, err := pkg.SubnetShift(subnet, 4); err != nil {
if subnets, err := pkg.SubnetInto(subnet, s.count); err != nil {
log.Errorf("Subnet split into %v failed, fallback to single mode, reason: %v", s.count, err)
go s.scan(subnet, out)
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.scan(subnet, out)
}()
} else {
log.Debugf("Subnet split into %v success", len(subnets))
s.wg.Add(len(subnets))
for _, sn := range subnets {
go s.scan(sn, out)
go func(sn *net.IPNet) {
defer s.wg.Done()
s.scan(sn, out)
}(sn)
}
}
time.Sleep(10 * time.Millisecond) // wait for all goroutines to start
s.wg.Wait()
close(out)
}()
return out
}

func (s *SubnetScanner) scan(subnet *net.IPNet, to chan []define.Record) {
s.wg.Add(1)
// to <- scanner.ScanSubnet(subnet)
for _, ip := range pkg.ParseIPNetToIPs(subnet) {
to <- scanner.ScanSingleIP(ip)
}
s.wg.Done()
}

0 comments on commit 903a9c2

Please sign in to comment.