From dad6bd9ac44a32110f08d26e7a1a8fc13fa0a5c5 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin Date: Tue, 22 Apr 2025 20:13:41 +0100 Subject: [PATCH] smp server: fix/text database import --- src/Simplex/Messaging/Server/Main.hs | 54 ++++++++++++------- .../Messaging/Server/QueueStore/Postgres.hs | 7 ++- tests/CoreTests/StoreLogTests.hs | 12 ++++- tests/Test.hs | 5 ++ 4 files changed, 57 insertions(+), 21 deletions(-) diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index b586fd0f9..19e80bada 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -26,6 +26,7 @@ import Data.Char (isAlpha, isAscii, toUpper) import Data.Either (fromRight) import Data.Functor (($>)) import Data.Ini (Ini, lookupValue, readIniFile) +import Data.Int (Int64) import Data.List (find, isPrefixOf) import qualified Data.List.NonEmpty as L import Data.Maybe (fromMaybe, isJust, isNothing) @@ -117,7 +118,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir) "Messages not imported" - ms <- newJournalMsgStore MQStoreCfg + ms <- newJournalMsgStore logPath MQStoreCfg readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration putStrLn "Import completed" @@ -135,7 +136,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath) "Journal not exported" - ms <- newJournalMsgStore MQStoreCfg + ms <- newJournalMsgStore logPath MQStoreCfg -- TODO [postgres] in case postgres configured, queues must be read from database readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms exportMessages True ms storeMsgsFilePath False @@ -178,14 +179,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: store log file " <> storeLogFile <> " will be compacted and imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema) "Queue records not imported" - ms <- newJournalMsgStore MQStoreCfg - sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms) - closeStoreLog sl - queues <- readTVarIO $ loadedQueues $ stmQueueStore ms - let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} - ps <- newJournalMsgStore $ PQStoreCfg storeCfg - qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps - renameFile storeLogFile $ storeLogFile <> ".bak" + qCnt <- importStoreLogToDatabase logPath storeLogFile dbOpts putStrLn $ "Import completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSMemory SMSMemory) -> setToDbStr <> "\nstore_messages set to `memory`, import messages to journal to use PostgreSQL database for queues (`smp-server journal import`)" @@ -207,10 +201,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = confirmOrExit ("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath) "Queue records not exported" - let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini} - ps <- newJournalMsgStore $ PQStoreCfg storeCfg - sl <- openWriteStoreLog False storeLogFilePath - Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) + qCnt <- exportDatabaseToStoreLog logPath dbOpts storeLogFilePath putStrLn $ "Export completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file." @@ -239,16 +230,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = (pure storeLogFile) (putStrLn ("Store log file " <> storeLogFile <> " not found") >> exitFailure) Nothing -> putStrLn "Store log disabled, see `[STORE_LOG] enable`" >> exitFailure - newJournalMsgStore :: QStoreCfg s -> IO (JournalMsgStore s) - newJournalMsgStore qsCfg = - let cfg = mkJournalStoreConfig qsCfg storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration - in newMsgStore cfg iniFile = combine cfgPath "smp-server.ini" serverVersion = "SMP server v" <> simplexMQVersion executableName = "smp-server" storeLogFilePath = combine logPath "smp-server-store.log" storeMsgsFilePath = combine logPath "smp-server-messages.log" - storeMsgsJournalDir = combine logPath "messages" + storeMsgsJournalDir = storeMsgsJournalDir' logPath storeNtfsFilePath = combine logPath "smp-server-ntfs.log" readStoreType :: Ini -> Either String AStoreType readStoreType ini = case (iniStoreQueues, iniStoreMessage) of @@ -567,8 +554,37 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = putStrLn $ "Error: both " <> storeLogFilePath <> " file and " <> B.unpack schema <> " schema are present (database: " <> B.unpack connstr <> ")." putStrLn "Configure queue storage." exitFailure + +importStoreLogToDatabase :: FilePath -> FilePath -> DBOpts -> IO Int64 +importStoreLogToDatabase logPath storeLogFile dbOpts = do + ms <- newJournalMsgStore logPath MQStoreCfg + sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms) + closeStoreLog sl + queues <- readTVarIO $ loadedQueues $ stmQueueStore ms + let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL} + ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg + qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps + renameFile storeLogFile $ storeLogFile <> ".bak" + pure qCnt + +exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO Int +exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do + let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL} + ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg + sl <- openWriteStoreLog False storeLogFilePath + Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) + closeStoreLog sl + pure qCnt #endif +newJournalMsgStore :: FilePath -> QStoreCfg s -> IO (JournalMsgStore s) +newJournalMsgStore logPath qsCfg = + let cfg = mkJournalStoreConfig qsCfg (storeMsgsJournalDir' logPath) defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration + in newMsgStore cfg + +storeMsgsJournalDir' :: FilePath -> FilePath +storeMsgsJournalDir' logPath = combine logPath "messages" + data EmbeddedWebParams = EmbeddedWebParams { webStaticPath :: FilePath, webHttpPort :: Maybe Int, diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index f500a3f42..a4625b2f7 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -374,7 +374,12 @@ batchInsertQueues tty queues toStore = do let st = dbStore toStore count <- withConnection st $ \db -> do - DB.copy_ db "COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, snd_secure, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at) FROM STDIN WITH (FORMAT CSV)" + DB.copy_ + db + [sql| + COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at, link_id, fixed_data, user_data) + FROM STDIN WITH (FORMAT CSV) + |] mapM_ (putQueue db) (zip [1..] qs) DB.putCopyEnd db Only qCnt : _ <- withConnection st (`DB.query_` "SELECT count(*) FROM msg_queues") diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 9556c6788..b1ab3cb9d 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE NamedFieldPuns #-} @@ -21,6 +22,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol import Simplex.Messaging.Server.Env.STM (readWriteQueueStore) +import Simplex.Messaging.Server.Main import Simplex.Messaging.Server.MsgStore.Journal import Simplex.Messaging.Server.MsgStore.Types import Simplex.Messaging.Server.QueueStore @@ -53,7 +55,6 @@ deriving instance Eq StoreLogRecord deriving instance Eq NtfCreds --- TODO [short links] test store log with queue data storeLogTests :: Spec storeLogTests = forM_ [QMMessaging, QMContact] $ \qm -> do @@ -138,6 +139,15 @@ testSMPStoreLog testSuite tests = mapM_ (writeStoreLogRecord l) saved closeStoreLog l replicateM_ 3 $ testReadWrite t +#if defined(dbServerPostgres) + qCnt <- fromIntegral <$> importStoreLogToDatabase "tests/tmp/" testStoreLogFile testStoreDBOpts + qCnt `shouldBe` length (compacted t) + imported <- B.readFile $ testStoreLogFile <> ".bak" + qCnt' <- exportDatabaseToStoreLog "tests/tmp/" testStoreDBOpts testStoreLogFile + qCnt' `shouldBe` qCnt + exported <- B.readFile testStoreLogFile + imported `shouldBe` exported +#endif where testReadWrite SLTC {compacted, state} = do st <- newMsgStore $ testJournalStoreCfg MQStoreCfg diff --git a/tests/Test.hs b/tests/Test.hs index 674c1aa18..9ebdec8f7 100644 --- a/tests/Test.hs +++ b/tests/Test.hs @@ -81,7 +81,12 @@ main = do describe "Message store tests" msgStoreTests describe "Retry interval tests" retryIntervalTests describe "SOCKS settings tests" socksSettingsTests +#if defined(dbServerPostgres) + around_ (postgressBracket testServerDBConnectInfo) $ + describe "Store log tests" storeLogTests +#else describe "Store log tests" storeLogTests +#endif describe "TRcvQueues tests" tRcvQueuesTests describe "Util tests" utilTests describe "Agent core tests" agentCoreTests