@@ -24,6 +24,7 @@ import (
24
24
"net/url"
25
25
"path"
26
26
"strings"
27
+ "sync"
27
28
"time"
28
29
29
30
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
@@ -35,10 +36,9 @@ import (
35
36
"github.com/cs3org/reva/pkg/rgrpc/status"
36
37
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
37
38
"github.com/cs3org/reva/pkg/storage/utils/etag"
38
- "github.com/cs3org/reva/pkg/storage/utils/templates"
39
- "github.com/cs3org/reva/pkg/user"
40
39
"github.com/cs3org/reva/pkg/utils"
41
40
"github.com/dgrijalva/jwt-go"
41
+ "github.com/google/uuid"
42
42
"github.com/pkg/errors"
43
43
)
44
44
@@ -193,6 +193,7 @@ func (s *svc) getHome(_ context.Context) string {
193
193
// TODO(labkode): issue #601, /home will be hardcoded.
194
194
return "/home"
195
195
}
196
+
196
197
func (s * svc ) InitiateFileDownload (ctx context.Context , req * provider.InitiateFileDownloadRequest ) (* gateway.InitiateFileDownloadResponse , error ) {
197
198
log := appctx .GetLogger (ctx )
198
199
p , st := s .getPath (ctx , req .Ref )
@@ -366,6 +367,7 @@ func (s *svc) InitiateFileDownload(ctx context.Context, req *provider.InitiateFi
366
367
}
367
368
368
369
func (s * svc ) initiateFileDownload (ctx context.Context , req * provider.InitiateFileDownloadRequest ) (* gateway.InitiateFileDownloadResponse , error ) {
370
+ // TODO(ishank011): enable downloading references spread across storage providers, eg. /eos
369
371
c , err := s .find (ctx , req .Ref )
370
372
if err != nil {
371
373
return & gateway.InitiateFileDownloadResponse {
@@ -857,6 +859,7 @@ func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provide
857
859
}
858
860
859
861
func (s * svc ) delete (ctx context.Context , req * provider.DeleteRequest ) (* provider.DeleteResponse , error ) {
862
+ // TODO(ishank011): enable deleting references spread across storage providers, eg. /eos
860
863
c , err := s .find (ctx , req .Ref )
861
864
if err != nil {
862
865
return & provider.DeleteResponse {
@@ -974,19 +977,20 @@ func (s *svc) Move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
974
977
}
975
978
976
979
func (s * svc ) move (ctx context.Context , req * provider.MoveRequest ) (* provider.MoveResponse , error ) {
977
- srcP , err := s .findProvider (ctx , req .Source )
980
+ srcList , err := s .findProviders (ctx , req .Source )
978
981
if err != nil {
979
982
return & provider.MoveResponse {
980
983
Status : status .NewStatusFromErrType (ctx , "move src=" + req .Source .String (), err ),
981
984
}, nil
982
985
}
983
986
984
- dstP , err := s .findProvider (ctx , req .Destination )
987
+ dstList , err := s .findProviders (ctx , req .Destination )
985
988
if err != nil {
986
989
return & provider.MoveResponse {
987
990
Status : status .NewStatusFromErrType (ctx , "move dst=" + req .Destination .String (), err ),
988
991
}, nil
989
992
}
993
+ srcP , dstP := srcList [0 ], dstList [0 ]
990
994
991
995
// if providers are not the same we do not implement cross storage copy yet.
992
996
if srcP .Address != dstP .Address {
@@ -1007,6 +1011,7 @@ func (s *svc) move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
1007
1011
}
1008
1012
1009
1013
func (s * svc ) SetArbitraryMetadata (ctx context.Context , req * provider.SetArbitraryMetadataRequest ) (* provider.SetArbitraryMetadataResponse , error ) {
1014
+ // TODO(ishank011): enable for references spread across storage providers, eg. /eos
1010
1015
c , err := s .find (ctx , req .Ref )
1011
1016
if err != nil {
1012
1017
return & provider.SetArbitraryMetadataResponse {
@@ -1023,6 +1028,7 @@ func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitra
1023
1028
}
1024
1029
1025
1030
func (s * svc ) UnsetArbitraryMetadata (ctx context.Context , req * provider.UnsetArbitraryMetadataRequest ) (* provider.UnsetArbitraryMetadataResponse , error ) {
1031
+ // TODO(ishank011): enable for references spread across storage providers, eg. /eos
1026
1032
c , err := s .find (ctx , req .Ref )
1027
1033
if err != nil {
1028
1034
return & provider.UnsetArbitraryMetadataResponse {
@@ -1142,14 +1148,89 @@ func (s *svc) statSharesFolder(ctx context.Context) (*provider.StatResponse, err
1142
1148
}
1143
1149
1144
1150
func (s * svc ) stat (ctx context.Context , req * provider.StatRequest ) (* provider.StatResponse , error ) {
1145
- c , err := s .find (ctx , req .Ref )
1151
+ providers , err := s .findProviders (ctx , req .Ref )
1146
1152
if err != nil {
1147
1153
return & provider.StatResponse {
1148
- Status : status .NewStatusFromErrType (ctx , "stat ref= " + req .Ref .String (), err ),
1154
+ Status : status .NewStatusFromErrType (ctx , "stat ref: " + req .Ref .String (), err ),
1149
1155
}, nil
1150
1156
}
1151
1157
1152
- return c .Stat (ctx , req )
1158
+ resPath := req .Ref .GetPath ()
1159
+ if len (providers ) == 1 && (resPath == "" || strings .HasPrefix (resPath , providers [0 ].ProviderPath )) {
1160
+ c , err := s .getStorageProviderClient (ctx , providers [0 ])
1161
+ if err != nil {
1162
+ return & provider.StatResponse {
1163
+ Status : status .NewInternal (ctx , err , "error connecting to storage provider=" + providers [0 ].Address ),
1164
+ }, nil
1165
+ }
1166
+ return c .Stat (ctx , req )
1167
+ }
1168
+
1169
+ infoFromProviders := make ([]* provider.ResourceInfo , len (providers ))
1170
+ errors := make ([]error , len (providers ))
1171
+ var wg sync.WaitGroup
1172
+
1173
+ for i , p := range providers {
1174
+ wg .Add (1 )
1175
+ go s .statOnProvider (ctx , req , infoFromProviders [i ], p , & errors [i ], & wg )
1176
+ }
1177
+ wg .Wait ()
1178
+
1179
+ var totalSize uint64
1180
+ for i := range providers {
1181
+ if errors [i ] != nil {
1182
+ return & provider.StatResponse {
1183
+ Status : status .NewStatusFromErrType (ctx , "stat ref: " + req .Ref .String (), errors [i ]),
1184
+ }, nil
1185
+ }
1186
+ if infoFromProviders [i ] != nil {
1187
+ totalSize += infoFromProviders [i ].Size
1188
+ }
1189
+ }
1190
+
1191
+ // TODO(ishank011): aggregrate other properties for references spread across storage providers, eg. /eos
1192
+ return & provider.StatResponse {
1193
+ Status : status .NewOK (ctx ),
1194
+ Info : & provider.ResourceInfo {
1195
+ Id : & provider.ResourceId {
1196
+ StorageId : "/" ,
1197
+ OpaqueId : uuid .New ().String (),
1198
+ },
1199
+ Type : provider .ResourceType_RESOURCE_TYPE_CONTAINER ,
1200
+ Path : resPath ,
1201
+ Size : totalSize ,
1202
+ },
1203
+ }, nil
1204
+ }
1205
+
1206
+ func (s * svc ) statOnProvider (ctx context.Context , req * provider.StatRequest , res * provider.ResourceInfo , p * registry.ProviderInfo , e * error , wg * sync.WaitGroup ) {
1207
+ defer wg .Done ()
1208
+ c , err := s .getStorageProviderClient (ctx , p )
1209
+ if err != nil {
1210
+ * e = errors .Wrap (err , "error connecting to storage provider=" + p .Address )
1211
+ return
1212
+ }
1213
+
1214
+ resPath := path .Clean (req .Ref .GetPath ())
1215
+ newPath := req .Ref .GetPath ()
1216
+ if resPath != "" && ! strings .HasPrefix (resPath , p .ProviderPath ) {
1217
+ newPath = p .ProviderPath
1218
+ }
1219
+ r , err := c .Stat (ctx , & provider.StatRequest {
1220
+ Ref : & provider.Reference {
1221
+ Spec : & provider.Reference_Path {
1222
+ Path : newPath ,
1223
+ },
1224
+ },
1225
+ })
1226
+ if err != nil {
1227
+ * e = errors .Wrap (err , "gateway: error calling ListContainer" )
1228
+ return
1229
+ }
1230
+ if res == nil {
1231
+ res = & provider.ResourceInfo {}
1232
+ }
1233
+ * res = * r .Info
1153
1234
}
1154
1235
1155
1236
func (s * svc ) Stat (ctx context.Context , req * provider.StatRequest ) (* provider.StatResponse , error ) {
@@ -1454,19 +1535,88 @@ func (s *svc) listSharesFolder(ctx context.Context) (*provider.ListContainerResp
1454
1535
}
1455
1536
1456
1537
func (s * svc ) listContainer (ctx context.Context , req * provider.ListContainerRequest ) (* provider.ListContainerResponse , error ) {
1457
- c , err := s .find (ctx , req .Ref )
1538
+ providers , err := s .findProviders (ctx , req .Ref )
1458
1539
if err != nil {
1459
1540
return & provider.ListContainerResponse {
1460
- Status : status .NewStatusFromErrType (ctx , "listContainer ref= " + req .Ref .String (), err ),
1541
+ Status : status .NewStatusFromErrType (ctx , "listContainer ref: " + req .Ref .String (), err ),
1461
1542
}, nil
1462
1543
}
1463
1544
1464
- res , err := c .ListContainer (ctx , req )
1545
+ resPath := path .Clean (req .Ref .GetPath ())
1546
+ infoFromProviders := make ([][]* provider.ResourceInfo , len (providers ))
1547
+ errors := make ([]error , len (providers ))
1548
+ var wg sync.WaitGroup
1549
+
1550
+ for i , p := range providers {
1551
+ wg .Add (1 )
1552
+ go s .listContainerOnProvider (ctx , req , & infoFromProviders [i ], p , & errors [i ], & wg )
1553
+ }
1554
+ wg .Wait ()
1555
+
1556
+ infos := []* provider.ResourceInfo {}
1557
+ indirects := make (map [string ][]* provider.ResourceInfo )
1558
+ for i := range providers {
1559
+ if errors [i ] != nil {
1560
+ return & provider.ListContainerResponse {
1561
+ Status : status .NewStatusFromErrType (ctx , "listContainer ref: " + req .Ref .String (), errors [i ]),
1562
+ }, nil
1563
+ }
1564
+ for _ , inf := range infoFromProviders [i ] {
1565
+ if parent := path .Dir (inf .Path ); resPath != "" && resPath != parent {
1566
+ parts := strings .Split (strings .TrimPrefix (inf .Path , resPath ), "/" )
1567
+ p := path .Join (resPath , parts [1 ])
1568
+ indirects [p ] = append (indirects [p ], inf )
1569
+ } else {
1570
+ infos = append (infos , inf )
1571
+ }
1572
+ }
1573
+ }
1574
+
1575
+ for k , v := range indirects {
1576
+ inf := & provider.ResourceInfo {
1577
+ Id : & provider.ResourceId {
1578
+ StorageId : "/" ,
1579
+ OpaqueId : uuid .New ().String (),
1580
+ },
1581
+ Type : provider .ResourceType_RESOURCE_TYPE_CONTAINER ,
1582
+ Etag : etag .GenerateEtagFromResources (nil , v ),
1583
+ Path : k ,
1584
+ Size : 0 ,
1585
+ }
1586
+ infos = append (infos , inf )
1587
+ }
1588
+
1589
+ return & provider.ListContainerResponse {
1590
+ Status : status .NewOK (ctx ),
1591
+ Infos : infos ,
1592
+ }, nil
1593
+ }
1594
+
1595
+ func (s * svc ) listContainerOnProvider (ctx context.Context , req * provider.ListContainerRequest , res * []* provider.ResourceInfo , p * registry.ProviderInfo , e * error , wg * sync.WaitGroup ) {
1596
+ defer wg .Done ()
1597
+ c , err := s .getStorageProviderClient (ctx , p )
1465
1598
if err != nil {
1466
- return nil , errors .Wrap (err , "gateway: error calling ListContainer" )
1599
+ * e = errors .Wrap (err , "error connecting to storage provider=" + p .Address )
1600
+ return
1467
1601
}
1468
1602
1469
- return res , nil
1603
+ resPath := path .Clean (req .Ref .GetPath ())
1604
+ newPath := req .Ref .GetPath ()
1605
+ if resPath != "" && ! strings .HasPrefix (resPath , p .ProviderPath ) {
1606
+ newPath = p .ProviderPath
1607
+ }
1608
+ r , err := c .ListContainer (ctx , & provider.ListContainerRequest {
1609
+ Ref : & provider.Reference {
1610
+ Spec : & provider.Reference_Path {
1611
+ Path : newPath ,
1612
+ },
1613
+ },
1614
+ })
1615
+ if err != nil {
1616
+ * e = errors .Wrap (err , "gateway: error calling ListContainer" )
1617
+ return
1618
+ }
1619
+ * res = r .Infos
1470
1620
}
1471
1621
1472
1622
func (s * svc ) ListContainer (ctx context.Context , req * provider.ListContainerRequest ) (* provider.ListContainerResponse , error ) {
@@ -1888,11 +2038,11 @@ func (s *svc) findByPath(ctx context.Context, path string) (provider.ProviderAPI
1888
2038
}
1889
2039
1890
2040
func (s * svc ) find (ctx context.Context , ref * provider.Reference ) (provider.ProviderAPIClient , error ) {
1891
- p , err := s .findProvider (ctx , ref )
2041
+ p , err := s .findProviders (ctx , ref )
1892
2042
if err != nil {
1893
2043
return nil , err
1894
2044
}
1895
- return s .getStorageProviderClient (ctx , p )
2045
+ return s .getStorageProviderClient (ctx , p [ 0 ] )
1896
2046
}
1897
2047
1898
2048
func (s * svc ) getStorageProviderClient (_ context.Context , p * registry.ProviderInfo ) (provider.ProviderAPIClient , error ) {
@@ -1905,37 +2055,13 @@ func (s *svc) getStorageProviderClient(_ context.Context, p *registry.ProviderIn
1905
2055
return c , nil
1906
2056
}
1907
2057
1908
- func (s * svc ) findProvider (ctx context.Context , ref * provider.Reference ) (* registry.ProviderInfo , error ) {
1909
- home := s .getHome (ctx )
1910
- if strings .HasPrefix (ref .GetPath (), home ) && s .c .HomeMapping != "" {
1911
- if u , ok := user .ContextGetUser (ctx ); ok {
1912
- layout := templates .WithUser (u , s .c .HomeMapping )
1913
- newRef := & provider.Reference {
1914
- Spec : & provider.Reference_Path {
1915
- Path : path .Join (layout , strings .TrimPrefix (ref .GetPath (), home )),
1916
- },
1917
- }
1918
- res , err := s .getStorageProvider (ctx , newRef )
1919
- if err != nil {
1920
- // if we get a NotFound error, default to the original reference
1921
- if _ , ok := err .(errtypes.IsNotFound ); ! ok {
1922
- return nil , err
1923
- }
1924
- } else {
1925
- return res , nil
1926
- }
1927
- }
1928
- }
1929
- return s .getStorageProvider (ctx , ref )
1930
- }
1931
-
1932
- func (s * svc ) getStorageProvider (ctx context.Context , ref * provider.Reference ) (* registry.ProviderInfo , error ) {
2058
+ func (s * svc ) findProviders (ctx context.Context , ref * provider.Reference ) ([]* registry.ProviderInfo , error ) {
1933
2059
c , err := pool .GetStorageRegistryClient (s .c .StorageRegistryEndpoint )
1934
2060
if err != nil {
1935
2061
return nil , errors .Wrap (err , "gateway: error getting storage registry client" )
1936
2062
}
1937
2063
1938
- res , err := c .GetStorageProvider (ctx , & registry.GetStorageProviderRequest {
2064
+ res , err := c .GetStorageProviders (ctx , & registry.GetStorageProvidersRequest {
1939
2065
Ref : ref ,
1940
2066
})
1941
2067
@@ -1958,11 +2084,11 @@ func (s *svc) getStorageProvider(ctx context.Context, ref *provider.Reference) (
1958
2084
}
1959
2085
}
1960
2086
1961
- if res .Provider == nil {
2087
+ if res .Providers == nil {
1962
2088
return nil , errors .New ("gateway: provider is nil" )
1963
2089
}
1964
2090
1965
- return res .Provider , nil
2091
+ return res .Providers , nil
1966
2092
}
1967
2093
1968
2094
type etagWithTS struct {
0 commit comments