Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/connectors_service/NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7566,7 +7566,7 @@ made under the terms of *both* these licenses.


soupsieve
2.8
2.8.1
MIT License
MIT License

Expand Down
8 changes: 8 additions & 0 deletions app/connectors_service/connectors/kibana.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ async def prepare(service_type, index_name, config, connector_definition=None):
schema=connector_configuration,
)

if connector_definition is not None and "filtering" in connector_definition:
logger.info(f"Updating filtering for '{connector_name}' connector")
filtering = connector_definition["filtering"]
await connector_index.api.client.connector.update_filtering(
connector_id=config["connectors"][0]["connector_id"],
filtering=filtering,
)

logger.info(f"Prepare {index_name}")
await upsert_index(es, index_name)
logger.info("Done")
Expand Down
11 changes: 4 additions & 7 deletions app/connectors_service/connectors/sources/mssql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,10 @@ async def get_docs(self, filtering=None):
tables = rule.get("tables")
id_columns = rule.get("id_columns", [])

id_columns_str = ""
for i, table in enumerate(tables):
if i == 0:
id_columns_str = f"{self.schema}_{table}"
else:
id_columns_str = f"{id_columns_str}_{table}"
id_columns = [f"{id_columns_str}_{column}" for column in id_columns]
if id_columns:
id_columns = map_column_names(
column_names=id_columns, schema=self.schema, tables=tables
)

async for row in self.fetch_documents_from_query(
tables=tables, query=query, id_columns=id_columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,9 @@ async def get_docs(self, filtering=None):
tables = rule.get("tables")
id_columns = rule.get("id_columns")
if id_columns:
id_columns = [
f"{self.schema}_{'_'.join(sorted(tables))}_{column}"
for column in id_columns
]
id_columns = map_column_names(
column_names=id_columns, schema=self.schema, tables=tables
)
async for row in self.fetch_documents_from_query(
tables=tables, query=query, id_columns=id_columns
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,68 @@
"type": "str",
"value": ""
}
}
},
"filtering": [
{
"domain": "DEFAULT",
"draft": {
"advanced_snippet": {
"updated_at": "2023-01-31T16:41:27.341Z",
"created_at": "2023-01-31T16:38:49.244Z",
"value": [
{"tables": ["customers_0"], "query": "SELECT * FROM customers_0"},
{"tables": ["customers_1"], "query": "SELECT * FROM customers_1"},
{"tables": ["customers_2"], "query": "SELECT * FROM customers_2"},
{
"tables": ["MixedCase_Table"],
"query": "SELECT * FROM \"MixedCase_Table\"",
"id_columns": ["ID"]
}
]
},
"rules": [
{
"field": "_",
"updated_at": "2023-01-31T16:41:27.341Z",
"created_at": "2023-01-31T16:38:49.244Z",
"rule": "regex",
"id": "DEFAULT",
"value": ".*",
"order": 1,
"policy": "include"
}
],
"validation": {"state": "valid", "errors": []}
},
"active": {
"advanced_snippet": {
"updated_at": "2023-01-31T16:41:27.341Z",
"created_at": "2023-01-31T16:38:49.244Z",
"value": [
{"tables": ["customers_0"], "query": "SELECT * FROM customers_0"},
{"tables": ["customers_1"], "query": "SELECT * FROM customers_1"},
{"tables": ["customers_2"], "query": "SELECT * FROM customers_2"},
{
"tables": ["MixedCase_Table"],
"query": "SELECT * FROM \"MixedCase_Table\"",
"id_columns": ["ID"]
}
]
},
"rules": [
{
"field": "_",
"updated_at": "2023-01-31T16:41:27.341Z",
"created_at": "2023-01-31T16:38:49.244Z",
"rule": "regex",
"id": "DEFAULT",
"value": ".*",
"order": 1,
"policy": "include"
}
],
"validation": {"state": "valid", "errors": []}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@

RECORDS_TO_DELETE = 10

# Mixed-case table for testing id_columns case sensitivity.
# This table uses uppercase column names ("ID", "Name", "Age") and is synced
# via advanced sync rules with id_columns: ["ID"] (uppercase).
MIXED_CASE_TABLE = "MixedCase_Table"
MIXED_CASE_RECORD_COUNT = 10

event_loop = asyncio.get_event_loop()


def get_num_docs():
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE))
print(NUM_TABLES * (RECORD_COUNT - RECORDS_TO_DELETE) + MIXED_CASE_RECORD_COUNT)


async def load():
Expand Down Expand Up @@ -91,8 +97,24 @@ async def load_rows():
await inject_lines(table, connect, RECORD_COUNT)
await connect.close()

async def load_mixed_case_table():
"""Create a table with mixed-case name to test id_columns case sensitivity.
This table is used with advanced sync rules to verify that id_columns
with different casing (e.g., 'ID' vs 'id') correctly match column names.
"""
connect = await asyncpg.connect(CONNECTION_STRING)
print(f"Adding data to mixed-case table '{MIXED_CASE_TABLE}'...")
sql_query = f'CREATE TABLE IF NOT EXISTS "{MIXED_CASE_TABLE}" ("ID" SERIAL PRIMARY KEY, "Name" VARCHAR(255), "Age" int)'
await connect.execute(sql_query)
rows = [(f"User_{i}", i) for i in range(MIXED_CASE_RECORD_COUNT)]
sql_query = f'INSERT INTO "{MIXED_CASE_TABLE}" ("Name", "Age") VALUES ($1, $2)'
await connect.executemany(sql_query, rows)
print(f"Inserted {MIXED_CASE_RECORD_COUNT} rows into '{MIXED_CASE_TABLE}'")
await connect.close()

await create_readonly_user()
await load_rows()
await load_mixed_case_table()


async def remove():
Expand Down
38 changes: 38 additions & 0 deletions app/connectors_service/tests/sources/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,44 @@ async def test_advanced_rules_validation_when_id_in_source_available(
},
],
),
(
# Configured with id_columns using mixed case - tests case sensitivity
Filter(
{
ADVANCED_SNIPPET: {
"value": [
{
"tables": ["emp_table"],
"query": "select * from emp_table",
"id_columns": [
"IDS"
], # uppercase to test case insensitivity
},
]
}
}
),
[
{
"dbo_emp_table_ids": 1,
"dbo_emp_table_names": "abcd",
"_id": "xe_dbo_emp_table_1",
"_timestamp": "2023-02-21T08:37:15+00:00",
"database": "xe",
"table": ["emp_table"],
"schema": "dbo",
},
{
"dbo_emp_table_ids": 2,
"dbo_emp_table_names": "xyz",
"_id": "xe_dbo_emp_table_2",
"_timestamp": "2023-02-21T08:37:15+00:00",
"database": "xe",
"table": ["emp_table"],
"schema": "dbo",
},
],
),
],
)
@pytest.mark.asyncio
Expand Down
38 changes: 38 additions & 0 deletions app/connectors_service/tests/sources/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,44 @@ async def test_get_docs():
},
],
),
(
# Configured with id_columns using mixed case - tests case sensitivity
Filter(
{
ADVANCED_SNIPPET: {
"value": [
{
"tables": ["emp_table"],
"query": "select * from emp_table",
"id_columns": [
"IDS"
], # uppercase to test case insensitivity
},
]
}
}
),
[
{
"public_emp_table_ids": 1,
"public_emp_table_names": "abcd",
"_id": "xe_public_emp_table_1",
"_timestamp": "2023-02-21T08:37:15+00:00",
"database": "xe",
"table": ["emp_table"],
"schema": "public",
},
{
"public_emp_table_ids": 2,
"public_emp_table_names": "xyz",
"_id": "xe_public_emp_table_2",
"_timestamp": "2023-02-21T08:37:15+00:00",
"database": "xe",
"table": ["emp_table"],
"schema": "public",
},
],
),
],
)
@pytest.mark.asyncio
Expand Down