Skip to content

Commit 8e70da9

Browse files
committed
Unignore internal pg2s3 dir
1 parent d968626 commit 8e70da9

File tree

5 files changed

+580
-1
lines changed

5 files changed

+580
-1
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pg2s3
1+
/pg2s3
22
dist/
33

44
# Binaries for programs and plugins

internal/pg2s3/client.go

+280
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
package pg2s3
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"io"
8+
"os/exec"
9+
"strings"
10+
11+
"filippo.io/age"
12+
"github.com/djherbis/buffer"
13+
"github.com/jackc/pgx/v4"
14+
"github.com/minio/minio-go/v7"
15+
"github.com/minio/minio-go/v7/pkg/credentials"
16+
17+
"github.com/theandrew168/pg2s3/internal/config"
18+
)
19+
20+
type Client struct {
21+
cfg config.Config
22+
}
23+
24+
func NewClient(cfg config.Config) (*Client, error) {
25+
// TODO: better use of context here? timeouts?
26+
ctx := context.Background()
27+
28+
// instantiate a pg2s3 client
29+
client := &Client{
30+
cfg: cfg,
31+
}
32+
33+
// validate connection to PG
34+
connPG, err := pgx.Connect(ctx, cfg.PGURL)
35+
if err != nil {
36+
return nil, err
37+
}
38+
defer connPG.Close(ctx)
39+
40+
if err = connPG.Ping(ctx); err != nil {
41+
return nil, err
42+
}
43+
44+
// validate connection to S3
45+
connS3, err := client.connectS3()
46+
if err != nil {
47+
return nil, err
48+
}
49+
50+
if _, err = connS3.ListBuckets(ctx); err != nil {
51+
return nil, err
52+
}
53+
54+
// validate public keys (if provided)
55+
for _, pubkey := range cfg.Encryption.PublicKeys {
56+
_, err = age.ParseX25519Recipient(pubkey)
57+
if err != nil {
58+
return nil, err
59+
}
60+
}
61+
62+
return client, nil
63+
}
64+
65+
func (c *Client) CreateBackup() (io.Reader, error) {
66+
args := []string{
67+
"-Fc", // custom output format (compressed and flexible)
68+
c.cfg.PGURL,
69+
}
70+
cmd := exec.Command("pg_dump", args...)
71+
72+
// buffer 32MB to memory, after that buffer to 64MB chunked files
73+
backup := buffer.NewUnboundedBuffer(32*1024*1024, 64*1024*1024)
74+
cmd.Stdout = backup
75+
76+
var capture bytes.Buffer
77+
cmd.Stderr = &capture
78+
79+
err := cmd.Run()
80+
if err != nil {
81+
return nil, errors.New(capture.String())
82+
}
83+
84+
return backup, nil
85+
}
86+
87+
func (c *Client) RestoreBackup(backup io.Reader) error {
88+
args := []string{
89+
"-c", // clean DB object before recreating them
90+
"-d", // database to be restored
91+
c.cfg.PGURL,
92+
}
93+
for _, schema := range c.cfg.Restore.Schemas {
94+
// specify which schemas should be restored
95+
args = append(args, "-n", schema)
96+
}
97+
cmd := exec.Command("pg_restore", args...)
98+
99+
cmd.Stdin = backup
100+
101+
var capture bytes.Buffer
102+
cmd.Stderr = &capture
103+
104+
err := cmd.Run()
105+
if err != nil {
106+
return errors.New(capture.String())
107+
}
108+
109+
return nil
110+
}
111+
112+
func (c *Client) EncryptBackup(backup io.Reader, publicKeys []string) (io.Reader, error) {
113+
var recipients []age.Recipient
114+
for _, pubkey := range publicKeys {
115+
recipient, err := age.ParseX25519Recipient(pubkey)
116+
if err != nil {
117+
return nil, err
118+
}
119+
120+
recipients = append(recipients, recipient)
121+
}
122+
123+
// setup encryption pipeline
124+
var encrypted bytes.Buffer
125+
w, err := age.Encrypt(&encrypted, recipients...)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
// apply encryption by copying data through
131+
if _, err = io.Copy(w, backup); err != nil {
132+
return nil, err
133+
}
134+
135+
// explicit close to flush encryption
136+
if err = w.Close(); err != nil {
137+
return nil, err
138+
}
139+
140+
return &encrypted, nil
141+
}
142+
143+
func (c *Client) DecryptBackup(encrypted io.Reader, privateKey string) (io.Reader, error) {
144+
identity, err := age.ParseX25519Identity(privateKey)
145+
if err != nil {
146+
return nil, err
147+
}
148+
149+
// setup decryption pipeline
150+
r, err := age.Decrypt(encrypted, identity)
151+
if err != nil {
152+
return nil, err
153+
}
154+
155+
// apply decryption by copying data through
156+
var backup bytes.Buffer
157+
if _, err = io.Copy(&backup, r); err != nil {
158+
return nil, err
159+
}
160+
161+
return &backup, nil
162+
}
163+
164+
func (c *Client) ListBackups() ([]string, error) {
165+
client, err := c.connectS3()
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
ctx := context.Background()
171+
objects := client.ListObjects(
172+
ctx,
173+
c.cfg.S3.BucketName,
174+
minio.ListObjectsOptions{},
175+
)
176+
177+
var backups []string
178+
for object := range objects {
179+
if object.Err != nil {
180+
return nil, object.Err
181+
}
182+
backups = append(backups, object.Key)
183+
}
184+
185+
err = sortBackups(backups)
186+
if err != nil {
187+
return nil, err
188+
}
189+
190+
return backups, nil
191+
}
192+
193+
func (c *Client) UploadBackup(name string, backup io.Reader) error {
194+
client, err := c.connectS3()
195+
if err != nil {
196+
return err
197+
}
198+
199+
ctx := context.Background()
200+
_, err = client.PutObject(
201+
ctx,
202+
c.cfg.S3.BucketName,
203+
name,
204+
backup,
205+
-1,
206+
minio.PutObjectOptions{},
207+
)
208+
if err != nil {
209+
return err
210+
}
211+
212+
return nil
213+
}
214+
215+
func (c *Client) DownloadBackup(name string) (io.Reader, error) {
216+
client, err := c.connectS3()
217+
if err != nil {
218+
return nil, err
219+
}
220+
221+
ctx := context.Background()
222+
backup, err := client.GetObject(
223+
ctx,
224+
c.cfg.S3.BucketName,
225+
name,
226+
minio.GetObjectOptions{},
227+
)
228+
if err != nil {
229+
return nil, err
230+
}
231+
232+
return backup, nil
233+
}
234+
235+
func (c *Client) DeleteBackup(name string) error {
236+
client, err := c.connectS3()
237+
if err != nil {
238+
return err
239+
}
240+
241+
ctx := context.Background()
242+
err = client.RemoveObject(
243+
ctx,
244+
c.cfg.S3.BucketName,
245+
name,
246+
minio.RemoveObjectOptions{},
247+
)
248+
if err != nil {
249+
return err
250+
}
251+
252+
return nil
253+
}
254+
255+
func (c *Client) connectS3() (*minio.Client, error) {
256+
creds := credentials.NewStaticV4(
257+
c.cfg.S3.AccessKeyID,
258+
c.cfg.S3.SecretAccessKey,
259+
"",
260+
)
261+
262+
// disable HTTPS requirement for local development / testing
263+
secure := true
264+
if strings.Contains(c.cfg.S3.Endpoint, "localhost") {
265+
secure = false
266+
}
267+
if strings.Contains(c.cfg.S3.Endpoint, "127.0.0.1") {
268+
secure = false
269+
}
270+
271+
client, err := minio.New(c.cfg.S3.Endpoint, &minio.Options{
272+
Creds: creds,
273+
Secure: secure,
274+
})
275+
if err != nil {
276+
return nil, err
277+
}
278+
279+
return client, nil
280+
}

0 commit comments

Comments
 (0)