Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Data.Char (isAlpha, isAscii, toUpper)
import Data.Either (fromRight)
import Data.Functor (($>))
import Data.Ini (Ini, lookupValue, readIniFile)
import Data.Int (Int64)
import Data.List (find, isPrefixOf)
import qualified Data.List.NonEmpty as L
import Data.Maybe (fromMaybe, isJust, isNothing)
Expand Down Expand Up @@ -117,7 +118,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
confirmOrExit
("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir)
"Messages not imported"
ms <- newJournalMsgStore MQStoreCfg
ms <- newJournalMsgStore logPath MQStoreCfg
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration
putStrLn "Import completed"
Expand All @@ -135,7 +136,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
confirmOrExit
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
"Journal not exported"
ms <- newJournalMsgStore MQStoreCfg
ms <- newJournalMsgStore logPath MQStoreCfg
-- TODO [postgres] in case postgres configured, queues must be read from database
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
exportMessages True ms storeMsgsFilePath False
Expand Down Expand Up @@ -178,14 +179,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
confirmOrExit
("WARNING: store log file " <> storeLogFile <> " will be compacted and imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema)
"Queue records not imported"
ms <- newJournalMsgStore MQStoreCfg
sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms)
closeStoreLog sl
queues <- readTVarIO $ loadedQueues $ stmQueueStore ms
let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini}
ps <- newJournalMsgStore $ PQStoreCfg storeCfg
qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
renameFile storeLogFile $ storeLogFile <> ".bak"
qCnt <- importStoreLogToDatabase logPath storeLogFile dbOpts
putStrLn $ "Import completed: " <> show qCnt <> " queues"
putStrLn $ case readStoreType ini of
Right (ASType SQSMemory SMSMemory) -> setToDbStr <> "\nstore_messages set to `memory`, import messages to journal to use PostgreSQL database for queues (`smp-server journal import`)"
Expand All @@ -207,10 +201,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
confirmOrExit
("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath)
"Queue records not exported"
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini}
ps <- newJournalMsgStore $ PQStoreCfg storeCfg
sl <- openWriteStoreLog False storeLogFilePath
Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
qCnt <- exportDatabaseToStoreLog logPath dbOpts storeLogFilePath
putStrLn $ "Export completed: " <> show qCnt <> " queues"
putStrLn $ case readStoreType ini of
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file."
Expand Down Expand Up @@ -239,16 +230,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
(pure storeLogFile)
(putStrLn ("Store log file " <> storeLogFile <> " not found") >> exitFailure)
Nothing -> putStrLn "Store log disabled, see `[STORE_LOG] enable`" >> exitFailure
newJournalMsgStore :: QStoreCfg s -> IO (JournalMsgStore s)
newJournalMsgStore qsCfg =
let cfg = mkJournalStoreConfig qsCfg storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
in newMsgStore cfg
iniFile = combine cfgPath "smp-server.ini"
serverVersion = "SMP server v" <> simplexMQVersion
executableName = "smp-server"
storeLogFilePath = combine logPath "smp-server-store.log"
storeMsgsFilePath = combine logPath "smp-server-messages.log"
storeMsgsJournalDir = combine logPath "messages"
storeMsgsJournalDir = storeMsgsJournalDir' logPath
storeNtfsFilePath = combine logPath "smp-server-ntfs.log"
readStoreType :: Ini -> Either String AStoreType
readStoreType ini = case (iniStoreQueues, iniStoreMessage) of
Expand Down Expand Up @@ -567,8 +554,37 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
putStrLn $ "Error: both " <> storeLogFilePath <> " file and " <> B.unpack schema <> " schema are present (database: " <> B.unpack connstr <> ")."
putStrLn "Configure queue storage."
exitFailure

importStoreLogToDatabase :: FilePath -> FilePath -> DBOpts -> IO Int64
importStoreLogToDatabase logPath storeLogFile dbOpts = do
ms <- newJournalMsgStore logPath MQStoreCfg
sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms)
closeStoreLog sl
queues <- readTVarIO $ loadedQueues $ stmQueueStore ms
let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg
qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
renameFile storeLogFile $ storeLogFile <> ".bak"
pure qCnt

exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO Int
exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg
sl <- openWriteStoreLog False storeLogFilePath
Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
closeStoreLog sl
pure qCnt
#endif

newJournalMsgStore :: FilePath -> QStoreCfg s -> IO (JournalMsgStore s)
newJournalMsgStore logPath qsCfg =
let cfg = mkJournalStoreConfig qsCfg (storeMsgsJournalDir' logPath) defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
in newMsgStore cfg

storeMsgsJournalDir' :: FilePath -> FilePath
storeMsgsJournalDir' logPath = combine logPath "messages"

data EmbeddedWebParams = EmbeddedWebParams
{ webStaticPath :: FilePath,
webHttpPort :: Maybe Int,
Expand Down
7 changes: 6 additions & 1 deletion src/Simplex/Messaging/Server/QueueStore/Postgres.hs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,12 @@ batchInsertQueues tty queues toStore = do
let st = dbStore toStore
count <-
withConnection st $ \db -> do
DB.copy_ db "COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, snd_secure, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at) FROM STDIN WITH (FORMAT CSV)"
DB.copy_
db
[sql|
COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at, link_id, fixed_data, user_data)
FROM STDIN WITH (FORMAT CSV)
|]
mapM_ (putQueue db) (zip [1..] qs)
DB.putCopyEnd db
Only qCnt : _ <- withConnection st (`DB.query_` "SELECT count(*) FROM msg_queues")
Expand Down
12 changes: 11 additions & 1 deletion tests/CoreTests/StoreLogTests.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
Expand All @@ -21,6 +22,7 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.Env.STM (readWriteQueueStore)
import Simplex.Messaging.Server.Main
import Simplex.Messaging.Server.MsgStore.Journal
import Simplex.Messaging.Server.MsgStore.Types
import Simplex.Messaging.Server.QueueStore
Expand Down Expand Up @@ -53,7 +55,6 @@ deriving instance Eq StoreLogRecord

deriving instance Eq NtfCreds

-- TODO [short links] test store log with queue data
storeLogTests :: Spec
storeLogTests =
forM_ [QMMessaging, QMContact] $ \qm -> do
Expand Down Expand Up @@ -138,6 +139,15 @@ testSMPStoreLog testSuite tests =
mapM_ (writeStoreLogRecord l) saved
closeStoreLog l
replicateM_ 3 $ testReadWrite t
#if defined(dbServerPostgres)
qCnt <- fromIntegral <$> importStoreLogToDatabase "tests/tmp/" testStoreLogFile testStoreDBOpts
qCnt `shouldBe` length (compacted t)
imported <- B.readFile $ testStoreLogFile <> ".bak"
qCnt' <- exportDatabaseToStoreLog "tests/tmp/" testStoreDBOpts testStoreLogFile
qCnt' `shouldBe` qCnt
exported <- B.readFile testStoreLogFile
imported `shouldBe` exported
#endif
where
testReadWrite SLTC {compacted, state} = do
st <- newMsgStore $ testJournalStoreCfg MQStoreCfg
Expand Down
5 changes: 5 additions & 0 deletions tests/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ main = do
describe "Message store tests" msgStoreTests
describe "Retry interval tests" retryIntervalTests
describe "SOCKS settings tests" socksSettingsTests
#if defined(dbServerPostgres)
around_ (postgressBracket testServerDBConnectInfo) $
describe "Store log tests" storeLogTests
#else
describe "Store log tests" storeLogTests
#endif
describe "TRcvQueues tests" tRcvQueuesTests
describe "Util tests" utilTests
describe "Agent core tests" agentCoreTests
Expand Down
Loading