Skip to content

Commit

Permalink
feat(integration-slack): store request error counts and disable on br…
Browse files Browse the repository at this point in the history
…oken (#52994)

Continuing #51126
Milestone 1 of [Notify on Disabled Integration project
](https://www.notion.so/sentry/Tech-Spec-Notify-on-Disabled-Integration-Spec-e7ea0f86ccd6419cb3e564067cf4a2ef?pvs=4
)

Implementing `IntegrationRequestBuffer` using Redis for logging errors
and detecting broken integrations to be disabled. Request buffer stores
daily aggregate error, fatal and success counts for an integration for
30 days. If an integration consistently fails for 7 days or has a fatal
response, then the integration is broken. Later users will be alerted of
broken integrations with an email or in-app.

Disabling implemented under feature flag `"disable-on-broken"`.
Conditions for logging Implemented in `BaseApiClient` and `SlackClient`.

---------

Co-authored-by: Colleen O'Rourke <[email protected]>
  • Loading branch information
chloeho7 and ceorourke committed Jul 25, 2023
1 parent c091732 commit e73144d
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
# Enable Opsgenie integration
"organizations:integrations-opsgenie": False,
# Allow disabling integrations when broken is detected
"organization:disable-on-broken": False,
"organizations:slack-disable-on-broken": False,
# Enable the 'discover' interface.
"organizations:discover": False,
# Enables events endpoint rate limit
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/features/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,10 @@
default_manager.add("organizations:pr-comment-bot", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
default_manager.add("organizations:ds-org-recalibration", OrganizationFeature, FeatureHandlerStrategy.INTERNAL)
default_manager.add("organizations:slack-use-new-lookup", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
default_manager.add("organizations:disable-on-broken", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
default_manager.add("organizations:slack-disable-on-broken", OrganizationFeature, FeatureHandlerStrategy.REMOTE)
default_manager.add("organizations:sourcemaps-bundle-flat-file-indexing", OrganizationFeature, FeatureHandlerStrategy.REMOTE)


# Project scoped features
default_manager.add("projects:alert-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
default_manager.add("projects:custom-inbound-filters", ProjectFeature, FeatureHandlerStrategy.INTERNAL)
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/integrations/discord/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
org_integration_id = infer_org_integration(
integration_id=integration_id, ctx_logger=logger
)
super().__init__(org_integration_id, verify_ssl, logging_context)
super().__init__(integration_id, org_integration_id, verify_ssl, logging_context)

@control_silo_function
def authorize_request(self, prepared_request: PreparedRequest) -> PreparedRequest:
Expand Down
17 changes: 4 additions & 13 deletions src/sentry/integrations/request_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def __init__(self, key):

cluster_id = settings.SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER
self.client = redis.redis_clusters.get(cluster_id)
self.is_fatal = False

def _convert_obj_to_dict(self, redis_object):
"""
Expand Down Expand Up @@ -51,15 +50,11 @@ def is_integration_broken(self):
Integration is broken if we have 7 consecutive days of errors and no successes OR have a fatal error
"""
# fast shutoff
# if self.is_fatal:
# return True

data = [
datetime.strptime(item.get("date"), "%Y-%m-%d").date()
for item in self._get()
if item.get("fatal_count", 0) != 0 and item.get("date")
][0 : IS_BROKEN_RANGE - 1]
if item.get("fatal_count", 0) > 0 and item.get("date")
][0:IS_BROKEN_RANGE]

if len(data) > 0:
return True
Expand All @@ -70,12 +65,12 @@ def is_integration_broken(self):
if item.get("error_count", 0) > 0
and item.get("success_count", 0) == 0
and item.get("date")
][0 : IS_BROKEN_RANGE - 1]
][0:IS_BROKEN_RANGE]

if not len(data):
return False

if len(data) < IS_BROKEN_RANGE - 1:
if len(data) < IS_BROKEN_RANGE:
return False

date_set = {data[0] - timedelta(x) for x in range((data[0] - data[-1]).days)}
Expand Down Expand Up @@ -130,8 +125,4 @@ def record_success(self):
self.add("success")

def record_fatal(self):
# skip to uninstall or call is_integration_broken?
self.is_fatal = True
self.add("fatal")
# if self.is_integration_broken():
# call uninstal
9 changes: 5 additions & 4 deletions src/sentry/integrations/slack/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ def authorize_request(self, prepared_request: PreparedRequest) -> PreparedReques
prepared_request.headers["Authorization"] = f"Bearer {token}"
return prepared_request

def is_response_fatal(self, response: BaseApiResponse) -> bool:
if not response.json.get("ok"):
if "account_inactive" == response.json.get("error", ""):
def is_response_fatal(self, response: Response) -> bool:
resp_json = response.json()
if not resp_json.get("ok"):
if "account_inactive" == resp_json.get("error", ""):
return True
return False

def track_response_data(
self,
Expand All @@ -84,7 +86,6 @@ def track_response_data(
# if no span was passed, create a dummy to which to add data to avoid having to wrap every
# span call in `if span`
span = span or Span()
# print("track_response_data")
try:
span.set_http_status(int(code))
except ValueError:
Expand Down
79 changes: 52 additions & 27 deletions src/sentry/shared_integrations/client/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,35 +88,30 @@ def finalize_request(self, prepared_request: PreparedRequest) -> PreparedRequest

def _get_redis_key(self):
"""
Returns the redis key for the integration or False if cannot make key
Returns the redis key for the integration or empty str if cannot make key
"""
if not hasattr(self, "integration_id"):
return ""
if not self.integration_id:
return ""
return f"sentry-integration-error:{self.integration_id}"

def is_error(self, e: Exception) -> bool:
if type(e) is ConnectionError:
return True
if type(e) is Timeout:
return True
if type(e) is HTTPError:
return True
def is_considered_error(self, e: Exception) -> bool:
return True

return False

def is_response_fatal(self, resp: BaseApiResponse) -> bool:
def is_response_fatal(self, resp: Response) -> bool:
return False

def is_response_error(self, resp: Response) -> bool:
if resp.status_code >= 400 and resp.status_code != 429 and resp.status_code < 500:
return True
if resp.status_code:
if resp.status_code >= 400 and resp.status_code != 429 and resp.status_code < 500:
return True
return False

def is_response_success(self, resp: Response) -> bool:
if resp.status_code < 300:
return True
if resp.status_code:
if resp.status_code < 300:
return True
return False

@overload
Expand Down Expand Up @@ -305,11 +300,7 @@ def _request(
raise e

self.track_response_data(resp.status_code, span, None, resp)
self.record_response(
BaseApiResponse.from_response(
resp, allow_text=allow_text, ignore_webhook_errors=ignore_webhook_errors
)
)
self.record_response(resp)

if resp.status_code == 204:
return {}
Expand Down Expand Up @@ -394,8 +385,8 @@ def record_error(self, error: Exception):
redis_key = self._get_redis_key()
if not len(redis_key):
return
if not self.is_error(error):
return False
if not self.is_considered_error(error):
return
buffer = IntegrationRequestBuffer(redis_key)
buffer.record_error()
if buffer.is_integration_broken():
Expand Down Expand Up @@ -427,19 +418,53 @@ def record_request_fatal(self, resp: Response):
self.disable_integration()

def disable_integration(self) -> None:

rpc_integration, rpc_org_integration = integration_service.get_organization_contexts(
integration_id=self.integration_id
)
if (
integration_service.get_integration(integration_id=rpc_integration.id).status
== ObjectStatus.DISABLED
):
return
oi = OrganizationIntegration.objects.filter(integration_id=self.integration_id)[0]
org = Organization.objects.get(id=oi.organization_id)
if features.has("organizations:disable-on-broken", org):
if (
features.has("organizations:slack-disable-on-broken", org)
and rpc_integration.provider == "slack"
):
integration_service.update_integration(
integration_id=rpc_integration.id, status=ObjectStatus.DISABLED
)
integration = integration_service.get_integration(integration_id=self.integration_id)
notify_disable(org, integration, self._get_redis_key())

if len(rpc_org_integration) == 0 and rpc_integration is None:
self.logger.info(
"integration.disabled",
extra={
"integration_id": self.integration_id,
"provider": "provider is None",
"organization_id": "rpc_org_integration is empty",
},
)
return
if len(rpc_org_integration) == 0:
self.logger.info(
"integration.disabled",
extra={
"integration_id": self.integration_id,
"provider": rpc_integration.provider,
"organization_id": "rpc_org_integration is empty",
},
)
return
if rpc_integration is None:
self.logger.info(
"integration.disabled",
extra={
"integration_id": self.integration_id,
"provider": "provider is None",
"organization_id": rpc_org_integration[0].organization_id,
},
)
return
self.logger.info(
"integration.disabled",
extra={
Expand Down
Empty file.
41 changes: 31 additions & 10 deletions tests/sentry/integrations/slack/test_disable.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ def tearDown(self):
self.resp.__exit__(None, None, None)

@responses.activate
@with_feature("organizations:disable-on-broken")
@with_feature("organizations:slack-disable-on-broken")
def test_fatal_and_disable_integration(self):
"""
fatal fast shut off with disable flag on, integration should be broken and disabled
"""
bodydict = {"ok": False, "error": "account_inactive"}
self.resp.add(
method=responses.POST,
Expand Down Expand Up @@ -98,6 +101,9 @@ def test_fatal_integration(self):

@responses.activate
def test_error_integration(self):
"""
recieve two errors and errors are recorded, integration is not broken yet so no disable
"""
bodydict = {"ok": False, "error": "The requested resource does not exist"}
self.resp.add(
method=responses.POST,
Expand All @@ -106,16 +112,28 @@ def test_error_integration(self):
content_type="application/json",
body=json.dumps(bodydict),
)
self.resp.add(
method=responses.POST,
url="https://slack.com/api/chat.postMessage",
status=404,
content_type="application/json",
body=json.dumps(bodydict),
)
client = SlackClient(integration_id=self.integration.id)
with pytest.raises(ApiError):
client.post("/chat.postMessage", data=self.payload)
with pytest.raises(ApiError):
client.post("/chat.postMessage", data=self.payload)
buffer = IntegrationRequestBuffer(client._get_redis_key())
assert (buffer._get()[0]["error_count"]) >= 1
assert (buffer._get()[0]["error_count"]) == 2
assert buffer.is_integration_broken() is False

# fake slow test w disable off
@responses.activate
def test_integration_is_broken(self):
def slow_test_integration_is_broken(self):
"""
slow shut off with disable flag off
put errors in buffer for 10 days, assert integration is broken but not disabled
"""
bodydict = {"ok": False, "error": "The requested resource does not exist"}
self.resp.add(
method=responses.POST,
Expand All @@ -130,17 +148,20 @@ def test_integration_is_broken(self):
for i in reversed(range(10)):
with freeze_time(now - timedelta(days=i)):
buffer.record_error()
buffer.record_success()

with pytest.raises(ApiError):
client.post("/chat.postMessage", data=self.payload)
assert buffer.is_integration_broken() is True
integration = Integration.objects.get(id=self.integration.id)
assert integration.status == ObjectStatus.ACTIVE
assert buffer.is_integration_broken() is False
assert Integration.objects.filter(id=self.integration.id).status == ObjectStatus.DISABLED

# fake slow test w disable on
@responses.activate
@with_feature("organizations:disable-on-broken")
def test_integration_is_broken_and_disabled(self):
@with_feature("organizations:slack-disable-on-broken")
def slow_test_integration_is_not_broken_or_disabled(self):
"""
slow test with disable flag on
put errors and success in buffer for 10 days, assert integration is not broken or disabled
"""
bodydict = {"ok": False, "error": "The requested resource does not exist"}
self.resp.add(
method=responses.POST,
Expand Down

0 comments on commit e73144d

Please sign in to comment.