Skip to content

Commit 807b69b

Browse files
committed
make all '' to "" in cursor
1 parent dd573c5 commit 807b69b

File tree

4 files changed

+27559
-367
lines changed

4 files changed

+27559
-367
lines changed

sdk/storage/azure-storage-blob-changefeed/azure/storage/blob/changefeed/_models.py

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class ChangeFeedPaged(PageIterator):
2727
2828
:ivar int results_per_page:
2929
The maximum number of results retrieved per API call.
30-
:ivar dict continuation_token:
30+
:ivar str continuation_token:
3131
The continuation token to retrieve the next page of results.
3232
:ivar current_page:
3333
The current page of listed results.
@@ -42,7 +42,7 @@ class ChangeFeedPaged(PageIterator):
4242
Filters the results to return only events which happened after this time.
4343
:param datetime end_time:
4444
Filters the results to return only events which happened before this time.
45-
:param dict continuation_token:
45+
:param str continuation_token:
4646
An continuation token with which to start listing events from the previous position.
4747
"""
4848
def __init__(
@@ -58,12 +58,12 @@ def __init__(
5858
extract_data=self._extract_data_cb,
5959
continuation_token=continuation_token or ""
6060
)
61-
continuation_token = eval(continuation_token) if continuation_token else None
61+
continuation_token = json.loads(continuation_token) if continuation_token else None
6262

63-
if continuation_token and container_client.primary_hostname != continuation_token['UrlHost']:
64-
raise ValueError('The token is not for the current storage account.')
65-
if continuation_token and continuation_token['CursorVersion'] != 1:
66-
raise ValueError('The CursorVersion is not supported by the current SDK.')
63+
if continuation_token and (container_client.primary_hostname != continuation_token["UrlHost"]):
64+
raise ValueError("The token is not for the current storage account.")
65+
if continuation_token and (continuation_token["CursorVersion"] != 1):
66+
raise ValueError("The CursorVersion is not supported by the current SDK.")
6767
self.results_per_page = results_per_page or 5000
6868
self.current_page = None
6969
self._change_feed = ChangeFeed(container_client, self.results_per_page, start_time=start_time,
@@ -81,11 +81,11 @@ def _extract_data_cb(self, event_list):
8181
self.current_page = event_list
8282
try:
8383
cursor = copy.deepcopy(self._change_feed.cursor)
84-
shard_cursors = cursor['CurrentSegmentCursor']['ShardCursors']
85-
cursor['CurrentSegmentCursor']['ShardCursors'] = [v for v in shard_cursors.values()]
84+
shard_cursors = cursor["CurrentSegmentCursor"]["ShardCursors"]
85+
cursor["CurrentSegmentCursor"]["ShardCursors"] = list(shard_cursors.values())
8686
except AttributeError:
8787
pass
88-
return str(cursor), self.current_page
88+
return json.dumps(cursor), self.current_page
8989

9090

9191
class ChangeFeed(object):
@@ -97,17 +97,17 @@ def __init__(self, client, page_size, start_time=None, end_time=None, cf_cursor=
9797
self.start_time = start_time
9898

9999
# the end time is in str format
100-
end_time_in_cursor = cf_cursor['EndTime'] if cf_cursor else None
100+
end_time_in_cursor = cf_cursor["EndTime"] if cf_cursor else None
101101
# convert the end time in str format to a datetime object
102102
end_time_in_cursor_obj = \
103-
datetime.strptime(end_time_in_cursor, '%Y-%m-%dT%H:%M:%S+00:00') if end_time_in_cursor else None
103+
datetime.strptime(end_time_in_cursor, "%Y-%m-%dT%H:%M:%S+00:00") if end_time_in_cursor else None
104104
# self.end_time is in datetime format
105105
self.end_time = end_time or end_time_in_cursor_obj
106106

107-
cur_segment_cursor = cf_cursor['CurrentSegmentCursor'] if cf_cursor else None
107+
cur_segment_cursor = cf_cursor["CurrentSegmentCursor"] if cf_cursor else None
108108

109109
self.cursor = {"CursorVersion": 1,
110-
"EndTime": self.end_time.strftime('%Y-%m-%dT%H:%M:%S+00:00') if self.end_time else "",
110+
"EndTime": self.end_time.strftime("%Y-%m-%dT%H:%M:%S+00:00") if self.end_time else "",
111111
"UrlHost": self.client.primary_hostname}
112112
self._initialize(cur_segment_cursor=cur_segment_cursor)
113113

@@ -147,7 +147,7 @@ def _initialize(self, cur_segment_cursor=None):
147147
start_year = self.start_time.year
148148
except AttributeError:
149149
try:
150-
start_date = self._parse_datetime_from_segment_path(cur_segment_cursor.get('SegmentPath'))
150+
start_date = self._parse_datetime_from_segment_path(cur_segment_cursor.get("SegmentPath"))
151151
start_year = start_date.year
152152
except AttributeError:
153153
start_year = ""
@@ -163,7 +163,7 @@ def _initialize(self, cur_segment_cursor=None):
163163

164164
# if change_feed_cursor is specified, start from the specified segment
165165
if cur_segment_cursor:
166-
while next_segment_path and next_segment_path != cur_segment_cursor['SegmentPath']:
166+
while next_segment_path and next_segment_path != cur_segment_cursor["SegmentPath"]:
167167
next_segment_path = next(self._segment_paths_generator)
168168

169169
self.current_segment = self._get_next_segment(
@@ -185,7 +185,7 @@ def _get_segment_paths(self, start_year=""):
185185
for path in paths:
186186
yield path.name
187187

188-
# if not searching by prefix, all paths would have been iterated already, so it's time to yield None
188+
# if not searching by prefix, all paths would have been iterated already, so it"s time to yield None
189189
if not start_year:
190190
break
191191
# search the segment prefixed with next year.
@@ -221,9 +221,10 @@ def __init__(self, client, segment_path, page_size, segment_cursor=None):
221221
self.segment_path = segment_path
222222
self.page_size = page_size
223223
self.shards = collections.deque()
224-
self.cursor = {'ShardCursors': {}, 'SegmentPath': self.segment_path}
224+
self.cursor = {"ShardCursors": {}, "SegmentPath": self.segment_path}
225225
self._initialize(segment_cursor=segment_cursor)
226-
# cursor is in this format {"segment_path", path, "CurrentShardPath": shard_path, "segment_cursor": ShardCursors dict}
226+
# cursor is in this format:
227+
# {"segment_path", path, "CurrentShardPath": shard_path, "segment_cursor": ShardCursors dict}
227228

228229
def __iter__(self):
229230
return self
@@ -240,8 +241,8 @@ def __next__(self):
240241
pass
241242

242243
# update cursor
243-
self.cursor['ShardCursors'][shard.shard_path] = shard.cursor
244-
self.cursor['CurrentShardPath'] = shard.shard_path
244+
self.cursor["ShardCursors"][shard.shard_path] = shard.cursor
245+
self.cursor["CurrentShardPath"] = shard.shard_path
245246

246247
if not segment_events:
247248
raise StopIteration
@@ -255,20 +256,20 @@ def _initialize(self, segment_cursor=None):
255256
segment_content = segment_content.decode()
256257
segment_dict = json.loads(segment_content)
257258

258-
raw_shard_paths = segment_dict['chunkFilePaths']
259+
raw_shard_paths = segment_dict["chunkFilePaths"]
259260
shard_paths = []
260261
# to strip the overhead of all raw shard paths
261262
for raw_shard_path in raw_shard_paths:
262-
shard_paths.append(raw_shard_path.replace('$blobchangefeed/', '', 1))
263+
shard_paths.append(raw_shard_path.replace("$blobchangefeed/", "", 1))
263264

264265
# TODO: we can optimize to initiate shards in parallel
265266
if not segment_cursor:
266267
for shard_path in shard_paths:
267268
self.shards.append(Shard(self.client, shard_path))
268269
else:
269-
start_shard_path = segment_cursor['CurrentShardPath']
270-
shard_cursors = {shard_cursor['CurrentChunkPath'][:-10]: shard_cursor
271-
for shard_cursor in segment_cursor['ShardCursors']}
270+
start_shard_path = segment_cursor["CurrentShardPath"]
271+
shard_cursors = {shard_cursor["CurrentChunkPath"][:-10]: shard_cursor
272+
for shard_cursor in segment_cursor["ShardCursors"]}
272273

273274
if shard_paths:
274275
# Initialize all shards using the shard cursors
@@ -318,7 +319,7 @@ def _initialize(self, shard_cursor=None):
318319
# move cursor to the expected chunk
319320
if shard_cursor:
320321
while self.unprocessed_chunk_path_props and \
321-
self.unprocessed_chunk_path_props[0].name != shard_cursor.get('CurrentChunkPath'):
322+
self.unprocessed_chunk_path_props[0].name != shard_cursor.get("CurrentChunkPath"):
322323
self.unprocessed_chunk_path_props.popleft()
323324
self.current_chunk = self._get_next_chunk(chunk_cursor=shard_cursor)
324325
else:
@@ -336,7 +337,7 @@ def __init__(self, client, chunk_path, chunk_cursor=None):
336337
self.client = client
337338
self.chunk_path = chunk_path
338339
self.file_reader = None
339-
self.cursor = {'CurrentChunkPath': chunk_path} # to track the current position in avro file
340+
self.cursor = {"CurrentChunkPath": chunk_path} # to track the current position in avro file
340341
self._data_stream = None
341342
self._initialize(chunk_cursor=chunk_cursor)
342343

@@ -346,12 +347,12 @@ def __iter__(self):
346347
def __next__(self):
347348
try:
348349
event = next(self.file_reader)
349-
self.cursor['EventIndex'] = self._data_stream.event_index
350-
self.cursor['BlockOffset'] = self._data_stream.object_position
350+
self.cursor["EventIndex"] = self._data_stream.event_index
351+
self.cursor["BlockOffset"] = self._data_stream.object_position
351352
return event
352353
except StopIteration:
353-
self.cursor['EventIndex'] = self._data_stream.event_index
354-
self.cursor['BlockOffset'] = self._data_stream.object_position
354+
self.cursor["EventIndex"] = self._data_stream.event_index
355+
self.cursor["BlockOffset"] = self._data_stream.object_position
355356
raise StopIteration
356357

357358
next = __next__ # Python 2 compatibility.
@@ -360,15 +361,15 @@ def _initialize(self, chunk_cursor=None):
360361
# To get all events in a chunk
361362
blob_client = self.client.get_blob_client(self.chunk_path)
362363

363-
file_offset = chunk_cursor.get('BlockOffset') if chunk_cursor else 0
364+
file_offset = chunk_cursor.get("BlockOffset") if chunk_cursor else 0
364365

365366
# An offset means the avro data doesn't have avro header,
366367
# so only when the data stream has a offset we need header stream to help
367368
header_stream = ChangeFeedStreamer(blob_client) if file_offset else None
368369
self._data_stream = ChangeFeedStreamer(blob_client, chunk_file_start=file_offset)
369370
self.file_reader = DataFileReader(self._data_stream, DatumReader(), header_reader=header_stream)
370371

371-
event_index = chunk_cursor.get('EventIndex') if chunk_cursor else 0
372+
event_index = chunk_cursor.get("EventIndex") if chunk_cursor else 0
372373
for _ in range(0, event_index):
373374
next(self.file_reader)
374375

@@ -385,7 +386,7 @@ def __init__(self, blob_client, chunk_file_start=0):
385386
self.event_index = 0
386387
self._point = self._chunk_file_start # file cursor position relative to the whole chunk file, not the buffered
387388
self._chunk_size = 4 * 1024 * 1024
388-
self._buf = b''
389+
self._buf = b""
389390
self._buf_start = self._chunk_file_start # the start position of the chunk file to buffer
390391
self._chunk_size_snapshot = blob_client.get_blob_properties().size
391392
length = self._chunk_size_snapshot - self._chunk_file_start
@@ -456,4 +457,3 @@ def track_object_position(self):
456457

457458
def set_object_index(self, event_index):
458459
self.event_index = event_index
459-

0 commit comments

Comments
 (0)