Skip to content

Commit cea3fc0

Browse files
authored
Use proper data type for timestamps in Postgres 2 (#1862)
For some reason, the payments database was forgotten by #1778.
1 parent f8feb19 commit cea3fc0

File tree

2 files changed

+222
-138
lines changed

2 files changed

+222
-138
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala

+43-30
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import scodec.Attempt
3030
import scodec.bits.BitVector
3131
import scodec.codecs._
3232

33-
import java.sql.{ResultSet, Statement}
33+
import java.sql.{ResultSet, Statement, Timestamp}
34+
import java.time.Instant
3435
import java.util.UUID
3536
import javax.sql.DataSource
36-
import scala.concurrent.duration._
3737

3838
class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb with Logging {
3939

@@ -42,7 +42,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
4242
import lock._
4343

4444
val DB_NAME = "payments"
45-
val CURRENT_VERSION = 5
45+
val CURRENT_VERSION = 6
4646

4747
private val hopSummaryCodec = (("node_id" | CommonCodecs.publicKey) :: ("next_node_id" | CommonCodecs.publicKey) :: ("short_channel_id" | optional(bool, CommonCodecs.shortchannelid))).as[HopSummary]
4848
private val paymentRouteCodec = discriminated[List[HopSummary]].by(byte)
@@ -62,12 +62,21 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
6262
statement.executeUpdate("ALTER TABLE sent SET SCHEMA payments")
6363
}
6464

65+
def migration56(statement: Statement): Unit = {
66+
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN created_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + created_at * interval '1 millisecond'")
67+
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN expire_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + expire_at * interval '1 millisecond'")
68+
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN received_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + received_at * interval '1 millisecond'")
69+
70+
statement.executeUpdate("ALTER TABLE payments.sent ALTER COLUMN created_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + created_at * interval '1 millisecond'")
71+
statement.executeUpdate("ALTER TABLE payments.sent ALTER COLUMN completed_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + completed_at * interval '1 millisecond'")
72+
}
73+
6574
getVersion(statement, DB_NAME) match {
6675
case None =>
6776
statement.executeUpdate("CREATE SCHEMA payments")
6877

69-
statement.executeUpdate("CREATE TABLE payments.received (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at BIGINT NOT NULL, expire_at BIGINT NOT NULL, received_at BIGINT)")
70-
statement.executeUpdate("CREATE TABLE payments.sent (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at BIGINT NOT NULL, completed_at BIGINT)")
78+
statement.executeUpdate("CREATE TABLE payments.received (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, expire_at TIMESTAMP WITH TIME ZONE NOT NULL, received_at TIMESTAMP WITH TIME ZONE)")
79+
statement.executeUpdate("CREATE TABLE payments.sent (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at TIMESTAMP WITH TIME ZONE NOT NULL, completed_at TIMESTAMP WITH TIME ZONE)")
7180

7281
statement.executeUpdate("CREATE INDEX sent_parent_id_idx ON payments.sent(parent_id)")
7382
statement.executeUpdate("CREATE INDEX sent_payment_hash_idx ON payments.sent(payment_hash)")
@@ -76,6 +85,10 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
7685
case Some(v@4) =>
7786
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
7887
migration45(statement)
88+
migration56(statement)
89+
case Some(v@5) =>
90+
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
91+
migration56(statement)
7992
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
8093
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
8194
}
@@ -95,7 +108,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
95108
statement.setLong(6, sent.amount.toLong)
96109
statement.setLong(7, sent.recipientAmount.toLong)
97110
statement.setString(8, sent.recipientNodeId.value.toHex)
98-
statement.setLong(9, sent.createdAt)
111+
statement.setTimestamp(9, Timestamp.from(Instant.ofEpochMilli(sent.createdAt)))
99112
statement.setString(10, sent.paymentRequest.map(PaymentRequest.write).orNull)
100113
statement.executeUpdate()
101114
}
@@ -106,7 +119,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
106119
withLock { pg =>
107120
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
108121
paymentResult.parts.foreach(p => {
109-
statement.setLong(1, p.timestamp)
122+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(p.timestamp)))
110123
statement.setString(2, paymentResult.paymentPreimage.toHex)
111124
statement.setLong(3, p.feesPaid.toLong)
112125
statement.setBytes(4, paymentRouteCodec.encode(p.route.getOrElse(Nil).map(h => HopSummary(h)).toList).require.toByteArray)
@@ -121,7 +134,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
121134
override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Postgres) {
122135
withLock { pg =>
123136
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
124-
statement.setLong(1, paymentResult.timestamp)
137+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(paymentResult.timestamp)))
125138
statement.setBytes(2, paymentFailuresCodec.encode(paymentResult.failures.map(f => FailureSummary(f)).toList).require.toByteArray)
126139
statement.setString(3, paymentResult.id.toString)
127140
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to mark an outgoing payment as failed but already in final status (id=${paymentResult.id})")
@@ -134,7 +147,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
134147
rs.getByteVector32FromHexNullable("payment_preimage"),
135148
rs.getMilliSatoshiNullable("fees_msat"),
136149
rs.getBitVectorOpt("payment_route"),
137-
rs.getLongNullable("completed_at"),
150+
rs.getTimestampNullable("completed_at").map(_.getTime),
138151
rs.getBitVectorOpt("failures"))
139152

140153
OutgoingPayment(
@@ -146,7 +159,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
146159
MilliSatoshi(rs.getLong("amount_msat")),
147160
MilliSatoshi(rs.getLong("recipient_amount_msat")),
148161
PublicKey(rs.getByteVectorFromHex("recipient_node_id")),
149-
rs.getLong("created_at"),
162+
rs.getTimestamp("created_at").getTime,
150163
rs.getStringNullable("payment_request").map(PaymentRequest.read),
151164
status
152165
)
@@ -207,8 +220,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
207220
override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp", DbBackends.Postgres) {
208221
withLock { pg =>
209222
using(pg.prepareStatement("SELECT * FROM payments.sent WHERE created_at >= ? AND created_at < ? ORDER BY created_at")) { statement =>
210-
statement.setLong(1, from)
211-
statement.setLong(2, to)
223+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
224+
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
212225
statement.executeQuery().map { rs =>
213226
parseOutgoingPayment(rs)
214227
}.toSeq
@@ -223,8 +236,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
223236
statement.setString(2, preimage.toHex)
224237
statement.setString(3, paymentType)
225238
statement.setString(4, PaymentRequest.write(pr))
226-
statement.setLong(5, pr.timestamp.seconds.toMillis) // BOLT11 timestamp is in seconds
227-
statement.setLong(6, (pr.timestamp + pr.expiry.getOrElse(PaymentRequest.DEFAULT_EXPIRY_SECONDS.toLong)).seconds.toMillis)
239+
statement.setTimestamp(5, Timestamp.from(Instant.ofEpochSecond(pr.timestamp))) // BOLT11 timestamp is in seconds
240+
statement.setTimestamp(6, Timestamp.from(Instant.ofEpochSecond(pr.timestamp + pr.expiry.getOrElse(PaymentRequest.DEFAULT_EXPIRY_SECONDS.toLong))))
228241
statement.executeUpdate()
229242
}
230243
}
@@ -234,7 +247,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
234247
withLock { pg =>
235248
using(pg.prepareStatement("UPDATE payments.received SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update =>
236249
update.setLong(1, amount.toLong)
237-
update.setLong(2, receivedAt)
250+
update.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(receivedAt)))
238251
update.setString(3, paymentHash.toHex)
239252
val updated = update.executeUpdate()
240253
if (updated == 0) {
@@ -250,8 +263,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
250263
PaymentRequest.read(paymentRequest),
251264
rs.getByteVector32FromHex("payment_preimage"),
252265
rs.getString("payment_type"),
253-
rs.getLong("created_at"),
254-
buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), Some(paymentRequest), rs.getLongNullable("received_at")))
266+
rs.getTimestamp("created_at").getTime,
267+
buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), Some(paymentRequest), rs.getTimestampNullable("received_at").map(_.getTime)))
255268
}
256269

257270
private def buildIncomingPaymentStatus(amount_opt: Option[MilliSatoshi], serializedPaymentRequest_opt: Option[String], receivedAt_opt: Option[Long]): IncomingPaymentStatus = {
@@ -274,8 +287,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
274287
override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming", DbBackends.Postgres) {
275288
withLock { pg =>
276289
using(pg.prepareStatement("SELECT * FROM payments.received WHERE created_at > ? AND created_at < ? ORDER BY created_at")) { statement =>
277-
statement.setLong(1, from)
278-
statement.setLong(2, to)
290+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
291+
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
279292
statement.executeQuery().map(parseIncomingPayment).toSeq
280293
}
281294
}
@@ -284,8 +297,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
284297
override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received", DbBackends.Postgres) {
285298
withLock { pg =>
286299
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat > 0 AND created_at > ? AND created_at < ? ORDER BY created_at")) { statement =>
287-
statement.setLong(1, from)
288-
statement.setLong(2, to)
300+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
301+
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
289302
statement.executeQuery().map(parseIncomingPayment).toSeq
290303
}
291304
}
@@ -294,9 +307,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
294307
override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending", DbBackends.Postgres) {
295308
withLock { pg =>
296309
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at > ? ORDER BY created_at")) { statement =>
297-
statement.setLong(1, from)
298-
statement.setLong(2, to)
299-
statement.setLong(3, System.currentTimeMillis)
310+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
311+
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
312+
statement.setTimestamp(3, Timestamp.from(Instant.now()))
300313
statement.executeQuery().map(parseIncomingPayment).toSeq
301314
}
302315
}
@@ -305,9 +318,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
305318
override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired", DbBackends.Postgres) {
306319
withLock { pg =>
307320
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at < ? ORDER BY created_at")) { statement =>
308-
statement.setLong(1, from)
309-
statement.setLong(2, to)
310-
statement.setLong(3, System.currentTimeMillis)
321+
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
322+
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
323+
statement.setTimestamp(3, Timestamp.from(Instant.now()))
311324
statement.executeQuery().map(parseIncomingPayment).toSeq
312325
}
313326
}
@@ -366,9 +379,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
366379
val paymentType = rs.getString("payment_type")
367380
val paymentRequest_opt = rs.getStringNullable("payment_request")
368381
val amount_opt = rs.getMilliSatoshiNullable("final_amount")
369-
val createdAt = rs.getLong("created_at")
370-
val completedAt_opt = rs.getLongNullable("completed_at")
371-
val expireAt_opt = rs.getLongNullable("expire_at")
382+
val createdAt = rs.getTimestamp("created_at").getTime
383+
val completedAt_opt = rs.getTimestampNullable("completed_at").map(_.getTime)
384+
val expireAt_opt = rs.getTimestampNullable("expire_at").map(_.getTime)
372385

373386
if (rs.getString("type") == "received") {
374387
val status: IncomingPaymentStatus = buildIncomingPaymentStatus(amount_opt, paymentRequest_opt, completedAt_opt)

0 commit comments

Comments
 (0)