Skip to content

Commit

Permalink
Merge branch 'master' of bitbucket.org:aldor007/go-img
Browse files Browse the repository at this point in the history
  • Loading branch information
aldor007 committed Nov 7, 2017
2 parents 9add6cb + 0681183 commit 072104b
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 54 deletions.
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ ADD . /go/src/mort

# when dep will be ready
RUN cd /go/src/mort && dep ensure -vendor-only
RUN go get -u go.uber.org/zap
# RUN goinstall
RUN cd /go/src/mort; go build cmd/mort.go; cp mort /go/mort; cp -r /go/src/mort/configuration /go/
# clean up
Expand Down
30 changes: 27 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions cmd/mort.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ func main() {
logger, _ := zap.NewDevelopment()
zap.ReplaceGlobals(logger)
log.RegisterLogger(logger.Sugar())
rp := mort.NewRequestProcessor(5)

imgConfig := config.GetInstance()
imgConfig.Load(*configPath)
// Echo instance
e := echo.New()

e.Use(mort.S3AuthMiddleware(imgConfig))
// TODO: change echo to pressly/chi

// Route => handler
e.Any ("/*", func(ctx echo.Context) error {
Expand All @@ -39,13 +41,16 @@ func main() {
return ctx.NoContent(400)
}

// dodac placeholder
res := mort.Process(ctx, obj)
res := rp.Process(ctx, obj)
res.SetDebug(ctx.Request().Header.Get("X-Mort-Debug"))
res.WriteHeaders(ctx.Response())
defer res.Close()
defer logger.Sync() // flushes buffer, if any
if res.HasError() {
log.Log().Warnw("Mort process error", "error", res.Error())
}

return res.Write(ctx)

return ctx.Stream(res.StatusCode, res.ContentType, res.Stream)
})


Expand Down
1 change: 1 addition & 0 deletions config/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Storage struct {
Region string `yaml:"region",omitempty`
Endpoint string `yaml:"endpoint",omitempty`
PathPrefix string `yaml:"pathPrefix",omitempty"`
AllowMetadata bool `yaml:"allowMetadata",omitempty"`
}

type StorageTypes map[string]Storage
Expand Down
22 changes: 11 additions & 11 deletions configuration/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ buckets:
mode: line
storages:
basic:
kind: "local"
kind: "local-meta"
rootPath: "/Users/aldor/workspace/mkaciubacom/web"
transform:
kind: "local"
kind: "local-meta"
rootPath: "/Users/aldor/workspace/mkaciubacom/web"
media2:
media:
keys:
- accessKey: "acc"
secretAccessKey: "sec"
Expand Down Expand Up @@ -150,13 +150,13 @@ buckets:
headers:
"x-security-key": "123qwe123qwe"
transform:
# kind: "local"
# rootPath: "/Users/aldor/workspace/mkaciubacom/web"
# pathPrefix: "transforms"
kind: "http"
url: "https://mkaciuba.pl/download<item>"
headers:
"x-security-key": "123qwe123qwe"
kind: "local-meta"
rootPath: "/Users/aldor/workspace/mkaciubacom/web"
pathPrefix: "transforms"
# kind: "http"
# url: "https://mkaciuba.pl/download<item>"
# headers:
# "x-security-key": "123qwe123qwe"
basic:
kind: "local"
kind: "local-meta"
rootPath: "/Users/aldor/workspace/mkaciubacom/web"
11 changes: 7 additions & 4 deletions engine/image_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"mort/transforms"
"mort/response"
"mort/object"
//Logger "github.com/labstack/gommon/log"
"mort/log"
)

type ImageEngine struct {
Expand All @@ -35,19 +35,22 @@ func (self *ImageEngine) Process(obj *object.FileObject, trans []transforms.Tran
}
}

hash := murmur3.New32()
hash := murmur3.New64()
hash.Write([]byte(obj.Key))
//hash.Write([]byte(len(trans)))

res := response.NewBuf(200, buf)
res.SetContentType("image/" + bimg.DetermineImageTypeName(buf))
res.Set("cache-control", "max-age=6000, public")
res.Set("last-modified", time.Now().Format(time.RFC1123))
res.Set("etag", strconv.FormatInt(int64(hash.Sum32()), 16))
res.Set("Last-Modified", time.Now().Format(time.RFC1123))
res.Set("ETag", strconv.FormatInt(int64(hash.Sum64()), 16))
meta, err := bimg.Metadata(buf)
if err == nil {
res.Set("x-amz-public-width", strconv.Itoa(meta.Size.Width))
res.Set("x-amz-public-height", strconv.Itoa(meta.Size.Height))

} else {
log.Log().Warnw("ImageEngine/process unable to process", "obj.key", obj.Key, "err", err)
}

return res, nil
Expand Down
2 changes: 1 addition & 1 deletion object/file_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewFileObject(uri string, mortConfig *config.Config) (*FileObject, error) {
obj.Uri = uri

err := obj.decode(mortConfig)
log.Log().Infow("New FileObject", "path", uri, "key", obj.Key, "bucket", obj.Bucket, "storage", obj.Storage,
log.Log().Infow("New FileObject", "path", uri, "key", obj.Key, "bucket", obj.Bucket, "storage", obj.Storage.Kind,
"hasTransforms", obj.HasTransform(), "hasParent" , obj.HasParent())
return &obj, err
}
Expand Down
106 changes: 94 additions & 12 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,59 @@ import (
"mort/transforms"
"mort/log"
"strconv"
"time"
)

const S3_LOCATION_STR = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><LocationConstraint xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">EU</LocationConstraint>"

func Process(ctx echo.Context, obj *object.FileObject) *response.Response {
func NewRequestProcessor(max int) RequestProcessor{
rp := RequestProcessor{}
rp.Init(max)
return rp
}

type requestMessage struct {
responseChan chan *response.Response
ctx echo.Context
obj *object.FileObject
}

type RequestProcessor struct {
queue chan requestMessage
}

func (r *RequestProcessor) Init(max int) {
r.queue = make(chan requestMessage, max)
}

func (r *RequestProcessor) Process(ctx echo.Context, obj *object.FileObject) *response.Response{

msg := requestMessage{}
msg.ctx = ctx
msg.obj = obj
msg.responseChan = make(chan *response.Response)

go r.processChan()
r.queue <- msg

select {
//case <-ctx.Done():
// return response.NewBuf(504, "timeout")
case res := <-msg.responseChan:
return res
case <-time.After(time.Second * 60):
return response.NewBuf(504, []byte("timeout"))
}
}

func (r *RequestProcessor) processChan() {
msg := <- r.queue
res := r.process(msg.ctx, msg.obj)
msg.responseChan <- res
}


func (r *RequestProcessor) process(ctx echo.Context, obj *object.FileObject) *response.Response {
switch ctx.Request().Method {
case "GET":
return hanldeGET(ctx, obj)
Expand Down Expand Up @@ -57,25 +106,58 @@ func hanldeGET(ctx echo.Context, obj *object.FileObject) *response.Response {
}
}

resChan := make(chan *response.Response)
parentChan := make(chan *response.Response)

go func(o *object.FileObject) {
resChan <- storage.Get(o)
}(obj)

// get parent from storage
if parentObj != nil {
parentRes = updateHeaders(storage.Get(parentObj))

if parentRes.StatusCode != 200 {
return parentRes
}
go func(p *object.FileObject) {
parentChan <- storage.Head(p)
}(parentObj)
}

// check if object is on storage
res = updateHeaders(storage.Get(obj))
if res.StatusCode == 200 {
return res
}
resLoop:
for {
select {
case res = <-resChan:
if parentObj != nil && (parentRes == nil || parentRes.StatusCode == 0) {
go func () {
resChan <- res
}()

} else {
if res.StatusCode == 200 && parentRes.StatusCode == 200 {
return updateHeaders(res)
}

defer parentRes.Close()
if res.StatusCode == 404 {
break resLoop
} else {
return updateHeaders(res)
}
}
case parentRes = <-parentChan:
if parentRes.StatusCode == 404 {
return updateHeaders(parentRes)
}
default:

}
}

if obj.HasTransform() && strings.Contains(parentRes.ContentType, "image/") {
defer res.Close()
parentRes = updateHeaders(storage.Get(parentObj))

if parentRes.StatusCode != 200 {
return updateHeaders(parentRes)
}

defer parentRes.Close()

// revers order of transforms
for i := 0; i < len(transforms)/2; i++ {
Expand Down
Loading

0 comments on commit 072104b

Please sign in to comment.