Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: add support of a batch insert request #411

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:

jobs:
luacheck:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request' &&
Expand All @@ -32,7 +32,7 @@ jobs:
run: ./.rocks/bin/luacheck .

golangci-lint:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request' &&
Expand All @@ -57,7 +57,7 @@ jobs:
args: --out-${NO_FUTURE}format colored-line-number --config=.golangci.yaml

codespell:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request' &&
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ issues:
exclude-rules:
- linters:
- lll
source: "\t?// *(see )?https://"
source: "^\\s*//\\s*(\\S+\\s){0,3}https?://\\S+$"
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
56 changes: 56 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type.
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
return Arrow{arrow}, nil
}

// Raw returns a []byte that contains Arrow raw data.
func (a Arrow) Raw() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
if v.IsValid() {
return v.Interface().(Arrow).data, nil
}

return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
100 changes: 100 additions & 0 deletions arrow/arrow_test.go
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package arrow_test

import (
"bytes"
"encoding/hex"
"testing"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2/arrow"
"github.com/vmihailenco/msgpack/v5"
)

var longArrow, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

var tests = []struct {
name string
arr []byte
enc []byte
}{
{
"abc",
[]byte{'a', 'b', 'c'},
[]byte{0xc7, 0x3, 0x8, 'a', 'b', 'c'},
},
{
"empty",
[]byte{},
[]byte{0xc7, 0x0, 0x8},
},
{
"one",
[]byte{1},
[]byte{0xd4, 0x8, 0x1},
},
{
"long",
longArrow,
[]byte{
0xc8, 0x1, 0x10, 0x8, 0xff, 0xff, 0xff, 0xff, 0x70, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0,
0x0, 0x9e, 0xff, 0xff, 0xff, 0x4, 0x0, 0x1, 0x0, 0x4, 0x0, 0x0, 0x0, 0xb6, 0xff, 0xff,
0xff, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x4, 0x0, 0x0, 0x0, 0xda, 0xff, 0xff, 0xff, 0x14, 0x0, 0x0, 0x0, 0x2, 0x2, 0x0, 0x0,
0x4, 0x0, 0x0, 0x0, 0xf0, 0xff, 0xff, 0xff, 0x40, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0,
0x61, 0x0, 0x0, 0x0, 0x6, 0x0, 0x8, 0x0, 0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0x4, 0x0, 0x8,
0x0, 0x9, 0x0, 0xc, 0x0, 0xc, 0x0, 0xc, 0x0, 0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8, 0x0,
0xa, 0x0, 0xc, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xff, 0xff, 0xff, 0xff, 0x88, 0x0,
0x0, 0x0, 0x4, 0x0, 0x0, 0x0, 0x8a, 0xff, 0xff, 0xff, 0x4, 0x0, 0x3, 0x0, 0x10, 0x0,
0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xac, 0xff, 0xff,
0xff, 0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x34, 0x0, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x8, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0xa, 0x0, 0x14, 0x0,
0x4, 0x0, 0xc, 0x0, 0x10, 0x0, 0xc, 0x0, 0x14, 0x0, 0x4, 0x0, 0x6, 0x0, 0x8, 0x0, 0xc,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
},
},
}

func TestEncodeArrow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := bytes.NewBuffer([]byte{})
enc := msgpack.NewEncoder(buf)

arr, err := arrow.MakeArrow(tt.arr)
require.NoError(t, err)

err = enc.Encode(arr)
require.NoError(t, err)

require.Equal(t, tt.enc, buf.Bytes())
})

}
}

func TestDecodeArrow(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

buf := bytes.NewBuffer(tt.enc)
dec := msgpack.NewDecoder(buf)

var arr arrow.Arrow
err := dec.Decode(&arr)
require.NoError(t, err)

require.Equal(t, tt.arr, arr.Raw())
})
}
}
Comment on lines +68 to +100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no negative scenarios here, only positive ones. Please, add cases with encoding (if possible)/decoding (should be possible) errors.

31 changes: 31 additions & 0 deletions arrow/config-ee.lua
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename config.lua -> config-memtx.lua, config-ee.lua -> config-memcs.lua? Up to you.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg {
work_dir = os.getenv("TEST_TNT_WORK_DIR")
}

box.schema.user.create('test', {
password = 'test',
if_not_exists = true
})
box.schema.user.grant('test', 'execute', 'universe', nil, {
if_not_exists = true
})

local s = box.schema.space.create('testArrow', {
engine = 'memcs',
field_count = 1,
format = {{'a', 'uint64'}},
if_not_exists = true
})
s:create_index('primary')
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
if_not_exists = true
})

-- Set listen only when every other thing is configured.
box.cfg {
listen = 3013
}
35 changes: 35 additions & 0 deletions arrow/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg {
work_dir = os.getenv("TEST_TNT_WORK_DIR")
}

box.schema.user.create('test', {
password = 'test',
if_not_exists = true
})
box.schema.user.grant('test', 'execute', 'universe', nil, {
if_not_exists = true
})

local s = box.schema.space.create('testArrow', {
if_not_exists = true
})
s:create_index('primary', {
type = 'tree',
parts = {{
field = 1,
type = 'integer'
}},
if_not_exists = true
})
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
if_not_exists = true
})

-- Set listen only when every other thing is configured.
box.cfg {
listen = os.getenv("TEST_TNT_LISTEN")
}
59 changes: 59 additions & 0 deletions arrow/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Run Tarantool Enterprise Edition instance before example execution:
//
// Terminal 1:
// $ cd arrow
// $ TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool config-ee.lua
//
// Terminal 2:
// $ go test -v example_test.go
package arrow_test

import (
"context"
"encoding/hex"
"fmt"
"log"
"time"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
)

var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

func Example() {
dialer := tarantool.NetDialer{
Address: "127.0.0.1:3013",
User: "test",
Password: "test",
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
cancel()
if err != nil {
log.Fatalf("Failed to connect: %s", err)
}

arr, err := arrow.MakeArrow(arrowBinData)
if err != nil {
log.Fatalf("Failed prepare Arrow data: %s", err)
}

req := arrow.NewInsertRequest("testArrow").Arrow(arr)

resp, err := client.Do(req).Get()
if err != nil {
log.Fatalf("Failed insert Arrow: %s", err)
}
if len(resp) == 0 {
fmt.Printf("Batch arrow inserted")
}
Comment on lines +56 to +58
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So our intention is a little more clear.

Suggested change
if len(resp) == 0 {
fmt.Printf("Batch arrow inserted")
}
if len(resp) > 0 {
log.Fatalf("Unexpected response")
}
fmt.Printf("Batch arrow inserted")

}
Loading
Loading