Skip to content

Commit e20b174

Browse files
Acconutpcfreak30
andauthored
handler, s3store, filestore: Serve GET requests from storage provider (#1228)
* feat(handler, s3store): implement ContentServerDataStore for direct content serving, closes #1064 - Add ServerDataStore interface - Implement ContentServerDataStore in S3Store with streaming support - Add Range header support for partial content requests - Update StoreComposer to support ContentServer capability - Add tests for new ContentServerDataStore functionality - Update Go version to 1.22.1 * Add documentation * Set Content-Type and Content-Disposition in handler * Handle range request, conditional requests and errors better * Move implementation and test into own files * Use `store.GetUpload` to simulate more realistic usage * Return proper error for incomplete uploads * Rename variable * Remove debug logging * Log outgoing status code * Implement content server for filestore * Undo changes in `go.mod` --------- Co-authored-by: Derrick Hammer <[email protected]>
1 parent 9e5ba70 commit e20b174

File tree

8 files changed

+513
-1
lines changed

8 files changed

+513
-1
lines changed

pkg/filestore/filestore.go

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"fmt"
2020
"io"
21+
"net/http"
2122
"os"
2223
"path/filepath"
2324

@@ -50,6 +51,7 @@ func (store FileStore) UseIn(composer *handler.StoreComposer) {
5051
composer.UseTerminater(store)
5152
composer.UseConcater(store)
5253
composer.UseLengthDeferrer(store)
54+
composer.UseContentServer(store)
5355
}
5456

5557
func (store FileStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
@@ -156,6 +158,10 @@ func (store FileStore) AsConcatableUpload(upload handler.Upload) handler.Concata
156158
return upload.(*fileUpload)
157159
}
158160

161+
func (store FileStore) AsServableUpload(upload handler.Upload) handler.ServableUpload {
162+
return upload.(*fileUpload)
163+
}
164+
159165
// defaultBinPath returns the path to the file storing the binary data, if it is
160166
// not customized using the pre-create hook.
161167
func (store FileStore) defaultBinPath(id string) string {
@@ -268,6 +274,12 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) error {
268274
return nil
269275
}
270276

277+
func (upload *fileUpload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
278+
http.ServeFile(w, r, upload.binPath)
279+
280+
return nil
281+
}
282+
271283
// createFile creates the file with the content. If the corresponding directory does not exist,
272284
// it is created. If the file already exists, its content is removed.
273285
func createFile(path string, content []byte) error {

pkg/filestore/filestore_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package filestore
33
import (
44
"context"
55
"io"
6+
"net/http"
7+
"net/http/httptest"
68
"os"
79
"path/filepath"
810
"strings"
@@ -68,6 +70,21 @@ func TestFilestore(t *testing.T) {
6870
a.Equal("hello world", string(content))
6971
reader.(io.Closer).Close()
7072

73+
// Serve content
74+
w := httptest.NewRecorder()
75+
r := httptest.NewRequest("GET", "/", nil)
76+
r.Header.Set("Range", "bytes=0-4")
77+
78+
err = store.AsServableUpload(upload).ServeContent(context.Background(), w, r)
79+
a.Nil(err)
80+
81+
a.Equal(http.StatusPartialContent, w.Code)
82+
a.Equal("5", w.Header().Get("Content-Length"))
83+
a.Equal("text/plain; charset=utf-8", w.Header().Get("Content-Type"))
84+
a.Equal("bytes 0-4/11", w.Header().Get("Content-Range"))
85+
a.NotEqual("", w.Header().Get("Last-Modified"))
86+
a.Equal("hello", w.Body.String())
87+
7188
// Terminate upload
7289
a.NoError(store.AsTerminatableUpload(upload).Terminate(ctx))
7390

pkg/handler/composer.go

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ type StoreComposer struct {
1414
Concater ConcaterDataStore
1515
UsesLengthDeferrer bool
1616
LengthDeferrer LengthDeferrerDataStore
17+
ContentServer ContentServerDataStore
18+
UsesContentServer bool
1719
}
1820

1921
// NewStoreComposer creates a new and empty store composer.
@@ -85,3 +87,8 @@ func (store *StoreComposer) UseLengthDeferrer(ext LengthDeferrerDataStore) {
8587
store.UsesLengthDeferrer = ext != nil
8688
store.LengthDeferrer = ext
8789
}
90+
91+
func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) {
92+
store.UsesContentServer = ext != nil
93+
store.ContentServer = ext
94+
}

pkg/handler/datastore.go

+19
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package handler
33
import (
44
"context"
55
"io"
6+
"net/http"
67
)
78

89
type MetaData map[string]string
@@ -191,3 +192,21 @@ type Lock interface {
191192
// Unlock releases an existing lock for the given upload.
192193
Unlock() error
193194
}
195+
196+
type ServableUpload interface {
197+
// ServeContent serves the uploaded data as specified by the GET request.
198+
// It allows data stores to delegate the handling of range requests and conditional
199+
// requests to their underlying providers.
200+
// The tusd handler will set the Content-Type and Content-Disposition headers
201+
// before calling ServeContent, but the implementation can override them.
202+
// After calling ServeContent, the handler will not take any further action
203+
// other than handling a potential error.
204+
ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error
205+
}
206+
207+
// ContentServerDataStore is the interface for DataStores that can serve content directly.
208+
// When the handler serves a GET request, it will pass the request to ServeContent
209+
// and delegate its handling to the DataStore, instead of using GetReader to obtain the content.
210+
type ContentServerDataStore interface {
211+
AsServableUpload(upload Upload) ServableUpload
212+
}

pkg/handler/unrouted_handler.go

+48
Original file line numberDiff line numberDiff line change
@@ -1047,6 +1047,7 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
10471047
return
10481048
}
10491049

1050+
// Fall back to the existing GetReader implementation if ContentServerDataStore is not implemented
10501051
contentType, contentDisposition := filterContentType(info)
10511052
resp := HTTPResponse{
10521053
StatusCode: http.StatusOK,
@@ -1058,13 +1059,43 @@ func (handler *UnroutedHandler) GetFile(w http.ResponseWriter, r *http.Request)
10581059
Body: "", // Body is intentionally left empty, and we copy it manually in later.
10591060
}
10601061

1062+
// If the data store implements ContentServerDataStore, use delegate the handling
1063+
// of GET requests to the data store.
1064+
// Otherwise, we will use the existing GetReader implementation.
1065+
if handler.composer.UsesContentServer {
1066+
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
1067+
1068+
// Pass file type and name to the implementation, but it may override them.
1069+
w.Header().Set("Content-Type", resp.Header["Content-Type"])
1070+
w.Header().Set("Content-Disposition", resp.Header["Content-Disposition"])
1071+
1072+
// Use loggingResponseWriter to get the ResponseOutgoing log entry that
1073+
// normally handler.sendResp would produce.
1074+
loggingW := &loggingResponseWriter{ResponseWriter: w, logger: c.log}
1075+
1076+
err = servableUpload.ServeContent(c, loggingW, r)
1077+
if err != nil {
1078+
handler.sendError(c, err)
1079+
}
1080+
return
1081+
}
1082+
10611083
// If no data has been uploaded yet, respond with an empty "204 No Content" status.
10621084
if info.Offset == 0 {
10631085
resp.StatusCode = http.StatusNoContent
10641086
handler.sendResp(c, resp)
10651087
return
10661088
}
10671089

1090+
if handler.composer.UsesContentServer {
1091+
servableUpload := handler.composer.ContentServer.AsServableUpload(upload)
1092+
err = servableUpload.ServeContent(c, w, r)
1093+
if err != nil {
1094+
handler.sendError(c, err)
1095+
}
1096+
return
1097+
}
1098+
10681099
src, err := upload.GetReader(c)
10691100
if err != nil {
10701101
handler.sendError(c, err)
@@ -1679,3 +1710,20 @@ func validateUploadId(newId string) error {
16791710

16801711
return nil
16811712
}
1713+
1714+
// loggingResponseWriter is a wrapper around http.ResponseWriter that logs the
1715+
// final status code similar to UnroutedHandler.sendResp.
1716+
type loggingResponseWriter struct {
1717+
http.ResponseWriter
1718+
logger *slog.Logger
1719+
}
1720+
1721+
func (w *loggingResponseWriter) WriteHeader(statusCode int) {
1722+
if statusCode >= 200 {
1723+
w.logger.Info("ResponseOutgoing", "status", statusCode)
1724+
}
1725+
w.ResponseWriter.WriteHeader(statusCode)
1726+
}
1727+
1728+
// Unwrap provides access to the underlying http.ResponseWriter.
1729+
func (w *loggingResponseWriter) Unwrap() http.ResponseWriter { return w.ResponseWriter }

pkg/s3store/s3store.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ import (
100100
// considered valid into a header value according to RFC2616.
101101
var nonPrintableRegexp = regexp.MustCompile(`[^\x09\x20-\x7E]`)
102102

103+
// errIncompleteUpload is used when a client attempts to download an incomplete upload
104+
var errIncompleteUpload = handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)
105+
103106
// See the handler.DataStore interface for documentation about the different
104107
// methods.
105108
type S3Store struct {
@@ -262,6 +265,7 @@ func (store S3Store) UseIn(composer *handler.StoreComposer) {
262265
composer.UseTerminater(store)
263266
composer.UseConcater(store)
264267
composer.UseLengthDeferrer(store)
268+
composer.UseContentServer(store)
265269
}
266270

267271
func (store S3Store) RegisterMetrics(registry prometheus.Registerer) {
@@ -758,7 +762,7 @@ func (upload s3Upload) GetReader(ctx context.Context) (io.ReadCloser, error) {
758762
})
759763
if err == nil {
760764
// The multipart upload still exists, which means we cannot download it yet
761-
return nil, handler.NewError("ERR_INCOMPLETE_UPLOAD", "cannot stream non-finished upload", http.StatusBadRequest)
765+
return nil, errIncompleteUpload
762766
}
763767

764768
// The AWS Go SDK v2 has a bug where types.NoSuchUpload is not returned,

pkg/s3store/serve_content.go

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package s3store
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io"
7+
"net/http"
8+
"strconv"
9+
10+
"github.com/tus/tusd/v2/pkg/handler"
11+
12+
"github.com/aws/aws-sdk-go-v2/aws"
13+
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
14+
"github.com/aws/aws-sdk-go-v2/service/s3"
15+
)
16+
17+
func (store S3Store) AsServableUpload(upload handler.Upload) handler.ServableUpload {
18+
return upload.(*s3Upload)
19+
}
20+
21+
func (upload *s3Upload) ServeContent(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
22+
input := &s3.GetObjectInput{
23+
Bucket: aws.String(upload.store.Bucket),
24+
Key: upload.store.keyWithPrefix(upload.objectId),
25+
}
26+
27+
// Forward the Range, If-Match, If-None-Match, If-Unmodified-Since, If-Modified-Since headers if present
28+
if val := r.Header.Get("Range"); val != "" {
29+
input.Range = aws.String(val)
30+
}
31+
if val := r.Header.Get("If-Match"); val != "" {
32+
input.IfMatch = aws.String(val)
33+
}
34+
if val := r.Header.Get("If-None-Match"); val != "" {
35+
input.IfNoneMatch = aws.String(val)
36+
}
37+
if val := r.Header.Get("If-Modified-Since"); val != "" {
38+
t, err := http.ParseTime(val)
39+
if err == nil {
40+
input.IfModifiedSince = aws.Time(t)
41+
}
42+
}
43+
if val := r.Header.Get("If-Unmodified-Since"); val != "" {
44+
t, err := http.ParseTime(val)
45+
if err == nil {
46+
input.IfUnmodifiedSince = aws.Time(t)
47+
}
48+
}
49+
50+
// Let S3 handle the request
51+
result, err := upload.store.Service.GetObject(ctx, input)
52+
if err != nil {
53+
// Delete the headers set by tusd's handler. We don't need them for errors.
54+
w.Header().Del("Content-Type")
55+
w.Header().Del("Content-Disposition")
56+
57+
var respErr *awshttp.ResponseError
58+
if errors.As(err, &respErr) {
59+
if respErr.HTTPStatusCode() == http.StatusNotFound || respErr.HTTPStatusCode() == http.StatusForbidden {
60+
// If the object cannot be found, it means that the upload is not yet complete and we cannot serve it.
61+
// At this stage it is not possible that the upload itself does not exist, because the handler
62+
// alredy checked this case. Therefore, we can safely assume that the upload is still in progress.
63+
return errIncompleteUpload
64+
}
65+
66+
if respErr.HTTPStatusCode() == http.StatusNotModified {
67+
// Content-Location, Date, ETag, Vary, Cache-Control and Expires should be set
68+
// for 304 Not Modified responses. See https://httpwg.org/specs/rfc9110.html#status.304
69+
for _, header := range []string{"Content-Location", "Date", "ETag", "Vary", "Cache-Control", "Expires"} {
70+
if val := respErr.Response.Header.Get(header); val != "" {
71+
w.Header().Set(header, val)
72+
}
73+
}
74+
75+
w.WriteHeader(http.StatusNotModified)
76+
return nil
77+
}
78+
79+
if respErr.HTTPStatusCode() == http.StatusRequestedRangeNotSatisfiable {
80+
// Content-Range should be set for 416 Request Range Not Satisfiable responses.
81+
// See https://httpwg.org/specs/rfc9110.html#status.304
82+
// Note: AWS S3 does not seem to include this header in its response.
83+
if val := respErr.Response.Header.Get("Content-Range"); val != "" {
84+
w.Header().Set("Content-Range", val)
85+
}
86+
87+
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
88+
return nil
89+
}
90+
}
91+
return err
92+
}
93+
defer result.Body.Close()
94+
95+
// Add Accept-Ranges,Content-*, Cache-Control, ETag, Expires, Last-Modified headers if present in S3 response
96+
if result.AcceptRanges != nil {
97+
w.Header().Set("Accept-Ranges", *result.AcceptRanges)
98+
}
99+
if result.ContentDisposition != nil {
100+
w.Header().Set("Content-Disposition", *result.ContentDisposition)
101+
}
102+
if result.ContentEncoding != nil {
103+
w.Header().Set("Content-Encoding", *result.ContentEncoding)
104+
}
105+
if result.ContentLanguage != nil {
106+
w.Header().Set("Content-Language", *result.ContentLanguage)
107+
}
108+
if result.ContentLength != nil {
109+
w.Header().Set("Content-Length", strconv.FormatInt(*result.ContentLength, 10))
110+
}
111+
if result.ContentRange != nil {
112+
w.Header().Set("Content-Range", *result.ContentRange)
113+
}
114+
if result.ContentType != nil {
115+
w.Header().Set("Content-Type", *result.ContentType)
116+
}
117+
if result.CacheControl != nil {
118+
w.Header().Set("Cache-Control", *result.CacheControl)
119+
}
120+
if result.ETag != nil {
121+
w.Header().Set("ETag", *result.ETag)
122+
}
123+
if result.ExpiresString != nil {
124+
w.Header().Set("Expires", *result.ExpiresString)
125+
}
126+
if result.LastModified != nil {
127+
w.Header().Set("Last-Modified", result.LastModified.Format(http.TimeFormat))
128+
}
129+
130+
statusCode := http.StatusOK
131+
if result.ContentRange != nil {
132+
// Use 206 Partial Content for range requests
133+
statusCode = http.StatusPartialContent
134+
} else if result.ContentLength != nil && *result.ContentLength == 0 {
135+
statusCode = http.StatusNoContent
136+
}
137+
w.WriteHeader(statusCode)
138+
139+
_, err = io.Copy(w, result.Body)
140+
return err
141+
}

0 commit comments

Comments
 (0)