Skip to content

Commit 043b4c2

Browse files
authored
feat: introduce tx idle timeout in Postgres connections (#4598)
1 parent 9a89780 commit 043b4c2

File tree

2 files changed

+78
-2
lines changed

2 files changed

+78
-2
lines changed

utils/misc/dbutils.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/url"
77
"os"
8+
"time"
89

910
"github.com/lib/pq"
1011

@@ -19,15 +20,20 @@ func GetConnectionString(c *config.Config, componentName string) string {
1920
port := c.GetInt("DB.port", 5432)
2021
password := c.GetString("DB.password", "ubuntu") // Reading secrets from
2122
sslmode := c.GetString("DB.sslMode", "disable")
23+
idleTxTimeout := c.GetDuration("DB.IdleTxTimeout", 5, time.Minute)
24+
2225
// Application Name can be any string of less than NAMEDATALEN characters (64 characters in a standard PostgreSQL build).
2326
// There is no need to truncate the string on our own though since PostgreSQL auto-truncates this identifier and issues a relevant notice if necessary.
2427
appName := DefaultString("rudder-server").OnError(os.Hostname())
2528
if len(componentName) > 0 {
2629
appName = fmt.Sprintf("%s-%s", componentName, appName)
2730
}
2831
return fmt.Sprintf("host=%s port=%d user=%s "+
29-
"password=%s dbname=%s sslmode=%s application_name=%s",
30-
host, port, user, password, dbname, sslmode, appName)
32+
"password=%s dbname=%s sslmode=%s application_name=%s "+
33+
" options='-c idle_in_transaction_session_timeout=%d'",
34+
host, port, user, password, dbname, sslmode, appName,
35+
idleTxTimeout.Milliseconds(),
36+
)
3137
}
3238

3339
// SetAppNameInDBConnURL sets application name in db connection url

utils/misc/dbutils_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package misc_test
22

33
import (
4+
"database/sql"
5+
"fmt"
46
"testing"
7+
"time"
58

9+
"github.com/ory/dockertest/v3"
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/rudderlabs/rudder-go-kit/config"
13+
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"
614
"github.com/rudderlabs/rudder-server/utils/misc"
715
)
816

@@ -58,3 +66,65 @@ func TestSetApplicationNameInDBConnectionURL(t *testing.T) {
5866
})
5967
}
6068
}
69+
70+
func TestIdleTxTimeout(t *testing.T) {
71+
pool, err := dockertest.NewPool("")
72+
require.NoError(t, err)
73+
postgresContainer, err := postgres.Setup(pool, t)
74+
require.NoError(t, err)
75+
76+
conf := config.New()
77+
conf.Set("DB.host", postgresContainer.Host)
78+
conf.Set("DB.user", postgresContainer.User)
79+
conf.Set("DB.name", postgresContainer.Database)
80+
conf.Set("DB.port", postgresContainer.Port)
81+
conf.Set("DB.password", postgresContainer.Password)
82+
83+
txTimeout := 2 * time.Millisecond
84+
85+
conf.Set("DB.IdleTxTimeout", txTimeout)
86+
87+
dsn := misc.GetConnectionString(conf, "test")
88+
89+
db, err := sql.Open("postgres", dsn)
90+
require.NoError(t, err)
91+
92+
var sessionTimeout string
93+
err = db.QueryRow("SHOW idle_in_transaction_session_timeout;").Scan(&sessionTimeout)
94+
require.NoError(t, err)
95+
require.Equal(t, txTimeout.String(), sessionTimeout)
96+
97+
t.Run("timeout tx", func(t *testing.T) {
98+
tx, err := db.Begin()
99+
require.NoError(t, err)
100+
101+
var pid int
102+
err = tx.QueryRow(`select pg_backend_pid();`).Scan(&pid)
103+
require.NoError(t, err)
104+
105+
_, err = tx.Exec("select 1")
106+
require.NoError(t, err)
107+
t.Log("sleep double the timeout to close connection")
108+
time.Sleep(2 * txTimeout)
109+
110+
err = tx.Commit()
111+
require.EqualError(t, err, "driver: bad connection")
112+
113+
var count int
114+
err = db.QueryRow(`SELECT count(*) FROM pg_stat_activity WHERE pid = $1`, pid).Scan(&count)
115+
require.NoError(t, err)
116+
117+
require.Zero(t, count)
118+
})
119+
120+
t.Run("successful tx", func(t *testing.T) {
121+
tx, err := db.Begin()
122+
require.NoError(t, err)
123+
_, err = tx.Exec("select 1")
124+
require.NoError(t, err)
125+
_, err = tx.Exec(fmt.Sprintf("select pg_sleep(%f)", txTimeout.Seconds()))
126+
require.NoError(t, err)
127+
128+
require.NoError(t, tx.Commit())
129+
})
130+
}

0 commit comments

Comments
 (0)