Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions ibis-server/app/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ async def dispatch(self, request: Request, call_next) -> Response:
with logger.contextualize(correlation_id=correlation_id):
logger.info("{method} {path}", method=request.method, path=request.url.path)
logger.info("Request params: {params}", params=dict(request.query_params))
# Redact sensitive headers before logging
sensitive = {
"authorization",
"proxy-authorization",
"cookie",
"set-cookie",
}
redacted_headers = {}
for k, v in request.headers.items():
kl = k.lower()
if kl in sensitive:
redacted_headers[k] = "REDACTED"
else:
redacted_headers[k] = v
logger.info("Request headers: {headers}", headers=redacted_headers)
body = await request.body()
if body:
json_obj = orjson.loads(body)
Expand Down
57 changes: 55 additions & 2 deletions ibis-server/tests/routers/v3/connector/postgres/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@
"name": "customer_access",
"requiredProperties": [
{
"name": "session_user",
# To test the name is case insensitive
"name": "Session_user",
"required": False,
}
],
"condition": "c_name = @session_user",
"condition": "c_name = @session_User",
},
],
"primaryKey": "c_custkey",
Expand Down Expand Up @@ -627,6 +628,58 @@ async def test_clac_query(client, manifest_str, connection_info):
assert len(result["data"]) == 1
assert len(result["data"][0]) == 2

manifest_with_required_properties = {
"catalog": "wren",
"schema": "public",
"models": [
{
"name": "customer",
"tableReference": {
"schema": "public",
"table": "customer",
},
"columns": [
{"name": "c_custkey", "type": "integer"},
{
"name": "c_name",
"type": "varchar",
"columnLevelAccessControl": {
"name": "c_name_access",
"requiredProperties": [
{
"name": "Session_level",
"required": True,
}
],
"operator": "EQUALS",
"threshold": "1",
},
},
],
"primaryKey": "c_custkey",
},
],
}

base64_manifest_with_required_properties = base64.b64encode(
orjson.dumps(manifest_with_required_properties)
).decode("utf-8")
response = await client.post(
url=f"{base_url}/query",
json={
"connectionInfo": connection_info,
"manifestStr": base64_manifest_with_required_properties,
"sql": "SELECT * FROM customer limit 1",
},
headers={
X_WREN_VARIABLE_PREFIX + "session_level": "2",
},
)
assert response.status_code == 200
result = response.json()
assert len(result["data"]) == 1
assert len(result["data"][0]) == 2


async def test_connection_timeout(
client, manifest_str, connection_info, connection_url
Expand Down
8 changes: 7 additions & 1 deletion ibis-server/tools/query_local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@

print("### Starting the session context ###")
print("#")
session_context = SessionContext(encoded_str, function_list_path + f"/{data_source}.csv")
# Set the session properties here. It should be lowercase.
properties = {}
properties = frozenset(properties.items())
print("### Session Properties ###")
for key, value in properties:
print(f"# {key}: {value}")
session_context = SessionContext(encoded_str, function_list_path + f"/{data_source}.csv", properties)
planned_sql = session_context.transform_sql(sql)
print("# Planned SQL:\n", planned_sql)

Expand Down
47 changes: 25 additions & 22 deletions wren-core-py/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,32 +158,35 @@ impl PySessionContext {
};
let manifest = to_manifest(mdl_base64)?;
let properties_ref = Arc::new(properties_map);
let Ok(analyzed_mdl) = AnalyzedWrenMDL::analyze(
match AnalyzedWrenMDL::analyze(
manifest,
Arc::clone(&properties_ref),
mdl::context::Mode::Unparse,
) else {
return Err(CoreError::new("Failed to analyze manifest").into());
};

let analyzed_mdl = Arc::new(analyzed_mdl);
) {
Ok(analyzed_mdl) => {
let analyzed_mdl = Arc::new(analyzed_mdl);
// the headers won't be used in the context. Provide an empty map.
let ctx = runtime
.block_on(create_ctx_with_mdl(
&ctx,
Arc::clone(&analyzed_mdl),
Arc::clone(&properties_ref),
mdl::context::Mode::Unparse,
))
.map_err(CoreError::from)?;

// the headers won't be used in the context. Provide an empty map.
let ctx = runtime
.block_on(create_ctx_with_mdl(
&ctx,
Arc::clone(&analyzed_mdl),
Arc::new(HashMap::new()),
mdl::context::Mode::Unparse,
))
.map_err(CoreError::from)?;

Ok(Self {
ctx,
mdl: analyzed_mdl,
runtime: Arc::new(runtime),
properties: properties_ref,
})
Ok(Self {
ctx,
mdl: analyzed_mdl,
runtime: Arc::new(runtime),
properties: properties_ref,
})
}
Err(e) => Err(CoreError::new(
format!("Failed to analyze MDL: {}", e).as_str(),
)
.into()),
}
})
}

Expand Down