forked from ipfs/go-ipfs-chunker
-
Notifications
You must be signed in to change notification settings - Fork 3
/
splitting.go
189 lines (160 loc) · 4.09 KB
/
splitting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Package chunk implements streaming block splitters.
// Splitters read data from a reader and provide byte slices (chunks)
// The size and contents of these slices depend on the splitting method
// used.
package chunk
import (
"io"
logging "github.com/ipfs/go-log"
pool "github.com/libp2p/go-buffer-pool"
)
var log = logging.Logger("chunk")
// A Splitter reads bytes from a Reader and creates "chunks" (byte slices)
// that can be used to build DAG nodes.
type Splitter interface {
Reader() io.Reader
NextBytes() ([]byte, error)
ChunkSize() uint64
MetaData() interface{}
SetIsDir(bool)
}
// A MultiSplitter encapsulates multiple splitters useful for concurrent
// reading of chunks and also specialized dag building schemas.
// Each MultiSplitter also provides Splitter-compatible interface
// to read sequentially (the Splitter-default way).
type MultiSplitter interface {
Splitter
Splitters() []Splitter
}
type MetaSplitter struct {
r io.Reader
size uint64
err error
}
// SplitterGen is a splitter generator, given a reader.
type SplitterGen func(r io.Reader) Splitter
// DefaultSplitter returns a SizeSplitter with the DefaultBlockSize.
func DefaultSplitter(r io.Reader) Splitter {
return NewSizeSplitter(r, DefaultBlockSize)
}
// SizeSplitterGen returns a SplitterGen function which will create
// a splitter with the given size when called.
func SizeSplitterGen(size int64) SplitterGen {
return func(r io.Reader) Splitter {
return NewSizeSplitter(r, size)
}
}
func MetaSplitterGen(size int64) SplitterGen {
return func(r io.Reader) Splitter {
return NewMetaSplitter(r, uint64(size))
}
}
// Chan returns a channel that receives each of the chunks produced
// by a splitter, along with another one for errors.
func Chan(s Splitter) (<-chan []byte, <-chan error) {
out := make(chan []byte)
errs := make(chan error, 1)
go func() {
defer close(out)
defer close(errs)
// all-chunks loop (keep creating chunks)
for {
b, err := s.NextBytes()
if err != nil {
errs <- err
return
}
out <- b
}
}()
return out, errs
}
type sizeSplitterv2 struct {
r io.Reader
size uint32
err error
}
// NewSizeSplitter returns a new size-based Splitter with the given block size.
func NewSizeSplitter(r io.Reader, size int64) Splitter {
return &sizeSplitterv2{
r: r,
size: uint32(size),
}
}
// NextBytes produces a new chunk.
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
if ss.err != nil {
return nil, ss.err
}
full := pool.Get(int(ss.size))
n, err := io.ReadFull(ss.r, full)
switch err {
case io.ErrUnexpectedEOF:
ss.err = io.EOF
small := make([]byte, n)
copy(small, full)
pool.Put(full)
return small, nil
case nil:
return full, nil
default:
pool.Put(full)
return nil, err
}
}
// Reader returns the io.Reader associated to this Splitter.
func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
// ChunkSize returns the chunk size of this Splitter.
func (ss *sizeSplitterv2) ChunkSize() uint64 {
return uint64(ss.size)
}
// MetaData returns metadata object from this chunker (none).
func (ss *sizeSplitterv2) MetaData() interface{} {
return nil
}
func (rss *sizeSplitterv2) SetIsDir(v bool) {
}
func NewMetaSplitter(r io.Reader, size uint64) Splitter {
return &MetaSplitter{
r: r,
size: size,
}
}
// NextBytes produces a new chunk.
func (ms *MetaSplitter) NextBytes() ([]byte, error) {
if ms.err != nil {
return nil, ms.err
}
// Return a new metadata chunk
buf := make([]byte, ms.size)
n, err := io.ReadFull(ms.r, buf)
switch err {
case io.ErrUnexpectedEOF:
ms.err = io.EOF
small := make([]byte, n)
copy(small, buf)
buf = nil
return small, nil
case nil:
return buf, nil
default:
buf = nil
return nil, err
}
}
// Reader returns the io.Reader associated to this Splitter.
func (ms *MetaSplitter) Reader() io.Reader {
return ms.r
}
// ChunkSize returns the chunk size of this Splitter.
func (ms *MetaSplitter) ChunkSize() uint64 {
return uint64(ms.size)
}
// MetaData returns metadata object from this chunker (none).
func (ms *MetaSplitter) MetaData() interface{} {
return nil
}
func (rss *MetaSplitter) SetIsDir(v bool) {
}