@@ -26,7 +26,7 @@ var log = logrus.New()
26
26
27
27
var flag = true
28
28
29
- const subChunkSize = 512 * 1024 // 256 KB
29
+ const subChunkSize = 512 * 1024 // 512 KB
30
30
const maxRetries = 5
31
31
32
32
// Helper function to write sub-chunks to the stream
@@ -310,56 +310,59 @@ func StoreReceivedChunk(nodeID string, chunkIndex int, chunk []byte, h host.Host
310
310
nodeData .Received [chunkIndex ] = chunk
311
311
fmt .Printf ("Node %s received chunk %d\n " , nodeID , chunkIndex )
312
312
fmt .Println ("Length of received chunks:" , len (nodeData .Received ))
313
-
314
- if config .Counter == config .ExpectedChunks {
315
- log .WithFields (logrus.Fields {"nodeID" : nodeID }).Info ("Node complete received data" )
316
-
317
- var _ string
318
- var err error
319
- log .WithField ("codingMethod" , config .CodingMethod ).Info ("Node decoding data" )
320
- droplets := make ([][]byte , 0 , config .ExpectedChunks )
321
- if config .CodingMethod == "LT" {
322
- for _ , droplet := range config .ChunksRecByNode {
323
- if len (droplet ) > 0 {
324
- droplets = append (droplets , droplet )
313
+ if config .K != 1 {
314
+ if config .Counter == config .ExpectedChunks {
315
+ log .WithFields (logrus.Fields {"nodeID" : nodeID }).Info ("Node complete received data" )
316
+
317
+ var _ string
318
+ var err error
319
+ log .WithField ("codingMethod" , config .CodingMethod ).Info ("Node decoding data" )
320
+ droplets := make ([][]byte , 0 , config .ExpectedChunks )
321
+ if config .CodingMethod == "LT" {
322
+ for _ , droplet := range config .ChunksRecByNode {
323
+ if len (droplet ) > 0 {
324
+ droplets = append (droplets , droplet )
325
+ }
325
326
}
327
+ _ , err = lt .LTDecode (droplets )
328
+ } else if config .CodingMethod == "RS" {
329
+ _ , err = rs .RSDecode (config .ChunksRecByNode )
326
330
}
327
- _ , err = lt .LTDecode (droplets )
328
- } else if config .CodingMethod == "RS" {
329
- _ , err = rs .RSDecode (config .ChunksRecByNode )
330
- }
331
331
332
- if (err != nil ) && (config .CodingMethod == "LT" ) {
333
- log .WithFields (logrus.Fields {"nodeID" : nodeID , "Error" : err , "length of valid chunks:" : len (droplets )}).Error ("Node failed to decode data" )
334
- flag = false
335
- return
336
- } else if (err != nil ) && (config .CodingMethod == "RS" ) {
337
- log .WithFields (logrus.Fields {"nodeID" : nodeID , "Error" : err , "length of valid chunks:" : len (config .ChunksRecByNode )}).Error ("Node failed to decode data" )
338
- // flag = false
339
- return
340
- }
332
+ if (err != nil ) && (config .CodingMethod == "LT" ) {
333
+ log .WithFields (logrus.Fields {"nodeID" : nodeID , "Error" : err , "length of valid chunks:" : len (droplets )}).Error ("Node failed to decode data" )
334
+ flag = false
335
+ return
336
+ } else if (err != nil ) && (config .CodingMethod == "RS" ) {
337
+ log .WithFields (logrus.Fields {"nodeID" : nodeID , "Error" : err , "length of valid chunks:" : len (config .ChunksRecByNode )}).Error ("Node failed to decode data" )
338
+ // flag = false
339
+ return
340
+ }
341
341
342
- log .WithFields (logrus.Fields {"nodeID" : nodeID }).Info ("Node reconstructed data" )
343
-
344
- // outputFilePath := fmt.Sprintf("output/%s_out.txt", config.NodeID)
345
- // if err := os.WriteFile(outputFilePath, []byte(decodedData), 0644); err != nil {
346
- // log.WithFields(logrus.Fields{"nodeID": nodeID, "Error": err}).Error("Node failed to write reconstructed data to file")
347
- // return
348
- // }
349
- if config .Mode == "upload" {
350
- for _ , peerInfo := range config .ConnectedPeers {
351
- if peerInfo .ID .String () != nodeID {
352
- readyKey := fmt .Sprintf ("%s-ready" , peerInfo .ID .String ())
353
- if _ , ok := config .SentChunks .Load (readyKey ); ! ok {
354
- SendReady (context .Background (), h , peerInfo , nodeID )
355
- config .SentChunks .Store (readyKey , struct {}{})
342
+ log .WithFields (logrus.Fields {"nodeID" : nodeID }).Info ("Node reconstructed data" )
343
+
344
+ // outputFilePath := fmt.Sprintf("output/%s_out.txt", config.NodeID)
345
+ // if err := os.WriteFile(outputFilePath, []byte(decodedData), 0644); err != nil {
346
+ // log.WithFields(logrus.Fields{"nodeID": nodeID, "Error": err}).Error("Node failed to write reconstructed data to file")
347
+ // return
348
+ // }
349
+ if config .Mode == "upload" {
350
+ for _ , peerInfo := range config .ConnectedPeers {
351
+ if peerInfo .ID .String () != nodeID {
352
+ readyKey := fmt .Sprintf ("%s-ready" , peerInfo .ID .String ())
353
+ if _ , ok := config .SentChunks .Load (readyKey ); ! ok {
354
+ SendReady (context .Background (), h , peerInfo , nodeID )
355
+ config .SentChunks .Store (readyKey , struct {}{})
356
+ }
356
357
}
357
358
}
359
+ time .Sleep (10 * time .Second )
360
+ } else if config .Mode == "download" {
361
+ logrus .WithField ("Total time" , time .Since (config .StartTime )).Info ("Total time" )
358
362
}
359
- time .Sleep (10 * time .Second )
360
- } else if config .Mode == "download" {
361
- logrus .WithField ("Total time" , time .Since (config .StartTime )).Info ("Total time" )
362
363
}
364
+ } else {
365
+ logrus .WithField ("Total time" , time .Since (config .StartTime )).Info ("Total time" )
363
366
}
364
367
} else {
365
368
return
@@ -384,7 +387,11 @@ func HandleDownloadStream(s network.Stream, h host.Host, wg *sync.WaitGroup) {
384
387
385
388
originalFilePath := "eth_transactions.json"
386
389
originalData , _ := os .ReadFile (originalFilePath )
387
-
390
+ peerInfo := peer.AddrInfo {ID : s .Conn ().RemotePeer ()}
391
+ if config .K == 1 {
392
+ SendChunk (context .Background (), h , peerInfo , 0 , originalData )
393
+ return
394
+ }
388
395
var chunks [][]byte
389
396
390
397
if config .CodingMethod == "RS" {
@@ -397,8 +404,6 @@ func HandleDownloadStream(s network.Stream, h host.Host, wg *sync.WaitGroup) {
397
404
panic ("Invalid coding method" )
398
405
}
399
406
400
- peerInfo := peer.AddrInfo {ID : s .Conn ().RemotePeer ()}
401
-
402
407
SendChunk (context .Background (), h , peerInfo , int (chunkIndex ), chunks [chunkIndex ])
403
408
}
404
409
0 commit comments