Skip to content

Commit

Permalink
Merge pull request #39 from reugn/develop
Browse files Browse the repository at this point in the history
v0.7.0
  • Loading branch information
reugn authored Nov 27, 2021
2 parents 47e7980 + 55e9e0e commit e98f8bb
Show file tree
Hide file tree
Showing 39 changed files with 644 additions and 336 deletions.
5 changes: 5 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
coverage:
status:
project:
default:
target: 80%
21 changes: 21 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: Build

on: [push, pull_request]

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.17.x]
steps:
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}
- name: Checkout code
uses: actions/checkout@v2
- name: Run coverage
run: go test ./... -coverprofile=coverage.out -covermode=atomic
- name: Upload coverage to Codecov
run: bash <(curl -s https://codecov.io/bash)
15 changes: 15 additions & 0 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: golangci-lint
on:
push:
branches:
- master
pull_request:

jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/
.vscode/
/vendor
coverage.out
41 changes: 41 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
run:
skip-dirs:
- examples

linters:
disable-all: true
enable:
- deadcode
- dupl
- errcheck
- exportloopref
- funlen
- goconst
- gocritic
- gocyclo
- gofmt
- goimports
- gosimple
- govet
- ineffassign
- lll
- misspell
- prealloc
- revive
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
- unused
- varcheck

issues:
exclude-rules:
# Exclude some linters from running on tests files.
- path: _test\.go
linters:
- errcheck
- unparam
- prealloc
13 changes: 0 additions & 13 deletions .travis.yml

This file was deleted.

4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# go-streams
[![Build Status](https://travis-ci.org/reugn/go-streams.svg?branch=master)](https://travis-ci.org/reugn/go-streams)
[![Build](https://github.com/reugn/go-streams/actions/workflows/build.yml/badge.svg)](https://github.com/reugn/go-streams/actions/workflows/build.yml)
[![PkgGoDev](https://pkg.go.dev/badge/github.com/reugn/go-streams)](https://pkg.go.dev/github.com/reugn/go-streams)
[![Go Report Card](https://goreportcard.com/badge/github.com/reugn/go-streams)](https://goreportcard.com/report/github.com/reugn/go-streams)
[![codecov](https://codecov.io/gh/reugn/go-streams/branch/master/graph/badge.svg)](https://codecov.io/gh/reugn/go-streams)
Expand Down Expand Up @@ -27,6 +27,7 @@ Flow capabilities ([flow](https://github.com/reugn/go-streams/tree/master/flow)
* Throttler
* SlidingWindow
* TumblingWindow
* SessionWindow

Supported Connectors:
* Go channels
Expand All @@ -36,6 +37,7 @@ Supported Connectors:
* [Aerospike](https://www.aerospike.com/)
* [Apache Kafka](https://kafka.apache.org/)
* [Apache Pulsar](https://pulsar.apache.org/)
* [NATS](https://nats.io/) Streaming
* [Redis](https://redis.io/)

## Examples
Expand Down
38 changes: 22 additions & 16 deletions aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/reugn/go-streams/flow"
)

// AerospikeProperties represents the Aerospike connector configuration properties.
// AerospikeProperties represents configuration properties for an Aerospike connector.
type AerospikeProperties struct {
Policy *aero.ClientPolicy
Hostname string
Expand All @@ -24,15 +24,15 @@ type AerospikeProperties struct {
SetName string
}

// ChangeNotificationProperties holds the Aerospike cluster events polling configuration.
// ChangeNotificationProperties contains the configuration for polling Aerospike cluster events.
type ChangeNotificationProperties struct {
PollingInterval time.Duration
}

// AerospikeSource represents an Aerospike source connector.
type AerospikeSource struct {
client *aero.Client
records chan *aero.Result
recordsChannel chan *aero.Result
scanPolicy *aero.ScanPolicy
out chan interface{}
ctx context.Context
Expand All @@ -59,7 +59,7 @@ func NewAerospikeSource(ctx context.Context,
records := make(chan *aero.Result)
source := &AerospikeSource{
client: client,
records: records,
recordsChannel: records,
scanPolicy: scanPolicy,
out: make(chan interface{}),
ctx: ctx,
Expand All @@ -76,7 +76,7 @@ func (as *AerospikeSource) poll() {
if as.changeNotificationProperties == nil {
// scan the entire namespace/set
as.doScan()
close(as.records)
close(as.recordsChannel)
return
}

Expand All @@ -87,6 +87,7 @@ loop:
select {
case <-as.ctx.Done():
break loop

case t := <-ticker.C:
ts := t.UnixNano() - as.changeNotificationProperties.PollingInterval.Nanoseconds()
as.scanPolicy.PredExp = []aero.PredExp{
Expand All @@ -104,10 +105,10 @@ loop:
func (as *AerospikeSource) doScan() {
recordSet, err := as.client.ScanAll(as.scanPolicy, as.properties.Namespase, as.properties.SetName)
if err != nil {
log.Printf("Aerospike client.ScanAll failed with: %v", err)
log.Printf("Aerospike client.ScanAll failed with: %s", err)
} else {
for result := range recordSet.Results() {
as.records <- result
as.recordsChannel <- result
}
}
}
Expand All @@ -122,16 +123,18 @@ loop:
select {
case <-sigchan:
break loop

case <-as.ctx.Done():
break loop
case result, ok := <-as.records:

case result, ok := <-as.recordsChannel:
if !ok {
break loop
}
if result.Err == nil {
as.out <- result.Record
} else {
log.Printf("Scan record error %s", result.Err)
log.Printf("Aerospike scan record error %s", result.Err)
}
}
}
Expand All @@ -152,8 +155,8 @@ func (as *AerospikeSource) Out() <-chan interface{} {
return as.out
}

// AerospikeKeyBins is an Aerospike Key and BinMap container.
// Use it to stream records to the AerospikeSink.
// AerospikeKeyBins represents an Aerospike Key and BinMap container.
// Use it to stream records to an AerospikeSink.
type AerospikeKeyBins struct {
Key *aero.Key
Bins aero.BinMap
Expand Down Expand Up @@ -200,21 +203,24 @@ func (as *AerospikeSink) init() {
if err := as.client.Put(as.writePolicy, m.Key, m.Bins); err != nil {
log.Printf("Aerospike client.Put failed with: %s", err)
}

case aero.BinMap:
// use the sha256 checksum of the BinMap as a Key
jsonStr, err := json.Marshal(m)
if err == nil {
key, err := aero.NewKey(as.properties.Namespase,
var key *aero.Key
// use BinMap sha256 checksum as record key
key, err = aero.NewKey(as.properties.Namespase,
as.properties.SetName,
sha256.Sum256([]byte(jsonStr)))
sha256.Sum256(jsonStr))
if err == nil {
as.client.Put(as.writePolicy, key, m)
err = as.client.Put(as.writePolicy, key, m)
}
}

if err != nil {
log.Printf("Error on processing Aerospike message: %s", err)
log.Printf("Error processing Aerospike message: %s", err)
}

default:
log.Printf("Unsupported message type %v", m)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.15
require (
github.com/Shopify/sarama v1.29.1
github.com/aerospike/aerospike-client-go/v5 v5.6.0
github.com/apache/pulsar-client-go v0.5.0
github.com/apache/pulsar-client-go v0.7.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/gorilla/websocket v1.4.2
github.com/nats-io/stan.go v0.9.0
github.com/nats-io/stan.go v0.10.2
github.com/reugn/go-streams v0.6.3
github.com/reugn/go-streams/aerospike v0.0.0
github.com/reugn/go-streams/kafka v0.0.0
Expand Down
Loading

0 comments on commit e98f8bb

Please sign in to comment.