Skip to content

Commit 162c31b

Browse files
[AppConfig] Add lock to SyncTokenPolicy (#19395)
* creating async policy, adding lock around sync token portion * pylint * fixed import * reducing to two locks * pylint fix
1 parent 676b80e commit 162c31b

File tree

4 files changed

+111
-15
lines changed

4 files changed

+111
-15
lines changed

sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_azure_appconfiguration_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ def __init__(self, base_url, credential, **kwargs):
8686
pipeline = kwargs.get("pipeline")
8787

8888
if pipeline is None:
89-
self._sync_token_policy = SyncTokenPolicy()
9089
aad_mode = not isinstance(credential, AppConfigConnectionStringCredential)
9190
pipeline = self._create_appconfig_pipeline(
9291
credential=credential, aad_mode=aad_mode, base_url=base_url, **kwargs

sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#
2525
# --------------------------------------------------------------------------
2626
from typing import Any, Dict
27+
from threading import Lock
2728
from azure.core.pipeline import PipelineRequest, PipelineResponse
2829
from azure.core.pipeline.policies import SansIOHTTPPolicy
2930

@@ -63,18 +64,20 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument
6364
# type: (**Any) -> None
6465
self._sync_token_header = "Sync-Token"
6566
self._sync_tokens = {} # type: Dict[str, Any]
67+
self._lock = Lock()
6668

6769
def on_request(self, request): # type: ignore # pylint: disable=arguments-differ
6870
# type: (PipelineRequest) -> None
6971
"""This is executed before sending the request to the next policy.
7072
:param request: The PipelineRequest object.
7173
:type request: ~azure.core.pipeline.PipelineRequest
7274
"""
73-
sync_token_header = ",".join(str(x) for x in self._sync_tokens.values())
74-
if sync_token_header:
75-
request.http_request.headers.update(
76-
{self._sync_token_header: sync_token_header}
77-
)
75+
with self._lock:
76+
sync_token_header = ",".join(str(x) for x in self._sync_tokens.values())
77+
if sync_token_header:
78+
request.http_request.headers.update(
79+
{self._sync_token_header: sync_token_header}
80+
)
7881

7982
def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ
8083
# type: (PipelineRequest, PipelineResponse) -> None
@@ -105,9 +108,10 @@ def _update_sync_token(self, sync_token):
105108
# type: (SyncToken) -> None
106109
if not sync_token:
107110
return
108-
existing_token = self._sync_tokens.get(sync_token.token_id, None)
109-
if not existing_token:
110-
self._sync_tokens[sync_token.token_id] = sync_token
111-
return
112-
if existing_token.sequence_number < sync_token.sequence_number:
113-
self._sync_tokens[sync_token.token_id] = sync_token
111+
with self._lock:
112+
existing_token = self._sync_tokens.get(sync_token.token_id, None)
113+
if not existing_token:
114+
self._sync_tokens[sync_token.token_id] = sync_token
115+
return
116+
if existing_token.sequence_number < sync_token.sequence_number:
117+
self._sync_tokens[sync_token.token_id] = sync_token

sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_azure_configuration_client_async.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
from .._azure_appconfiguration_credential import AppConfigConnectionStringCredential
3939
from .._generated.models import KeyValue
4040
from .._models import ConfigurationSetting
41-
from .._sync_token import SyncTokenPolicy
4241
from .._user_agent import USER_AGENT
42+
from ._sync_token_async import AsyncSyncTokenPolicy
4343

4444
try:
4545
from typing import TYPE_CHECKING
@@ -87,10 +87,9 @@ def __init__(self, base_url, credential, **kwargs):
8787
)
8888

8989
pipeline = kwargs.get("pipeline")
90-
self._sync_token_policy = SyncTokenPolicy()
90+
self._sync_token_policy = AsyncSyncTokenPolicy()
9191

9292
if pipeline is None:
93-
self._sync_token_policy = SyncTokenPolicy()
9493
aad_mode = not isinstance(credential, AppConfigConnectionStringCredential)
9594
pipeline = self._create_appconfig_pipeline(
9695
credential=credential, aad_mode=aad_mode, base_url=base_url, **kwargs
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# --------------------------------------------------------------------------
2+
#
3+
# Copyright (c) Microsoft Corporation. All rights reserved.
4+
#
5+
# The MIT License (MIT)
6+
#
7+
# Permission is hereby granted, free of charge, to any person obtaining a copy
8+
# of this software and associated documentation files (the ""Software""), to
9+
# deal in the Software without restriction, including without limitation the
10+
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11+
# sell copies of the Software, and to permit persons to whom the Software is
12+
# furnished to do so, subject to the following conditions:
13+
#
14+
# The above copyright notice and this permission notice shall be included in
15+
# all copies or substantial portions of the Software.
16+
#
17+
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23+
# IN THE SOFTWARE.
24+
#
25+
# --------------------------------------------------------------------------
26+
from typing import Any, Dict
27+
from asyncio import Lock
28+
from azure.core.pipeline import PipelineRequest, PipelineResponse
29+
from azure.core.pipeline.policies import SansIOHTTPPolicy
30+
31+
from .._sync_token import SyncToken
32+
33+
34+
class AsyncSyncTokenPolicy(SansIOHTTPPolicy):
35+
"""A simple policy that enable the given callback
36+
with the response.
37+
:keyword callback raw_response_hook: Callback function. Will be invoked on response.
38+
"""
39+
40+
def __init__(self, **kwargs): # pylint: disable=unused-argument
41+
# type: (**Any) -> None
42+
self._sync_token_header = "Sync-Token"
43+
self._sync_tokens = {} # type: Dict[str, Any]
44+
self._lock = Lock()
45+
46+
async def on_request(self, request): # type: ignore # pylint: disable=arguments-differ, invalid-overridden-method
47+
# type: (PipelineRequest) -> None
48+
"""This is executed before sending the request to the next policy.
49+
:param request: The PipelineRequest object.
50+
:type request: ~azure.core.pipeline.PipelineRequest
51+
"""
52+
async with self._lock:
53+
sync_token_header = ",".join(str(x) for x in self._sync_tokens.values())
54+
if sync_token_header:
55+
request.http_request.headers.update(
56+
{self._sync_token_header: sync_token_header}
57+
)
58+
59+
async def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ, invalid-overridden-method
60+
# type: (PipelineRequest, PipelineResponse) -> None
61+
"""This is executed after the request comes back from the policy.
62+
:param request: The PipelineRequest object.
63+
:type request: ~azure.core.pipeline.PipelineRequest
64+
:param response: The PipelineResponse object.
65+
:type response: ~azure.core.pipeline.PipelineResponse
66+
"""
67+
sync_token_header = response.http_response.headers.get(self._sync_token_header)
68+
if not sync_token_header:
69+
return
70+
sync_token_strings = sync_token_header.split(",")
71+
if not sync_token_strings:
72+
return
73+
for sync_token_string in sync_token_strings:
74+
sync_token = SyncToken.from_sync_token_string(sync_token_string)
75+
await self._update_sync_token(sync_token)
76+
77+
async def add_token(self, full_raw_tokens):
78+
# type: (str) -> None
79+
raw_tokens = full_raw_tokens.split(",")
80+
for raw_token in raw_tokens:
81+
sync_token = SyncToken.from_sync_token_string(raw_token)
82+
await self._update_sync_token(sync_token)
83+
84+
async def _update_sync_token(self, sync_token):
85+
# type: (SyncToken) -> None
86+
if not sync_token:
87+
return
88+
async with self._lock:
89+
existing_token = self._sync_tokens.get(sync_token.token_id, None)
90+
if not existing_token:
91+
self._sync_tokens[sync_token.token_id] = sync_token
92+
return
93+
if existing_token.sequence_number < sync_token.sequence_number:
94+
self._sync_tokens[sync_token.token_id] = sync_token

0 commit comments

Comments
 (0)