Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from flow to flow_id and from groups to group_ids for ES queries #1089

Merged
merged 2 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions contactql/es/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,36 @@ import (
"github.com/olivere/elastic/v7"
)

// AssetMapper is used to map engine assets to however ES identifies them
type AssetMapper interface {
Flow(assets.Flow) int64
Group(assets.Group) int64
}

// ToElasticQuery converts a contactql query to an Elastic query
func ToElasticQuery(env envs.Environment, query *contactql.ContactQuery) elastic.Query {
func ToElasticQuery(env envs.Environment, mapper AssetMapper, query *contactql.ContactQuery) elastic.Query {
if query.Resolver() == nil {
panic("can only convert queries parsed with a resolver")
}

return nodeToElastic(env, query.Resolver(), query.Root())
return nodeToElastic(env, query.Resolver(), mapper, query.Root())
}

func nodeToElastic(env envs.Environment, resolver contactql.Resolver, node contactql.QueryNode) elastic.Query {
func nodeToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, node contactql.QueryNode) elastic.Query {
switch n := node.(type) {
case *contactql.BoolCombination:
return boolCombinationToElastic(env, resolver, n)
return boolCombinationToElastic(env, resolver, mapper, n)
case *contactql.Condition:
return conditionToElastic(env, resolver, n)
return conditionToElastic(env, resolver, mapper, n)
default:
panic(fmt.Sprintf("unsupported node type: %T", n))
}
}

func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver, combination *contactql.BoolCombination) elastic.Query {
func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, combination *contactql.BoolCombination) elastic.Query {
queries := make([]elastic.Query, len(combination.Children()))
for i, child := range combination.Children() {
queries[i] = nodeToElastic(env, resolver, child)
queries[i] = nodeToElastic(env, resolver, mapper, child)
}

if combination.Operator() == contactql.BoolOperatorAnd {
Expand All @@ -45,12 +51,12 @@ func boolCombinationToElastic(env envs.Environment, resolver contactql.Resolver,
return elastic.NewBoolQuery().Should(queries...)
}

func conditionToElastic(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) elastic.Query {
func conditionToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query {
switch c.PropertyType() {
case contactql.PropertyTypeField:
return fieldConditionToElastic(env, resolver, c)
case contactql.PropertyTypeAttribute:
return attributeConditionToElastic(env, resolver, c)
return attributeConditionToElastic(env, resolver, mapper, c)
case contactql.PropertyTypeScheme:
return schemeConditionToElastic(env, c)
default:
Expand Down Expand Up @@ -181,7 +187,7 @@ func fieldConditionToElastic(env envs.Environment, resolver contactql.Resolver,
panic(fmt.Sprintf("unsupported field type: %s", fieldType))
}

func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) elastic.Query {
func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) elastic.Query {
key := c.PropertyKey()
value := strings.ToLower(c.Value())
var query elastic.Query
Expand Down Expand Up @@ -303,16 +309,16 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv

switch c.Operator() {
case contactql.OpEqual:
return elastic.NewTermQuery("groups", group.UUID())
return elastic.NewTermQuery("group_ids", mapper.Group(group))
case contactql.OpNotEqual:
return not(elastic.NewTermQuery("groups", group.UUID()))
return not(elastic.NewTermQuery("group_ids", mapper.Group(group)))
default:
panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator()))
}
case contactql.AttributeFlow:
// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query = elastic.NewExistsQuery("flow")
query = elastic.NewExistsQuery("flow_id")
if c.Operator() == contactql.OpEqual {
query = not(query)
}
Expand All @@ -323,9 +329,9 @@ func attributeConditionToElastic(env envs.Environment, resolver contactql.Resolv

switch c.Operator() {
case contactql.OpEqual:
return elastic.NewTermQuery("flow", flow.UUID())
return elastic.NewTermQuery("flow_id", mapper.Flow(flow))
case contactql.OpNotEqual:
return not(elastic.NewTermQuery("flow", flow.UUID()))
return not(elastic.NewTermQuery("flow_id", mapper.Flow(flow)))
default:
panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator()))
}
Expand Down
34 changes: 30 additions & 4 deletions contactql/es/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ import (
"github.com/stretchr/testify/require"
)

type MockMapper struct {
flows map[assets.FlowUUID]int64
groups map[assets.GroupUUID]int64
}

func (m *MockMapper) Flow(f assets.Flow) int64 {
return m.flows[f.UUID()]
}

func (m *MockMapper) Group(g assets.Group) int64 {
return m.groups[g.UUID()]
}

func newMockMapper(flows map[assets.FlowUUID]int64, groups map[assets.GroupUUID]int64) *MockMapper {
return &MockMapper{flows, groups}
}

func newMockResolver() contactql.Resolver {
return contactql.NewMockResolver(
[]assets.Field{
Expand All @@ -29,18 +46,27 @@ func newMockResolver() contactql.Resolver {
static.NewField("54c72635-d747-4e45-883c-099d57dd998e", "district", "District", assets.FieldTypeDistrict),
static.NewField("fde8f740-c337-421b-8abb-83b954897c80", "ward", "Ward", assets.FieldTypeWard),
},
[]assets.Flow{
static.NewFlow("c261165a-f5b0-40ba-b916-76fb49667a4f", "Registration", []byte(`{}`)),
},
[]assets.Group{
static.NewGroup("8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf", "U-Reporters", ""),
static.NewGroup("cf51cf8d-94da-447a-b27e-a42a900c37a6", "Testers", ""),
},
[]assets.Flow{
static.NewFlow("c261165a-f5b0-40ba-b916-76fb49667a4f", "Registration", []byte(`{}`)),
},
)
}

func TestElasticQuery(t *testing.T) {
resolver := newMockResolver()
mapper := newMockMapper(
map[assets.FlowUUID]int64{
"c261165a-f5b0-40ba-b916-76fb49667a4f": 234, // Registration
},
map[assets.GroupUUID]int64{
"8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf": 345, // U-Reporters
"cf51cf8d-94da-447a-b27e-a42a900c37a6": 456, // Testers
},
)

type testCase struct {
Description string `json:"description"`
Expand All @@ -67,7 +93,7 @@ func TestElasticQuery(t *testing.T) {
parsed, err := contactql.ParseQuery(env, tc.Query, resolver)
require.NoError(t, err)

query := es.ToElasticQuery(env, parsed)
query := es.ToElasticQuery(env, mapper, parsed)
assert.NotNil(t, query, tc.Description)

source, err := query.Source()
Expand Down
12 changes: 6 additions & 6 deletions contactql/es/testdata/to_query.json
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,7 @@
"query": "group = \"U-Reporters\"",
"elastic": {
"term": {
"groups": "8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf"
"group_ids": 345
}
}
},
Expand All @@ -1685,7 +1685,7 @@
"bool": {
"must_not": {
"term": {
"groups": "8de30b78-d9ef-4db2-b2e8-4f7b6aef64cf"
"group_ids": 345
}
}
}
Expand All @@ -1696,7 +1696,7 @@
"query": "flow = \"registration\"",
"elastic": {
"term": {
"flow": "c261165a-f5b0-40ba-b916-76fb49667a4f"
"flow_id": 234
}
}
},
Expand All @@ -1707,7 +1707,7 @@
"bool": {
"must_not": {
"term": {
"flow": "c261165a-f5b0-40ba-b916-76fb49667a4f"
"flow_id": 234
}
}
}
Expand All @@ -1718,7 +1718,7 @@
"query": "flow != \"\"",
"elastic": {
"exists": {
"field": "flow"
"field": "flow_id"
}
}
},
Expand All @@ -1729,7 +1729,7 @@
"bool": {
"must_not": {
"exists": {
"field": "flow"
"field": "flow_id"
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion contactql/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ func TestEvaluateQuery(t *testing.T) {
static.NewField("023f733d-ce00-4a61-96e4-b411987028ea", "empty", "Empty", assets.FieldTypeText),
static.NewField("81e25783-a1d8-42b9-85e4-68c7ab2df39d", "xyz", "XYZ", assets.FieldTypeText),
},
[]assets.Group{},
[]assets.Flow{
static.NewFlow("ea351bf8-3c49-46dd-935c-5b20e2a00b7a", "Registration", []byte(`{}`)),
static.NewFlow("1b73528f-6e4e-4c64-b393-78088449fb49", "Catch All", []byte(`{}`)),
},
[]assets.Group{},
)

for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion contactql/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestInspect(t *testing.T) {
static.NewField(assets.FieldUUID("3810a485-3fda-4011-a589-7320c0b8dbef"), "dob", "DOB", assets.FieldTypeDatetime),
static.NewField(assets.FieldUUID("d66a7823-eada-40e5-9a3a-57239d4690bf"), "gender", "Gender", assets.FieldTypeText),
},
[]assets.Flow{},
[]assets.Group{
static.NewGroup(assets.GroupUUID("4eeca453-f474-4767-bdd0-434b180223db"), "U-Reporters", ""),
},
[]assets.Flow{},
)

tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions contactql/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (

type mockResolver struct {
fields []assets.Field
groups []assets.Group
flows []assets.Flow
groups []assets.Group
}

// NewMockResolver creates a new mock resolver for fields and groups
func NewMockResolver(fields []assets.Field, groups []assets.Group, flows []assets.Flow) Resolver {
func NewMockResolver(fields []assets.Field, flows []assets.Flow, groups []assets.Group) Resolver {
return &mockResolver{
fields: fields,
groups: groups,
flows: flows,
groups: groups,
}
}

Expand Down
8 changes: 4 additions & 4 deletions contactql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func TestParseQuery(t *testing.T) {
static.NewField("165def68-3216-4ebf-96bc-f6f1ee5bd966", "state", "State", assets.FieldTypeState),
static.NewField("85baf5e1-b57a-46dc-a726-a84e8c4229c7", "dob", "DOB", assets.FieldTypeDatetime),
},
[]assets.Group{
static.NewGroup("a9b5b0a0-1098-4bc2-8384-eea09ae43e6b", "U-Reporters", ""),
},
[]assets.Flow{
static.NewFlow("f87fd7cd-e501-4394-9cff-62309af85138", "Registration", []byte(`{}`)),
},
[]assets.Group{
static.NewGroup("a9b5b0a0-1098-4bc2-8384-eea09ae43e6b", "U-Reporters", ""),
},
)

tests := []struct {
Expand Down Expand Up @@ -358,8 +358,8 @@ func TestParsingErrors(t *testing.T) {
static.NewField("3810a485-3fda-4011-a589-7320c0b8dbef", "dob", "DOB", assets.FieldTypeDatetime),
static.NewField("d66a7823-eada-40e5-9a3a-57239d4690bf", "gender", "Gender", assets.FieldTypeText),
},
[]assets.Group{},
[]assets.Flow{},
[]assets.Group{},
)

for _, tc := range tests {
Expand Down