|
22 | 22 | const fs = require('fs'); |
23 | 23 | const path = require('path'); |
24 | 24 | const extension = process.env.ARROW_JS_DEBUG === 'src' ? '.ts' : '.cjs'; |
25 | | -const { RecordBatch, AsyncMessageReader } = require(`../index${extension}`); |
26 | 25 | const { VectorLoader } = require(`../targets/apache-arrow/visitor/vectorloader`); |
| 26 | +const { RecordBatch, AsyncMessageReader, makeData, Struct, Schema, Field } = require(`../index${extension}`); |
27 | 27 |
|
28 | 28 | (async () => { |
29 | 29 |
|
30 | 30 | const readable = process.argv.length < 3 ? process.stdin : fs.createReadStream(path.resolve(process.argv[2])); |
31 | 31 | const reader = new AsyncMessageReader(readable); |
32 | 32 |
|
33 | | - let schema, recordBatchIndex = 0, dictionaryBatchIndex = 0; |
| 33 | + let schema, metadataLength, message; |
| 34 | + let byteOffset = 0; |
| 35 | + let recordBatchCount = 0; |
| 36 | + let dictionaryBatchCount = 0; |
34 | 37 |
|
35 | | - for await (const message of reader) { |
36 | | - |
37 | | - let bufferRegions = []; |
| 38 | + while (1) { |
| 39 | + if ((metadataLength = (await reader.readMetadataLength())).done) { break; } |
| 40 | + if (metadataLength.value === -1) { |
| 41 | + if ((metadataLength = (await reader.readMetadataLength())).done) { break; } |
| 42 | + } |
| 43 | + if ((message = (await reader.readMetadata(metadataLength.value))).done) { break; } |
38 | 44 |
|
39 | | - if (message.isSchema()) { |
40 | | - schema = message.header(); |
41 | | - continue; |
42 | | - } else if (message.isRecordBatch()) { |
43 | | - const header = message.header(); |
44 | | - bufferRegions = header.buffers; |
45 | | - const body = await reader.readMessageBody(message.bodyLength); |
| 45 | + if (message.value.isSchema()) { |
| 46 | + console.log( |
| 47 | + `Schema:`, |
| 48 | + { |
| 49 | + byteOffset, |
| 50 | + metadataLength: metadataLength.value, |
| 51 | + }); |
| 52 | + schema = message.value.header(); |
| 53 | + byteOffset += metadataLength.value; |
| 54 | + } else if (message.value.isRecordBatch()) { |
| 55 | + const header = message.value.header(); |
| 56 | + const bufferRegions = header.buffers; |
| 57 | + const body = await reader.readMessageBody(message.value.bodyLength); |
46 | 58 | const recordBatch = loadRecordBatch(schema, header, body); |
47 | | - console.log(`record batch ${++recordBatchIndex}: ${JSON.stringify({ |
48 | | - offset: body.byteOffset, |
49 | | - length: body.byteLength, |
50 | | - numRows: recordBatch.length, |
51 | | - })}`); |
52 | | - } else if (message.isDictionaryBatch()) { |
53 | | - const header = message.header(); |
54 | | - bufferRegions = header.data.buffers; |
| 59 | + console.log( |
| 60 | + `RecordBatch ${++recordBatchCount}:`, |
| 61 | + { |
| 62 | + numRows: recordBatch.numRows, |
| 63 | + byteOffset, |
| 64 | + metadataLength: metadataLength.value, |
| 65 | + bodyByteLength: body.byteLength, |
| 66 | + }); |
| 67 | + byteOffset += metadataLength.value; |
| 68 | + bufferRegions.forEach(({ offset, length: byteLength }, i) => { |
| 69 | + console.log(`\tbuffer ${i + 1}:`, { byteOffset: byteOffset + offset, byteLength }); |
| 70 | + }); |
| 71 | + byteOffset += body.byteLength; |
| 72 | + } else if (message.value.isDictionaryBatch()) { |
| 73 | + const header = message.value.header(); |
| 74 | + const bufferRegions = header.data.buffers; |
55 | 75 | const type = schema.dictionaries.get(header.id); |
56 | | - const body = await reader.readMessageBody(message.bodyLength); |
| 76 | + const body = await reader.readMessageBody(message.value.bodyLength); |
57 | 77 | const recordBatch = loadDictionaryBatch(header.data, body, type); |
58 | | - console.log(`dictionary batch ${++dictionaryBatchIndex}: ${JSON.stringify({ |
59 | | - offset: body.byteOffset, |
60 | | - length: body.byteLength, |
61 | | - numRows: recordBatch.length, |
62 | | - dictionaryId: header.id, |
63 | | - })}`); |
| 78 | + console.log( |
| 79 | + `DictionaryBatch ${++dictionaryBatchCount}:`, |
| 80 | + { |
| 81 | + id: header.id, |
| 82 | + numRows: recordBatch.numRows, |
| 83 | + byteOffset, |
| 84 | + metadataLength: metadataLength.value, |
| 85 | + bodyByteLength: body.byteLength, |
| 86 | + }); |
| 87 | + byteOffset += metadataLength.value; |
| 88 | + bufferRegions.forEach(({ offset, length: byteLength }, i) => { |
| 89 | + console.log(`\tbuffer ${i + 1}:`, { byteOffset: byteOffset + offset, byteLength }); |
| 90 | + }); |
| 91 | + byteOffset += body.byteLength; |
64 | 92 | } |
65 | | - |
66 | | - bufferRegions.forEach(({ offset, length }, i) => { |
67 | | - console.log(`\tbuffer ${i + 1}: { offset: ${offset}, length: ${length} }`); |
68 | | - }); |
69 | 93 | } |
70 | 94 |
|
71 | 95 | await reader.return(); |
72 | 96 |
|
73 | 97 | })().catch((e) => { console.error(e); process.exit(1); }); |
74 | 98 |
|
75 | 99 | function loadRecordBatch(schema, header, body) { |
76 | | - return new RecordBatch(schema, header.length, new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields)); |
| 100 | + const children = new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany(schema.fields); |
| 101 | + return new RecordBatch( |
| 102 | + schema, |
| 103 | + makeData({ |
| 104 | + type: new Struct(schema.fields), |
| 105 | + length: header.length, |
| 106 | + children: children |
| 107 | + }) |
| 108 | + ); |
77 | 109 | } |
78 | 110 |
|
79 | 111 | function loadDictionaryBatch(header, body, dictionaryType) { |
80 | | - return RecordBatch.new(new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType])); |
| 112 | + const schema = new Schema([new Field('', dictionaryType)]); |
| 113 | + const children = new VectorLoader(body, header.nodes, header.buffers, new Map()).visitMany([dictionaryType]); |
| 114 | + return new RecordBatch( |
| 115 | + schema, |
| 116 | + makeData({ |
| 117 | + type: new Struct(schema.fields), |
| 118 | + length: header.length, |
| 119 | + children: children |
| 120 | + }) |
| 121 | + ); |
81 | 122 | } |
0 commit comments