Skip to content

Commit f069ddc

Browse files
committed
[WIP] initial attempt to add postgres
1 parent 1a4b332 commit f069ddc

File tree

8 files changed

+1172
-1
lines changed

8 files changed

+1172
-1
lines changed

format/all/all.go

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
_ "github.com/wader/fq/format/opus"
2525
_ "github.com/wader/fq/format/pcap"
2626
_ "github.com/wader/fq/format/png"
27+
_ "github.com/wader/fq/format/postgres"
2728
_ "github.com/wader/fq/format/protobuf"
2829
_ "github.com/wader/fq/format/raw"
2930
_ "github.com/wader/fq/format/tar"

format/format.go

+4
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ const (
7373
OPUS_PACKET = "opus_packet"
7474
PCAP = "pcap"
7575
PCAPNG = "pcapng"
76+
PGWAL = "pgwal"
77+
PGWALPAGE = "pgwal_page"
78+
PGMULTIXACTOFF = "pgmultixact_offsets"
79+
PGMULTIXACTMEM = "pgmultixact_members"
7680
PNG = "png"
7781
PROTOBUF = "protobuf"
7882
PROTOBUF_WIDEVINE = "protobuf_widevine"

format/postgres/pgmultixact.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package postgres
2+
3+
import (
4+
"github.com/wader/fq/format"
5+
"github.com/wader/fq/format/registry"
6+
"github.com/wader/fq/pkg/decode"
7+
"github.com/wader/fq/pkg/scalar"
8+
)
9+
10+
const BLCKSZ = 8192
11+
12+
func init() {
13+
registry.MustRegister(decode.Format{
14+
Name: format.PGMULTIXACTOFF,
15+
Description: "PostgreSQL multixact offset file",
16+
DecodeFn: mxOffsetDecode,
17+
})
18+
registry.MustRegister(decode.Format{
19+
Name: format.PGMULTIXACTMEM,
20+
Description: "PostgreSQL multixact members file",
21+
DecodeFn: mxMembersDecode,
22+
})
23+
}
24+
25+
func mxOffsetDecode(d *decode.D, in interface{}) interface{} {
26+
d.Endian = decode.LittleEndian
27+
28+
d.FieldArray("offsets", func(d *decode.D) {
29+
for {
30+
if d.End() {
31+
break
32+
}
33+
d.FieldU32("offset", scalar.Hex)
34+
35+
}
36+
})
37+
return nil
38+
}
39+
40+
var flags = scalar.UToScalar{
41+
0: {Sym: "ForKeyShare", Description: "For Key Share"},
42+
1: {Sym: "ForShare", Description: "For Share"},
43+
2: {Sym: "ForNoKeyUpdate", Description: "For No Key Update"},
44+
3: {Sym: "ForUpdate", Description: "For Update"},
45+
4: {Sym: "NoKeyUpdate", Description: "No Key Update"},
46+
5: {Sym: "Update", Description: "Update"},
47+
}
48+
49+
func mxMembersDecode(d *decode.D, in interface{}) interface{} {
50+
var xidLen uint = 4
51+
var groupLen uint = 4 * (1 + xidLen)
52+
d.Endian = decode.LittleEndian
53+
54+
m := d.FieldArrayValue("members")
55+
p := d.FieldArrayValue("paddings")
56+
57+
for {
58+
var xacts []*decode.D = make([]*decode.D, 4)
59+
60+
for i := 0; i < 4; i++ {
61+
xacts[i] = m.FieldStructValue("xact")
62+
xacts[i].FieldU8("status", flags)
63+
}
64+
65+
for i := 0; i < 4; i++ {
66+
xacts[i].FieldU32("xid")
67+
}
68+
69+
// Check if rest of bytes are padding before EOF
70+
if d.BitsLeft() < int64(groupLen*8) && d.BitsLeft() > 0 {
71+
p.FieldRawLen("padding", d.BitsLeft())
72+
break
73+
}
74+
75+
// Check on EOF
76+
if d.End() {
77+
break
78+
}
79+
80+
// Not EOF, let's check on block boundary
81+
if blkLeft := BLCKSZ - (uint(d.Pos())>>3)%BLCKSZ; blkLeft < groupLen {
82+
p.FieldRawLen("padding", int64(blkLeft*8))
83+
}
84+
}
85+
return nil
86+
}

format/postgres/pgwal.go

+180
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package postgres
2+
3+
import (
4+
"github.com/wader/fq/format"
5+
"github.com/wader/fq/format/registry"
6+
"github.com/wader/fq/pkg/decode"
7+
"github.com/wader/fq/pkg/scalar"
8+
)
9+
10+
func init() {
11+
registry.MustRegister(decode.Format{
12+
Name: format.PGWAL,
13+
Description: "PostgreSQL write-ahead log file",
14+
DecodeFn: pgwalDecode,
15+
})
16+
}
17+
18+
const XLOG_BLCKSZ = 8192
19+
20+
const XLP_LONG_HEADER = 2
21+
22+
const (
23+
BKPBLOCK_FORK_MASK = 0x0F
24+
/* block data is an XLogRecordBlockImage */
25+
BKPBLOCK_HAS_IMAGE = 0x10
26+
BKPBLOCK_HAS_DATA = 0x20
27+
/* redo will re-init the page */
28+
BKPBLOCK_WILL_INIT = 0x40
29+
/* RelFileNode omitted, same as previous */
30+
BKPBLOCK_SAME_REL = 0x80
31+
)
32+
33+
/* Information stored in bimg_info */
34+
const (
35+
/* page image has "hole" */
36+
BKPIMAGE_HAS_HOLE = 0x01
37+
/* page image is compressed */
38+
BKPIMAGE_IS_COMPRESSED = 0x02
39+
/* page image should be restored during replay */
40+
BKPIMAGE_APPLY = 0x04
41+
)
42+
43+
var expected_rem_len uint64 = 0
44+
45+
var rmgrIds = scalar.UToScalar{
46+
0: {Sym: "XLOG", Description: "RM_XLOG_ID"},
47+
1: {Sym: "Transaction", Description: "RM_XACT_ID"},
48+
2: {Sym: "Storage", Description: "RM_SMGR_ID"},
49+
3: {Sym: "CLOG", Description: "RM_CLOG_ID"},
50+
4: {Sym: "Database", Description: "RM_DBASE_ID"},
51+
5: {Sym: "Tablespace", Description: "RM_TBLSPC_ID"},
52+
6: {Sym: "MultiXact", Description: "RM_MULTIXACT_ID"},
53+
7: {Sym: "RelMap", Description: "RM_RELMAP_ID"},
54+
8: {Sym: "Standby", Description: "RM_STANDBY_ID"},
55+
9: {Sym: "Heap2", Description: "RM_HEAP2_ID"},
56+
10: {Sym: "Heap", Description: "RM_HEAP_ID"},
57+
11: {Sym: "Btree", Description: "RM_BTREE_ID"},
58+
12: {Sym: "Hash", Description: "RM_HASH_ID"},
59+
13: {Sym: "Gin", Description: "RM_GIN_ID"},
60+
14: {Sym: "Gist", Description: "RM_GIST_ID"},
61+
15: {Sym: "Sequence", Description: "RM_SEQ_ID"},
62+
16: {Sym: "SPGist", Description: "RM_SPGIST_ID"},
63+
17: {Sym: "BRIN", Description: "RM_BRIN_ID"},
64+
18: {Sym: "CommitTs", Description: "RM_COMMIT_TS_ID"},
65+
19: {Sym: "ReplicationOrigin", Description: "RM_REPLORIGIN_ID"},
66+
20: {Sym: "Generic", Description: "RM_GENERIC_ID"},
67+
21: {Sym: "LogicalMessage", Description: "RM_LOGICALMSG_ID"},
68+
}
69+
70+
func decodeXLogPageHeaderData(d *decode.D) {
71+
var info uint64
72+
73+
d.FieldU16("xlp_magic", d.AssertU(0xd106))
74+
info = d.FieldU16("xlp_info")
75+
d.FieldU32("xlp_timeline")
76+
d.FieldU64("xlp_pageaddr")
77+
d.FieldU32("xlp_rem_len")
78+
d.FieldRawLen("padding", int64(d.AlignBits(64)))
79+
if info&XLP_LONG_HEADER != 0 {
80+
// Long header
81+
d.FieldStruct("XLogLongPageHeaderData", func(d *decode.D) {
82+
d.FieldU64("xlp_sysid")
83+
d.FieldU32("xlp_seg_size")
84+
d.FieldU32("xlp_xlog_blcksz")
85+
})
86+
}
87+
}
88+
89+
func pgwalDecode(d *decode.D, in interface{}) interface{} {
90+
91+
d.Endian = decode.LittleEndian
92+
93+
pageHeaders := d.FieldArrayValue("XLogPageHeaders")
94+
header := pageHeaders.FieldStruct("XLogPageHeaderData", decodeXLogPageHeaderData)
95+
96+
d.FieldRawLen("prev_file_rec", int64(header.FieldGet("xlp_rem_len").V.(uint32)*8))
97+
d.FieldRawLen("prev_file_rec_padding", int64(d.AlignBits(64)))
98+
99+
d.FieldArray("XLogRecords", func(d *decode.D) {
100+
for {
101+
d.FieldStruct("XLogRecord", func(d *decode.D) {
102+
record_pos := uint64(d.Pos()) >> 3
103+
record_len := d.FieldU32("xl_tot_len")
104+
record_end := record_pos + record_len
105+
header_pos := record_end - record_end%XLOG_BLCKSZ
106+
d.FieldU32("xl_xid")
107+
d.FieldU64("xl_prev", scalar.Hex)
108+
d.FieldU8("xl_info")
109+
d.FieldU8("xl_rmid", rmgrIds)
110+
d.FieldRawLen("padding", int64(d.AlignBits(32)))
111+
d.FieldU32("xl_crc", scalar.Hex)
112+
113+
var lenghts []uint64 = []uint64{}
114+
115+
d.FieldArray("XLogRecordBlockHeader", func(d *decode.D) {
116+
for blkheaderid := uint64(0); d.PeekBits(8) == blkheaderid; blkheaderid++ {
117+
d.FieldStruct("XlogRecordBlockHeader", func(d *decode.D) {
118+
/* block reference ID */
119+
d.FieldU8("id", d.AssertU(blkheaderid))
120+
/* fork within the relation, and flags */
121+
fork_flags := d.FieldU8("fork_flags")
122+
/* number of payload bytes (not including page image) */
123+
lenghts = append(lenghts, d.FieldU16("data_length"))
124+
if fork_flags&BKPBLOCK_HAS_IMAGE != 0 {
125+
d.FieldStruct("XLogRecordBlockImageHeader", func(d *decode.D) {
126+
/* number of page image bytes */
127+
d.FieldU16("length")
128+
/* number of bytes before "hole" */
129+
d.FieldU16("hole_offset")
130+
/* flag bits, see below */
131+
bimg_info := d.FieldU8("bimg_info")
132+
d.FieldRawLen("padding", int64(d.AlignBits(16)))
133+
if bimg_info&BKPIMAGE_HAS_HOLE != 0 &&
134+
bimg_info&BKPIMAGE_IS_COMPRESSED != 0 {
135+
d.FieldU16("hole_length")
136+
}
137+
})
138+
}
139+
if fork_flags&BKPBLOCK_SAME_REL == 0 {
140+
d.FieldStruct("RelFileNode", func(d *decode.D) {
141+
/* tablespace */
142+
d.FieldU32("spcNode")
143+
/* database */
144+
d.FieldU32("dbNode")
145+
/* relation */
146+
d.FieldU32("relNode")
147+
})
148+
d.FieldU32("BlockNum")
149+
}
150+
})
151+
}
152+
})
153+
if d.PeekBits(8) == 0xff {
154+
d.FieldStruct("XLogRecordDataHeaderShort", func(d *decode.D) {
155+
d.FieldU8("id", d.AssertU(0xff))
156+
lenghts = append(lenghts, d.FieldU8("data_length"))
157+
})
158+
}
159+
160+
d.FieldArray("data", func(d *decode.D) {
161+
for _, x := range lenghts {
162+
pos := uint64(d.Pos()) >> 3
163+
if pos < header_pos && (header_pos < pos+x) {
164+
d.FieldRawLen("data", int64((header_pos-pos)*8))
165+
header := pageHeaders.FieldStruct("XLogPageHeaderData", decodeXLogPageHeaderData)
166+
header.FieldGet("xlp_rem_len").TryScalarFn(d.ValidateU(record_end - header_pos))
167+
d.FieldRawLen("data", int64((x+pos-header_pos)*8))
168+
} else {
169+
d.FieldRawLen("data", int64(x*8))
170+
}
171+
}
172+
})
173+
174+
d.FieldRawLen("ending_padding", int64(d.AlignBits(64)))
175+
})
176+
}
177+
})
178+
179+
return nil
180+
}

format/postgres/pgwal_page.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package postgres
2+
3+
import (
4+
"github.com/wader/fq/format"
5+
"github.com/wader/fq/format/registry"
6+
"github.com/wader/fq/pkg/decode"
7+
_ "github.com/wader/fq/pkg/scalar"
8+
)
9+
10+
func init() {
11+
registry.MustRegister(decode.Format{
12+
Name: format.PGWALPAGE,
13+
Description: "PostgreSQL write-ahead page",
14+
DecodeFn: walpageDecode,
15+
})
16+
}
17+
18+
//const XLOG_BLCKSZ = 8192
19+
20+
func walpageDecode(d *decode.D, in interface{}) interface{} {
21+
22+
d.Endian = decode.LittleEndian
23+
24+
pageHeaders := d.FieldArrayValue("XLogPageHeaders")
25+
_ = pageHeaders.FieldStruct("XLogPageHeaderData", decodeXLogPageHeaderData)
26+
27+
return nil
28+
}

go.mod

+23-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,28 @@ require (
2929
)
3030

3131
require (
32+
github.com/cilium/ebpf v0.7.0 // indirect
33+
github.com/cosiner/argv v0.1.0 // indirect
34+
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
35+
github.com/derekparker/trie v0.0.0-20200317170641-1fdf38b7b0e9 // indirect
36+
github.com/go-delve/delve v1.8.0 // indirect
37+
github.com/google/go-dap v0.6.0 // indirect
38+
github.com/hashicorp/golang-lru v0.5.4 // indirect
39+
github.com/inconshreveable/mousetrap v1.0.0 // indirect
3240
github.com/itchyny/timefmt-go v0.1.3 // indirect
33-
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 // indirect
41+
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
42+
github.com/mattn/go-colorable v0.1.12 // indirect
43+
github.com/mattn/go-isatty v0.0.14 // indirect
44+
github.com/mattn/go-runewidth v0.0.13 // indirect
45+
github.com/peterh/liner v1.2.1 // indirect
46+
github.com/rivo/uniseg v0.2.0 // indirect
47+
github.com/russross/blackfriday/v2 v2.1.0 // indirect
48+
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
49+
github.com/sirupsen/logrus v1.8.1 // indirect
50+
github.com/spf13/cobra v1.3.0 // indirect
51+
github.com/spf13/pflag v1.0.5 // indirect
52+
go.starlark.net v0.0.0-20211203141949-70c0e40ae128 // indirect
53+
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
54+
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
55+
gopkg.in/yaml.v2 v2.4.0 // indirect
3456
)

0 commit comments

Comments
 (0)