Skip to content

Commit 8d0212c

Browse files
committed
Add mapreduce example
1 parent 0e07c51 commit 8d0212c

File tree

6 files changed

+85
-14
lines changed

6 files changed

+85
-14
lines changed

benchmarking/queuebenchmark.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55
producerAmounts = 4
66
vals = 100
77

8-
print("Mutex")
8+
print("Junctions:")
99
for i in range(producerAmounts):
1010
prodConAmount = 10 ** i
1111
print("Producer/Consumer Amount: " + str(prodConAmount))
1212
for j in range(amount):
13-
subprocess.call(["./go_build_queuebenchmarking_go.exe","mutex",str(prodConAmount), str(prodConAmount), str(vals)])
13+
subprocess.call(["./go_build_queuebenchmarking_go.exe","junction",str(prodConAmount), str(prodConAmount), str(vals)])
1414

15-
print("Junctions:")
15+
print("Mutex")
1616
for i in range(producerAmounts):
1717
prodConAmount = 10 ** i
1818
print("Producer/Consumer Amount: " + str(prodConAmount))
1919
for j in range(amount):
20-
subprocess.call(["./go_build_queuebenchmarking_go.exe","junction",str(prodConAmount), str(prodConAmount), str(vals)])
20+
subprocess.call(["./go_build_queuebenchmarking_go.exe","mutex",str(prodConAmount), str(prodConAmount), str(vals)])

examples/mapreduce.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package main
2+
3+
import (
4+
"../gojo/junction"
5+
"../gojo/types"
6+
"constraints"
7+
"fmt"
8+
)
9+
10+
func createMapReduce[T constraints.Integer](vals []T) func(types.Unit) ([]T, error) {
11+
j := junction.NewJunction()
12+
13+
mapPort, mapSignal := junction.NewAsyncSignal[T](j)
14+
reducePort, reduceSignal := junction.NewAsyncSignal[T](j)
15+
listPort, listSignal := junction.NewAsyncSignal[[]T](j)
16+
finalPort, finalSignal := junction.NewAsyncSignal[[]T](j)
17+
getPort, getSignal := junction.NewSyncSignal[types.Unit, []T](j)
18+
19+
// map
20+
junction.NewUnaryAsyncJoinPattern[T](mapPort).Action(func(val T) {
21+
fmt.Println("Mapping: ", val)
22+
reduceSignal(val * 2)
23+
})
24+
25+
// reduce
26+
junction.NewBinaryAsyncJoinPattern[[]T, T](listPort, reducePort).Action(func(list []T, val T) {
27+
fmt.Printf("Reducing: %v %d \n", list, val)
28+
list = append(list, val)
29+
30+
if len(list) == len(vals) {
31+
finalSignal(list)
32+
} else {
33+
listSignal(list)
34+
}
35+
})
36+
37+
// final
38+
junction.NewBinarySyncJoinPattern[[]T, types.Unit, []T](finalPort, getPort).Action(func(list []T, a types.Unit) []T {
39+
return list
40+
})
41+
42+
listSignal(make([]T, 0))
43+
44+
for _, val := range vals {
45+
go mapSignal(val)
46+
}
47+
48+
return getSignal
49+
}
50+
51+
func main() {
52+
arr := []int{1, 2, 3, 5}
53+
get := createMapReduce[int](arr)
54+
55+
ret, _ := get(types.Unit{})
56+
57+
fmt.Printf("Result: %v", ret)
58+
}

examples/queue/benchmarking/queuebenchmarking.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,38 @@ func test(producerCount int, consumerCount int, vals int, enqueue func(int), deq
1919
enqueue(i)
2020
}
2121

22-
func() {
22+
wg.Add(1)
23+
24+
go func() {
25+
defer wg.Done()
26+
2327
for i := 1; i <= producerCount; i++ {
2428
wg.Add(1)
2529
go func(num int) {
2630
defer wg.Done()
2731

2832
for j := 1; j <= vals; j++ {
2933
enqueue(j * num)
30-
time.Sleep(time.Duration(j * 10))
3134
}
3235

3336
}(i)
3437
}
3538
}()
3639

40+
wg.Add(1)
41+
3742
// Consumer
38-
func() {
43+
go func() {
44+
defer wg.Done()
45+
3946
for i := 0; i < consumerCount; i++ {
4047
wg.Add(1)
48+
4149
go func() {
4250
defer wg.Done()
4351

4452
for j := 0; j < vals; j++ {
4553
dequeue(types.Unit{})
46-
time.Sleep(time.Duration(j * 10))
4754
}
4855

4956
}()

examples/queue/queue.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ Problem when reading and writing async:
2525

2626
func NewQueue[T any]() (func(T), func(types.Unit) (T, error)) {
2727
j := junction.NewJunction()
28+
j1 := junction.NewJunction()
2829

2930
firstPort, firstSignal := junction.NewAsyncSignal[QueueElement[T]](j)
30-
lastPort, lastSignal := junction.NewAsyncSignal[QueueElement[T]](j)
31+
lastPort, lastSignal := junction.NewAsyncSignal[QueueElement[T]](j1)
3132

32-
enqueuePort, enqueueSignal := junction.NewAsyncSignal[T](j)
33+
enqueuePort, enqueueSignal := junction.NewAsyncSignal[T](j1)
3334
dequeuePort, dequeueSignal := junction.NewSyncSignal[types.Unit, T](j)
3435

3536
junction.NewBinaryAsyncJoinPattern[QueueElement[T], T](lastPort, enqueuePort).Action(func(last QueueElement[T], value T) {
@@ -43,10 +44,10 @@ func NewQueue[T any]() (func(T), func(types.Unit) (T, error)) {
4344

4445
junction.NewBinarySyncJoinPattern[QueueElement[T], types.Unit, T](firstPort, dequeuePort).Action(func(first QueueElement[T], a types.Unit) T {
4546
nextSignal, _ := first.getNextSignal(types.Unit{})
46-
val, _ := first.getValueSignal(types.Unit{})
47-
4847
firstSignal(nextSignal)
4948

49+
val, _ := first.getValueSignal(types.Unit{})
50+
5051
return val
5152
})
5253

@@ -73,6 +74,8 @@ func newQueueElement[T any]() QueueElement[T] {
7374

7475
junction.NewBinarySyncJoinPattern[T, types.Unit, T](setValuePort, getValuePort).
7576
Action(func(val T, a types.Unit) T {
77+
// Added for benchmarking
78+
time.Sleep(time.Duration(10))
7679
return val
7780
})
7881

examples/queue/queueMutex.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package queue
33
import (
44
"../../gojo/types"
55
"sync"
6+
"time"
67
)
78

89
type QueueElementMutex[T any] struct {
@@ -48,7 +49,7 @@ func NewQueueMutex[T any]() (func(T), func(types.Unit) (T, error)) {
4849

4950
firstElem := queue.head
5051
queue.head = queue.head.next
51-
52+
time.Sleep(time.Duration(10))
5253
c.L.Unlock()
5354

5455
return (*firstElem).val, nil

gojo/junction/controller/controllerdata.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package controller
22

3-
import "../../types"
3+
import (
4+
"../../types"
5+
)
46

57
type JoinPatterns struct {
68
portIds int

0 commit comments

Comments
 (0)