diff --git a/NOTICE.txt b/NOTICE.txt index cd9662f0f..17e777e70 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -6486,7 +6486,7 @@ POSSIBILITY OF SUCH DAMAGE. pycparser -2.22 +2.23 BSD License pycparser -- A C parser in Python @@ -8410,7 +8410,7 @@ POSSIBILITY OF SUCH DAMAGE. xmltodict -0.15.0 +0.14.2 MIT License Copyright (C) 2012 Martin Blech and individual contributors. diff --git a/connectors/preflight_check.py b/connectors/preflight_check.py index ee104c940..ff455cabb 100644 --- a/connectors/preflight_check.py +++ b/connectors/preflight_check.py @@ -108,7 +108,7 @@ async def _versions_compatible(self, es_version): logger.critical( f"Elasticsearch {es_version} and Connectors {self.version} are incompatible: Elasticsearch minor version is older than Connectors" ) - return False + return True if es_version_parts[1] > connector_version_parts[1]: logger.warning( diff --git a/connectors/sources/servicenow.py b/connectors/sources/servicenow.py index 75e92df3d..9380076ae 100644 --- a/connectors/sources/servicenow.py +++ b/connectors/sources/servicenow.py @@ -38,14 +38,16 @@ retryable, ) -RETRIES = 3 +RETRIES = 2 RETRY_INTERVAL = 2 -QUEUE_MEM_SIZE = 25 * 1024 * 1024 # Size in Megabytes -CONCURRENT_TASKS = 1000 # Depends on total number of services and size of each service -MAX_CONCURRENT_CLIENT_SUPPORT = 10 -TABLE_FETCH_SIZE = 50 -TABLE_BATCH_SIZE = 5 -ATTACHMENT_BATCH_SIZE = 10 +QUEUE_MEM_SIZE = 1024 * 1024 * 1024 # 1GB - Optimized for HUGE records +CONCURRENT_TASKS = 200 # Reduced from 1000 for better resource management +MAX_CONCURRENT_CLIENT_SUPPORT = 10 # Increase for better attachment download performance +SYS_DB_OBJECT_FETCH_SIZE = 500 # Agressive fetch size for sys_db_object table +SYS_DB_OBJECT_BATCH_SIZE = 10 # Batch size for sys_db_object table +TABLE_FETCH_SIZE = 100 # Number of records to grab per API call +TABLE_BATCH_SIZE = 1 # Batch size for all other tables +ATTACHMENT_BATCH_SIZE = 1 # Optimized for 10K+ attachments RUNNING_FTEST = ( "RUNNING_FTEST" in os.environ @@ -69,6 +71,7 @@ ], "incident": ["admin", "sn_incident_read", "ml_report_user", "ml_admin", "itil"], "kb_knowledge": ["admin", "knowledge", "knowledge_manager", "knowledge_admin"], + "sc_cat_item": ["admin", "catalog_admin", "catalog"], "change_request": ["admin", "sn_change_read", "itil"], } ACLS_QUERY = "sys_security_acl.operation=read^sys_security_acl.name={table_name}" @@ -124,7 +127,7 @@ def _get_session(self): """ self._logger.debug("Generating aiohttp client session") - connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_CLIENT_SUPPORT) + connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT_CLIENT_SUPPORT, ssl=False) basic_auth = aiohttp.BasicAuth( login=self.configuration["username"], password=self.configuration["password"], @@ -145,16 +148,40 @@ def _get_session(self): ) async def _read_response(self, response): - fetched_response = await response.read() + """Read and validate response from ServiceNow API. + + Args: + response: aiohttp response object + + Returns: + bytes: Valid response content + + Raises: + InvalidResponse: If response is invalid or malformed + """ + try: + fetched_response = await response.read() + except Exception as e: + msg = f"Failed to read response from ServiceNow: {e}" + raise InvalidResponse(msg) + if fetched_response == b"": msg = "Request to ServiceNow server returned an empty response." raise InvalidResponse(msg) - elif not response.headers["Content-Type"].startswith("application/json"): + + # Check for extremely large responses that might be truncated + if len(fetched_response) > 10 * 1024 * 1024: # 10MB limit + msg = f"Response too large ({len(fetched_response)} bytes), may be truncated" + self._logger.warning(msg) + + content_type = response.headers.get("Content-Type", "") + if not content_type.startswith("application/json"): if response.headers.get("Connection") == "close": msg = "Couldn't connect to ServiceNow instance" raise Exception(msg) - msg = f"Cannot proceed due to unexpected response type '{response.headers['Content-Type']}'; response type must begin with 'application/json'." + msg = f"Cannot proceed due to unexpected response type '{content_type}'; response type must begin with 'application/json'." raise InvalidResponse(msg) + return fetched_response @retryable( @@ -162,30 +189,49 @@ async def _read_response(self, response): interval=RETRY_INTERVAL, strategy=RetryStrategy.EXPONENTIAL_BACKOFF, ) - async def get_table_length(self, table_name): + async def get_table_length(self, table_name, custom_filter=None): + """Get the total count of records in a table. + + Args: + table_name (str): Name of the ServiceNow table + custom_filter (str, optional): Custom query filter to apply. Defaults to None. + + Returns: + int: Total count of records matching the criteria + """ try: url = ENDPOINTS["TABLE"].format(table=table_name) params = {"sysparm_limit": 1} + + # Add custom filter if provided + if custom_filter: + params["sysparm_query"] = custom_filter + response = await self._api_call( url=url, params=params, actions={}, method="get" ) await self._read_response(response=response) return int(response.headers.get("x-total-count", 0)) except Exception as exception: + filter_info = f" with filter '{custom_filter}'" if custom_filter else "" self._logger.warning( - f"Error while fetching {table_name} length. Exception: {exception}." + f"Error while fetching {table_name} length{filter_info}. Exception: {exception}." ) raise - def _prepare_url(self, url, params, offset): + def _prepare_url(self, url, params, offset, table_name=None): if not url.endswith("/file"): query = ORDER_BY_CREATION_DATE_QUERY if "sysparm_query" in params.keys(): query += params["sysparm_query"] + + # Use appropriate fetch size based on table type + fetch_size = SYS_DB_OBJECT_FETCH_SIZE if table_name == "sys_db_object" else TABLE_FETCH_SIZE + params.update( { "sysparm_query": query, - "sysparm_limit": TABLE_FETCH_SIZE, + "sysparm_limit": fetch_size, "sysparm_offset": offset, } ) @@ -196,26 +242,48 @@ def _prepare_url(self, url, params, offset): return full_url async def get_filter_apis(self, rules, mapping): - apis = [] + all_apis = [] + + # First, generate all individual API calls for all rules for rule in rules: params = {"sysparm_query": rule["query"]} table_name = mapping[rule["service"]] - total_count = await self.get_table_length(table_name) + # Use the updated get_table_length with custom filter + total_count = await self.get_table_length(table_name, rule["query"]) paginated_apis = self.get_record_apis( url=ENDPOINTS["TABLE"].format(table=table_name), params=params, total_count=total_count, + table_name=table_name, ) - apis.extend(paginated_apis) - return apis + all_apis.extend(paginated_apis) + + self._logger.debug(f"get_record_apis() generated {len(all_apis)} total paginated API calls") - def get_record_apis(self, url, params, total_count): + # Now batch the API calls based on TABLE_BATCH_SIZE + for batched_apis_index in range(0, len(all_apis), TABLE_BATCH_SIZE): + batched_apis = all_apis[ + batched_apis_index: ( + batched_apis_index + TABLE_BATCH_SIZE + ) # noqa + ] + self._logger.debug( + f"Created advanced rules batch with {len(batched_apis)} API calls (target: {TABLE_BATCH_SIZE})") + yield batched_apis + + # Uses TABLE_FETCH_SIZE or SYS_DB_OBJECT_FETCH_SIZE to create API calls with rows limited + # by sysparm_limit and offset by sysparm_offset + def get_record_apis(self, url, params, total_count, table_name=None): headers = [ {"name": "Content-Type", "value": "application/json"}, {"name": "Accept", "value": "application/json"}, ] + + # Use appropriate fetch size based on table type + fetch_size = SYS_DB_OBJECT_FETCH_SIZE if table_name == "sys_db_object" else TABLE_FETCH_SIZE + apis = [] - for page in range(math.ceil(total_count / TABLE_FETCH_SIZE)): + for page in range(math.ceil(total_count / fetch_size)): apis.append( { "id": str(uuid.uuid4()), @@ -224,7 +292,8 @@ def get_record_apis(self, url, params, total_count): "url": self._prepare_url( url=url, params=params.copy(), - offset=page * TABLE_FETCH_SIZE, + offset=page * fetch_size, + table_name=table_name, ), } ) @@ -243,12 +312,15 @@ def get_attachment_apis(self, url, ids): "id": str(uuid.uuid4()), "headers": headers, "method": "GET", - "url": self._prepare_url(url=url, params=params.copy(), offset=0), + "url": self._prepare_url(url=url, params=params.copy(), offset=0, table_name="attachment"), } ) return apis - async def get_data(self, batched_apis): + async def get_data(self, batched_apis, skip_debug_logging=False): + # Only log detailed debug info if not skipping (i.e., not for sys_db_object) + if not skip_debug_logging: + self._logger.debug(f"get_data(). Number of API calls in this batch: {len(batched_apis)} Batch API List: {batched_apis}") try: batch_data = self._prepare_batch(requests=batched_apis) async for response in self._batch_api_call(batch_data=batch_data): @@ -268,6 +340,17 @@ def _prepare_batch(self, requests): strategy=RetryStrategy.EXPONENTIAL_BACKOFF, ) async def _batch_api_call(self, batch_data): + """Execute batch API call with comprehensive error handling for JSON responses. + + Args: + batch_data (dict): Batch request data + + Yields: + dict: Parsed response data from each request in the batch + + Raises: + InvalidResponse: If response is malformed or contains errors + """ response = await self._api_call( url=ENDPOINTS["BATCH"], params={}, actions=batch_data, method="post" ) @@ -304,20 +387,24 @@ async def filter_services(self, configured_service): servicenow_mapping, invalid_services = {}, configured_service payload = {"sysparm_fields": "sys_id, label, name"} + # Always use get_table_length without custom filter for sys_db_object table_length = await self.get_table_length(table_name="sys_db_object") record_apis = self.get_record_apis( url=ENDPOINTS["TABLE"].format(table="sys_db_object"), params=payload, total_count=table_length, + table_name="sys_db_object", ) - for batched_apis_index in range(0, len(record_apis), TABLE_BATCH_SIZE): + batch_size = SYS_DB_OBJECT_BATCH_SIZE + for batched_apis_index in range(0, len(record_apis), batch_size): batched_apis = record_apis[ batched_apis_index : ( - batched_apis_index + TABLE_BATCH_SIZE + batched_apis_index + batch_size ) # noqa ] - async for table_data in self.get_data(batched_apis=batched_apis): + # Skip debug logging for sys_db_object operations + async for table_data in self.get_data(batched_apis=batched_apis, skip_debug_logging=True): for mapping in table_data: # pyright: ignore sys_id = mapping.get("sys_id") name = mapping.get("name") @@ -347,6 +434,7 @@ def _log_missing_sysparm_field(self, sys_id, field): self._logger.debug(msg) async def ping(self): + # Always use get_table_length without custom filter for sys_db_object await self.get_table_length(table_name="sys_db_object") async def close_session(self): @@ -643,7 +731,7 @@ def _format_doc(self, data): async def _fetch_attachment_metadata(self, batched_apis, table_access_control): try: async for attachments_metadata in self.servicenow_client.get_data( - batched_apis=batched_apis + batched_apis=batched_apis, skip_debug_logging=True ): for record in attachments_metadata: formatted_attachment_metadata = self._format_doc(data=record) @@ -702,19 +790,40 @@ async def _attachment_metadata_producer(self, record_ids, table_access_control): await self.queue.put(EndSignal.RECORD) async def _yield_table_data(self, batched_apis): + self._logger.debug(f"Yielding table data. Number of API calls in this batch: {len(batched_apis)}") try: + record_count = 0 async for table_data in self.servicenow_client.get_data( batched_apis=batched_apis ): + if not isinstance(table_data, list): + self._logger.warning(f"Expected list of records, got {type(table_data)}") + continue + for record in table_data: - formatted_table_data = self._format_doc(data=record) - serialized_table_data = self.serialize(doc=formatted_table_data) - yield serialized_table_data + if not isinstance(record, dict): + self._logger.warning(f"Skipping invalid record: not a dict ({type(record)})") + continue + + try: + formatted_table_data = self._format_doc(data=record) + serialized_table_data = self.serialize(doc=formatted_table_data) + record_count += 1 + yield serialized_table_data + except Exception as format_error: + self._logger.warning( + f"Failed to format/serialize record {record.get('sys_id', 'unknown')}: {format_error}" + ) + continue + + self._logger.debug(f"Successfully processed {record_count} records from batch") + except Exception as exception: self._logger.warning( - f"Skipping batch data for {batched_apis}. Exception: {exception}.", + f"Error processing batch data: {exception}", exc_info=True, ) + # Don't re-raise - let the caller handle the empty generator async def _fetch_table_data(self, batched_apis, table_access_control): try: @@ -798,22 +907,33 @@ async def _fetch_access_controls(self, table_name): ) return list(set(access_control)) + # uses TABLE_BATCH_SIZE or SYS_DB_OBJECT_BATCH_SIZE to create batches of API calls async def _get_batched_apis(self, service_name, params): - table_length = await self.servicenow_client.get_table_length( - table_name=service_name - ) + table_length = await self.servicenow_client.get_table_length(table_name=service_name) record_apis = self.servicenow_client.get_record_apis( url=ENDPOINTS["TABLE"].format(table=service_name), params=params, total_count=table_length, + table_name=service_name, ) - for batched_apis_index in range(0, len(record_apis), TABLE_BATCH_SIZE): + # Use appropriate batch size based on table type + batch_size = SYS_DB_OBJECT_BATCH_SIZE if service_name == "sys_db_object" else TABLE_BATCH_SIZE + + for batched_apis_index in range(0, len(record_apis), batch_size): batched_apis = record_apis[ batched_apis_index : ( - batched_apis_index + TABLE_BATCH_SIZE + batched_apis_index + batch_size ) # noqa ] + + self._logger.debug(f"Creating SNOW batch api call. \n" + f"Service Name: {service_name}\n" + f"Total Records: {table_length}\n" + f"Batch Target Size: {batch_size}\n" + f"Number APIs: {len(record_apis)}\n" + f"Number APIs in batch: {len(batched_apis)}\n") + yield batched_apis async def _table_data_generator(self, service_name, params): @@ -886,14 +1006,13 @@ async def get_docs(self, filtering=None): advanced_rules_index + TABLE_BATCH_SIZE ) # noqa ] - filter_apis = await self.servicenow_client.get_filter_apis( + async for filter_apis_batch in self.servicenow_client.get_filter_apis( rules=batched_advanced_rules, mapping=servicenow_mapping - ) - - await self.fetchers.put( - partial(self._fetch_table_data, filter_apis, []) - ) - self.task_count += 1 + ): + await self.fetchers.put( + partial(self._fetch_table_data, filter_apis_batch, []) + ) + self.task_count += 1 else: if ( @@ -952,3 +1071,4 @@ async def get_content(self, metadata, timestamp=None, doit=False): ), ), ) +