Skip to content

Commit

Permalink
Merge pull request #1 from jbenet/master
Browse files Browse the repository at this point in the history
merging
  • Loading branch information
llSourcell committed Sep 15, 2014
2 parents 0b5447a + ab9baf9 commit 0276c9b
Show file tree
Hide file tree
Showing 10 changed files with 254 additions and 145 deletions.
3 changes: 1 addition & 2 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import (
u "github.com/jbenet/go-ipfs/util"
)

// Block is the ipfs blocks service. It is the way
// to retrieve blocks by the higher level ipfs modules
// Block is a singular block of data in ipfs
type Block struct {
Multihash mh.Multihash
Data []byte
Expand Down
25 changes: 25 additions & 0 deletions blocks/blocks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package blocks

import "testing"

func TestBlocksBasic(t *testing.T) {

// Test empty data
empty := []byte{}
_, err := NewBlock(empty)
if err != nil {
t.Fatal(err)
}

// Test nil case
_, err = NewBlock(nil)
if err != nil {
t.Fatal(err)
}

// Test some data
_, err = NewBlock([]byte("Hello world!"))
if err != nil {
t.Fatal(err)
}
}
2 changes: 2 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// IPFS is a global, versioned, peer-to-peer filesystem
package ipfs
7 changes: 4 additions & 3 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// NewDagFromReader constructs a Merkle DAG from the given io.Reader.
// size required for block construction.
func NewDagFromReader(r io.Reader) (*dag.Node, error) {
return NewDagFromReaderWithSplitter(r, SplitterBySize(1024*512))
return NewDagFromReaderWithSplitter(r, &SizeSplitter{1024 * 512})
}

func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, error) {
blkChan := spl(r)
root := &dag.Node{Data: dag.FilePBData()}
blkChan := spl.Split(r)
first := <-blkChan
root := &dag.Node{Data: dag.FilePBData(first)}

for blk := range blkChan {
child := &dag.Node{Data: dag.WrapData(blk)}
Expand Down
74 changes: 58 additions & 16 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,51 @@ package importer
import (
"bytes"
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
"testing"

dag "github.com/jbenet/go-ipfs/merkledag"
)

func TestFileConsistency(t *testing.T) {
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, 512*32)
should := buf.Bytes()
nd, err := NewDagFromReaderWithSplitter(buf, SplitterBySize(512))
func TestBuildDag(t *testing.T) {
td := os.TempDir()
fi, err := os.Create(td + "/tmpfi")
if err != nil {
t.Fatal(err)
}
r, err := dag.NewDagReader(nd, nil)

_, err = io.CopyN(fi, rand.Reader, 1024*1024)
if err != nil {
t.Fatal(err)
}

out, err := ioutil.ReadAll(r)
fi.Close()

_, err = NewDagFromFile(td + "/tmpfi")
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(out, should) {
t.Fatal("Output not the same as input.")
}
}

//Test where calls to read are smaller than the chunk size
func TestFileConsistencyLargeBlocks(t *testing.T) {
func TestSizeBasedSplit(t *testing.T) {
bs := &SizeSplitter{512}
testFileConsistency(t, bs, 32*512)
bs = &SizeSplitter{4096}
testFileConsistency(t, bs, 32*4096)

// Uneven offset
testFileConsistency(t, bs, 31*4095)
}

func testFileConsistency(t *testing.T, bs BlockSplitter, nbytes int) {
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, 4096*32)
io.CopyN(buf, rand.Reader, int64(nbytes))
should := buf.Bytes()
nd, err := NewDagFromReaderWithSplitter(buf, SplitterBySize(4096))
nd, err := NewDagFromReaderWithSplitter(buf, bs)
if err != nil {
t.Fatal(err)
}
Expand All @@ -52,7 +61,40 @@ func TestFileConsistencyLargeBlocks(t *testing.T) {
t.Fatal(err)
}

if !bytes.Equal(out, should) {
t.Fatal("Output not the same as input.")
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}

func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
}
for i, v := range a {
if v != b[i] {
return fmt.Errorf("Arrays differ at index: %d", i)
}
}
return nil
}

func TestMaybeRabinConsistency(t *testing.T) {
testFileConsistency(t, NewMaybeRabin(4096), 256*4096)
}

func TestRabinBlockSize(t *testing.T) {
buf := new(bytes.Buffer)
nbytes := 1024 * 1024
io.CopyN(buf, rand.Reader, int64(nbytes))
rab := NewMaybeRabin(4096)
blkch := rab.Split(buf)

var blocks [][]byte
for b := range blkch {
blocks = append(blocks, b)
}

fmt.Printf("Avg block size: %d\n", nbytes/len(blocks))

}
134 changes: 134 additions & 0 deletions importer/rabin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package importer

import (
"bufio"
"bytes"
"fmt"
"io"
"math"
)

type MaybeRabin struct {
mask int
windowSize int
MinBlockSize int
MaxBlockSize int
}

func NewMaybeRabin(avgBlkSize int) *MaybeRabin {
blkbits := uint(math.Log2(float64(avgBlkSize)))
rb := new(MaybeRabin)
rb.mask = (1 << blkbits) - 1
rb.windowSize = 16 // probably a good number...
rb.MinBlockSize = avgBlkSize / 2
rb.MaxBlockSize = (avgBlkSize / 2) * 3
return rb
}

func (mr *MaybeRabin) Split(r io.Reader) chan []byte {
out := make(chan []byte, 16)
go func() {
inbuf := bufio.NewReader(r)
blkbuf := new(bytes.Buffer)

// some bullshit numbers i made up
a := 10 // honestly, no idea what this is
MOD := 33554383 // randomly chosen (seriously)
an := 1
rollingHash := 0

// Window is a circular buffer
window := make([]byte, mr.windowSize)
push := func(i int, val byte) (outval int) {
outval = int(window[i%len(window)])
window[i%len(window)] = val
return
}

// Duplicate byte slice
dup := func(b []byte) []byte {
d := make([]byte, len(b))
copy(d, b)
return d
}

// Fill up the window
i := 0
for ; i < mr.windowSize; i++ {
b, err := inbuf.ReadByte()
if err != nil {
fmt.Println(err)
return
}
blkbuf.WriteByte(b)
push(i, b)
rollingHash = (rollingHash*a + int(b)) % MOD
an = (an * a) % MOD
}

for ; true; i++ {
b, err := inbuf.ReadByte()
if err != nil {
break
}
outval := push(i, b)
blkbuf.WriteByte(b)
rollingHash = (rollingHash*a + int(b) - an*outval) % MOD
if (rollingHash&mr.mask == mr.mask && blkbuf.Len() > mr.MinBlockSize) ||
blkbuf.Len() >= mr.MaxBlockSize {
out <- dup(blkbuf.Bytes())
blkbuf.Reset()
}

// Check if there are enough remaining
peek, err := inbuf.Peek(mr.windowSize)
if err != nil || len(peek) != mr.windowSize {
break
}
}
io.Copy(blkbuf, inbuf)
out <- blkbuf.Bytes()
close(out)
}()
return out
}

/*
func WhyrusleepingCantImplementRabin(r io.Reader) chan []byte {
out := make(chan []byte, 4)
go func() {
buf := bufio.NewReader(r)
blkbuf := new(bytes.Buffer)
window := make([]byte, 16)
var val uint64
prime := uint64(61)
get := func(i int) uint64 {
return uint64(window[i%len(window)])
}
set := func(i int, val byte) {
window[i%len(window)] = val
}
for i := 0; ; i++ {
curb, err := buf.ReadByte()
if err != nil {
break
}
set(i, curb)
blkbuf.WriteByte(curb)
hash := md5.Sum(window)
if hash[0] == 0 && hash[1] == 0 {
out <- blkbuf.Bytes()
blkbuf.Reset()
}
}
out <- blkbuf.Bytes()
close(out)
}()
return out
}
*/
48 changes: 0 additions & 48 deletions importer/split_test.go

This file was deleted.

Loading

0 comments on commit 0276c9b

Please sign in to comment.