Skip to content

Commit

Permalink
fix report API, and a few translations
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Aug 18, 2024
1 parent 9308d58 commit 0ec5189
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 65 deletions.
6 changes: 6 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
reFileHashMismatchError = regexp.MustCompile(` hash mismatch, expected ([0-9a-f]+), got ([0-9a-f]+)`)
)

const DefaultBMCLAPIServer = "https://openbmclapi.bangbang93.com"

type Cluster struct {
name string
opts config.ClusterOptions
Expand Down Expand Up @@ -77,7 +79,11 @@ func NewCluster(
for i, name := range opts.Storages {
storages[i] = storageManager.GetIndex(name)
}
if opts.Server == "" {
opts.Server = DefaultBMCLAPIServer
}
cr = &Cluster{
name: name,
opts: opts,
gcfg: gcfg,

Expand Down
14 changes: 11 additions & 3 deletions cluster/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/hmac"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -261,12 +262,18 @@ func (cr *Cluster) RequestCert(ctx context.Context) (ckp *CertKeyPair, err error
}

func (cr *Cluster) ReportDownload(ctx context.Context, response *http.Response, err error) error {
if errors.Is(err, context.Canceled) {
return nil
}

type ReportPayload struct {
Urls []string `json:"urls"`
Error utils.EmbedJSON[struct{ Message string }] `json:"error"`
Urls []string `json:"urls"`
Error utils.EmbedJSON[struct {
Message string `json:"message"`
}] `json:"error"`
}
var payload ReportPayload
redirects := utils.GetRedirects(response.Request)
redirects := utils.GetRedirects(response)
payload.Urls = make([]string, len(redirects))
for i, u := range redirects {
payload.Urls[i] = u.String()
Expand All @@ -280,6 +287,7 @@ func (cr *Cluster) ReportDownload(ctx context.Context, response *http.Response,
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := cr.client.Do(req)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (cr *Cluster) Connect(ctx context.Context) error {
})
}
engio.OnConnect(func(s *engine.Socket) {
log.Info("Engine.IO %s connected for cluster %s", s.ID(), cr.ID())
log.Infof("Engine.IO %s connected for cluster %s", s.ID(), cr.ID())
})
engio.OnDisconnect(cr.onDisconnected)
engio.OnDialError(func(s *engine.Socket, err *engine.DialErrorContext) {
Expand Down Expand Up @@ -102,7 +102,7 @@ func (cr *Cluster) Connect(ctx context.Context) error {
log.Infof("[remote]: %v", data[0])
}
})
log.Info("Connecting to socket.io namespace")
log.Infof("Cluster %s is connecting to socket.io namespace", cr.Name())
if err := cr.socket.Connect(""); err != nil {
return fmt.Errorf("Namespace connect error: %w", err)
}
Expand Down
27 changes: 7 additions & 20 deletions cluster/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,6 @@ func storageIdSortFunc(a, b storage.Storage) int {
return 1
}

// func SyncFiles(ctx context.Context, manager *storage.Manager, files map[string]*StorageFileInfo, heavyCheck bool) bool {
// log.TrInfof("info.sync.prepare", len(files))

// slices.SortFunc(files, func(a, b *StorageFileInfo) int { return a.Size - b.Size })
// if cr.syncFiles(ctx, files, heavyCheck) != nil {
// return false
// }

// cr.filesetMux.Lock()
// for _, f := range files {
// cr.fileset[f.Hash] = f.Size
// }
// cr.filesetMux.Unlock()

// return true
// }

var emptyStr string

func checkFile(
Expand Down Expand Up @@ -393,12 +376,15 @@ func (c *HTTPClient) SyncFiles(
return err
}

totalFiles := len(files)
totalFiles := len(missingMap)

var stats syncStats
stats.pg = pg
stats.slots = limited.NewBufSlots(slots)
stats.totalFiles = totalFiles
for _, f := range missingMap {
stats.totalSize += f.Size
}

var barUnit decor.SizeB1024
stats.lastInc.Store(time.Now().UnixNano())
Expand Down Expand Up @@ -620,7 +606,8 @@ func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *Storage
return utils.ProxyPBReader(r, bar, stats.totalBar, &stats.lastInc)
}); err != nil {
reqInd = (reqInd + 1) % len(reqs)
if rerr, ok := err.(*utils.RedirectError); ok {
var rerr *utils.RedirectError
if errors.As(err, &rerr) {
go func() {
if err := rp.Cluster.ReportDownload(context.WithoutCancel(ctx), rerr.GetResponse(), rerr.Unwrap()); err != nil {
log.Warnf("Report API error: %v", err)
Expand All @@ -642,7 +629,7 @@ func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *Storage
bar.SetRefill(bar.Current())

c := tried.Add(1)
if c > maxRetryCount {
if c > maxRetryCount || errors.Is(err, context.Canceled) {
log.TrErrorf("error.sync.download.failed", f.Hash, err)
stats.failCount.Add(1)
return
Expand Down
5 changes: 2 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ import (
"gopkg.in/yaml.v3"

"github.com/LiterMC/go-openbmclapi/api"
"github.com/LiterMC/go-openbmclapi/cluster"
"github.com/LiterMC/go-openbmclapi/config"
"github.com/LiterMC/go-openbmclapi/log"
"github.com/LiterMC/go-openbmclapi/storage"
"github.com/LiterMC/go-openbmclapi/utils"
)

const DefaultBMCLAPIServer = "https://openbmclapi.bangbang93.com"

func migrateConfig(data []byte, cfg *config.Config) {
var oldConfig map[string]any
if err := yaml.Unmarshal(data, &oldConfig); err != nil {
Expand Down Expand Up @@ -97,7 +96,7 @@ func readAndRewriteConfig() (cfg *config.Config, err error) {
Id: "${CLUSTER_ID}",
Secret: "${CLUSTER_SECRET}",
PublicHosts: []string{},
Server: DefaultBMCLAPIServer,
Server: cluster.DefaultBMCLAPIServer,
SkipSignatureCheck: false,
},
}
Expand Down
10 changes: 5 additions & 5 deletions lang/en/us.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ var areaUS = map[string]string{
"warn.exit.detected.windows.open.browser": "Detected that you are in windows environment, we are helping you to open the browser",
"warn.cluster.detected.hash.mismatch": "Detected hash mismatch error, removing bad file %s",

"info.filelist.fetching": "Fetching file list",
"info.filelist.fetching": "Fetching file list for %s",
"error.filelist.fetch.failed": "Cannot fetch cluster file list: %v",

"error.address.listen.failed": "Cannot listen address %s: %v",

"info.cert.requesting": "Requesting certificates, please wait ...",
"info.cert.requesting": "Requesting certificates for %s, please wait ...",
"info.cert.requested": "Requested certificate for %s",
"error.cert.not.set": "No certificates was set in the config",
"error.cert.parse.failed": "Cannot parse certificate key pair[%d]: %v",
Expand All @@ -27,7 +27,7 @@ var areaUS = map[string]string{
"info.wait.first.sync": "Waiting for the first sync ...",
"info.cluster.enable.sending": "Sending enable packet",
"info.cluster.enabled": "Cluster enabled",
"error.cluster.enable.failed": "Cannot enable cluster: %v",
"error.cluster.enable.failed": "Cannot enable cluster %s: %v",
"error.cluster.disconnected": "Cluster disconnected from remote. exit.",
"info.cluster.reconnect.keepalive": "Reconnecting due to keepalive failed",
"info.cluster.reconnecting": "Reconnecting ...",
Expand All @@ -49,8 +49,8 @@ var areaUS = map[string]string{
"warn.cluster.disabled": "Cluster disabled",
"warn.httpserver.closing": "Closing HTTP server ...",

"info.check.start": "Start checking files for %s, heavy = %v",
"info.check.done": "File check finished for %s, missing %d files",
"info.check.start": "Start checking files, heavy = %v",
"info.check.done": "File check finished, missing %d files",
"error.check.failed": "Failed to check %s: %v",
"hint.check.checking": "> Checking ",
"warn.check.modified.size": "Found modified file: size of %q is %d, expect %d",
Expand Down
10 changes: 5 additions & 5 deletions lang/zh/cn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ var areaCN = map[string]string{
"warn.exit.detected.windows.open.browser": "检测到您是新手 Windows 用户. 我们正在帮助您打开浏览器 ...",
"warn.cluster.detected.hash.mismatch": "检测到文件哈希值不匹配, 正在删除 %s",

"info.filelist.fetching": "获取文件列表中",
"info.filelist.fetching": "为 %s 获取文件列表中",
"error.filelist.fetch.failed": "文件列表获取失败: %v",

"error.address.listen.failed": "无法监听地址 %s: %v",

"info.cert.requesting": "请求证书中, 请稍候 ...",
"info.cert.requesting": "正在为 %s 请求证书, 请稍候 ...",
"info.cert.requested": "证书请求完毕, 域名为 %s",
"error.cert.not.set": "配置文件内没有提供证书",
"error.cert.parse.failed": "无法解析证书密钥对[%d]: %v",
Expand All @@ -27,7 +27,7 @@ var areaCN = map[string]string{
"info.wait.first.sync": "正在等待第一次同步 ...",
"info.cluster.enable.sending": "正在发送启用数据包",
"info.cluster.enabled": "节点已启用",
"error.cluster.enable.failed": "无法启用节点: %v",
"error.cluster.enable.failed": "无法启用节点 %s: %v",
"error.cluster.disconnected": "节点从主控断开. exit.",
"info.cluster.reconnect.keepalive": "保活失败, 重连中 ...",
"info.cluster.reconnecting": "重连中 ...",
Expand All @@ -49,8 +49,8 @@ var areaCN = map[string]string{
"warn.cluster.disabled": "节点已禁用",
"warn.httpserver.closing": "正在关闭 HTTP 服务器 ...",

"info.check.start": "开始在 %s 检测文件. 强检查 = %v",
"info.check.done": "文件在 %s 检查完毕, 缺失 %d 个文件",
"info.check.start": "开始检测文件. 强检查 = %v",
"info.check.done": "文件检查完毕, 缺失 %d 个文件",
"error.check.failed": "无法检查 %s: %v",
"hint.check.checking": "> 检查中 ",
"warn.check.modified.size": "找到修改过的文件: %q 的大小为 %d, 预期 %d",
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func main() {

log.TrInfof("info.wait.first.sync")
r.InitSynchronizer(ctx)
if ctx.Err() != nil {
return
}

r.EnableClusterAll(ctx)
}(ctx)
Expand Down
26 changes: 8 additions & 18 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func NewRunner(cfg *config.Config) *Runner {
log.Errorf("Stat load failed: %v", err)
}
r.apiRateLimiter = limited.NewAPIRateMiddleWare(api.RealAddrCtxKey, "go-openbmclapi.cluster.logged.user" /* api/v0.loggedUserKey */)
r.certificates = make(map[string]*tls.Certificate)
return r
}

Expand Down Expand Up @@ -227,25 +228,11 @@ func (r *Runner) InitClusters(ctx context.Context) {
for name, opts := range r.Config.Clusters {
cr := cluster.NewCluster(name, opts, gcfg, r.storageManager, r.statManager, r.client)
if err := cr.Init(ctx); err != nil {
log.TrErrorf("error.init.failed", err)
} else {
r.clusters[name] = cr
log.TrErrorf("error.init.failed", cr.Name(), err)
continue
}
r.clusters[name] = cr
}

// r.cluster = NewCluster(ctx,
// ClusterServerURL,
// baseDir,
// config.PublicHost, r.getPublicPort(),
// config.ClusterId, config.ClusterSecret,
// config.Byoc, dialer,
// config.Storages,
// cache,
// )
// if err := r.cluster.Init(ctx); err != nil {
// log.TrErrorf("error.init.failed"), err)
// os.Exit(1)
// }
}

func (r *Runner) ListenSignals(ctx context.Context, cancel context.CancelFunc) int {
Expand Down Expand Up @@ -363,7 +350,6 @@ func (r *Runner) SetupLogger(ctx context.Context) error {
}

func (r *Runner) StopServer(ctx context.Context) {
r.tunnelCancel()
shutCtx, cancelShut := context.WithTimeout(context.Background(), time.Second*15)
defer cancelShut()
log.TrWarnf("warn.server.closing")
Expand All @@ -373,6 +359,7 @@ func (r *Runner) StopServer(ctx context.Context) {
defer cancelShut()
var wg sync.WaitGroup
for _, cr := range r.clusters {
wg.Add(1)
go func() {
defer wg.Done()
cr.Disable(shutCtx)
Expand All @@ -381,6 +368,9 @@ func (r *Runner) StopServer(ctx context.Context) {
wg.Wait()
log.TrWarnf("warn.httpserver.closing")
r.server.Shutdown(shutCtx)
if r.tunnelCancel != nil {
r.tunnelCancel()
}
r.listener.Close()
r.listener = nil
}()
Expand Down
2 changes: 1 addition & 1 deletion utils/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewHTTPStatusErrorFromResponse(res *http.Response) (e *HTTPStatusError) {
e.URL = res.Request.URL.String()
}
if res.Body != nil {
var buf [512]byte
var buf [1024]byte
n, _ := res.Body.Read(buf[:])
msg := (string)(buf[:n])
for _, b := range msg {
Expand Down
18 changes: 10 additions & 8 deletions utils/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package utils
import (
"bufio"
"bytes"
"context"

Check failure on line 25 in utils/http.go

View workflow job for this annotation

GitHub Actions / test

"context" imported and not used

Check failure on line 25 in utils/http.go

View workflow job for this annotation

GitHub Actions / request

"context" imported and not used

Check failure on line 25 in utils/http.go

View workflow job for this annotation

GitHub Actions / test

"context" imported and not used

Check failure on line 25 in utils/http.go

View workflow job for this annotation

GitHub Actions / build-windows

"context" imported and not used
"crypto/tls"
"errors"
"io"
Expand Down Expand Up @@ -506,15 +507,16 @@ func (c *connHeadReader) Read(buf []byte) (n int, err error) {
return c.Conn.Read(buf)
}

func GetRedirects(req *http.Request) []*url.URL {
func GetRedirects(resp *http.Response) []*url.URL {
redirects := make([]*url.URL, 0, 5)
for req != nil {
redirects = append(redirects, req.URL)
resp := req.Response
if resp == nil {
break
}
if u, _ := resp.Location(); u != nil {
redirects = append(redirects, u)
}
var req *http.Request
for resp != nil {
req = resp.Request
redirects = append(redirects, req.URL)
resp = req.Response
}
if len(redirects) == 0 {
return nil
Expand All @@ -531,7 +533,7 @@ type RedirectError struct {

func ErrorFromRedirect(err error, resp *http.Response) *RedirectError {
return &RedirectError{
Redirects: GetRedirects(resp.Request),
Redirects: GetRedirects(resp),
Response: resp,
Err: err,
}
Expand Down

0 comments on commit 0ec5189

Please sign in to comment.