-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy pathbackend.py
2338 lines (2096 loc) · 96.4 KB
/
backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pluggable Back-ends for Container Server
"""
import errno
import os
from uuid import uuid4
import six
from six.moves import range
from six.moves.urllib.parse import unquote
import sqlite3
from eventlet import tpool
from swift.common.constraints import CONTAINER_LISTING_LIMIT
from swift.common.exceptions import LockTimeout
from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \
filter_shard_ranges, ShardRangeList
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
DATADIR = 'containers'
RECORD_TYPE_OBJECT = 'object'
RECORD_TYPE_SHARD = 'shard'
SHARD_RANGE_TABLE = 'shard_range'
NOTFOUND = 'not_found'
UNSHARDED = 'unsharded'
SHARDING = 'sharding'
SHARDED = 'sharded'
COLLAPSED = 'collapsed'
SHARD_STATS_STATES = [ShardRange.ACTIVE, ShardRange.SHARDING,
ShardRange.SHRINKING]
SHARD_LISTING_STATES = SHARD_STATS_STATES + [ShardRange.CLEAVED]
SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
ShardRange.ACTIVE, ShardRange.SHARDING]
# when auditing a shard gets its own shard range, which could be in any state
# except FOUND, and any potential acceptors excluding FOUND ranges that may be
# unwanted overlaps
SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
ShardRange.ACTIVE, ShardRange.SHARDING,
ShardRange.SHARDED, ShardRange.SHRINKING,
ShardRange.SHRUNK]
# attribute names in order used when transforming shard ranges from dicts to
# tuples and vice-versa
SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count',
'bytes_used', 'meta_timestamp', 'deleted', 'state',
'state_timestamp', 'epoch', 'reported', 'tombstones')
POLICY_STAT_TABLE_CREATE = '''
CREATE TABLE policy_stat (
storage_policy_index INTEGER PRIMARY KEY,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0
);
'''
POLICY_STAT_TRIGGER_SCRIPT = '''
CREATE TRIGGER object_insert_policy_stat AFTER INSERT ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count + (1 - new.deleted),
bytes_used = bytes_used + new.size
WHERE storage_policy_index = new.storage_policy_index;
INSERT INTO policy_stat (
storage_policy_index, object_count, bytes_used)
SELECT new.storage_policy_index,
(1 - new.deleted),
new.size
WHERE NOT EXISTS(
SELECT changes() as change
FROM policy_stat
WHERE change <> 0
);
UPDATE container_info
SET hash = chexor(hash, new.name, new.created_at);
END;
CREATE TRIGGER object_delete_policy_stat AFTER DELETE ON object
BEGIN
UPDATE policy_stat
SET object_count = object_count - (1 - old.deleted),
bytes_used = bytes_used - old.size
WHERE storage_policy_index = old.storage_policy_index;
UPDATE container_info
SET hash = chexor(hash, old.name, old.created_at);
END;
'''
CONTAINER_INFO_TABLE_SCRIPT = '''
CREATE TABLE container_info (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1,
storage_policy_index INTEGER DEFAULT 0,
reconciler_sync_point INTEGER DEFAULT -1
);
'''
CONTAINER_STAT_VIEW_SCRIPT = '''
CREATE VIEW container_stat
AS SELECT ci.account, ci.container, ci.created_at,
ci.put_timestamp, ci.delete_timestamp,
ci.reported_put_timestamp, ci.reported_delete_timestamp,
ci.reported_object_count, ci.reported_bytes_used, ci.hash,
ci.id, ci.status, ci.status_changed_at, ci.metadata,
ci.x_container_sync_point1, ci.x_container_sync_point2,
ci.reconciler_sync_point,
ci.storage_policy_index,
coalesce(ps.object_count, 0) AS object_count,
coalesce(ps.bytes_used, 0) AS bytes_used
FROM container_info ci LEFT JOIN policy_stat ps
ON ci.storage_policy_index = ps.storage_policy_index;
CREATE TRIGGER container_stat_update
INSTEAD OF UPDATE ON container_stat
BEGIN
UPDATE container_info
SET account = NEW.account,
container = NEW.container,
created_at = NEW.created_at,
put_timestamp = NEW.put_timestamp,
delete_timestamp = NEW.delete_timestamp,
reported_put_timestamp = NEW.reported_put_timestamp,
reported_delete_timestamp = NEW.reported_delete_timestamp,
reported_object_count = NEW.reported_object_count,
reported_bytes_used = NEW.reported_bytes_used,
hash = NEW.hash,
id = NEW.id,
status = NEW.status,
status_changed_at = NEW.status_changed_at,
metadata = NEW.metadata,
x_container_sync_point1 = NEW.x_container_sync_point1,
x_container_sync_point2 = NEW.x_container_sync_point2,
storage_policy_index = NEW.storage_policy_index,
reconciler_sync_point = NEW.reconciler_sync_point;
END;
'''
def update_new_item_from_existing(new_item, existing):
"""
Compare the data and meta related timestamps of a new object item with
the timestamps of an existing object record, and update the new item
with data and/or meta related attributes from the existing record if
their timestamps are newer.
The multiple timestamps are encoded into a single string for storing
in the 'created_at' column of the objects db table.
:param new_item: A dict of object update attributes
:param existing: A dict of existing object attributes
:return: True if any attributes of the new item dict were found to be
newer than the existing and therefore not updated, otherwise
False implying that the updated item is equal to the existing.
"""
# item[created_at] may be updated so keep a copy of the original
# value in case we process this item again
new_item.setdefault('data_timestamp', new_item['created_at'])
# content-type and metadata timestamps may be encoded in
# item[created_at], or may be set explicitly.
item_ts_data, item_ts_ctype, item_ts_meta = decode_timestamps(
new_item['data_timestamp'])
if new_item.get('ctype_timestamp'):
item_ts_ctype = Timestamp(new_item.get('ctype_timestamp'))
item_ts_meta = item_ts_ctype
if new_item.get('meta_timestamp'):
item_ts_meta = Timestamp(new_item.get('meta_timestamp'))
if not existing:
# encode new_item timestamps into one string for db record
new_item['created_at'] = encode_timestamps(
item_ts_data, item_ts_ctype, item_ts_meta)
return True
# decode existing timestamp into separate data, content-type and
# metadata timestamps
rec_ts_data, rec_ts_ctype, rec_ts_meta = decode_timestamps(
existing['created_at'])
# Extract any swift_bytes values from the content_type values. This is
# necessary because the swift_bytes value to persist should be that at the
# most recent data timestamp whereas the content-type value to persist is
# that at the most recent content-type timestamp. The two values happen to
# be stored in the same database column for historical reasons.
for item in (new_item, existing):
content_type, swift_bytes = extract_swift_bytes(item['content_type'])
item['content_type'] = content_type
item['swift_bytes'] = swift_bytes
newer_than_existing = [True, True, True]
if rec_ts_data >= item_ts_data:
# apply data attributes from existing record
new_item.update([(k, existing[k])
for k in ('size', 'etag', 'deleted', 'swift_bytes')])
item_ts_data = rec_ts_data
newer_than_existing[0] = False
if rec_ts_ctype >= item_ts_ctype:
# apply content-type attribute from existing record
new_item['content_type'] = existing['content_type']
item_ts_ctype = rec_ts_ctype
newer_than_existing[1] = False
if rec_ts_meta >= item_ts_meta:
# apply metadata timestamp from existing record
item_ts_meta = rec_ts_meta
newer_than_existing[2] = False
# encode updated timestamps into one string for db record
new_item['created_at'] = encode_timestamps(
item_ts_data, item_ts_ctype, item_ts_meta)
# append the most recent swift_bytes onto the most recent content_type in
# new_item and restore existing to its original state
for item in (new_item, existing):
if item['swift_bytes']:
item['content_type'] += ';swift_bytes=%s' % item['swift_bytes']
del item['swift_bytes']
return any(newer_than_existing)
def merge_shards(shard_data, existing):
"""
Compares ``shard_data`` with ``existing`` and updates ``shard_data`` with
any items of ``existing`` that take precedence over the corresponding item
in ``shard_data``.
:param shard_data: a dict representation of shard range that may be
modified by this method.
:param existing: a dict representation of shard range.
:returns: True if ``shard data`` has any item(s) that are considered to
take precedence over the corresponding item in ``existing``
"""
if not existing:
return True
if existing['timestamp'] < shard_data['timestamp']:
# note that currently we do not roll forward any meta or state from
# an item that was created at older time, newer created time trumps
shard_data['reported'] = 0 # reset the latch
return True
elif existing['timestamp'] > shard_data['timestamp']:
return False
new_content = False
# timestamp must be the same, so preserve existing range bounds and deleted
for k in ('lower', 'upper', 'deleted'):
shard_data[k] = existing[k]
# now we need to look for meta data updates
if existing['meta_timestamp'] >= shard_data['meta_timestamp']:
for k in ('object_count', 'bytes_used', 'meta_timestamp'):
shard_data[k] = existing[k]
shard_data['tombstones'] = existing.get('tombstones', -1)
else:
new_content = True
# We can latch the reported flag
if existing['reported'] and \
existing['object_count'] == shard_data['object_count'] and \
existing['bytes_used'] == shard_data['bytes_used'] and \
existing.get('tombstones', -1) == shard_data['tombstones'] and \
existing['state'] == shard_data['state'] and \
existing['epoch'] == shard_data['epoch']:
shard_data['reported'] = 1
else:
shard_data.setdefault('reported', 0)
if shard_data['reported'] and not existing['reported']:
new_content = True
if (existing['state_timestamp'] == shard_data['state_timestamp']
and shard_data['state'] > existing['state']):
new_content = True
elif existing['state_timestamp'] >= shard_data['state_timestamp']:
for k in ('state', 'state_timestamp', 'epoch'):
shard_data[k] = existing[k]
else:
new_content = True
return new_content
class ContainerBroker(DatabaseBroker):
"""
Encapsulates working with a container database.
Note that this may involve multiple on-disk DB files if the container
becomes sharded:
* :attr:`_db_file` is the path to the legacy container DB name, i.e.
``<hash>.db``. This file should exist for an initialised broker that
has never been sharded, but will not exist once a container has been
sharded.
* :attr:`db_files` is a list of existing db files for the broker. This
list should have at least one entry for an initialised broker, and
should have two entries while a broker is in SHARDING state.
* :attr:`db_file` is the path to whichever db is currently authoritative
for the container. Depending on the container's state, this may not be
the same as the ``db_file`` argument given to :meth:`~__init__`, unless
``force_db_file`` is True in which case :attr:`db_file` is always equal
to the ``db_file`` argument given to :meth:`~__init__`.
* :attr:`pending_file` is always equal to :attr:`_db_file` extended with
``.pending``, i.e. ``<hash>.db.pending``.
"""
db_type = 'container'
db_contains_type = 'object'
db_reclaim_timestamp = 'created_at'
delete_meta_whitelist = ['x-container-sysmeta-shard-quoted-root',
'x-container-sysmeta-shard-root']
def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None,
account=None, container=None, pending_timeout=None,
stale_reads_ok=False, skip_commits=False,
force_db_file=False):
self._init_db_file = db_file
if db_file == ':memory:':
base_db_file = db_file
else:
base_db_file = make_db_file_path(db_file, None)
super(ContainerBroker, self).__init__(
base_db_file, timeout, logger, account, container, pending_timeout,
stale_reads_ok, skip_commits=skip_commits)
# the root account and container are populated on demand
self._root_account = self._root_container = None
self._force_db_file = force_db_file
self._db_files = None
@classmethod
def create_broker(cls, device_path, part, account, container, logger=None,
epoch=None, put_timestamp=None,
storage_policy_index=None):
"""
Create a ContainerBroker instance. If the db doesn't exist, initialize
the db file.
:param device_path: device path
:param part: partition number
:param account: account name string
:param container: container name string
:param logger: a logger instance
:param epoch: a timestamp to include in the db filename
:param put_timestamp: initial timestamp if broker needs to be
initialized
:param storage_policy_index: the storage policy index
:return: a :class:`swift.container.backend.ContainerBroker` instance
"""
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = make_db_file_path(
os.path.join(device_path, db_dir, hsh + '.db'), epoch)
broker = ContainerBroker(db_path, account=account, container=container,
logger=logger)
if not os.path.exists(broker.db_file):
try:
broker.initialize(put_timestamp, storage_policy_index)
except DatabaseAlreadyExists:
pass
return broker
def get_db_state(self):
"""
Returns the current state of on disk db files.
"""
if self._db_file == ':memory:':
return UNSHARDED
if not self.db_files:
return NOTFOUND
if len(self.db_files) > 1:
return SHARDING
if self.db_epoch is None:
# never been sharded
return UNSHARDED
if self.db_epoch != self._own_shard_range().epoch:
return UNSHARDED
if not self.get_shard_ranges():
return COLLAPSED
return SHARDED
def sharding_initiated(self):
"""
Returns True if a broker has shard range state that would be necessary
for sharding to have been initiated, False otherwise.
"""
own_shard_range = self.get_own_shard_range()
if own_shard_range.state in (ShardRange.SHARDING,
ShardRange.SHRINKING,
ShardRange.SHARDED,
ShardRange.SHRUNK):
return bool(self.get_shard_ranges())
return False
def sharding_required(self):
"""
Returns True if a broker has shard range state that would be necessary
for sharding to have been initiated but has not yet completed sharding,
False otherwise.
"""
db_state = self.get_db_state()
return (db_state == SHARDING or
(db_state == UNSHARDED and self.sharding_initiated()))
def is_sharded(self):
return self.get_db_state() == SHARDED
def reload_db_files(self):
"""
Reloads the cached list of valid on disk db files for this broker.
"""
if self._db_file == ':memory:':
return
# reset connection so the next access will use the correct DB file
self.conn = None
self._db_files = get_db_files(self._init_db_file)
@property
def db_files(self):
"""
Gets the cached list of valid db files that exist on disk for this
broker.
The cached list may be refreshed by calling
:meth:`~swift.container.backend.ContainerBroker.reload_db_files`.
:return: A list of paths to db files ordered by ascending epoch;
the list may be empty.
"""
if not self._db_files:
self.reload_db_files()
return self._db_files
@property
def db_file(self):
"""
Get the path to the primary db file for this broker. This is typically
the db file for the most recent sharding epoch. However, if no db files
exist on disk, or if ``force_db_file`` was True when the broker was
constructed, then the primary db file is the file passed to the broker
constructor.
:return: A path to a db file; the file does not necessarily exist.
"""
if self._force_db_file:
return self._init_db_file
if self.db_files:
return self.db_files[-1]
return self._init_db_file
@property
def db_epoch(self):
hash_, epoch, ext = parse_db_filename(self.db_file)
return epoch
@property
def storage_policy_index(self):
if not hasattr(self, '_storage_policy_index'):
self._storage_policy_index = \
self.get_info()['storage_policy_index']
return self._storage_policy_index
@property
def path(self):
self._populate_instance_cache()
return '%s/%s' % (self.account, self.container)
def _initialize(self, conn, put_timestamp, storage_policy_index):
"""
Create a brand new container database (tables, indices, triggers, etc.)
"""
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
if not self.container:
raise ValueError(
'Attempting to create a new database with no container set')
if storage_policy_index is None:
storage_policy_index = 0
self.create_object_table(conn)
self.create_policy_stat_table(conn, storage_policy_index)
self.create_container_info_table(conn, put_timestamp,
storage_policy_index)
self.create_shard_range_table(conn)
self._db_files = None
def create_object_table(self, conn):
"""
Create the object table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
:param conn: DB connection object
"""
conn.executescript("""
CREATE TABLE object (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
created_at TEXT,
size INTEGER,
content_type TEXT,
etag TEXT,
deleted INTEGER DEFAULT 0,
storage_policy_index INTEGER DEFAULT 0
);
CREATE INDEX ix_object_deleted_name ON object (deleted, name);
CREATE TRIGGER object_update BEFORE UPDATE ON object
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""" + POLICY_STAT_TRIGGER_SCRIPT)
def create_container_info_table(self, conn, put_timestamp,
storage_policy_index):
"""
Create the container_info table which is specific to the container DB.
Not a part of Pluggable Back-ends, internal to the baseline code.
Also creates the container_stat view.
:param conn: DB connection object
:param put_timestamp: put timestamp
:param storage_policy_index: storage policy index
"""
if put_timestamp is None:
put_timestamp = Timestamp(0).internal
# The container_stat view is for compatibility; old versions of Swift
# expected a container_stat table with columns "object_count" and
# "bytes_used", but when that stuff became per-storage-policy and
# moved to the policy_stat table, we stopped creating those columns in
# container_stat.
#
# To retain compatibility, we create the container_stat view with some
# triggers to make it behave like the old container_stat table. This
# way, if an old version of Swift encounters a database with the new
# schema, it can still work.
#
# Note that this can occur during a rolling Swift upgrade if a DB gets
# rsynced from an old node to a new, so it's necessary for
# availability during upgrades. The fact that it enables downgrades is
# a nice bonus.
conn.executescript(CONTAINER_INFO_TABLE_SCRIPT +
CONTAINER_STAT_VIEW_SCRIPT)
conn.execute("""
INSERT INTO container_info (account, container, created_at, id,
put_timestamp, status_changed_at, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?);
""", (self.account, self.container, Timestamp.now().internal,
str(uuid4()), put_timestamp, put_timestamp,
storage_policy_index))
def create_policy_stat_table(self, conn, storage_policy_index=0):
"""
Create policy_stat table.
:param conn: DB connection object
:param storage_policy_index: the policy_index the container is
being created with
"""
conn.executescript(POLICY_STAT_TABLE_CREATE)
conn.execute("""
INSERT INTO policy_stat (storage_policy_index)
VALUES (?)
""", (storage_policy_index,))
def create_shard_range_table(self, conn):
"""
Create the shard_range table which is specific to the container DB.
:param conn: DB connection object
"""
# Use execute (not executescript ) so we get the benefits of our
# GreenDBConnection. Creating a table requires a whole-DB lock;
# *any* in-progress cursor will otherwise trip a "database is locked"
# error.
conn.execute("""
CREATE TABLE %s (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
timestamp TEXT,
lower TEXT,
upper TEXT,
object_count INTEGER DEFAULT 0,
bytes_used INTEGER DEFAULT 0,
meta_timestamp TEXT,
deleted INTEGER DEFAULT 0,
state INTEGER,
state_timestamp TEXT,
epoch TEXT,
reported INTEGER DEFAULT 0,
tombstones INTEGER DEFAULT -1
);
""" % SHARD_RANGE_TABLE)
conn.execute("""
CREATE TRIGGER shard_range_update BEFORE UPDATE ON %s
BEGIN
SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT');
END;
""" % SHARD_RANGE_TABLE)
def get_db_version(self, conn):
if self._db_version == -1:
self._db_version = 0
for row in conn.execute('''
SELECT name FROM sqlite_master
WHERE name = 'ix_object_deleted_name' '''):
self._db_version = 1
return self._db_version
def _get_deleted_key(self, connection):
if self.get_db_version(connection) < 1:
return '+deleted'
return 'deleted'
def _newid(self, conn):
conn.execute('''
UPDATE container_stat
SET reported_put_timestamp = 0, reported_delete_timestamp = 0,
reported_object_count = 0, reported_bytes_used = 0''')
def _commit_puts_load(self, item_list, entry):
"""See :func:`swift.common.db.DatabaseBroker._commit_puts_load`"""
(name, timestamp, size, content_type, etag, deleted) = entry[:6]
if len(entry) > 6:
storage_policy_index = entry[6]
else:
storage_policy_index = 0
content_type_timestamp = meta_timestamp = None
if len(entry) > 7:
content_type_timestamp = entry[7]
if len(entry) > 8:
meta_timestamp = entry[8]
item_list.append({'name': name,
'created_at': timestamp,
'size': size,
'content_type': content_type,
'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index,
'ctype_timestamp': content_type_timestamp,
'meta_timestamp': meta_timestamp})
def _empty(self):
self._commit_puts_stale_ok()
with self.get() as conn:
try:
row = conn.execute(
'SELECT max(object_count) from policy_stat').fetchone()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
row = conn.execute(
'SELECT object_count from container_stat').fetchone()
return zero_like(row[0])
def empty(self):
"""
Check if container DB is empty.
This method uses more stringent checks on object count than
:meth:`is_deleted`: this method checks that there are no objects in any
policy; if the container is in the process of sharding then both fresh
and retiring databases are checked to be empty; if a root container has
shard ranges then they are checked to be empty.
:returns: True if the database has no active objects, False otherwise
"""
if not all(broker._empty() for broker in self.get_brokers()):
return False
if self.is_root_container() and self.sharding_initiated():
# sharded shards dont get updates from their shards so their shard
# usage should not be relied upon
return self.get_shard_usage()['object_count'] <= 0
return True
def delete_object(self, name, timestamp, storage_policy_index=0):
"""
Mark an object deleted.
:param name: object name to be deleted
:param timestamp: timestamp when the object was marked as deleted
:param storage_policy_index: the storage policy index for the object
"""
self.put_object(name, timestamp, 0, 'application/deleted', 'noetag',
deleted=1, storage_policy_index=storage_policy_index)
def make_tuple_for_pickle(self, record):
return (record['name'], record['created_at'], record['size'],
record['content_type'], record['etag'], record['deleted'],
record['storage_policy_index'],
record['ctype_timestamp'],
record['meta_timestamp'])
def put_object(self, name, timestamp, size, content_type, etag, deleted=0,
storage_policy_index=0, ctype_timestamp=None,
meta_timestamp=None):
"""
Creates an object in the DB with its metadata.
:param name: object name to be created
:param timestamp: timestamp of when the object was created
:param size: object size
:param content_type: object content-type
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deleted_at timestamp to timestamp
:param storage_policy_index: the storage policy index for the object
:param ctype_timestamp: timestamp of when content_type was last
updated
:param meta_timestamp: timestamp of when metadata was last updated
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
'content_type': content_type, 'etag': etag,
'deleted': deleted,
'storage_policy_index': storage_policy_index,
'ctype_timestamp': ctype_timestamp,
'meta_timestamp': meta_timestamp}
self.put_record(record)
def remove_objects(self, lower, upper, max_row=None):
"""
Removes object records in the given namespace range from the object
table.
Note that objects are removed regardless of their storage_policy_index.
:param lower: defines the lower bound of object names that will be
removed; names greater than this value will be removed; names less
than or equal to this value will not be removed.
:param upper: defines the upper bound of object names that will be
removed; names less than or equal to this value will be removed;
names greater than this value will not be removed. The empty string
is interpreted as there being no upper bound.
:param max_row: if specified only rows less than or equal to max_row
will be removed
"""
query_conditions = []
query_args = []
if max_row is not None:
query_conditions.append('ROWID <= ?')
query_args.append(str(max_row))
if lower:
query_conditions.append('name > ?')
query_args.append(lower)
if upper:
query_conditions.append('name <= ?')
query_args.append(upper)
query = 'DELETE FROM object WHERE deleted in (0, 1)'
if query_conditions:
query += ' AND ' + ' AND '.join(query_conditions)
with self.get() as conn:
conn.execute(query, query_args)
conn.commit()
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
**kwargs):
"""
Apply delete logic to database info.
:returns: True if the DB is considered to be deleted, False otherwise
"""
# The container is considered deleted if the delete_timestamp
# value is greater than the put_timestamp, and there are no
# objects in the container.
return zero_like(object_count) and (
Timestamp(delete_timestamp) > Timestamp(put_timestamp))
def _is_deleted(self, conn):
"""
Check if the DB is considered to be deleted.
This object count used in this check is the same as the container
object count that would be returned in the result of :meth:`get_info`
and exposed to a client i.e. it is based on the container_stat view for
the current storage policy index or relevant shard range usage.
:param conn: database conn
:returns: True if the DB is considered to be deleted, False otherwise
"""
info = conn.execute('''
SELECT put_timestamp, delete_timestamp, object_count
FROM container_stat''').fetchone()
info = dict(info)
info.update(self._get_alternate_object_stats()[1])
return self._is_deleted_info(**info)
def is_old_enough_to_reclaim(self, now, reclaim_age):
with self.get() as conn:
info = conn.execute('''
SELECT put_timestamp, delete_timestamp
FROM container_stat''').fetchone()
return (Timestamp(now - reclaim_age) >
Timestamp(info['delete_timestamp']) >
Timestamp(info['put_timestamp']))
def is_empty_enough_to_reclaim(self):
if self.is_root_container() and (self.get_shard_ranges() or
self.get_db_state() == SHARDING):
return False
return self.empty()
def is_reclaimable(self, now, reclaim_age):
return self.is_old_enough_to_reclaim(now, reclaim_age) and \
self.is_empty_enough_to_reclaim()
def get_info_is_deleted(self):
"""
Get the is_deleted status and info for the container.
:returns: a tuple, in the form (info, is_deleted) info is a dict as
returned by get_info and is_deleted is a boolean.
"""
if self.db_file != ':memory:' and not os.path.exists(self.db_file):
return {}, True
info = self.get_info()
return info, self._is_deleted_info(**info)
def get_replication_info(self):
info = super(ContainerBroker, self).get_replication_info()
info['shard_max_row'] = self.get_max_row(SHARD_RANGE_TABLE)
return info
def _do_get_info_query(self, conn):
data = None
trailing_sync = 'x_container_sync_point1, x_container_sync_point2'
trailing_pol = 'storage_policy_index'
errors = set()
while not data:
try:
data = conn.execute(('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, status_changed_at,
object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s, %s
FROM container_stat
''') % (trailing_sync, trailing_pol)).fetchone()
except sqlite3.OperationalError as err:
err_msg = str(err)
if err_msg in errors:
# only attempt migration once
raise
errors.add(err_msg)
if 'no such column: storage_policy_index' in err_msg:
trailing_pol = '0 AS storage_policy_index'
elif 'no such column: x_container_sync_point' in err_msg:
trailing_sync = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
# populate instance cache
self._storage_policy_index = data['storage_policy_index']
self.account = data['account']
self.container = data['container']
return data
def _get_info(self):
self._commit_puts_stale_ok()
with self.get() as conn:
return self._do_get_info_query(conn)
def _populate_instance_cache(self, conn=None):
# load cached instance attributes from the database if necessary
if self.container is None:
with self.maybe_get(conn) as conn:
self._do_get_info_query(conn)
def _get_alternate_object_stats(self):
state = self.get_db_state()
if state == SHARDING:
other_info = self.get_brokers()[0]._get_info()
stats = {'object_count': other_info['object_count'],
'bytes_used': other_info['bytes_used']}
elif state == SHARDED and self.is_root_container():
stats = self.get_shard_usage()
else:
stats = {}
return state, stats
def get_info(self):
"""
Get global data for the container.
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, status_changed_at,
object_count, bytes_used, reported_put_timestamp,
reported_delete_timestamp, reported_object_count,
reported_bytes_used, hash, id, x_container_sync_point1,
x_container_sync_point2, and storage_policy_index,
db_state.
"""
data = self._get_info()
state, stats = self._get_alternate_object_stats()
data.update(stats)
data['db_state'] = state
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError as err:
if 'no such column: x_container_sync_point' not in \
str(err):
raise
self._migrate_add_container_sync_points(conn)
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.commit()
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?,
x_container_sync_point2 = ?
''', (sync_point1, sync_point2))
elif sync_point1 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?
''', (sync_point1,))
elif sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point2 = ?
''', (sync_point2,))
def get_policy_stats(self):
with self.get() as conn:
try:
info = conn.execute('''
SELECT storage_policy_index, object_count, bytes_used
FROM policy_stat
''').fetchall()
except sqlite3.OperationalError as err:
if not any(msg in str(err) for msg in (
"no such column: storage_policy_index",
"no such table: policy_stat")):
raise
info = conn.execute('''
SELECT 0 as storage_policy_index, object_count, bytes_used
FROM container_stat
''').fetchall()
policy_stats = {}
for row in info:
stats = dict(row)
key = stats.pop('storage_policy_index')
policy_stats[key] = stats
return policy_stats
def has_multiple_policies(self):
with self.get() as conn:
try:
curs = conn.execute('''
SELECT count(storage_policy_index)
FROM policy_stat
''').fetchone()
except sqlite3.OperationalError as err:
if 'no such table: policy_stat' not in str(err):
raise
# no policy_stat row
return False
if curs and curs[0] > 1:
return True