Skip to content

Commit

Permalink
Merge pull request #1089 from nyaruka/es_flow_id
Browse files Browse the repository at this point in the history
Switch from `flow` to `flow_id` and from `groups` to `group_ids` for ES queries
  • Loading branch information
rowanseymour authored Apr 4, 2022
2 parents df7dc9f + dde8230 commit 2463bbb
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 34 deletions.
36 changes: 21 additions & 15 deletions contactql/es/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,36 @@ import (
"github.com/olivere/elastic/v7"
)

// AssetMapper is used to map engine assets to however ES identifies them
type AssetMapper interface {
Flow(assets.Flow) int64
Group(assets.Group) int64
}

// ToElasticQuery converts a contactql query to an Elastic query
func ToElasticQuery(env envs.Environment, query *contactql.ContactQuery) elastic.Query {
func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.ContactQuery) elastic.Query {
if query.Resolver() == nil {
panic("can only convert queries parsed with a resolver")
}

return nodeToElastic(env, query.Resolver(), query.Root())
return nodeToElastic(env, query.Resolver(), mapper, query.Root())
}

func nodeToElastic(env envs.Environment, resolver contactql.Resolver, node contactql.QueryNode) elastic.Query {
func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, node contactql.QueryNode) elastic.Query {
switch n := node.(type) {
case *contactql.BoolCombination:
return boolCombinationToElastic(env, resolver, n)
return boolCombinationToElastic(env, resolver, mapper, n)
case *contactql.Condition:
return conditionToElastic(env, resolver, n)
return conditionToElastic(env, resolver, mapper, n)
default:
panic(fmt.Sprintf("unsupported node type: %T", n))
}
}

func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver, combination *contactql.BoolCombination) elastic.Query {
func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, combination *contactql.BoolCombination) elastic.Query {
queries := make([]elastic.Query, len(combination.Children()))
for i, child := range combination.Children() {
queries[i] = nodeToElastic(env, resolver, child)
queries[i] = nodeToElastic(env, resolver, mapper, child)
}

if combination.Operator() == contactql.BoolOperatorAnd {
Expand All @@ -45,12 +51,12 @@ func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver,
return elastic.NewBoolQuery().Should(queries...)
}

func conditionToElastic(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) elastic.Query {
func conditionToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query {
switch c.PropertyType() {
case contactql.PropertyTypeField:
return fieldConditionToElastic(env, resolver, c)
case contactql.PropertyTypeAttribute:
return attributeConditionToElastic(env, resolver, c)
return attributeConditionToElastic(env, resolver, mapper, c)
case contactql.PropertyTypeScheme:
return schemeConditionToElastic(env, c)
default:
Expand Down Expand Up @@ -181,7 +187,7 @@ func fieldConditionToElastic(env envs.Environment, resolver contactql.Resolver,
panic(fmt.Sprintf("unsupported field type: %s", fieldType))
}

func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) elastic.Query {
func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query {
key := c.PropertyKey()
value := strings.ToLower(c.Value())
var query elastic.Query
Expand Down Expand Up @@ -303,16 +309,16 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv

switch c.Operator() {
case contactql.OpEqual:
return elastic.NewTermQuery("groups", group.UUID())
return elastic.NewTermQuery("group_ids", mapper.Group(group))
case contactql.OpNotEqual:
return not(elastic.NewTermQuery("groups", group.UUID()))
return not(elastic.NewTermQuery("group_ids", mapper.Group(group)))
default:
panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator()))
}
case contactql.AttributeFlow:
// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query = elastic.NewExistsQuery("flow")
query = elastic.NewExistsQuery("flow_id")
if c.Operator() == contactql.OpEqual {
query = not(query)
}
Expand All @@ -323,9 +329,9 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv

switch c.Operator() {
case contactql.OpEqual:
return elastic.NewTermQuery("flow", flow.UUID())
return elastic.NewTermQuery("flow_id", mapper.Flow(flow))
case contactql.OpNotEqual:
return not(elastic.NewTermQuery("flow", flow.UUID()))
return not(elastic.NewTermQuery("flow_id", mapper.Flow(flow)))
default:
panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator()))
}
Expand Down
34 changes: 30 additions & 4 deletions contactql/es/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ import (
"github.com/stretchr/testify/require"
)

type MockMapper struct {
flows map[assets.FlowUUID]int64
groups map[assets.GroupUUID]int64
}

func (m *MockMapper) Flow(f assets.Flow) int64 {
return m.flows[f.UUID()]
}

func (m *MockMapper) Group(g assets.Group) int64 {
return m.groups[g.UUID()]
}

func newMockMapper(flows map[assets.FlowUUID]int64, groups map[assets.GroupUUID]int64) *MockMapper {
return &MockMapper{flows, groups}
}

func newMockResolver() contactql.Resolver {
return contactql.NewMockResolver(
[]assets.Field{
Expand All @@ -29,18 +46,27 @@ func newMockResolver() contactql.Resolver {
static.NewField("54c72635-d747-4e45-883c-099d57dd998e", "district", "District", assets.FieldTypeDistrict),
static.NewField("fde8f740-c337-421b-8abb-83b954897c80", "ward", "Ward", assets.FieldTypeWard),
},
[]assets.Flow{
static.NewFlow("c261165a-f5b0-40ba-b916-76fb49667a4f", "Registration", []byte(`{}`)),
},
[]assets.Group{
static.NewGroup("8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf", "U-Reporters", ""),
static.NewGroup("cf51cf8d-94da-447a-b27e-a42a900c37a6", "Testers", ""),
},
[]assets.Flow{
static.NewFlow("c261165a-f5b0-40ba-b916-76fb49667a4f", "Registration", []byte(`{}`)),
},
)
}

func TestElasticQuery(t *testing.T) {
resolver := newMockResolver()
mapper := newMockMapper(
map[assets.FlowUUID]int64{
"c261165a-f5b0-40ba-b916-76fb49667a4f": 234, // Registration
},
map[assets.GroupUUID]int64{
"8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf": 345, // U-Reporters
"cf51cf8d-94da-447a-b27e-a42a900c37a6": 456, // Testers
},
)

type testCase struct {
Description string `json:"description"`
Expand All @@ -67,7 +93,7 @@ func TestElasticQuery(t *testing.T) {
parsed, err := contactql.ParseQuery(env, tc.Query, resolver)
require.NoError(t, err)

query := es.ToElasticQuery(env, parsed)
query := es.ToElasticQuery(env, mapper, parsed)
assert.NotNil(t, query, tc.Description)

source, err := query.Source()
Expand Down
12 changes: 6 additions & 6 deletions contactql/es/testdata/to_query.json
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,7 @@
"query": "group = \"U-Reporters\"",
"elastic": {
"term": {
"groups": "8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf"
"group_ids": 345
}
}
},
Expand All @@ -1685,7 +1685,7 @@
"bool": {
"must_not": {
"term": {
"groups": "8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf"
"group_ids": 345
}
}
}
Expand All @@ -1696,7 +1696,7 @@
"query": "flow = \"registration\"",
"elastic": {
"term": {
"flow": "c261165a-f5b0-40ba-b916-76fb49667a4f"
"flow_id": 234
}
}
},
Expand All @@ -1707,7 +1707,7 @@
"bool": {
"must_not": {
"term": {
"flow": "c261165a-f5b0-40ba-b916-76fb49667a4f"
"flow_id": 234
}
}
}
Expand All @@ -1718,7 +1718,7 @@
"query": "flow != \"\"",
"elastic": {
"exists": {
"field": "flow"
"field": "flow_id"
}
}
},
Expand All @@ -1729,7 +1729,7 @@
"bool": {
"must_not": {
"exists": {
"field": "flow"
"field": "flow_id"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion contactql/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ func TestEvaluateQuery(t *testing.T) {
static.NewField("023f733d-ce00-4a61-96e4-b411987028ea", "empty", "Empty", assets.FieldTypeText),
static.NewField("81e25783-a1d8-42b9-85e4-68c7ab2df39d", "xyz", "XYZ", assets.FieldTypeText),
},
[]assets.Group{},
[]assets.Flow{
static.NewFlow("ea351bf8-3c49-46dd-935c-5b20e2a00b7a", "Registration", []byte(`{}`)),
static.NewFlow("1b73528f-6e4e-4c64-b393-78088449fb49", "Catch All", []byte(`{}`)),
},
[]assets.Group{},
)

for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion contactql/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestInspect(t *testing.T) {
static.NewField(assets.FieldUUID("3810a485-3fda-4011-a589-7320c0b8dbef"), "dob", "DOB", assets.FieldTypeDatetime),
static.NewField(assets.FieldUUID("d66a7823-eada-40e5-9a3a-57239d4690bf"), "gender", "Gender", assets.FieldTypeText),
},
[]assets.Flow{},
[]assets.Group{
static.NewGroup(assets.GroupUUID("4eeca453-f474-4767-bdd0-434b180223db"), "U-Reporters", ""),
},
[]assets.Flow{},
)

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions contactql/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (

type mockResolver struct {
fields []assets.Field
groups []assets.Group
flows []assets.Flow
groups []assets.Group
}

// NewMockResolver creates a new mock resolver for fields and groups
func NewMockResolver(fields []assets.Field, groups []assets.Group, flows []assets.Flow) Resolver {
func NewMockResolver(fields []assets.Field, flows []assets.Flow, groups []assets.Group) Resolver {
return &mockResolver{
fields: fields,
groups: groups,
flows: flows,
groups: groups,
}
}

Expand Down
8 changes: 4 additions & 4 deletions contactql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func TestParseQuery(t *testing.T) {
static.NewField("165def68-3216-4ebf-96bc-f6f1ee5bd966", "state", "State", assets.FieldTypeState),
static.NewField("85baf5e1-b57a-46dc-a726-a84e8c4229c7", "dob", "DOB", assets.FieldTypeDatetime),
},
[]assets.Group{
static.NewGroup("a9b5b0a0-1098-4bc2-8384-eea09ae43e6b", "U-Reporters", ""),
},
[]assets.Flow{
static.NewFlow("f87fd7cd-e501-4394-9cff-62309af85138", "Registration", []byte(`{}`)),
},
[]assets.Group{
static.NewGroup("a9b5b0a0-1098-4bc2-8384-eea09ae43e6b", "U-Reporters", ""),
},
)

tests := []struct {
Expand Down Expand Up @@ -358,8 +358,8 @@ func TestParsingErrors(t *testing.T) {
static.NewField("3810a485-3fda-4011-a589-7320c0b8dbef", "dob", "DOB", assets.FieldTypeDatetime),
static.NewField("d66a7823-eada-40e5-9a3a-57239d4690bf", "gender", "Gender", assets.FieldTypeText),
},
[]assets.Group{},
[]assets.Flow{},
[]assets.Group{},
)

for _, tc := range tests {
Expand Down

0 comments on commit 2463bbb

Please sign in to comment.