forked from eozmen410/mapreduce_wordcount
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwc1.hs
110 lines (82 loc) · 2.87 KB
/
wc1.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
{-# LANGUAGE TupleSections #-}
import Control.Parallel(pseq)
import Control.Parallel.Strategies
import Data.Char(isAlpha, toLower)
import Data.Map(Map, keys, fromListWith, toList, unionsWith)
import qualified Data.ByteString.Lazy.Char8 as B
import Data.List(sortBy)
import Data.Function(on)
import System.Environment(getArgs, getProgName)
import System.Exit(die)
{-
Name: Ecenaz Ozmen and Yefri Gaitan
Uni: eo2419 and yg2548
------------------------------
COMS 4995 003 Parallel Functional Programming
Final Project
[Description]
to compile:
stack ghc -- -O2 -Wall -threaded -rtsopts -eventlog wc
to run:
./wc big.txt seq +RTS -N8 -ls (for sequential)
./wc big.txt par +RTS -N8 -ls (for parallel)
-----
Use lts-14.5 as the "resolver" for the Haskell Tool Stack.
Your code should load under GHCi 8.6.5 with no warnings under -Wall, e.g.
:set -Wall
:l hw4
-}
-- have main function take two parameters: name of file and whether seq or par
main :: IO()
main = do
args <- getArgs
case args of
[filename, "par"] -> do
content <- B.readFile filename
print $ take 10 $ sort $ wcpar content
[filename, "seq"] -> do
content <- B.readFile filename
print $ take 10 $ sort $ wcseq content
_ -> do
pn <- getProgName
die $ "Usage: " ++ pn ++ " <filename> <par/seq>"
wcseq :: B.ByteString -> [(B.ByteString, Int)]
wcseq = seqMapReduce wcmap wcreduce . split 100000
wcpar :: B.ByteString -> [(B.ByteString, Int)]
wcpar = finalreduce . parMapReduce rdeepseq wcmap rseq parwcreduce . split 100000
-- wc helper functions
--
wcmap :: [B.ByteString] -> [(B.ByteString, Int)]
wcmap = map (, 1)
parwcreduce :: [(B.ByteString, Int)] -> Map B.ByteString Int
parwcreduce = fromListWith (+)
finalreduce :: [Map B.ByteString Int] -> [(B.ByteString, Int)]
finalreduce = toList . unionsWith (+)
wcreduce :: [[(B.ByteString, Int)]] -> [(B.ByteString, Int)]
wcreduce = toList . fromListWith (+) . concat
-- map reduce library
--
seqMapReduce :: (a -> b) -> ([b] -> c) -> [a] -> c
seqMapReduce mf rf = rf . map mf
parMapReduce
:: Strategy b -- for mapping
-> (a -> b) -- map func
-> Strategy c -- for reducing
-> (b -> c) -- reduce func
-> [a] -- init list
-> [c]
parMapReduce mstrat mf rstrat rf xs =
mres `pseq` rres
where mres = map mf xs `using` parBuffer 200 mstrat
rres = map rf mres `using` parBuffer 200 rstrat -- [[(B.ByteString, Int)]]
-- Helper functions
--
sort :: Ord b => [(a,b)] -> [(a,b)]
sort = sortBy (flip compare `on` snd)
split :: Int -> B.ByteString -> [[B.ByteString]]
split n bs = chunk n $ map removeNonLetters $ B.words bs
chunk :: Int -> [a] -> [[a]]
chunk _ [] = []
chunk n xs = let (as,bs) = splitAt n xs in as : chunk n bs
removeNonLetters :: B.ByteString -> B.ByteString
removeNonLetters = B.filter isAlpha . B.map toLower