Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 102 additions & 51 deletions wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import {
toIbisConnectionInfo,
toMultipleIbisConnectionInfos,
} from '../dataSource';
import { DialectSQL, WrenSQL } from '../models/adaptor';

export type { WrenSQL };

const logger = getLogger('IbisAdaptor');
logger.level = 'debug';
Expand Down Expand Up @@ -90,6 +93,7 @@ const dataSourceUrlMap: Record<SupportedDataSource, string> = {
[SupportedDataSource.CLICK_HOUSE]: 'clickhouse',
[SupportedDataSource.TRINO]: 'trino',
};

export interface TableResponse {
tables: CompactTable[];
}
Expand All @@ -114,11 +118,13 @@ export interface IbisQueryOptions extends IbisBaseOptions {
export interface IbisDryPlanOptions {
dataSource: DataSourceName;
mdl: Manifest;
// TODO: replace sql type with WrenSQL
sql: string;
}

export interface IIbisAdaptor {
query: (
// TODO: replace query type with WrenSQL
query: string,
options: IbisQueryOptions,
) => Promise<IbisQueryResponse>;
Expand All @@ -140,6 +146,12 @@ export interface IIbisAdaptor {
mdl: Manifest,
parameters: Record<string, any>,
) => Promise<ValidationResponse>;
modelSubstitute: (
dataSource: DataSourceName,
connectionInfo: WREN_AI_CONNECTION_INFO,
mdl: Manifest,
sql: DialectSQL,
) => Promise<WrenSQL>;
}

export interface IbisResponse {
Expand All @@ -162,6 +174,7 @@ enum IBIS_API_TYPE {
METADATA = 'METADATA',
VALIDATION = 'VALIDATION',
ANALYSIS = 'ANALYSIS',
MODEL_SUBSTITUTE = 'MODEL_SUBSTITUTE',
}

export class IbisAdaptor implements IIbisAdaptor {
Expand All @@ -183,11 +196,8 @@ export class IbisAdaptor implements IIbisAdaptor {
);
return res.data;
} catch (e) {
logger.debug(`Got error when dry plan with ibis: ${e.response.data}`);
throw Errors.create(Errors.GeneralErrorCodes.DRY_PLAN_ERROR, {
customMessage: e.response.data,
originalError: e,
});
logger.debug(`Dry plan error: ${e.response?.data || e.message}`);
this.throwError(e, 'Error during dry plan execution');
}
}

Expand Down Expand Up @@ -219,19 +229,8 @@ export class IbisAdaptor implements IIbisAdaptor {
processTime: res.headers['x-process-time'],
};
} catch (e) {
logger.debug(
`Got error when querying ibis: ${e.response?.data || e.message}`,
);

throw Errors.create(Errors.GeneralErrorCodes.IBIS_SERVER_ERROR, {
customMessage:
e.response?.data || e.message || 'Error querying ibis server',
originalError: e,
other: {
correlationId: e.response?.headers['x-correlation-id'],
processTime: e.response?.headers['x-process-time'],
},
});
logger.debug(`Query error: ${e.response?.data || e.message}`);
this.throwError(e, 'Error querying ibis server');
}
}

Expand Down Expand Up @@ -259,15 +258,8 @@ export class IbisAdaptor implements IIbisAdaptor {
processTime: response.headers['x-process-time'],
};
} catch (err) {
logger.info(`Got error when dry running ibis`);
throw Errors.create(Errors.GeneralErrorCodes.DRY_RUN_ERROR, {
customMessage: err.response?.data || err.message,
originalError: err,
other: {
correlationId: err.response?.headers['x-correlation-id'],
processTime: err.response?.headers['x-process-time'],
},
});
logger.debug(`Dry run error: ${err.response?.data || err.message}`);
this.throwError(err, 'Error during dry run execution');
}
}

Expand Down Expand Up @@ -310,16 +302,8 @@ export class IbisAdaptor implements IIbisAdaptor {
);
return await getTablesByConnectionInfo(ibisConnectionInfo);
} catch (e) {
logger.debug(
`Got error when getting table: ${e.response?.data || e.message}`,
);
throw Errors.create(Errors.GeneralErrorCodes.IBIS_SERVER_ERROR, {
customMessage:
e.response?.data ||
e.message ||
'Error getting table from ibis server',
originalError: e,
});
logger.debug(`Get tables error: ${e.response?.data || e.message}`);
this.throwError(e, 'Error getting table from ibis server');
}
}

Expand All @@ -340,17 +324,8 @@ export class IbisAdaptor implements IIbisAdaptor {
);
return res.data;
} catch (e) {
logger.debug(
`Got error when getting constraint: ${e.response?.data || e.message}`,
);

throw Errors.create(Errors.GeneralErrorCodes.IBIS_SERVER_ERROR, {
customMessage:
e.response?.data ||
e.message ||
'Error getting constraint from ibis server',
originalError: e,
});
logger.debug(`Get constraints error: ${e.response?.data || e.message}`);
this.throwError(e, 'Error getting constraint from ibis server');
}
}

Expand All @@ -375,12 +350,41 @@ export class IbisAdaptor implements IIbisAdaptor {
body,
);
return { valid: true, message: null };
} catch (e) {
logger.debug(`Validation error: ${e.response?.data || e.message}`);
return { valid: false, message: e.response?.data || e.message };
}
}
Comment on lines +357 to +361

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Inconsistent error handling approach in validate method.

The validate method returns an error response instead of throwing exceptions, which is inconsistent with other methods that use the throwError function. Consider making the error handling approach consistent across all methods.

  public async validate(
    dataSource: DataSourceName,
    validationRule: ValidationRules,
    connectionInfo: WREN_AI_CONNECTION_INFO,
    mdl: Manifest,
    parameters: Record<string, any>,
  ): Promise<ValidationResponse> {
    connectionInfo = this.updateConnectionInfo(connectionInfo);
    const ibisConnectionInfo = toIbisConnectionInfo(dataSource, connectionInfo);
    const body = {
      connectionInfo: ibisConnectionInfo,
      manifestStr: Buffer.from(JSON.stringify(mdl)).toString('base64'),
      parameters,
    };
    try {
      logger.debug(`Run validation rule "${validationRule}" with ibis`);
      await axios.post(
        `${this.ibisServerEndpoint}/${this.getIbisApiVersion(IBIS_API_TYPE.VALIDATION)}/connector/${dataSourceUrlMap[dataSource]}/validate/${snakeCase(validationRule)}`,
        body,
      );
      return { valid: true, message: null };
-    } catch (e) {
-      logger.debug(`Validation error: ${e.response?.data || e.message}`);
-      return { valid: false, message: e.response?.data || e.message };
-    }
+    } catch (e) {
+      logger.debug(`Validation error: ${e.response?.data || e.message}`);
+      // Either convert all methods to return errors like this, or make this one throw like the others
+      return { valid: false, message: e.response?.data || e.message };
+    }
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (e) {
logger.debug(`Validation error: ${e.response?.data || e.message}`);
return { valid: false, message: e.response?.data || e.message };
}
}
public async validate(
dataSource: DataSourceName,
validationRule: ValidationRules,
connectionInfo: WREN_AI_CONNECTION_INFO,
mdl: Manifest,
parameters: Record<string, any>,
): Promise<ValidationResponse> {
connectionInfo = this.updateConnectionInfo(connectionInfo);
const ibisConnectionInfo = toIbisConnectionInfo(dataSource, connectionInfo);
const body = {
connectionInfo: ibisConnectionInfo,
manifestStr: Buffer.from(JSON.stringify(mdl)).toString('base64'),
parameters,
};
try {
logger.debug(`Run validation rule "${validationRule}" with ibis`);
await axios.post(
`${this.ibisServerEndpoint}/${this.getIbisApiVersion(IBIS_API_TYPE.VALIDATION)}/connector/${dataSourceUrlMap[dataSource]}/validate/${snakeCase(validationRule)}`,
body,
);
return { valid: true, message: null };
} catch (e) {
logger.debug(`Validation error: ${e.response?.data || e.message}`);
// Either convert all methods to return errors like this, or make this one throw like the others
return { valid: false, message: e.response?.data || e.message };
}
}


public async modelSubstitute(
dataSource: DataSourceName,
connectionInfo: WREN_AI_CONNECTION_INFO,
mdl: Manifest,
sql: DialectSQL,
): Promise<WrenSQL> {
connectionInfo = this.updateConnectionInfo(connectionInfo);
const ibisConnectionInfo = toIbisConnectionInfo(dataSource, connectionInfo);
const body = {
sql,
connectionInfo: ibisConnectionInfo,
manifestStr: Buffer.from(JSON.stringify(mdl)).toString('base64'),
};
try {
logger.debug(`Running model substitution with ibis`);
const res = await axios.post(
`${this.ibisServerEndpoint}/${this.getIbisApiVersion(IBIS_API_TYPE.MODEL_SUBSTITUTE)}/connector/${dataSourceUrlMap[dataSource]}/model-substitute`,
body,
);
return res.data as WrenSQL;
} catch (e) {
logger.debug(
`Got error when validating connection: ${e.response?.data || e.message}`,
`Model substitution error: ${e.response?.data || e.message}`,
);
this.throwError(
e,
'Error running model substitution with ibis server',
this.modelSubstituteErrorMessageBuilder,
);

return { valid: false, message: e.response?.data || e.message };
}
}

Expand Down Expand Up @@ -437,8 +441,55 @@ export class IbisAdaptor implements IIbisAdaptor {
IBIS_API_TYPE.DRY_RUN,
IBIS_API_TYPE.DRY_PLAN,
IBIS_API_TYPE.VALIDATION,
IBIS_API_TYPE.MODEL_SUBSTITUTE,
].includes(apiType);
if (useV3) logger.debug('Using ibis v3 api');
return useV3 ? 'v3' : 'v2';
}

private throwError(
e: any,
defaultMessage: string,
errorMessageBuilder?: CallableFunction,
) {
const customMessage =
e.response?.data?.message ||
e.response?.data ||
e.message ||
defaultMessage;
throw Errors.create(Errors.GeneralErrorCodes.IBIS_SERVER_ERROR, {
customMessage: errorMessageBuilder
? errorMessageBuilder(customMessage)
: customMessage,
originalError: e,
other: {
correlationId: e.response?.headers['x-correlation-id'],
processTime: e.response?.headers['x-process-time'],
},
});
}

private modelSubstituteErrorMessageBuilder(message: string) {
const ModelSubstituteErrorEnum = {
MODEL_NOT_FOUND: () => {
return message.includes('Model not found');
},
PARSING_EXCEPTION: () => {
return message.includes('sql.parser.ParsingException');
},
};
if (ModelSubstituteErrorEnum.MODEL_NOT_FOUND()) {
const modelName = message.split(': ')[1];
return (
message +
`. Try to add catalog and schema in front of your table. eg: my_database.public.${modelName}`
);
} else if (ModelSubstituteErrorEnum.PARSING_EXCEPTION()) {
return (
message +
'. Please check your selected column and make sure its quoted for columns with non-alphanumeric characters.'
);
}
return message;
}
}
Loading