-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcolumnix.py
173 lines (136 loc) · 5.53 KB
/
columnix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
Python bindings for columnix.
Example write (Python):
from columnix import Writer, Column, I64, I32, STR, LZ4, ZSTD
columns = [Column(I64, "timestamp", compression=LZ4),
Column(STR, "email", compression=ZSTD),
Column(I32, "id")]
rows = [(1400000000000, "[email protected]", 23),
(1400000001000, "[email protected]", 45),
(1400000002000, "[email protected]", 67)]
with Writer("example.cx", columns, row_group_size=2) as writer:
for row in rows:
writer.put(row)
Example read (C):
#define __STDC_FORMAT_MACROS
#include <assert.h>
#include <inttypes.h>
#include <stdio.h>
#include <columnix/reader.h>
int main()
{
struct cx_reader *reader = cx_reader_new("example.cx");
assert(reader);
int64_t timestamp;
const struct cx_string *email;
int32_t event;
while (cx_reader_next(reader)) {
assert(cx_reader_get_i64(reader, 0, ×tamp) &&
cx_reader_get_str(reader, 1, &email) &&
cx_reader_get_i32(reader, 2, &event));
printf("{%" PRIi64 ", %s, %d}\n", timestamp, email->ptr, event);
}
assert(!cx_reader_error(reader));
cx_reader_free(reader);
}
"""
from ctypes import cdll, util
from ctypes import (c_char_p, c_size_t, c_void_p, c_int, c_int32, c_int64,
c_bool, c_float, c_double)
lib = cdll.LoadLibrary(util.find_library("columnix"))
cx_writer_new = lib.cx_writer_new
cx_writer_new.argtypes = [c_char_p, c_size_t]
cx_writer_new.restype = c_void_p
cx_writer_free = lib.cx_writer_free
cx_writer_free.argtypes = [c_void_p]
cx_writer_add_column = lib.cx_writer_add_column
cx_writer_add_column.argtypes = [c_void_p, c_char_p, c_int, c_int, c_int, c_int]
cx_writer_put_null = lib.cx_writer_put_null
cx_writer_put_null.argtypes = [c_void_p, c_size_t]
cx_writer_put_bit = lib.cx_writer_put_bit
cx_writer_put_bit.argtypes = [c_void_p, c_size_t, c_bool]
cx_writer_put_i32 = lib.cx_writer_put_i32
cx_writer_put_i32.argtypes = [c_void_p, c_size_t, c_int32]
cx_writer_put_i64 = lib.cx_writer_put_i64
cx_writer_put_i64.argtypes = [c_void_p, c_size_t, c_int64]
cx_writer_put_flt = lib.cx_writer_put_flt
cx_writer_put_flt.argtypes = [c_void_p, c_size_t, c_float]
cx_writer_put_dbl = lib.cx_writer_put_dbl
cx_writer_put_dbl.argtypes = [c_void_p, c_size_t, c_double]
cx_writer_put_str = lib.cx_writer_put_str
cx_writer_put_str.argtypes = [c_void_p, c_size_t, c_char_p]
cx_writer_finish = lib.cx_writer_finish
cx_writer_finish.argtypes = [c_void_p, c_bool]
BIT = 0
I32 = 1
I64 = 2
FLT = 3
DBL = 4
STR = 5
LZ4 = 1
LZ4HC = 2
ZSTD = 3
class Column(object):
def __init__(self, type, name, encoding=None, compression=None, level=1):
self.type = type
self.name = name
self.encoding = encoding or 0
self.compression = compression or 0
self.level = level
class Writer(object):
def __init__(self, path, columns, row_group_size=100000, sync=True):
self.path = path
self.columns = columns
self.row_group_size = row_group_size
self.sync = sync
put_fn = [self._put_bit, self._put_i32, self._put_i64, self._put_flt,
self._put_dbl, self._put_str]
self.put_lookup = [put_fn[column.type] for column in columns]
self.writer = None
def __enter__(self):
assert self.writer is None
self.writer = cx_writer_new(self.path, self.row_group_size)
if not self.writer:
raise RuntimeError("failed to create writer for %s" % self.path)
for column in self.columns:
if not cx_writer_add_column(self.writer, column.name, column.type,
column.encoding, column.compression,
column.level):
raise RuntimeError("failed to add column")
return self
def __exit__(self, err, value, traceback):
assert self.writer is not None
if not err:
cx_writer_finish(self.writer, self.sync)
cx_writer_free(self.writer)
self.writer = None
def put(self, row):
assert self.writer is not None
put_lookup = self.put_lookup
put_null = self._put_null
for column, value in enumerate(row):
if value is None:
put_null(column)
else:
put_lookup[column](column, value)
def _put_null(self, column):
if not cx_writer_put_null(self.writer, column):
raise RuntimeError("put_null(%d)" % column)
def _put_bit(self, column, value):
if not cx_writer_put_bit(self.writer, column, value):
raise RuntimeError("put_bit(%d, %r)" % (column, value))
def _put_i32(self, column, value):
if not cx_writer_put_i32(self.writer, column, value):
raise RuntimeError("put_i32(%d, %r)" % (column, value))
def _put_i64(self, column, value):
if not cx_writer_put_i64(self.writer, column, value):
raise RuntimeError("put_i64(%d, %r)" % (column, value))
def _put_flt(self, column, value):
if not cx_writer_put_flt(self.writer, column, value):
raise RuntimeError("put_flt(%d, %r)" % (column, value))
def _put_dbl(self, column, value):
if not cx_writer_put_dbl(self.writer, column, value):
raise RuntimeError("put_dbl(%d, %r)" % (column, value))
def _put_str(self, column, value):
if not cx_writer_put_str(self.writer, column, value):
raise RuntimeError("put_str(%d, %r)" % (column, value))