Skip to content

Commit fd78218

Browse files
authored
Merge pull request #30 from reugn/develop
v0.5.0
2 parents ea97a47 + 2f3cb8c commit fd78218

File tree

17 files changed

+528
-84
lines changed

17 files changed

+528
-84
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Supported Sources and Sinks (ext package):
3232
* Go channels
3333
* File system
3434
* Network (TCP, UDP)
35+
* [Aerospike](https://www.aerospike.com/)
3536
* [Apache Kafka](https://kafka.apache.org/)
3637
* [Apache Pulsar](https://pulsar.apache.org/)
3738
* [Redis](https://redis.io/)

examples/aerospike/.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
aerospike

examples/aerospike/main.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
aero "github.com/aerospike/aerospike-client-go"
9+
"github.com/reugn/go-streams"
10+
ext "github.com/reugn/go-streams/extension"
11+
"github.com/reugn/go-streams/flow"
12+
)
13+
14+
func main() {
15+
properties := &ext.AerospikeProperties{
16+
Policy: nil,
17+
Hostname: "localhost",
18+
Port: 3000,
19+
Namespase: "test",
20+
SetName: "streams",
21+
}
22+
ctx, cancelFunc := context.WithCancel(context.Background())
23+
24+
timer := time.NewTimer(time.Minute)
25+
go func() {
26+
select {
27+
case <-timer.C:
28+
cancelFunc()
29+
}
30+
}()
31+
32+
cnProperties := &ext.ChangeNotificationProperties{PollingInterval: time.Second * 3}
33+
source, err := ext.NewAerospikeSource(ctx, properties, nil, cnProperties)
34+
streams.Check(err)
35+
flow1 := flow.NewMap(transform, 1)
36+
sink, err := ext.NewAerospikeSink(ctx, properties, nil)
37+
streams.Check(err)
38+
39+
source.Via(flow1).To(sink)
40+
}
41+
42+
var transform = func(in interface{}) interface{} {
43+
msg := in.(*aero.Record)
44+
fmt.Println(msg.Bins)
45+
msg.Bins["ts"] = streams.NowNano()
46+
return ext.AerospikeKeyBins{Key: msg.Key, Bins: msg.Bins}
47+
}

examples/net/main.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
56
"strings"
7+
"time"
68

79
"github.com/reugn/go-streams"
810
ext "github.com/reugn/go-streams/extension"
@@ -12,7 +14,17 @@ import (
1214
// Test producer: nc -u 127.0.0.1 3434
1315
// Test consumer: nc -u -l 3535
1416
func main() {
15-
source, err := ext.NewNetSource(ext.UDP, "127.0.0.1:3434")
17+
ctx, cancelFunc := context.WithCancel(context.Background())
18+
19+
timer := time.NewTimer(time.Minute)
20+
go func() {
21+
select {
22+
case <-timer.C:
23+
cancelFunc()
24+
}
25+
}()
26+
27+
source, err := ext.NewNetSource(ctx, ext.UDP, "127.0.0.1:3434")
1628
streams.Check(err)
1729
flow1 := flow.NewMap(toUpper, 1)
1830
sink, err := ext.NewNetSink(ext.UDP, "127.0.0.1:3535")
@@ -23,6 +35,6 @@ func main() {
2335

2436
var toUpper = func(in interface{}) interface{} {
2537
msg := in.(string)
26-
fmt.Printf("Got: %s\n", msg)
38+
fmt.Printf("Got: %s", msg)
2739
return strings.ToUpper(msg)
2840
}

examples/redis/main.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package main
22

33
import (
4+
"context"
45
"strings"
6+
"time"
57

68
"github.com/reugn/go-streams"
79

@@ -13,12 +15,23 @@ import (
1315
//docker exec -it pubsub bash
1416
//https://redis.io/topics/pubsub
1517
func main() {
18+
ctx, cancelFunc := context.WithCancel(context.Background())
19+
20+
timer := time.NewTimer(time.Minute)
21+
go func() {
22+
select {
23+
case <-timer.C:
24+
cancelFunc()
25+
}
26+
}()
27+
1628
config := &redis.Options{
1729
Addr: "localhost:6379", // use default Addr
1830
Password: "", // no password set
1931
DB: 0, // use default DB
2032
}
21-
source, err := ext.NewRedisSource(config, "test")
33+
34+
source, err := ext.NewRedisSource(ctx, config, "test")
2235
streams.Check(err)
2336
flow1 := flow.NewMap(toUpper, 1)
2437
sink := ext.NewRedisSink(config, "test2")

extension/aerospike.go

+228
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package ext
2+
3+
import (
4+
"context"
5+
"crypto/md5"
6+
"encoding/json"
7+
"log"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
"time"
12+
13+
aero "github.com/aerospike/aerospike-client-go"
14+
"github.com/reugn/go-streams"
15+
"github.com/reugn/go-streams/flow"
16+
)
17+
18+
// AerospikeProperties is the Aerospike connector configuration properties
19+
type AerospikeProperties struct {
20+
Policy *aero.ClientPolicy
21+
Hostname string
22+
Port int
23+
Namespase string
24+
SetName string
25+
}
26+
27+
// ChangeNotificationProperties holds the changes polling configuration
28+
type ChangeNotificationProperties struct {
29+
PollingInterval time.Duration
30+
}
31+
32+
// AerospikeSource connector
33+
type AerospikeSource struct {
34+
client *aero.Client
35+
records chan *aero.Result
36+
scanPolicy *aero.ScanPolicy
37+
out chan interface{}
38+
ctx context.Context
39+
properties *AerospikeProperties
40+
changeNotificationProperties *ChangeNotificationProperties
41+
}
42+
43+
// NewAerospikeSource returns a new AerospikeSource instance
44+
// set changeNotificationProperties to nil to scan the entire namespace/set
45+
func NewAerospikeSource(ctx context.Context,
46+
properties *AerospikeProperties,
47+
scanPolicy *aero.ScanPolicy,
48+
changeNotificationProperties *ChangeNotificationProperties) (*AerospikeSource, error) {
49+
50+
client, err := aero.NewClientWithPolicy(properties.Policy, properties.Hostname, properties.Port)
51+
if err != nil {
52+
return nil, err
53+
}
54+
55+
if scanPolicy == nil {
56+
scanPolicy = aero.NewScanPolicy()
57+
}
58+
59+
records := make(chan *aero.Result)
60+
source := &AerospikeSource{
61+
client: client,
62+
records: records,
63+
scanPolicy: scanPolicy,
64+
out: make(chan interface{}),
65+
ctx: ctx,
66+
properties: properties,
67+
changeNotificationProperties: changeNotificationProperties,
68+
}
69+
70+
go source.poll()
71+
go source.init()
72+
return source, nil
73+
}
74+
75+
func (as *AerospikeSource) poll() {
76+
if as.changeNotificationProperties == nil {
77+
// scan the entire namespace/set
78+
as.doScan()
79+
close(as.records)
80+
return
81+
}
82+
83+
// get change notifications by polling
84+
ticker := time.NewTicker(as.changeNotificationProperties.PollingInterval)
85+
loop:
86+
for {
87+
select {
88+
case <-as.ctx.Done():
89+
break loop
90+
case t := <-ticker.C:
91+
ts := t.UnixNano() - as.changeNotificationProperties.PollingInterval.Nanoseconds()
92+
as.scanPolicy.PredExp = []aero.PredExp{
93+
aero.NewPredExpRecLastUpdate(),
94+
aero.NewPredExpIntegerValue(ts),
95+
aero.NewPredExpIntegerGreater(),
96+
}
97+
log.Printf("Polling records %v", as.scanPolicy.PredExp)
98+
99+
as.doScan()
100+
}
101+
}
102+
}
103+
104+
func (as *AerospikeSource) doScan() {
105+
recordSet, err := as.client.ScanAll(as.scanPolicy, as.properties.Namespase, as.properties.SetName)
106+
if err != nil {
107+
log.Printf("Aerospike client.ScanAll failed with: %v", err)
108+
} else {
109+
for result := range recordSet.Results() {
110+
as.records <- result
111+
}
112+
}
113+
}
114+
115+
// init starts the main loop
116+
func (as *AerospikeSource) init() {
117+
sigchan := make(chan os.Signal, 1)
118+
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
119+
120+
loop:
121+
for {
122+
select {
123+
case <-sigchan:
124+
break loop
125+
case <-as.ctx.Done():
126+
break loop
127+
case result, ok := <-as.records:
128+
if !ok {
129+
break loop
130+
}
131+
if result.Err == nil {
132+
as.out <- result.Record
133+
} else {
134+
log.Printf("Scan record error %s", result.Err)
135+
}
136+
}
137+
}
138+
139+
log.Printf("Closing Aerospike consumer")
140+
close(as.out)
141+
as.client.Close()
142+
}
143+
144+
// Via streams data through the given flow
145+
func (as *AerospikeSource) Via(_flow streams.Flow) streams.Flow {
146+
flow.DoStream(as, _flow)
147+
return _flow
148+
}
149+
150+
// Out returns an output channel for sending data
151+
func (as *AerospikeSource) Out() <-chan interface{} {
152+
return as.out
153+
}
154+
155+
// AerospikeKeyBins is an Aerospike Key and BinMap container
156+
// use it to stream records to the AerospikeSink
157+
type AerospikeKeyBins struct {
158+
Key *aero.Key
159+
Bins aero.BinMap
160+
}
161+
162+
// AerospikeSink connector
163+
type AerospikeSink struct {
164+
client *aero.Client
165+
in chan interface{}
166+
ctx context.Context
167+
properties *AerospikeProperties
168+
writePolicy *aero.WritePolicy
169+
}
170+
171+
// NewAerospikeSink returns a new AerospikeSink instance
172+
func NewAerospikeSink(ctx context.Context,
173+
properties *AerospikeProperties, writePolicy *aero.WritePolicy) (*AerospikeSink, error) {
174+
client, err := aero.NewClientWithPolicy(properties.Policy, properties.Hostname, properties.Port)
175+
if err != nil {
176+
return nil, err
177+
}
178+
179+
if writePolicy == nil {
180+
writePolicy = aero.NewWritePolicy(0, 0)
181+
}
182+
183+
source := &AerospikeSink{
184+
client: client,
185+
in: make(chan interface{}),
186+
ctx: ctx,
187+
properties: properties,
188+
writePolicy: writePolicy,
189+
}
190+
191+
go source.init()
192+
return source, nil
193+
}
194+
195+
// init starts the main loop
196+
func (as *AerospikeSink) init() {
197+
for msg := range as.in {
198+
switch m := msg.(type) {
199+
case AerospikeKeyBins:
200+
if err := as.client.Put(as.writePolicy, m.Key, m.Bins); err != nil {
201+
log.Printf("Aerospike client.Put failed with: %s", err)
202+
}
203+
case aero.BinMap:
204+
// use the md5 hash of a BinMap as a Key
205+
jsonStr, err := json.Marshal(m)
206+
if err == nil {
207+
key, err := aero.NewKey(as.properties.Namespase,
208+
as.properties.SetName,
209+
md5.Sum([]byte(jsonStr)))
210+
if err == nil {
211+
as.client.Put(as.writePolicy, key, m)
212+
}
213+
}
214+
215+
if err != nil {
216+
log.Printf("Error on processing Aerospike message: %s", err)
217+
}
218+
default:
219+
log.Printf("Unsupported message type %v", m)
220+
}
221+
}
222+
as.client.Close()
223+
}
224+
225+
// In returns an input channel for receiving data
226+
func (as *AerospikeSink) In() chan<- interface{} {
227+
return as.in
228+
}

0 commit comments

Comments
 (0)