Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve hole punch code stability and code quality #542

Merged
merged 4 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions firedbg/target.json

This file was deleted.

1 change: 0 additions & 1 deletion firedbg/version.toml

This file was deleted.

11 changes: 3 additions & 8 deletions neptune/mega/tunnel_punch/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ export default function ({ app, mesh, punch }) {
if (ep === app.endpoint.id) {
return getLocalConfig().then(
config => {
var inbound = config.inbound.find(
config.inbound.find(
i => i.protocol === protocol && i.name === name
) || null
var hole = punch.findHole(ep)
}
)
} else {
Expand Down Expand Up @@ -227,8 +226,6 @@ export default function ({ app, mesh, punch }) {
currentListens = []
currentTargets = {}

console.info(`Applying config: ${JSON.encode(config)}`)

config.inbound.forEach(i => {
var protocol = i.protocol
var name = i.name
Expand All @@ -245,10 +242,10 @@ export default function ({ app, mesh, punch }) {
punch.createInboundHole($selectedEP)
var hole = punch.findHole($selectedEP)
if(hole && hole.ready()) {
console.info("Using direct session")
app.log("Using direct session")
return hole.directSession()
}
console.info("Using hub forwarded session")
app.log("Using hub forwarded session")
return pipeline($=>$
.muxHTTP().to($=>$
.pipe(mesh.connect($selectedEP))
Expand Down Expand Up @@ -341,7 +338,6 @@ export default function ({ app, mesh, punch }) {
return new StreamEnd
})
.onEnd(() => {
console.info('Answers in api: ', $response)
return $response
})
).spawn()
Expand Down Expand Up @@ -397,7 +393,6 @@ export default function ({ app, mesh, punch }) {
var tunnelHole = null
var makeRespTunnel = pipeline($=>$
.onStart(ctx => {
console.info("Making resp tunnel: ", ctx)
var ep = ctx.peer.id
tunnelHole = punch.findHole(ep)
if(!tunnelHole) throw `Invalid Hole State for ${ep}`
Expand Down
5 changes: 2 additions & 3 deletions neptune/mega/tunnel_punch/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ export default function ({ app, mesh, utils }) {
var ip = $ctx.peer.ip
var port = $ctx.peer.port

console.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
app.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
switch(action) {
case 'leave':
api.deleteHole(ep, true)
Expand All @@ -178,8 +178,7 @@ export default function ({ app, mesh, utils }) {
var ip = $ctx.peer.ip
var port = $ctx.peer.port

console.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
console.log("Punch req: ", obj)
app.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`)
switch(action) {
case 'request':
api.createHole(ep, 'server')
Expand Down
80 changes: 42 additions & 38 deletions neptune/mega/tunnel_punch/punch.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export default function ({ app, mesh }) {
// Only available for symmetric NAT
function Hole(ep) {
// (idle) (handshake) (punching connected closed) (left fail)
// (idle) (handshake) (punching connected) (left fail)
var state = 'idle'
var bound = '0.0.0.0:' + randomPort()
var destIP = null
Expand All @@ -21,7 +21,7 @@ export default function ({ app, mesh }) {


// Check if ep is self.
console.info(`Creating hole to peer ${ep}, bound ${bound}`)
app.log(`Creating hole to peer ${ep}, bound ${bound}`)
if (ep === app.endpoint.id) {
throw 'Must not create a hole to self'
}
Expand Down Expand Up @@ -56,7 +56,7 @@ export default function ({ app, mesh }) {
.connectTLS({
...tlsOptions,
onState: tls => {
console.info('TLS State: ', tls)
app.log(`TLS State: ${tls.state}`)
if($connection.state === 'connected' && tls.state === 'connected') {
app.log(`Connected TLS to peer ${destIP}:${destPort}`)
state = 'connected'
Expand All @@ -72,8 +72,7 @@ export default function ({ app, mesh }) {
.connect(() => `${destIP}:${destPort}`, {
bind: bound,
onState: function (conn) {
console.info("Conn Info: ", conn)

app.log(`Connection State: ${conn.state}`)
if (conn.state === 'open') {
conn.socket.setRawOption(1, 15, new Data([1, 0, 0, 0]))
} else if (conn.state === 'connected') {
Expand All @@ -82,19 +81,19 @@ export default function ({ app, mesh }) {
} else if (conn.state === 'closed') {
app.log(`Disconnected from peer ${destIP}:${destPort}`)
$connection = null
state = 'closed'
state = 'left'
retryTimes += 1
}

// Max Retry set to 10
if (retryTimes > 10 || state === 'fail') {
console.info(`Retry limit exceeded, punch failed.`)
app.log(`Retry limit exceeded, punch failed.`)
state = 'fail'
updateHoles()
}
},
})
.handleStreamEnd(evt => console.info('Hole connection end, retry: ', retryTimes + 1, ' reason: ', evt?.error))
.handleStreamEnd(evt => app.log(`Hole connection end, retry: ${retryTimes + 1}, reason: ${evt?.error}`))
)
)
)
Expand Down Expand Up @@ -127,17 +126,15 @@ export default function ({ app, mesh }) {
var listen = pipeline($ => $
.acceptTLS({
...tlsOptions,
onState: tls => console.info('TLS State: ', tls)
onState: tls => app.log(`TLS State: ${tls.state}`)
}).to($ => $
.handleMessage(msg => {
console.info('Server Received: ', msg)
$msg = msg
return new Data
}).pipe(() => svc(buildCtx())), () => $msg
)
)

console.info("Direct Server Listening...")
pipy.listen(bound, 'tcp', listen)

session = pipeline($ => $
Expand Down Expand Up @@ -168,7 +165,6 @@ export default function ({ app, mesh }) {
return new StreamEnd
})
.onEnd(() => {
console.info('Answers in hole: ', $response, store)
if (callback)
callback($response)
return $response
Expand All @@ -182,7 +178,7 @@ export default function ({ app, mesh }) {
state = 'handshake'
var start = Date.now()

console.info("Requesting punch")
app.log("Requesting punch")
request(new Message({
method: 'POST',
path: '/api/punch/request',
Expand All @@ -192,7 +188,7 @@ export default function ({ app, mesh }) {
})), (resp) => {
var end = Date.now()
rtt = (end - start) / 2000
console.info('Estimated RTT: ', rtt)
app.log(`Estimated RTT: ${2 * rtt}`)

if (resp.head.status != 200) {
app.log(`Failed on requesting`)
Expand All @@ -208,7 +204,7 @@ export default function ({ app, mesh }) {
state = 'handshake'
var start = Date.now()

console.info("Accepting punch")
app.log("Accepting punch")
request(new Message({
method: 'POST',
path: '/api/punch/accept',
Expand All @@ -218,7 +214,7 @@ export default function ({ app, mesh }) {
})), (resp) => {
var end = Date.now()
rtt = (end - start) / 2000
console.info('Estimated RTT: ', rtt)
app.log(`Estimated RTT: ${2 * rtt}`)

if (!resp || resp.head.status != 200) {
app.log(`Failed on accepting`)
Expand Down Expand Up @@ -255,12 +251,11 @@ export default function ({ app, mesh }) {

function addPeerCert(cert) {
var peerCert = new crypto.Certificate(cert)
console.info("TLS: ", tlsOptions)
tlsOptions['trusted'] = [peerCert]
}

function updateNatInfo(ip, port) {
console.info(`Peer NAT Info: ${ip}:${port}`)
app.log(`Peer NAT Info: ${ip}:${port}`)
destIP = ip
destPort = port
}
Expand All @@ -269,24 +264,29 @@ export default function ({ app, mesh }) {
// 1. Server accept message got 200 OK
// 2. Client receive accept
function punch() {
state = 'punching'

console.info(`Punching to ${destIP}:${destPort} (${ep})`)
app.log(`Punching to ${destIP}:${destPort} (${ep})`)
if (role === 'server') {
makeFakeCall(destIP, destPort)
}
directSession()

try {
directSession()
} catch (err) {
app.log("Punching failed, abandon this hole.")
state = 'fail'
updateHoles()
}
}

function makeRespTunnel() {
console.info("Created Resp Tunnel")
app.log("Created Resp Tunnel")
state = 'connected'

return pipeline($ => $
.acceptHTTPTunnel(() => new Message({ status: 200 })).to($ => $
.onStart(new Data)
.swap(() => pHub)
.onEnd(() => console.info(`Direct Connection from ${ep} lost`))
.onEnd(() => app.log(`Direct Connection from ${ep} lost`))
)
)
}
Expand All @@ -297,7 +297,7 @@ export default function ({ app, mesh }) {
// The hole has been released.
return
} else if (state != 'connected') {
console.info(`Current state ${state}, made the hole failed`)
app.log(`Current state ${state}, force the hole failed`)
state = 'fail'
updateHoles()
}
Expand All @@ -306,7 +306,7 @@ export default function ({ app, mesh }) {
// send a SYN to dest, expect no return.
// this will cheat the firewall to allow inbound connection from peer.
function makeFakeCall(destIP, destPort) {
console.info("Making fake call")
app.log("Making fake call")
pipeline($ => $
.onStart(new Data).connect(`${destIP}:${destPort}`, {
bind: bound,
Expand All @@ -316,7 +316,6 @@ export default function ({ app, mesh }) {

// abort this connection.
if (conn.state === 'connecting') {
console.info('Performing early close')
conn.close()
}
}
Expand All @@ -339,7 +338,10 @@ export default function ({ app, mesh }) {
method: 'GET',
path: '/api/ping'
}))
.pipe(session)
.pipe(() => {
if (session) return session
return pipeline($=>$.dummy())
})
.replaceMessage(res => {
if (res.head.status != 200 && !pacemaker)
app.log("Cardiac Arrest happens, hole: ", ep)
Expand All @@ -353,24 +355,27 @@ export default function ({ app, mesh }) {

// if not called from pacemaker
// the heart should beat automatically :)
heart.spawn()
new Timeout(10).wait().then(() => heartbeat(false))
try {
heart.spawn()
new Timeout(10).wait().then(() => heartbeat(false))
} catch (err) {
app.log("Heartbeat interrupted...")
}
}

// Used on direct connection setup.
// To urge the connect filter try to call the peer
function pacemaker() {
rtt ??= 0.02

var timeout = [rtt, rtt, rtt, rtt, rtt, 2 * rtt, 3 * rtt, 5 * rtt, 8 * rtt, 13 * rtt]
var timeout = [rtt, rtt, 2 * rtt, 3 * rtt, 5 * rtt]
var round = 0
var cont = true

console.info('Pacemaking......')
pipeline($ => $
.onStart(new Data)
.repeat(() => {
if(round < 10)
if(round < 5 && state === 'connecting')
return new Timeout(timeout[round]).wait().then(() => cont)
return false
})
Expand All @@ -382,7 +387,6 @@ export default function ({ app, mesh }) {
cont = false
heartbeat(false)
}
console.info('Pacemaker: ', resp)
return new StreamEnd
})
)
Expand Down Expand Up @@ -435,17 +439,17 @@ export default function ({ app, mesh }) {
fails[key] += 1
}
})
console.info(`Holes after updating: `, holes)
app.log(`Holes after updating: ${holes.size}`)
}

function createInboundHole(ep) {
updateHoles()
if (findHole(ep)) return
if (fails[ep] && fails[ep] >= 3) {
console.info(`Won't create hole to ${ep}, too many fails!`)
app.log(`Won't create hole to ${ep}, too many fails!`)
return
}
console.info(`Creating Inbound Hole to ${ep}`)
app.log(`Creating Inbound Hole to ${ep}`)
try {
var hole = Hole(ep)
hole.requestPunch()
Expand All @@ -461,7 +465,7 @@ export default function ({ app, mesh }) {
function createOutboundHole(ep, natIp, natPort) {
updateHoles()
if (findHole(ep)) return
console.info(`Creating Outbound Hole to ${ep}`)
app.log(`Creating Outbound Hole to ${ep}`)
try {
var hole = Hole(ep)
hole.acceptPunch()
Expand Down
Loading