-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathwarc.go
365 lines (335 loc) · 9.31 KB
/
warc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// package warc provides primitives for reading and writing WARC files.
package warc
import (
"bufio"
"bytes"
"compress/bzip2"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"
"time"
)
// Mode defines the way Reader will generate Records.
type Mode int
func (m Mode) String() string {
switch m {
case SequentialMode:
return "SequentialMode"
case AsynchronousMode:
return "AsynchronousMode"
}
return ""
}
const (
// SequentialMode means Records have to be consumed one by one and a call to
// ReadRecord() invalidates the previous record. The benefit is that
// Records have almost no overhead since they wrap around
// the underlying Reader.
SequentialMode Mode = iota
// AsynchronousMode means calls to ReadRecord don't effect previously
// returned Records. This mode copies the Record's content into
// separate memory, thus bears memory overhead.
AsynchronousMode
// DefaultMode defines the reading mode used in NewReader().
DefaultMode = AsynchronousMode
)
// Reader reads WARC records from WARC files.
type Reader struct {
// Unexported fields.
mode Mode
source io.ReadCloser
reader *bufio.Reader
record *Record
buffer []byte
}
// Writer writes WARC records to WARC files.
type Writer struct {
// Unexported fields.
target io.Writer
}
// Header provides information about the WARC record. It stores WARC record
// field names and their values. Since WARC field names are case-insensitive,
// the Header methods are case-insensitive as well.
type Header map[string]string
// Record represents a WARC record.
type Record struct {
Header Header
Content io.Reader
}
const (
compressionNone = iota
compressionBZIP
compressionGZIP
)
// guessCompression returns the compression type of a data stream by matching
// the first two bytes with the magic numbers of compression formats.
func guessCompression(b *bufio.Reader) (int, error) {
magic, err := b.Peek(2)
if err != nil {
if err == io.EOF {
err = nil
}
return compressionNone, err
}
switch {
case magic[0] == 0x42 && magic[1] == 0x5a:
return compressionBZIP, nil
case magic[0] == 0x1f && magic[1] == 0x8b:
return compressionGZIP, nil
}
return compressionNone, nil
}
// decompress automatically decompresses data streams and makes sure the result
// obeys the io.ReadCloser interface. This way callers don't need to check
// whether the underlying reader has a Close() function or not, they just call
// defer Close() on the result.
func decompress(r io.Reader) (res io.ReadCloser, err error) {
// Create a buffered reader to peek the stream's magic number.
dataReader := bufio.NewReader(r)
compr, err := guessCompression(dataReader)
if err != nil {
return nil, err
}
switch compr {
case compressionGZIP:
gzipReader, err := gzip.NewReader(dataReader)
if err != nil {
return nil, err
}
res = gzipReader
case compressionBZIP:
bzipReader := bzip2.NewReader(dataReader)
res = ioutil.NopCloser(bzipReader)
case compressionNone:
res = ioutil.NopCloser(dataReader)
}
return res, err
}
// sliceReader returns a new io.Reader for the next n bytes in source.
// If clone is true, the n bytes will be fully read from source and the
// resulting io.Reader will have its own copy of the data. Calls to the
// result's Read() function won't change the state of source.
// If clone is false, no bytes will be consumed from source and the resulting
// io.Reader will wrap itself around source. Each call to the result's Read()
// function will change the state of source.
func sliceReader(source io.Reader, size int, clone bool) (io.Reader, error) {
reader := io.LimitReader(source, int64(size))
if !clone {
return reader, nil
}
content, err := ioutil.ReadAll(reader)
if err != nil {
return nil, err
}
return bytes.NewReader(content), nil
}
// splitKeyValue parses WARC record header fields.
func splitKeyValue(line string) (string, string) {
parts := strings.SplitN(line, ":", 2)
if len(parts) != 2 {
return "", ""
}
return parts[0], strings.TrimSpace(parts[1])
}
// NewRecord creates a new WARC header.
func NewHeader() Header {
return make(map[string]string)
}
// Set sets the header field associated with key to value.
func (h Header) Set(key, value string) {
h[strings.ToLower(key)] = value
}
// Get returns the value associated with the given key.
// If there is no value associated with the key, Get returns "".
func (h Header) Get(key string) string {
return h[strings.ToLower(key)]
}
// Del deletes the value associated with key.
func (h Header) Del(key string) {
delete(h, strings.ToLower(key))
}
// NewRecord creates a new WARC record.
func NewRecord() *Record {
return &Record{
Header: make(map[string]string),
}
}
// NewReader creates a new WARC reader.
func NewReader(reader io.Reader) (*Reader, error) {
return NewReaderMode(reader, DefaultMode)
}
// NewReaderMode is like NewReader, but specifies the mode instead of
// assuming DefaultMode.
func NewReaderMode(reader io.Reader, mode Mode) (*Reader, error) {
source, err := decompress(reader)
if err != nil {
return nil, err
}
return &Reader{
mode: mode,
source: source,
reader: bufio.NewReader(source),
buffer: make([]byte, 4096),
}, nil
}
// Close closes the reader.
func (r *Reader) Close() {
if r.source != nil {
r.source.Close()
r.source = nil
r.reader = nil
r.record = nil
}
}
// readLine reads the next line in the opened WARC file.
func (r *Reader) readLine() (string, error) {
data, isPrefix, err := r.reader.ReadLine()
if err != nil {
return "", err
}
// Line was too long for the buffer.
// TODO: rather return an error in this case? This function
// is only used on header fields and they shouldn't exceed the buffer size
// or should they?
if isPrefix {
buffer := new(bytes.Buffer)
buffer.Write(data)
for isPrefix {
data, isPrefix, err = r.reader.ReadLine()
if err != nil {
return "", err
}
buffer.Write(data)
}
return buffer.String(), nil
}
return string(data), nil
}
// ReadRecord reads the next record from the opened WARC file.
func (r *Reader) ReadRecord() (*Record, error) {
// Go to the position of the next record in the file.
r.seekRecord()
// Skip the record version line.
if _, err := r.readLine(); err != nil {
return nil, err
}
// Parse the record header.
header := NewHeader()
for {
line, err := r.readLine()
if err != nil {
return nil, err
}
if line == "" {
break
}
if key, value := splitKeyValue(line); key != "" {
header.Set(key, value)
}
}
// Determine the content length and then retrieve the record content.
length, err := strconv.Atoi(header["content-length"])
if err != nil {
return nil, fmt.Errorf("failed to parse field Content-Length: %v", err)
}
content, err := sliceReader(r.reader, length, r.mode == AsynchronousMode)
if err != nil {
return nil, err
}
r.record = &Record{
Header: header,
Content: content,
}
return r.record, nil
}
// seekRecord moves the Reader to the position of the next WARC record
// in the opened WARC file.
func (r *Reader) seekRecord() error {
// No record was read yet? The r.reader must be at a start of the file and
// thus the start of a record.
if r.record == nil {
return nil
}
// If the mode is set to SequentialMode, the underlying r.reader might be
// anywhere inside the active record's block - depending on how much the
// user actually consumed. So we have to make sure all content gets skipped
// here.
if r.mode == SequentialMode {
for {
n, err := r.record.Content.Read(r.buffer)
if n == 0 || err != nil {
break
}
}
}
// Set to nil so it's safe to call this function several times without
// destroying stuff.
r.record = nil
for i := 0; i < 2; i++ {
line, err := r.readLine()
if err != nil {
return err
}
if line != "" {
return fmt.Errorf("expected empty line, got %q", line)
}
}
return nil
}
// Mode returns the reader mode.
func (r *Reader) Mode() Mode {
return r.mode
}
// NewWriter creates a new WARC writer.
func NewWriter(writer io.Writer) *Writer {
return &Writer{writer}
}
// WriteRecord writes a record to the underlying WARC file.
func (w *Writer) WriteRecord(r *Record) (int, error) {
data, err := ioutil.ReadAll(r.Content)
if err != nil {
return 0, err
}
// Content-Length is the number of octets in the content. If no content is
// present, a value of '0' (zero) shall be used.
r.Header["content-length"] = strconv.Itoa(len(data))
// If the values for WARC-Date and WARC-Type are missing, add them
// because the standard says they're mandatory.
if r.Header["warc-date"] == "" {
r.Header["warc-date"] = time.Now().Format(time.RFC3339)
}
if r.Header["warc-type"] == "" {
r.Header["warc-type"] = "resource"
}
total := 0
// write is a helper function to count the total number of
// written bytes to w.target.
write := func(format string, args ...interface{}) error {
written, err := fmt.Fprintf(w.target, format, args...)
total += written
return err
}
// A record consists of a version string, the record header followed by a
// record content block and two newlines:
// Version CLRF
// Header-Key: Header-Value CLRF
// CLRF
// Content
// CLRF
// CLRF
if err := write("%s\r\n", "WARC/1.0"); err != nil {
return total, err
}
for key, value := range r.Header {
if err := write("%s: %s\r\n", strings.Title(key), value); err != nil {
return total, err
}
}
if err := write("\r\n%s\r\n\r\n", data); err != nil {
return total, err
}
return total, nil
}