Skip to content

Commit

Permalink
Use unique key per peer
Browse files Browse the repository at this point in the history
This should make that naiive polling already a bit more robust.
  • Loading branch information
ch1bo committed Sep 13, 2024
1 parent 755e6ea commit 4a68949
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions hydra-node/src/Hydra/Network/Etcd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Data.ByteString.Base16 qualified as Base16
import Data.ByteString.Base16.Lazy qualified as LBase16
import Data.ByteString.Base64 qualified as Base64
import Hydra.Logging (Tracer)
import Hydra.Network (Host (..), Network (..), NetworkCallback (..), NetworkComponent)
import Hydra.Network (Host (..), Network (..), NetworkCallback (..), NetworkComponent, PortNumber)
import Hydra.Node.Network (NetworkConfiguration (..))
import System.FilePath ((</>))
import System.Posix (Handler (Catch), installHandler, sigTERM)
Expand All @@ -40,7 +40,7 @@ withEtcdNetwork _tracer config callback action = do
race_ (waitMessages clientUrl callback) $ do
action
Network
{ broadcast = putMessage clientUrl
{ broadcast = putMessage clientUrl port
}
where
-- TODO: use TLS to secure peer connections
Expand Down Expand Up @@ -83,16 +83,17 @@ withEtcdNetwork _tracer config callback action = do
putMessage ::
(ToCBOR msg, MonadIO m) =>
String ->
PortNumber ->
msg ->
m ()
putMessage endpoint msg = do
putMessage endpoint port msg = do
-- XXX: error handling
runProcess_ $
proc "etcdctl" ["--endpoints", endpoint, "put", key]
& setStdin (byteStringInput hexMsg)
where
-- FIXME: use different keys per message types? per peer?
key = "foo"
-- FIXME: use different keys per message types?
key = "foo-" <> show port

hexMsg = LBase16.encode $ serialize msg

Expand All @@ -105,44 +106,44 @@ waitMessages ::
m ()
waitMessages endpoint NetworkCallback{deliver} = do
forever $ do
threadDelay 0.1
threadDelay 0.001
-- TODO: use revisions? and use compaction to limit storage
-- FIXME: use watch instead of poll
try (getKey endpoint "foo") >>= \case
Left (e :: SomeException) -> fail $ "etcd get error" <> show e
Right entry -> do
-- HACK: lenient decoding
case decodeFull' $ Base16.decodeLenient (value entry) of
Left err -> fail $ "Failed to decode etcd entry: " <> show err
Right msg ->
deliver msg
Left (e :: SomeException) -> putStrLn $ "etcd get error" <> show e
Right EtcdEntry{entries} -> do
forM_ entries $ \(_, value) -> do
-- HACK: lenient decoding
case decodeFull' $ Base16.decodeLenient value of
Left err -> fail $ "Failed to decode etcd entry: " <> show err
Right msg ->
deliver msg

getKey :: MonadIO m => String -> String -> m EtcdEntry
getKey endpoint key = do
-- XXX: error handling
out <- readProcessStdout_ $ proc "etcdctl" ["--endpoints", endpoint, "-w", "json", "get", key]
out <- readProcessStdout_ $ proc "etcdctl" ["--endpoints", endpoint, "-w", "json", "get", "--prefix", key]
case Aeson.eitherDecode out >>= parseEither parseEtcdEntry of
Left err -> die $ "Failed to parse etcd entry: " <> err
Right entry -> pure entry

data EtcdEntry = EtcdEntry
{ revision :: Natural
, key :: ByteString
, value :: ByteString
, entries :: [(ByteString, ByteString)]
}
deriving (Show)

parseEtcdEntry :: Value -> Parser EtcdEntry
parseEtcdEntry = withObject "EtcdEntry" $ \o -> do
-- TODO: use header revision or mod_revision of entry?
revision <- o .: "header" >>= (.: "revision")
entries <- o .: "kvs"
case entries of
[entry] -> do
key <- parseBase64 =<< entry .: "key"
value <- parseBase64 =<< entry .: "value"
pure EtcdEntry{revision, key, value}
_ -> fail "expected exactly one entry"
entries <- o .: "kvs" >>= mapM parseEntry
pure EtcdEntry{revision, entries}
where
parseEntry = withObject "EtcdEntry[]" $ \o -> do
key <- parseBase64 =<< o .: "key"
value <- parseBase64 =<< o .: "value"
pure (key, value)

-- HACK: lenient decoding
parseBase64 :: Text -> Parser ByteString
Expand Down

0 comments on commit 4a68949

Please sign in to comment.