Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 183 additions & 16 deletions redisearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package redisearch
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"

"time"

Expand Down Expand Up @@ -40,9 +42,7 @@ var DefaultOptions = Options{
// Cleint is an interface to redisearch's redis commands
type Client struct {
pool *redis.Pool

schema *Schema
name string
name string
}

var maxConns = 500
Expand All @@ -55,8 +55,7 @@ func NewClient(addr, name string) *Client {
// TODO: Add timeouts. and 2 separate pools for indexing and querying, with different timeouts
return redis.Dial("tcp", addr)
}, maxConns),
schema: nil,
name: name,
name: name,
}

ret.pool.TestOnBorrow = func(c redis.Conn, t time.Time) (err error) {
Expand All @@ -73,7 +72,6 @@ func NewClient(addr, name string) *Client {

// CreateIndex configues the index and creates it on redis
func (i *Client) CreateIndex(s *Schema) error {
i.schema = s
args := redis.Args{i.name}
// Set flags based on options
if s.Options.NoFieldFlags {
Expand All @@ -93,7 +91,7 @@ func (i *Client) CreateIndex(s *Schema) error {
}

args = append(args, "SCHEMA")
for _, f := range i.schema.Fields {
for _, f := range s.Fields {

switch f.Type {
case TextField:
Expand All @@ -110,7 +108,9 @@ func (i *Client) CreateIndex(s *Schema) error {
if opts.Sortable {
args = append(args, "SORTABLE")
}

if opts.NoStem {
args = append(args, "NOSTEM")
}
}

case NumericField:
Expand Down Expand Up @@ -148,6 +148,16 @@ type IndexingOptions struct {
Replace bool
}

// MultiError Represents one or more errors
type MultiError map[int]error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have a slice here where each sub error correlates to the index of the operation that triggered it


func (e MultiError) Error() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker, but wouldn't you in this case rather chain all error strings somehow?

Copy link
Contributor Author

@mnunberg mnunberg Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that require line breaks to make it readable, and wouldn't line breaks confuse a lot of things?

We can't really anticipate how many documents the user is putting in here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put newlines, we can always make it look like an array. It's just for debug printing etc.

for _, err := range e {
return err.Error()
}
return ""
}

// DefaultIndexingOptions are the default options for document indexing
var DefaultIndexingOptions = IndexingOptions{
Language: "",
Expand All @@ -156,15 +166,16 @@ var DefaultIndexingOptions = IndexingOptions{
}

// Index indexes multiple documents on the index, with optional Options passed to options
func (i *Client) IndexOptions(opts IndexingOptions, docs ...Document) error {
func (i *Client) IndexOptions(opts IndexingOptions, docs ...Document) (errors MultiError) {

conn := i.pool.Get()
defer conn.Close()

n := 0
errors = make(map[int]error)

for _, doc := range docs {
args := make(redis.Args, 0, len(i.schema.Fields)*2+6)
for ii, doc := range docs {
args := make(redis.Args, 0, 6+len(doc.Properties))
args = append(args, i.name, doc.Id, doc.Score)
// apply options
if opts.NoSave {
Expand All @@ -188,23 +199,29 @@ func (i *Client) IndexOptions(opts IndexingOptions, docs ...Document) error {
}

if err := conn.Send("FT.ADD", args...); err != nil {
return err
errors[ii] = err
return
}
n++
}

if err := conn.Flush(); err != nil {
return err
errors[-1] = err
return
}

for n > 0 {
if _, err := conn.Receive(); err != nil {
return err
errors[n-1] = err
}
n--
}

return nil
if len(errors) == 0 {
return nil
}

return
}

// convert the result from a redis query to a proper Document object
Expand Down Expand Up @@ -243,7 +260,7 @@ func loadDocument(arr []interface{}, idIdx, scoreIdx, payloadIdx, fieldsIdx int)
return doc, nil
}

func (i *Client) Index(docs ...Document) error {
func (i *Client) Index(docs ...Document) map[int]error {
return i.IndexOptions(DefaultIndexingOptions, docs...)
}

Expand Down Expand Up @@ -307,3 +324,153 @@ func (i *Client) Drop() error {
return err

}

// IndexInfo - Structure showing information about an existing index
type IndexInfo struct {
Schema Schema
Name string `redis:"index_name"`
DocCount uint64 `redis:"num_docs"`
RecordCount uint64 `redis:"num_records"`
TermCount uint64 `redis:"num_terms"`
MaxDocID uint64 `redis:"max_doc_id"`
InvertedIndexSizeMB float64 `redis:"inverted_sz_mb"`
OffsetVectorSizeMB float64 `redis:"offset_vector_sz_mb"`
DocTableSizeMB float64 `redis:"doc_table_size_mb"`
KeyTableSizeMB float64 `redis:"key_table_size_mb"`
RecordsPerDocAvg float64 `redis:"records_per_doc_avg"`
BytesPerRecordAvg float64 `redis:"bytes_per_record_avg"`
OffsetsPerTermAvg float64 `redis:"offsets_per_term_avg"`
OffsetBitsPerTermAvg float64 `redis:"offset_bits_per_record_avg"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will soon need the gc stats as well

}

func (info *IndexInfo) setTarget(key string, value interface{}) error {
v := reflect.ValueOf(info).Elem()
for i := 0; i < v.NumField(); i++ {
tag := v.Type().Field(i).Tag.Get("redis")
if tag == key {
targetInfo := v.Field(i)
switch targetInfo.Kind() {
case reflect.String:
s, _ := redis.String(value, nil)
targetInfo.SetString(s)
case reflect.Uint64:
u, _ := redis.Uint64(value, nil)
targetInfo.SetUint(u)
case reflect.Float64:
f, _ := redis.Float64(value, nil)
targetInfo.SetFloat(f)
default:
panic("Tag set without handler")
}
return nil
}
}
return errors.New("setTarget: No handler defined for :" + key)
}

func sliceIndex(haystack []string, needle string) int {
for pos, elem := range haystack {
if elem == needle {
return pos
}
}
return -1
}

func (info *IndexInfo) loadSchema(values []interface{}, options []string) {
// Values are a list of fields
scOptions := Options{}
for _, opt := range options {
switch strings.ToUpper(opt) {
case "NOFIELDS":
scOptions.NoFieldFlags = true
case "NOFREQS":
scOptions.NoFrequencies = true
case "NOOFFSETS":
scOptions.NoOffsetVectors = true
}
}
sc := NewSchema(scOptions)
for _, specTmp := range values {
// spec, isArr := specTmp.([]string)
// if !isArr {
// panic("Value is not an array of strings!")
// }
spec, err := redis.Strings(specTmp, nil)
if err != nil {
panic(err)
}
// Name, Type,
if len(spec) < 3 {
panic("Invalid spec")
}
var options []string
if len(spec) > 3 {
options = spec[3:]
} else {
options = []string{}
}

f := Field{Name: spec[0]}
switch strings.ToUpper(spec[2]) {
case "NUMERIC":
f.Type = NumericField
nfOptions := NumericFieldOptions{}
f.Options = nfOptions
if sliceIndex(options, "SORTABLE") != -1 {
nfOptions.Sortable = true
}
case "TEXT":
f.Type = TextField
tfOptions := TextFieldOptions{}
f.Options = tfOptions
if sliceIndex(options, "SORTABLE") != -1 {
tfOptions.Sortable = true
}
if wIdx := sliceIndex(options, "WEIGHT"); wIdx != -1 && wIdx+1 != len(spec) {
weightString := options[wIdx+1]
weight64, _ := strconv.ParseFloat(weightString, 32)
tfOptions.Weight = float32(weight64)
}
}
sc = sc.AddField(f)
}
info.Schema = *sc
}

// Info - Get information about the index. This can also be used to check if the
// index exists
func (i *Client) Info() (*IndexInfo, error) {
conn := i.pool.Get()
defer conn.Close()

res, err := redis.Values(conn.Do("FT.INFO", i.name))
if err != nil {
return nil, err
}

ret := IndexInfo{}
var schemaFields []interface{}
var indexOptions []string

// Iterate over the values
for ii := 0; ii < len(res); ii += 2 {
key, _ := redis.String(res[ii], nil)
if err := ret.setTarget(key, res[ii+1]); err == nil {
continue
}

switch key {
case "index_options":
indexOptions, _ = redis.Strings(res[ii+1], nil)
case "fields":
schemaFields, _ = redis.Values(res[ii+1], nil)
}
}

if schemaFields != nil {
ret.loadSchema(schemaFields, indexOptions)
}

return &ret, nil
}
26 changes: 21 additions & 5 deletions redisearch/redisearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@ package redisearch_test
import (
"fmt"
"log"
"os"
"testing"
"time"

"github.com/RedisLabs/redisearch-go/redisearch"
)

func createClient(indexName string) *redisearch.Client {
value, exists := os.LookupEnv("REDISEARCH_TEST_HOST")
host := "localhost:6379"
if exists && value != "" {
host = value
}
return redisearch.NewClient(host, indexName)
}

func TestClient(t *testing.T) {

c := redisearch.NewClient("localhost:6379", "testung")
c := createClient("testung")

sc := redisearch.NewSchema(redisearch.DefaultOptions).
AddField(redisearch.NewTextField("foo"))
Expand All @@ -25,10 +35,17 @@ func TestClient(t *testing.T) {
docs[i] = redisearch.NewDocument(fmt.Sprintf("doc%d", i), float32(i)/float32(100)).Set("foo", "hello world")
}

if err := c.Index(docs, redisearch.DefaultIndexingOptions); err != nil {
if err := c.IndexOptions(redisearch.DefaultIndexingOptions, docs...); err != nil {
t.Fatal(err)
}

// Test it again
if err := c.IndexOptions(redisearch.DefaultIndexingOptions, docs...); err == nil {
t.Fatal("Expected error for duplicate document")
} else if len(err) != 100 {
t.Fatal("Not enough errors received")
}

docs, total, err := c.Search(redisearch.NewQuery("hello world"))
fmt.Println(docs, total, err)
}
Expand All @@ -37,7 +54,7 @@ func ExampleClient() {

// Create a client. By default a client is schemaless
// unless a schema is provided when creating the index
c := redisearch.NewClient("localhost:6379", "myIndex")
c := createClient("myIndex")

// Create a schema
sc := redisearch.NewSchema(redisearch.DefaultOptions).
Expand All @@ -60,8 +77,7 @@ func ExampleClient() {
Set("date", time.Now().Unix())

// Index the document. The API accepts multiple documents at a time
if err := c.Index([]redisearch.Document{doc},
redisearch.DefaultIndexingOptions); err != nil {
if err := c.IndexOptions(redisearch.DefaultIndexingOptions, doc); err != nil {
log.Fatal(err)
}

Expand Down
1 change: 1 addition & 0 deletions redisearch/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Field struct {
type TextFieldOptions struct {
Weight float32
Sortable bool
NoStem bool
}

// NumericFieldOptions Options for numeric fields
Expand Down