forked from piskvorky/smart_open
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_s3_crt.py
135 lines (123 loc) · 4.78 KB
/
test_s3_crt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import botocore
import smart_open
import torch
from torch.utils.data import Dataset, DataLoader
from functools import partial
import boto3
from smart_open.s3 import CRTclient
import time
import io
from multiprocessing.pool import Pool
class S3Dataset(torch.utils.data.IterableDataset):
def __init__(self, urls,crt):
self.urls = urls
self.crt = crt
def __iter__(self):
for url in self.urls:
with smart_open.open(
url, mode="rb", transport_params=self.crt
) as fp:
s1 = time.time()
fp.seek(512)
t1 = time.time()
buf = fp.read(19247284)
print(url,'crt',t1-s1)
yield buf
def torch_test():
prefix = "s3://waabi-live-training-datasets-us-east-1/staging/pandar_hwy101_curated_v1_human_labelled_train_2023_07_07/00000000-0000-0000-0000-00000000000"
urls = [prefix+str(i)+".tess" for i in range(10)]
crt={}
boto_session = botocore.session.Session()
boto_session.set_config_variable("tcp_keepalive", "true")
crt['session'] = boto_session
client = CRTclient(boto_session.get_config_variable('region'))
crt["client"] = client
dataset=S3Dataset(urls,crt)
loader = DataLoader(dataset,number_workers=2)
t = time.time()
for item in loader:
print(len(item))
print('time took', time.time()-t)
# def testCRT(url):
# crt={}
# prefix = "s3://waabi-live-training-datasets-us-east-1/staging/pandar_hwy101_curated_v1_human_labelled_train_2023_07_07/00000000-0000-0000-0000-00000000000"
# urls = [prefix+str(i)+".tess" for i in range(10)]
# boto_session = botocore.session.Session()
# boto_session.set_config_variable("tcp_keepalive", "true")
#
# crt['session'] = boto_session
# client = CRTclient(boto_session.get_config_variable('region'))
# crt["client"] = client
# with smart_open.open(
# url, mode="rb", transport_params=crt
# ) as fp:
# s1 = time.time()
# fp.seek(512)
# t1 = time.time()
# buf = fp.read(19247284)
# t2 = time.time()
# fp.seek(20247284)
# t3 = time.time()
# buf = fp.read(12247284)
# t4 = time.time()
# print(url,'crt',t1-s1,t2-t1,t3-t2,t4-t1)
def testGet(key):
boto_session = botocore.session.Session()
client = CRTclient(boto_session.get_config_variable('region'))
client.get_object('waabi-live-training-datasets-us-east-1',key)
def testMultiGet():
prefix = "/staging/pandar_hwy101_curated_v1_human_labelled_train_2023_07_07/00000000-0000-0000-0000-00000000000"
urls = [prefix+str(i)+".tess" for i in range(10)]
s1 = time.time()
with Pool(5) as p:
p.map(testGet, urls)
print('testMultiGet total_time:',time.time()-s1)
# def testMulti():
# crt={}
# prefix = "s3://waabi-live-training-datasets-us-east-1/staging/pandar_hwy101_curated_v1_human_labelled_train_2023_07_07/00000000-0000-0000-0000-00000000000"
# urls = [prefix+str(i)+".tess" for i in range(10)]
# s1 = time.time()
# with Pool(5) as p:
# p.map(testCRT, urls)
# print('total_time:',time.time()-s1)
def testboto():
url= "s3://waabi-live-training-datasets-us-east-1/staging/pandar_hwy101_curated_v1_human_labelled_train_2023_07_07/00000000-0000-0000-0000-000000000020.tess"
s3_config = {}
with smart_open.open(
url, mode="rb", transport_params=s3_config
) as fp:
s2 = time.time()
fp.seek(512)
t1 = time.time()
buf = fp.read(19247284)
t2 = time.time()
fp.seek(20247284)
t3 = time.time()
buf = fp.read(12247284)
t4 = time.time()
print('s3',t1-s2,t2-t1,t3-t2,t4-t1)
s3 = time.time()
print('original_time_used',s3-s2)
def testRange():
with smart_open.open(
url, mode="rb", transport_params=s3_config
) as fp:
s2 = time.time()
fp.get_object_range(512,512+19247284)
t1 = time.time()
out = fp.get_object_range(20247284,20247284+12247284)
t2 = time.time()
fp.get_object_range(30247284,30247284+19247284)
t3 = time.time()
print('s3 get_object_range',t1-s2,t2-t1,t3-t2)
def testDownload():
s3r = boto3.resource('s3')
bucket = s3r.Bucket("waabi-live-training-datasets-us-east-1")
def do_boto3_download(key: str):
bytes_buffer = io.BytesIO()
s3r.meta.client.download_fileobj(Bucket="waabi-live-training-datasets-us-east-1", Key=key, Fileobj=bytes_buffer)
return bytes_buffer
b1 = time.time()
do_boto3_download('staging/pandar_hwy101_curated_v1_human_labelled_train_2023_07_07/00000000-0000-0000-0000-000000000020.tess')
print('boto3',time.time()-b1)
testMultiGet()