Skip to content

Commit

Permalink
api: add support of a batch insert request
Browse files Browse the repository at this point in the history
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW .

Closes #399
  • Loading branch information
dmyger committed Oct 7, 2024
1 parent 592db69 commit 4c52c2f
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 21 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
## [Unreleased]

### Added
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
connection and ctx is not canceled;
also added logs for error case of `ConnectionPool.tryConnect()` calls in
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
- Methods that are implemented but not included in the pooler interface (#395).
- Implemented stringer methods for pool.Role (#405).
- Support the IPROTO_INSERT_ARROW request (#399).

### Changed

Expand Down
59 changes: 59 additions & 0 deletions arrow/arrow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package arrow

import (
"fmt"
"reflect"

"github.com/vmihailenco/msgpack/v5"
)

// Arrow MessagePack extension type
const arrowExtId = 8

// Arrow struct wraps a raw arrow data buffer.
type Arrow struct {
data []byte
}

// MakeArrow returns a new arrow.Arrow object that contains
// wrapped a raw arrow data buffer.
func MakeArrow(arrow []byte) (Arrow, error) {
if len(arrow) == 0 {
return Arrow{}, fmt.Errorf("no Arrow data")
}
return Arrow{arrow}, nil
}

// ToArrow returns a []byte that contains Arrow raw data.
func (a *Arrow) ToArrow() []byte {
return a.data
}

func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
arrow := Arrow{
data: make([]byte, 0, extLen),
}
n, err := d.Buffered().Read(arrow.data)
if err != nil {
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
}
if n < extLen || n != len(arrow.data) {
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
}

v.Set(reflect.ValueOf(arrow))
return nil
}

func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
if v.IsValid() {
return v.Interface().(Arrow).data, nil
}

return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
}

func init() {
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
}
130 changes: 130 additions & 0 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package arrow_test

import (
"encoding/hex"
"log"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
"github.com/tarantool/go-tarantool/v2/test_helpers"
)

var isArrowSupported = false

var server = "127.0.0.1:3013"
var dialer = tarantool.NetDialer{
Address: server,
User: "test",
Password: "test",
}
var space = "testArrow"

var opts = tarantool.Opts{
Timeout: 5 * time.Second,
}

func skipIfArrowUnsupported(t *testing.T) {
t.Helper()
if !isArrowSupported {
t.Skip("Skipping test for Tarantool without Arrow support in msgpack")
}
}

// TestInsert uses Arrow sequence from Tarantool's test .
// nolint:lll
// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua
func TestInsert_invalid(t *testing.T) {
skipIfArrowUnsupported(t)

arrows := []struct {
arrow string
expected string
}{
{
"",
"no Arrow data",
},
{
"00",
"Failed to decode Arrow IPC data",
},
{
"ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000",
"memtx does not support arrow format",
},
}

conn := test_helpers.ConnectWithValidation(t, dialer, opts)
defer conn.Close()

for i, a := range arrows {
t.Run(strconv.Itoa(i), func(t *testing.T) {
data, err := hex.DecodeString(a.arrow)
require.NoError(t, err)

arr, err := arrow.MakeArrow(data)
if err != nil {
require.ErrorContains(t, err, a.expected)
return
}
req := arrow.NewInsertRequest(space, arr)

_, err = conn.Do(req).Get()
require.ErrorContains(t, err, a.expected)
})
}

}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
// is a separate function, see
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
func runTestMain(m *testing.M) int {
isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0)
if err != nil {
log.Fatalf("Failed to extract Tarantool version: %s", err)
}
isArrowSupported = !isLess

if !isArrowSupported {
log.Println("Skipping insert Arrow tests...")
return m.Run()
}

instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{
Dialer: dialer,
InitScript: "config.lua",
Listen: server,
WaitStart: 100 * time.Millisecond,
ConnectRetry: 10,
RetryTimeout: 500 * time.Millisecond,
})
defer test_helpers.StopTarantoolWithCleanup(instance)

if err != nil {
log.Printf("Failed to prepare test Tarantool: %s", err)
return 1
}

return m.Run()
}

func TestMain(m *testing.M) {
code := runTestMain(m)
os.Exit(code)
}
36 changes: 36 additions & 0 deletions arrow/config.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Do not set listen for now so connector won't be
-- able to send requests until everything is configured.
box.cfg {
work_dir = os.getenv("TEST_TNT_WORK_DIR")
}

box.schema.user.create('test', {
password = 'test',
if_not_exists = true
})
box.schema.user.grant('test', 'execute', 'universe', nil, {
if_not_exists = true
})

local s = box.schema.space.create('testArrow', {
id = 524,
if_not_exists = true
})
s:create_index('primary', {
type = 'tree',
parts = {{
field = 1,
type = 'integer'
}},
if_not_exists = true
})
s:truncate()

box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
if_not_exists = true
})

-- Set listen only when every other thing is configured.
box.cfg {
listen = os.getenv("TEST_TNT_LISTEN")
}
62 changes: 62 additions & 0 deletions arrow/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Run Tarantool instance before example execution:
//
// Terminal 1:
// $ cd arrow
// $ TEST_TNT_LISTEN=3013 TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool config.lua
//
// Terminal 2:
// $ go test -v example_test.go
package arrow_test

import (
"context"
"encoding/hex"
"fmt"
"log"
"strings"
"time"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/arrow"
)

var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
"01000000000000003400000008000000000000000200000000000000000000000000" +
"00000000000000000000000000000800000000000000000000000100000001000000" +
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
"00000000000000000000")

func Example() {
dialer := tarantool.NetDialer{
Address: "127.0.0.1:3013",
User: "test",
Password: "test",
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
cancel()
if err != nil {
log.Fatalf("Failed to connect: %s", err)
}

arr, err := arrow.MakeArrow(arrowBinData)
if err != nil {
log.Fatalf("Failed prepare Arrow data: %s", err)
}

spaceNo := uint32(524)
req := arrow.NewInsertRequest(spaceNo, arr)

_, err = client.Do(req).Get()
if err != nil {
msg := strings.Split(err.Error(), "(")[0]
fmt.Printf("Failed insert Arrow: %s\n", msg)
}

//! Output:
// Failed insert Arrow: memtx does not support arrow format
}
Loading

0 comments on commit 4c52c2f

Please sign in to comment.