@@ -2,15 +2,16 @@ package outbox_test
2
2
3
3
import (
4
4
"context"
5
+ "database/sql"
5
6
"encoding/json"
6
7
"fmt"
7
- "log/slog"
8
8
"os"
9
9
"testing"
10
10
11
11
"github.com/google/go-cmp/cmp"
12
12
"github.com/jackc/pgx/v5"
13
13
"github.com/jackc/pgx/v5/pgxpool"
14
+ _ "github.com/jackc/pgx/v5/stdlib"
14
15
outbox "github.com/nikolayk812/pgx-outbox"
15
16
"github.com/nikolayk812/pgx-outbox/internal/containers"
16
17
"github.com/nikolayk812/pgx-outbox/internal/fakes"
@@ -29,6 +30,7 @@ var ctx = context.Background()
29
30
type WriterReaderTestSuite struct {
30
31
suite.Suite
31
32
pool * pgxpool.Pool
33
+ db * sql.DB
32
34
container testcontainers.Container
33
35
34
36
writer outbox.Writer
@@ -50,6 +52,9 @@ func (suite *WriterReaderTestSuite) SetupSuite() {
50
52
suite .pool , err = pgxpool .New (ctx , connStr )
51
53
suite .noError (err )
52
54
55
+ suite .db , err = sql .Open ("pgx" , connStr )
56
+ suite .noError (err )
57
+
53
58
suite .writer , err = outbox .NewWriter (outboxTable )
54
59
suite .noError (err )
55
60
@@ -61,55 +66,82 @@ func (suite *WriterReaderTestSuite) TearDownSuite() {
61
66
if suite .pool != nil {
62
67
suite .pool .Close ()
63
68
}
69
+ if suite .db != nil {
70
+ suite .NoError (suite .db .Close ())
71
+ }
64
72
if suite .container != nil {
65
- if err := suite .container .Terminate (ctx ); err != nil {
66
- slog .Error ("suite.container.Terminate" , slog .Any ("error" , err ))
67
- }
73
+ suite .NoError (suite .container .Terminate (ctx ))
68
74
}
69
75
70
76
goleak .VerifyNone (suite .T ())
71
77
}
72
78
73
- func (suite * WriterReaderTestSuite ) TestWriter_New () {
79
+ //nolint:dupl
80
+ func (suite * WriterReaderTestSuite ) TestWriter_WriteMessage () {
81
+ invalidMessage := fakes .FakeMessage ()
82
+ invalidMessage .Broker = ""
83
+
74
84
tests := []struct {
75
85
name string
76
- table string
77
- options []outbox.WriteOption
78
- wantErr error
86
+ in []types.Message
87
+ wantErr bool
79
88
}{
80
89
{
81
- name : "empty table" ,
82
- table : "" ,
83
- wantErr : outbox .ErrTableEmpty ,
90
+ name : "no messages" ,
91
+ in : []types.Message {},
84
92
},
85
93
{
86
- name : "non-empty table" ,
87
- table : "outbox_messages" ,
94
+ name : "single message" ,
95
+ in : []types.Message {
96
+ fakes .FakeMessage (),
97
+ },
88
98
},
89
99
{
90
- name : "with options" ,
91
- table : "outbox_messages" ,
92
- options : []outbox.WriteOption {outbox .WithDisablePreparedBatch ()},
100
+ name : "multiple in" ,
101
+ in : []types.Message {
102
+ fakes .FakeMessage (),
103
+ fakes .FakeMessage (),
104
+ },
93
105
},
106
+ {
107
+ name : "invalid message" ,
108
+ in : []types.Message {
109
+ invalidMessage ,
110
+ },
111
+ wantErr : true ,
112
+ },
113
+ // Add more test cases as needed
94
114
}
95
115
96
116
for _ , tt := range tests {
97
117
suite .Run (tt .name , func () {
98
118
t := suite .T ()
99
119
100
- writer , err := outbox .NewWriter (tt .table , tt .options ... )
101
- if tt .wantErr != nil {
102
- require .ErrorIs (t , err , tt .wantErr )
103
- return
120
+ // GIVEN
121
+ for _ , message := range tt .in {
122
+ id , err := suite .write (message )
123
+ if tt .wantErr {
124
+ require .Error (t , err )
125
+ return
126
+ }
127
+ require .NoError (t , err )
128
+ assert .Positive (t , id )
104
129
}
105
130
131
+ limit := maxInt (1 , len (tt .in ))
132
+
133
+ // THEN
134
+ actual , err := suite .reader .Read (ctx , limit )
106
135
require .NoError (t , err )
107
- assert .NotNil (t , writer )
136
+ assertEqualMessages (t , tt .in , actual )
137
+
138
+ suite .markAll ()
108
139
})
109
140
}
110
141
}
111
142
112
- func (suite * WriterReaderTestSuite ) TestWriter_WriteMessage () {
143
+ //nolint:dupl
144
+ func (suite * WriterReaderTestSuite ) TestWriter_WriteMessageStdLib () {
113
145
invalidMessage := fakes .FakeMessage ()
114
146
invalidMessage .Broker = ""
115
147
@@ -151,7 +183,7 @@ func (suite *WriterReaderTestSuite) TestWriter_WriteMessage() {
151
183
152
184
// GIVEN
153
185
for _ , message := range tt .in {
154
- id , err := suite .write (message )
186
+ id , err := suite .writeStdLib (message )
155
187
if tt .wantErr {
156
188
require .Error (t , err )
157
189
return
@@ -487,6 +519,34 @@ func (suite *WriterReaderTestSuite) beginTx(ctx context.Context) (pgx.Tx, func(e
487
519
return tx , commitFunc , nil
488
520
}
489
521
522
+ func (suite * WriterReaderTestSuite ) beginTxStdLib () (* sql.Tx , func (err error ) error , error ) {
523
+ emptyFunc := func (_ error ) error { return nil }
524
+
525
+ tx , err := suite .db .Begin ()
526
+ if err != nil {
527
+ return nil , emptyFunc , fmt .Errorf ("db.Begin: %w" , err )
528
+ }
529
+
530
+ commitFunc := func (execErr error ) error {
531
+ if execErr != nil {
532
+ rbErr := tx .Rollback ()
533
+ if rbErr != nil {
534
+ return fmt .Errorf ("tx.Rollback %v: %w" , execErr , rbErr ) //nolint:errorlint
535
+ }
536
+ return execErr
537
+ }
538
+
539
+ txErr := tx .Commit ()
540
+ if txErr != nil {
541
+ return fmt .Errorf ("tx.Commit: %w" , txErr )
542
+ }
543
+
544
+ return nil
545
+ }
546
+
547
+ return tx , commitFunc , nil
548
+ }
549
+
490
550
func (suite * WriterReaderTestSuite ) write (message types.Message ) (_ int64 , txErr error ) {
491
551
tx , commitFunc , err := suite .beginTx (ctx )
492
552
if err != nil {
@@ -506,6 +566,25 @@ func (suite *WriterReaderTestSuite) write(message types.Message) (_ int64, txErr
506
566
return id , nil
507
567
}
508
568
569
+ func (suite * WriterReaderTestSuite ) writeStdLib (message types.Message ) (_ int64 , txErr error ) {
570
+ tx , commitFunc , err := suite .beginTxStdLib ()
571
+ if err != nil {
572
+ return 0 , fmt .Errorf ("beginTx: %w" , err )
573
+ }
574
+ defer func () {
575
+ if err := commitFunc (txErr ); err != nil {
576
+ txErr = fmt .Errorf ("commitFunc: %w" , err )
577
+ }
578
+ }()
579
+
580
+ id , err := suite .writer .Write (ctx , tx , message )
581
+ if err != nil {
582
+ return 0 , fmt .Errorf ("writer.Write: %w" , err )
583
+ }
584
+
585
+ return id , nil
586
+ }
587
+
509
588
func (suite * WriterReaderTestSuite ) writeBatch (messages []types.Message ) (_ []int64 , txErr error ) {
510
589
tx , commitFunc , err := suite .beginTx (ctx )
511
590
if err != nil {
@@ -549,13 +628,54 @@ func (suite *WriterReaderTestSuite) noError(err error) {
549
628
suite .Require ().NoError (err )
550
629
}
551
630
631
+ //nolint:unparam
552
632
func maxInt (x , y int ) int {
553
633
if x > y {
554
634
return x
555
635
}
556
636
return y
557
637
}
558
638
639
+ // TestWriter_New is just to increase coverage.
640
+ func (suite * WriterReaderTestSuite ) TestWriter_New () {
641
+ tests := []struct {
642
+ name string
643
+ table string
644
+ options []outbox.WriteOption
645
+ wantErr error
646
+ }{
647
+ {
648
+ name : "empty table" ,
649
+ table : "" ,
650
+ wantErr : outbox .ErrTableEmpty ,
651
+ },
652
+ {
653
+ name : "non-empty table" ,
654
+ table : "outbox_messages" ,
655
+ },
656
+ {
657
+ name : "with options" ,
658
+ table : "outbox_messages" ,
659
+ options : []outbox.WriteOption {outbox .WithDisablePreparedBatch ()},
660
+ },
661
+ }
662
+
663
+ for _ , tt := range tests {
664
+ suite .Run (tt .name , func () {
665
+ t := suite .T ()
666
+
667
+ writer , err := outbox .NewWriter (tt .table , tt .options ... )
668
+ if tt .wantErr != nil {
669
+ require .ErrorIs (t , err , tt .wantErr )
670
+ return
671
+ }
672
+
673
+ require .NoError (t , err )
674
+ assert .NotNil (t , writer )
675
+ })
676
+ }
677
+ }
678
+
559
679
func (suite * WriterReaderTestSuite ) TestWriter_WriteWithNilTx () {
560
680
message := fakes .FakeMessage ()
561
681
@@ -599,3 +719,48 @@ func (suite *WriterReaderTestSuite) TestWriter_WriteWithNilTx() {
599
719
})
600
720
}
601
721
}
722
+
723
+ // TestReader_New is just to increase coverage.
724
+ func (suite * WriterReaderTestSuite ) TestReader_New () {
725
+ tests := []struct {
726
+ name string
727
+ table string
728
+ pool * pgxpool.Pool
729
+ option outbox.ReadOption
730
+ wantErr error
731
+ }{
732
+ {
733
+ name : "empty table" ,
734
+ table : "" ,
735
+ pool : suite .pool ,
736
+ wantErr : outbox .ErrTableEmpty ,
737
+ },
738
+ {
739
+ name : "nil pool" ,
740
+ table : "outbox_messages" ,
741
+ pool : nil ,
742
+ wantErr : outbox .ErrPoolNil ,
743
+ },
744
+ {
745
+ name : "with option" ,
746
+ table : "outbox_messages" ,
747
+ pool : suite .pool ,
748
+ option : outbox .WithReadFilter (types.MessageFilter {Brokers : []string {"broker1" }}),
749
+ },
750
+ }
751
+
752
+ for _ , tt := range tests {
753
+ suite .Run (tt .name , func () {
754
+ t := suite .T ()
755
+
756
+ reader , err := outbox .NewReader (tt .table , tt .pool , tt .option )
757
+ if tt .wantErr != nil {
758
+ require .ErrorIs (t , err , tt .wantErr )
759
+ return
760
+ }
761
+
762
+ require .NoError (t , err )
763
+ assert .NotNil (t , reader )
764
+ })
765
+ }
766
+ }
0 commit comments