-
Notifications
You must be signed in to change notification settings - Fork 0
/
QueueDeployment.hs
156 lines (129 loc) · 4.2 KB
/
QueueDeployment.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
{-# LANGUAGE GADTs #-}
module QueueDeployment where
import Control.Exception (finally)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import GHC.Num.Natural
import Test.QuickCheck.Monadic
import ModelIO
------------------------------------------------------------------------
qUEUE_SIZE :: Natural
qUEUE_SIZE = 2^16
deploy :: P a b -> TBQueue a -> IO (TBQueue b)
deploy Id xs = return xs
deploy (f :>>> g) xs = deploy g =<< deploy f xs
deploy (Map f) xs = deploy (MapM (return . f)) xs
deploy (MapM f) xs = do
ys <- newTBQueueIO qUEUE_SIZE
_pid <- forkIO $ forever $ do
x <- atomically (readTBQueue xs)
y <- f x
atomically (writeTBQueue ys y)
return ys
deploy (f :&&& g) xs = do
xs1 <- newTBQueueIO qUEUE_SIZE
xs2 <- newTBQueueIO qUEUE_SIZE
_pid <- forkIO $ forever $ do
x <- atomically (readTBQueue xs)
atomically $ do
writeTBQueue xs1 x
writeTBQueue xs2 x
ys <- deploy f xs1
zs <- deploy g xs2
yzs <- newTBQueueIO qUEUE_SIZE
_pid <- forkIO $ forever $ do
y <- atomically (readTBQueue ys)
z <- atomically (readTBQueue zs)
atomically (writeTBQueue yzs (y, z))
return yzs
deploy (Shard f) xs = do
xsEven <- newTBQueueIO qUEUE_SIZE
xsOdd <- newTBQueueIO qUEUE_SIZE
_pid <- forkIO $ shard xs xsEven xsOdd
ysEven <- deploy f xsEven
ysOdd <- deploy f xsOdd
ys <- newTBQueueIO qUEUE_SIZE
_pid <- forkIO $ merge ysEven ysOdd ys
return ys
where
shard :: TBQueue a -> TBQueue a -> TBQueue a -> IO ()
shard qIn qEven qOdd = do
atomically (readTBQueue qIn >>= writeTBQueue qEven)
shard qIn qOdd qEven
merge :: TBQueue a -> TBQueue a -> TBQueue a -> IO ()
merge qEven qOdd qOut = do
atomically (readTBQueue qEven >>= writeTBQueue qOut)
merge qOdd qEven qOut
deploy _ _ = error "not implemented"
example' :: [Int] -> IO [(Int, Bool)]
example' xs0 = do
xs <- newTBQueueIO qUEUE_SIZE
mapM_ (atomically . writeTBQueue xs) xs0
ys <- deploy (Id :&&& Map even) xs
replicateM (length xs0) (atomically (readTBQueue ys))
prop_commute :: Eq b => P a b -> [a] -> PropertyM IO ()
prop_commute p xs = do
ys <- run $ do
qxs <- newTBQueueIO qUEUE_SIZE
mapM_ (atomically . writeTBQueue qxs) xs
qys <- deploy p qxs
replicateM (length xs) (atomically (readTBQueue qys))
ys' <- run (model p xs)
assert (ys == ys')
------------------------------------------------------------------------
queueSleepSeq :: P () ()
queueSleepSeq =
MapM $ \() -> do
() <- threadDelay 250000
((), ()) <- (,) <$> threadDelay 250000 <*> threadDelay 250000
() <- threadDelay 250000
return ()
queueSleep :: P () ()
queueSleep =
MapM (const (threadDelay 250000)) :&&& MapM (const (threadDelay 250000)) :>>>
MapM (const (threadDelay 250000)) :>>>
MapM (const (threadDelay 250000))
queueSleepSharded :: P () ()
queueSleepSharded = Shard queueSleep
runP :: P a b -> [a] -> IO [b]
runP p xs0 = do
xs <- newTBQueueIO qUEUE_SIZE
pid <- forkIO $ mapM_ (atomically . writeTBQueue xs) xs0
ys <- deploy p xs
replicateM (length xs0) (atomically (readTBQueue ys))
`finally` killThread pid
runP_ :: P () b -> Int -> IO ()
runP_ p n = do
xs <- newTBQueueIO qUEUE_SIZE
pid <- forkIO $ replicateM_ n (atomically (writeTBQueue xs ()))
ys <- deploy p xs
replicateM_ n (atomically (readTBQueue ys))
`finally` killThread pid
runQueueSleepSeq :: IO ()
runQueueSleepSeq = void (runP_ queueSleepSeq 5)
runQueueSleep :: IO ()
runQueueSleep = void (runP_ queueSleep 5)
runQueueSleepSharded :: IO ()
runQueueSleepSharded = void (runP_ queueSleepSharded 5)
copyP :: P () ()
copyP =
Id :&&& Id :&&& Id :&&& Id :&&& Id
:>>> Map (const ())
copy10P :: P () ()
copy10P =
Id :&&& Id :&&& Id :&&& Id :&&& Id :&&&
Id :&&& Id :&&& Id :&&& Id :&&& Id
:>>> Map (const ())
copyPSharded :: P () ()
copyPSharded = Shard copyP
noCopyP :: P () ()
noCopyP = Map (const ())
runQueueCopying :: Int -> IO ()
runQueueCopying n = void (runP_ copyP n)
runQueueCopyingSharded :: Int -> IO ()
runQueueCopyingSharded n = void (runP_ copyPSharded n)
runQueueNoCopying :: Int -> IO ()
runQueueNoCopying n = void (runP_ noCopyP n)
runQueueCopying10 :: Int -> IO ()
runQueueCopying10 n = void (runP_ copy10P n)