Skip to content

Commit 1726fe5

Browse files
swathipiliscai-msft
authored andcommitted
[EventHub] update arm template with storage conn str (#20376)
* update test resources * fix * fix failing tests * adams comments * pylint * remove dateutil * adams comments * nit
1 parent b235c33 commit 1726fe5

File tree

3 files changed

+105
-28
lines changed

3 files changed

+105
-28
lines changed

sdk/eventhub/azure-eventhub-checkpointstoretable/azure/eventhub/extensions/checkpointstoretable/_tablestoragecs.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import time
88
import logging
99
import calendar
10-
import dateutil.parser
1110
from azure.core import MatchConditions
1211
from azure.eventhub import CheckpointStore # type: ignore # pylint: disable=no-name-in-module
1312
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
@@ -18,6 +17,8 @@
1817
)
1918
from ._vendor.data.tables import TableClient, UpdateMode
2019
from ._vendor.data.tables._base_client import parse_connection_str
20+
from ._vendor.data.tables._deserialize import clean_up_dotnet_timestamps
21+
from ._vendor.data.tables._common_conversion import TZ_UTC
2122

2223
logger = logging.getLogger(__name__)
2324

@@ -39,6 +40,19 @@ def _to_timestamp(date):
3940
timestamp += date.microsecond / 1e6
4041
return timestamp
4142

43+
def _timestamp_to_datetime(value):
44+
# Cosmos returns this with a decimal point that throws an error on deserialization
45+
cleaned_value = clean_up_dotnet_timestamps(value)
46+
try:
47+
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
48+
tzinfo=TZ_UTC
49+
)
50+
except ValueError:
51+
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%SZ").replace(
52+
tzinfo=TZ_UTC
53+
)
54+
return dt_obj
55+
4256

4357
class TableCheckpointStore(CheckpointStore):
4458
"""A CheckpointStore that uses Azure Table Storage to store the partition ownership and checkpoint data.
@@ -113,13 +127,13 @@ def _create_ownership_entity(cls, ownership):
113127
Create a dictionary with the `ownership` attributes.
114128
"""
115129
ownership_entity = {
116-
"PartitionKey": "{} {} {} Ownership".format(
130+
"PartitionKey": u"{} {} {} Ownership".format(
117131
ownership["fully_qualified_namespace"],
118132
ownership["eventhub_name"],
119133
ownership["consumer_group"],
120134
),
121-
"RowKey": ownership["partition_id"],
122-
"ownerid": ownership["owner_id"],
135+
"RowKey": u"{}".format(ownership["partition_id"]),
136+
"ownerid": u"{}".format(ownership["owner_id"]),
123137
}
124138
return ownership_entity
125139

@@ -129,21 +143,21 @@ def _create_checkpoint_entity(cls, checkpoint):
129143
Create a dictionary with `checkpoint` attributes.
130144
"""
131145
checkpoint_entity = {
132-
"PartitionKey": "{} {} {} Checkpoint".format(
146+
"PartitionKey": u"{} {} {} Checkpoint".format(
133147
checkpoint["fully_qualified_namespace"],
134148
checkpoint["eventhub_name"],
135149
checkpoint["consumer_group"],
136150
),
137-
"RowKey": checkpoint["partition_id"],
138-
"offset": checkpoint["offset"],
139-
"sequencenumber": checkpoint["sequence_number"],
151+
"RowKey": u"{}".format(checkpoint["partition_id"]),
152+
"offset": u"{}".format(checkpoint["offset"]),
153+
"sequencenumber": u"{}".format(checkpoint["sequence_number"]),
140154
}
141155
return checkpoint_entity
142156

143157
def _update_ownership(self, ownership, **kwargs):
144158
"""_update_ownership mutates the passed in ownership."""
159+
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
145160
try:
146-
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
147161
metadata = self._table_client.update_entity(
148162
mode=UpdateMode.REPLACE,
149163
entity=ownership_entity,
@@ -166,7 +180,7 @@ def _update_ownership(self, ownership, **kwargs):
166180
)
167181
ownership["etag"] = metadata["etag"]
168182
ownership["last_modified_time"] = _to_timestamp(
169-
dateutil.parser.isoparse(metadata["content"]["Timestamp"])
183+
_timestamp_to_datetime(metadata["content"]["Timestamp"])
170184
)
171185

172186
def _claim_one_partition(self, ownership, **kwargs):
@@ -289,7 +303,7 @@ def list_checkpoints(
289303
"eventhub_name": eventhub_name,
290304
"consumer_group": consumer_group,
291305
"partition_id": entity[u"RowKey"],
292-
"sequence_number": entity[u"sequencenumber"],
306+
"sequence_number": int(entity[u"sequencenumber"]),
293307
"offset": str(entity[u"offset"]),
294308
}
295309
checkpoints_list.append(checkpoint)

sdk/eventhub/azure-eventhub-checkpointstoretable/tests/test_storage_table_partition_manager.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@
1313
from azure.eventhub.extensions.checkpointstoretable import TableCheckpointStore
1414
from azure.eventhub.exceptions import OwnershipLostError
1515

16-
STORAGE_CONN_STR = [
17-
#os.environ.get("AZURE_STORAGE_CONN_STR", "Azure Storage Connection String"),
18-
os.environ.get("AZURE_COSMOS_CONN_STR", "Azure Storage Connection String"),
16+
STORAGE_ENV_KEYS = [
17+
"AZURE_TABLES_CONN_STR",
18+
"AZURE_COSMOS_CONN_STR"
1919
]
2020

2121

22-
def get_live_storage_table_client(storage_connection_str):
22+
def get_live_storage_table_client(conn_str_env_key):
2323
try:
24+
storage_connection_str = os.environ[conn_str_env_key]
2425
table_name = "table{}".format(uuid.uuid4().hex)
2526
table_service_client = TableServiceClient.from_connection_string(
2627
storage_connection_str
@@ -176,35 +177,35 @@ def _update_and_list_checkpoint(storage_connection_str, table_name):
176177
assert checkpoint_list[0]["offset"] == "30"
177178

178179

179-
@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
180-
@pytest.mark.skip("update after adding conn str env var")
181-
def test_claim_ownership_exception(storage_connection_str):
180+
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
181+
@pytest.mark.liveTest
182+
def test_claim_ownership_exception(conn_str_env_key):
182183
storage_connection_str, table_name = get_live_storage_table_client(
183-
storage_connection_str
184+
conn_str_env_key
184185
)
185186
try:
186187
_claim_ownership_exception_test(storage_connection_str, table_name)
187188
finally:
188189
remove_live_storage_table_client(storage_connection_str, table_name)
189190

190191

191-
@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
192-
@pytest.mark.skip("update after adding conn str env var")
193-
def test_claim_and_list_ownership(storage_connection_str):
192+
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
193+
@pytest.mark.liveTest
194+
def test_claim_and_list_ownership(conn_str_env_key):
194195
storage_connection_str, table_name = get_live_storage_table_client(
195-
storage_connection_str
196+
conn_str_env_key
196197
)
197198
try:
198199
_claim_and_list_ownership(storage_connection_str, table_name)
199200
finally:
200201
remove_live_storage_table_client(storage_connection_str, table_name)
201202

202203

203-
@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
204-
@pytest.mark.skip("update after adding conn str env var")
205-
def test_update_checkpoint(storage_connection_str):
204+
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
205+
@pytest.mark.liveTest
206+
def test_update_checkpoint(conn_str_env_key):
206207
storage_connection_str, table_name = get_live_storage_table_client(
207-
storage_connection_str
208+
conn_str_env_key
208209
)
209210
try:
210211
_update_and_list_checkpoint(storage_connection_str, table_name)

sdk/eventhub/test-resources.json

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,14 @@
7070
"eventHubsNamespace": "[concat('eh-', parameters('baseName'))]",
7171
"eventHubName": "[concat('eh-', parameters('baseName'), '-hub')]",
7272
"eventHubAuthRuleName": "[concat('eh-', parameters('baseName'), '-hub-auth-rule')]",
73-
"storageAccount": "[concat('blb', parameters('baseName'))]",
73+
"storageAccount": "[concat('storage', parameters('baseName'))]",
7474
"containerName": "your-blob-container-name",
7575
"defaultSASKeyName": "RootManageSharedAccessKey",
7676
"eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]",
7777
"storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]",
78+
"tablesMgmtApiVersion": "2019-04-01",
79+
"tablesAuthorizationApiVersion": "2018-09-01-preview",
80+
"tableDataContributorRoleId": "0a9a7e1f-b9d0-4cc4-a60d-0319b160aaa3"
7881
},
7982
"resources": [
8083
{
@@ -140,6 +143,48 @@
140143
}
141144
]
142145
},
146+
{
147+
"type": "Microsoft.DocumentDB/databaseAccounts",
148+
"apiVersion": "2020-04-01",
149+
"name": "[variables('storageAccount')]",
150+
"location": "[parameters('location')]",
151+
"tags": {
152+
"defaultExperience": "Azure Table",
153+
"hidden-cosmos-mmspecial": "",
154+
"CosmosAccountType": "Non-Production"
155+
},
156+
"kind": "GlobalDocumentDB",
157+
"properties": {
158+
"publicNetworkAccess": "Enabled",
159+
"enableAutomaticFailover": false,
160+
"enableMultipleWriteLocations": false,
161+
"isVirtualNetworkFilterEnabled": false,
162+
"virtualNetworkRules": [],
163+
"disableKeyBasedMetadataWriteAccess": false,
164+
"enableFreeTier": false,
165+
"enableAnalyticalStorage": false,
166+
"databaseAccountOfferType": "Standard",
167+
"consistencyPolicy": {
168+
"defaultConsistencyLevel": "BoundedStaleness",
169+
"maxIntervalInSeconds": 86400,
170+
"maxStalenessPrefix": 1000000
171+
},
172+
"locations": [
173+
{
174+
"locationName": "[parameters('location')]",
175+
"provisioningState": "Succeeded",
176+
"failoverPriority": 0,
177+
"isZoneRedundant": false
178+
}
179+
],
180+
"capabilities": [
181+
{
182+
"name": "EnableTable"
183+
}
184+
],
185+
"ipRules": []
186+
}
187+
},
143188
{
144189
"type": "Microsoft.Authorization/roleAssignments",
145190
"apiVersion": "2019-04-01-preview",
@@ -159,6 +204,15 @@
159204
"principalId": "[parameters('testApplicationOid')]",
160205
"scope": "[resourceGroup().id]"
161206
}
207+
},
208+
{
209+
"type": "Microsoft.Authorization/roleAssignments",
210+
"apiVersion": "[variables('tablesAuthorizationApiVersion')]",
211+
"name": "[guid(concat('tableDataContributorRoleId', resourceGroup().id))]",
212+
"properties": {
213+
"roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('tableDataContributorRoleId'))]",
214+
"principalId": "[parameters('testApplicationOid')]"
215+
}
162216
}
163217
],
164218
"outputs": {
@@ -197,6 +251,14 @@
197251
"AZURE_STORAGE_ACCESS_KEY":{
198252
"type": "string",
199253
"value": "[listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value]"
254+
},
255+
"AZURE_TABLES_CONN_STR": {
256+
"type": "string",
257+
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
258+
},
259+
"AZURE_COSMOS_CONN_STR": {
260+
"type": "string",
261+
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(resourceId('Microsoft.DocumentDB/databaseAccounts', variables('storageAccount')), '2020-04-01').primaryMasterKey, ';TableEndpoint=https://', variables('storageAccount'), '.table.cosmos.azure.com:443/')]"
200262
}
201263
}
202264
}

0 commit comments

Comments
 (0)