diff --git a/.github/renovate.json5 b/.github/renovate.json5 new file mode 100644 index 0000000..c960bfa --- /dev/null +++ b/.github/renovate.json5 @@ -0,0 +1,21 @@ +{ + "$schema": "https://docs.renovatebot.com/renovate-schema.json", + "extends": [ + "config:base", + ":timezone(Asia/Tokyo)", + ":combinePatchMinorReleases", + ":prHourlyLimitNone", + ":prConcurrentLimit10", + "group:recommended", + "group:allNonMajor", + "schedule:weekly" + ], + "dependencyDashboard": false, + "packageRules": [ + { + "matchUpdateTypes": ["minor", "patch", "pin", "digest"], + "platformAutomerge": true, + "automerge": true + } + ] +} \ No newline at end of file diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..b8a495c --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,30 @@ +name: ci +on: + push: + branches-ignore: + - "master" + tags-ignore: + - "*" + +jobs: + build: + name: ci + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: setup go + uses: actions/setup-go@v4 + with: + go-version-file: ./go.mod + cache: true + cache-dependency-path: ./go.sum + - run: go version + - run: go fmt . + - uses: dominikh/staticcheck-action@v1 + with: + version: "2023.1.5" + install-go: false + - name: Test + run: make test + - name: Build + run: make diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..71ab422 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,31 @@ +name: release + +on: + push: + tags: + - "*" + +jobs: + release: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version-file: ./go.mod + cache: false + + - run: go install github.com/tcnksm/ghr@latest + + - name: Build + run: | + GOOS=linux GOARCH=amd64 go build -o dist/sora-archive-uploader_linux_amd64 cmd/sora-archive-uploader/main.go + GOOS=darwin GOARCH=amd64 go build -o dist/sora-archive-uploader_darwin_amd64 cmd/sora-archive-uploader/main.go + GOOS=darwin GOARCH=arm64 go build -o dist/sora-archive-uploader_darwin_arm64 cmd/sora-archive-uploader/main.go + gzip dist/* + + - name: Release + run: | + ghr -t "${{ secrets.GITHUB_TOKEN }}" -u "${{ github.repository_owner }}" -r "sora-archive-uploader" --replace "${GITHUB_REF##*/}" dist/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dc62d1b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +bin/* +config.ini +*.jsonl diff --git a/CHANGES.md b/CHANGES.md new file mode 100644 index 0000000..a970540 --- /dev/null +++ b/CHANGES.md @@ -0,0 +1,16 @@ +# 変更履歴 + +- CHANGE + - 下位互換のない変更 +- UPDATE + - 下位互換がある変更 +- ADD + - 下位互換がある追加 +- FIX + - バグ修正 + +## develop + +## 2023.1.0 + +**祝いリリース** diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2bb9ad2 --- /dev/null +++ b/LICENSE @@ -0,0 +1,176 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..25d8e79 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +.PHONY: all test + +all: + go build -o bin/sora-archive-uploader cmd/sora-archive-uploader/main.go + +test: + go test -race -v ./s3 \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..31c5f78 --- /dev/null +++ b/README.md @@ -0,0 +1,101 @@ +# Sora Archive Uploader + + + +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) + + + +## About Shiguredo's open source software + +We will not respond to PRs or issues that have not been discussed on Discord. Also, Discord is only available in Japanese. + +Please read https://github.com/shiguredo/oss/blob/master/README.en.md before use. + +## 時雨堂のオープンソースソフトウェアについて + +利用前に https://github.com/shiguredo/oss をお読みください。 + +## Sora Archive Uploader について + +Sora が出力する録画関連のファイルを S3 または S3 互換オブジェクトストレージにアップロードするツールです。 +systemd タイマーユニットを利用しての定期実行を想定しています。 + +[Sora Cloud](https://sora-cloud.shiguredo.jp/) で実際に利用している仕組みからツールとして切り出して公開しています。 + +## 目的 + +Sora は録画を行った場合、録画ファイルを WebM 、録画メタデータ JSON ファイルで出力します。 +Sora Cloud では出力されたファイルをオブジェクトストレージにアップロードする仕組みが必要となり開発しました。 + +## 特徴 + +- systemd の設定だけで利用できます +- 並列でオブジェクトストレージにアップロードできます +- アップロード完了時に指定された URL にウェブフックリクエストを通知します +- ウェブフックにはベーシック認証や mTLS が利用可能です +- アップロードに失敗した場合は設定ファイルで指定した隔離ディレクトリに移動します +- アップロードの帯域制限を設定できます + +### 対応オブジェクトストレージ + +- AWS S3 +- MinIO +- GCP GCS +- Vultr Object Storage +- Linode Object Storage +- DigitalOcean Spaces +- Cloudflare R2 + +## まずは使ってみる + +config.ini に必要な情報を設定してください。 + +```bash +$ cp config_example.ini config.ini +``` + +make でビルドして実行します。 + +```bash +$ make +$ ./bin/sora-archive-uploader -C config.ini +``` + +## Discord + +最新の状況などは Discord で共有しています。質問や相談も Discord でのみ受け付けています。 + +https://discord.gg/shiguredo + +## 有償での優先実装が可能な機能一覧 + +**詳細は Discord またはメールにてお問い合わせください** + +- オープンソースでの公開が前提 +- 可能であれば企業名の公開 + - 公開が難しい場合は `企業名非公開` と書かせていただきます + +### 機能 + +- [Amazon S3 SSE-S3](https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/userguide/UsingServerSideEncryption.html) への対応 +- [Azure Blob Storage](https://azure.microsoft.com/ja-jp/products/storage/blobs/) への対応 + +## ライセンス + +``` +Copyright 2022-2023, Takeshi Namao (Original Author) +Copyright 2022-2023, Shiguredo Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +``` diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..f970d5b --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +2023.1.0 \ No newline at end of file diff --git a/cmd/sora-archive-uploader/main.go b/cmd/sora-archive-uploader/main.go new file mode 100644 index 0000000..fcdb834 --- /dev/null +++ b/cmd/sora-archive-uploader/main.go @@ -0,0 +1,26 @@ +package main + +import ( + "flag" + "fmt" + "log" + + archive "github.com/shiguredo/sora-archive-uploader" +) + +func main() { + // /bin/sora-archive-uploader -V + showVersion := flag.Bool("V", false, "バージョン") + + // /bin/sora-archive-uploader -C ./config.ini + configFilePath := flag.String("C", "./config.ini", "Config file path") + flag.Parse() + + if *showVersion { + fmt.Printf("Sora Archive Uploader version %s\n", archive.Version) + return + } + + log.Printf("config file path: %s", *configFilePath) + archive.Run(configFilePath) +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..a16a3d9 --- /dev/null +++ b/config.go @@ -0,0 +1,79 @@ +package archive + +import ( + _ "embed" + + "gopkg.in/ini.v1" +) + +//go:embed VERSION +var Version string + +const ( + DefaultLogDir = "." + DefaultLogName = "sora-archive-uploader.jsonl" + + // megabytes + DefaultLogRotateMaxSize = 200 + DefaultLogRotateMaxBackups = 7 + // days + DefaultLogRotateMaxAge = 30 +) + +type Config struct { + Debug bool `ini:"debug"` + + LogDir string `ini:"log_dir"` + LogName string `ini:"log_name"` + LogStdout bool `ini:"log_stdout"` + + LogRotateMaxSize int `ini:"log_rotate_max_size"` + LogRotateMaxBackups int `ini:"log_rotate_max_backups"` + LogRotateMaxAge int `ini:"log_rotate_max_age"` + LogRotateCompress bool `ini:"log_rotate_compress"` + + ObjectStorageEndpoint string `ini:"object_storage_endpoint"` + ObjectStorageBucketName string `ini:"object_storage_bucket_name"` + ObjectStorageAccessKeyID string `ini:"object_storage_access_key_id"` + ObjectStorageSecretAccessKey string `ini:"object_storage_secret_access_key"` + + SoraArchiveDirFullPath string `ini:"archive_dir_full_path"` + SoraEvacuateDirFullPath string `ini:"evacuate_dir_full_path"` + + UploadWorkers int `ini:"upload_workers"` + + // 1 ファイルあたりのアップロードレート制限 + UploadFileRateLimitMbps int `ini:"upload_file_rate_limit_mbps"` + + UploadedFileCacheSize int `ini:"uploaded_file_cache_size"` + + WebhookEndpointURL string `ini:"webhook_endpoint_url"` + WebhookEndpointHealthCheckURL string `ini:"webhook_endpoint_health_check_url"` + + WebhookTypeHeaderName string `ini:"webhook_type_header_name"` + WebhookTypeArchiveUploaded string `ini:"webhook_type_archive_uploaded"` + WebhookTypeSplitArchiveUploaded string `ini:"webhook_type_split_archive_uploaded"` + WebhookTypeSplitArchiveEndUploaded string `ini:"webhook_type_split_archive_end_uploaded"` + WebhookTypeReportUploaded string `ini:"webhook_type_report_uploaded"` + + WebhookBasicAuthUsername string `ini:"webhook_basic_auth_username"` + WebhookBasicAuthPassword string `ini:"webhook_basic_auth_password"` + + WebhookRequestTimeoutS int32 `ini:"webhook_request_timeout_s"` + + WebhookTLSVerifyCacertPath string `ini:"webhook_tls_verify_cacert_path"` + WebhookTLSFullchainPath string `ini:"webhook_tls_fullchain_path"` + WebhookTLSPrivkeyPath string `ini:"webhook_tls_privkey_path"` +} + +func newConfig(configFilePath string) (*Config, error) { + config := new(Config) + iniConfig, err := ini.InsensitiveLoad(configFilePath) + if err != nil { + return nil, err + } + if err := iniConfig.StrictMapTo(config); err != nil { + return nil, err + } + return config, nil +} diff --git a/config_example.ini b/config_example.ini new file mode 100644 index 0000000..0934b9a --- /dev/null +++ b/config_example.ini @@ -0,0 +1,64 @@ +debug = false + +# Sora の録画アーカイブディレクトリのフルパス +# archive_dir_full_path = /path/to/archive +# アップロードに失敗した際の待避ディレクトリのフルパス +# evacuate_dir_full_path = /path/to/evacuate + +# 同時アップロード数 +upload_workers = 4 + +# 起動中にアップロード処理を行なったファイルを記録するキャッシュの上限で、 +# 重複してアップロードしてしまうのを避けます。 +# 起動時にアップロードする録画ファイルが大量にある場合は、 +# 大きめのキャッシュサイズを設定することをおすすめします。 +uploaded_file_cache_size = 32 + +# 1 ファイルあたりのアップロード速度制限 +# 0 の場合は制限しません +# upload_file_rate_limit_mbps = 0 + +# ログ +log_dir = . +log_name = sora-archive-uploader.jsonl +log_stdout = true + +# MB +log_rotate_max_size = 200 +log_rotate_max_backups = 7 +# day +log_rotate_max_age = 30 +log_rotate_compress = false + +# アップロード先の S3 または S3 互換オブジェクトストレージの設定 +# object_storage_endpoint = https://s3.example.com +# object_storage_bucket_name = bucket-name +# object_storage_access_key_id = access-key-id +# object_storage_secret_access_key = secret-access-key + +# オブジェクトストレージにアップロードが完了した際に通知するウェブフック + +# 空文字列の場合はウェブフックは飛ばさない +# webhook_endpoint_url = https://example.com/webhook + +# ウェブフックリクエストのタイムアウト時間 (秒) +webhook_request_timeout_s = 30 + +# ウェブフックタイプが入ってくるヘッダー名 +webhook_type_header_name = "sora-archive-uploader-webhook-type" +webhook_type_archive_uploaded = "archive.uploaded" +webhook_type_split_archive_uploaded = "split-archive.uploaded" +webhook_type_split_archive_end_uploaded = "split-archive-end.uploaded" +webhook_type_report_uploaded = "recording-report.uploaded" + +# ウェブフックのベーシック認証 +# 空文字はベーシック認証を行わない +# webhook_basic_auth_username = username +# webhook_basic_auth_password = password + +# webhook で HTTPS を利用する場合にサーバーの証明書をベリファイする場合に指定 +# 指定しない場合は OS のものを利用し、サーバー名までは検証しません +# webhook_tls_verify_cacert_path = /path/to/cacert.pem +# webhook で mTLS を利用する場合に指定します +# webhook_tls_fullchain_path = /path/to/fullchain.pem +# webhook_tls_privkey_path = /path/to/privkey.pem diff --git a/finder.go b/finder.go new file mode 100644 index 0000000..32210a8 --- /dev/null +++ b/finder.go @@ -0,0 +1,78 @@ +package archive + +import ( + "os" + "path/filepath" + "regexp" + "strings" + + zlog "github.com/rs/zerolog/log" +) + +func runFileFinder(archiveDir string) ([]string, error) { + var result []string + zlog.Debug().Str("archive-dir", archiveDir).Msg("START-SCRAPE-DIRECTORY") + files, err := os.ReadDir(archiveDir) + if err != nil { + zlog.Err(err).Msg("ERROR-RUN-FILE-FINDER") + return result, err + } + replaceWebmPattern := regexp.MustCompile(`.json$`) + for _, f := range files { + if !f.IsDir() { + continue + } + dirPath := filepath.Join(archiveDir, f.Name()) + archiveFiles, err := os.ReadDir(dirPath) + if err != nil { + zlog.Err(err).Msg("ERROR-READ-DIRECTORY") + continue + } + var reportFile *string + for _, archiveFile := range archiveFiles { + fullpath := filepath.Join(dirPath, archiveFile.Name()) + filename := archiveFile.Name() + if !(strings.HasSuffix(filename, ".json")) { + zlog.Debug(). + Str("file_path", fullpath). + Msg("IGNORE-FILE-TYPE") + continue + } + // 以下の処理は .json ファイルであることが保証される + if strings.HasPrefix(filename, "report-") { + zlog.Debug(). + Str("file_path", fullpath). + Msg("FOUND-AT-FINDER") + reportFile = &fullpath + } else if strings.HasPrefix(filename, "split-archive-end-") { + zlog.Debug(). + Str("file_path", fullpath). + Msg("FOUND-AT-FINDER") + result = append(result, fullpath) + } else if strings.HasPrefix(filename, "archive-") || strings.HasPrefix(filename, "split-archive-") { + // webm ファイルの存在を確認し、ファイルが存在したら後続の処理にファイルパスを渡す + // webm ファイルが存在しない場合は、次回のスクレイピングのタイミングで処理する + webmFilename := replaceWebmPattern.ReplaceAllString(filename, ".webm") + webmFullpath := filepath.Join(dirPath, webmFilename) + if info, err := os.Stat(webmFullpath); err != nil || info.IsDir() { + continue + } + zlog.Debug(). + Str("file_path", fullpath). + Str("webm_file_path", webmFullpath). + Msg("FOUND-AT-FINDER") + result = append(result, fullpath) + } else { + zlog.Debug(). + Str("file_path", fullpath). + Msg("IGNORE-FILE") + } + } + // ディレクトリ内に report json ファイルが見つかった場合は、最後に流す + if reportFile != nil { + result = append(result, *reportFile) + } + } + zlog.Debug().Str("archive-dir", archiveDir).Msg("END-SCRAPE-DIRECTORY") + return result, nil +} diff --git a/gatekeeper.go b/gatekeeper.go new file mode 100644 index 0000000..459a740 --- /dev/null +++ b/gatekeeper.go @@ -0,0 +1,233 @@ +package archive + +import ( + "context" + "os" + "path/filepath" + "strings" + "sync" + "sync/atomic" + + zlog "github.com/rs/zerolog/log" +) + +type RecordingUnit struct { + mutex sync.RWMutex + recordingID string + counter int32 + reportFile string +} + +func newRecordingUnit(recordingID string) *RecordingUnit { + return &RecordingUnit{ + recordingID: recordingID, + counter: 0, + } +} + +func (ru *RecordingUnit) run() { + ru.mutex.Lock() + defer ru.mutex.Unlock() + atomic.AddInt32(&ru.counter, 1) + zlog.Debug(). + Str("recording_id", ru.recordingID). + Int32("counter", ru.counter). + Msg("RUN-RECORDING-UNIT") +} + +func (ru *RecordingUnit) done() { + atomic.AddInt32(&ru.counter, -1) + zlog.Debug(). + Str("recording_id", ru.recordingID). + Int32("counter", ru.counter). + Msg("DONE-RECORDING-UNIT") +} + +func (ru *RecordingUnit) canProcessAndSetReportFile(reportFile string) bool { + ru.mutex.Lock() + defer ru.mutex.Unlock() + if atomic.LoadInt32(&ru.counter) == 0 { + return true + } + ru.reportFile = reportFile + return false +} + +func (ru *RecordingUnit) canProcessAndGetReportFile() (*string, bool) { + ru.mutex.Lock() + defer ru.mutex.Unlock() + if atomic.LoadInt32(&ru.counter) == 0 && ru.reportFile != "" { + return &ru.reportFile, true + } + return nil, false +} + +type GateKeeper struct { + mutex sync.RWMutex + config *Config + ctx context.Context + processingList sync.Map + processingCounter int64 + out chan string +} + +func newGateKeeper(config *Config) *GateKeeper { + g := &GateKeeper{ + config: config, + processingList: sync.Map{}, + out: make(chan string, 50), + } + return g +} + +func (g *GateKeeper) stop() { + close(g.out) + zlog.Debug().Msg("STOPPED-GATE-KEEPER") +} + +func (g *GateKeeper) run(ctx context.Context, infiles []string) <-chan string { + g.ctx = ctx + go func() { + defer g.stop() + + processArchiveFile := func(infile string) { + filename := filepath.Base(infile) + recordingID := filepath.Base(filepath.Dir(infile)) + atomic.AddInt64(&g.processingCounter, 1) + if strings.HasPrefix(filename, "report-") { + g.mutex.Lock() + ru, ok := g.getRecordingUnit(recordingID) + if !ok { + // report-* の前に他のファイルが処理されてない + g.mutex.Unlock() + go func() { + select { + case <-g.ctx.Done(): + case g.out <- infile: + } + }() + return + } + g.mutex.Unlock() + + if ru.canProcessAndSetReportFile(infile) { + go func() { + select { + case <-g.ctx.Done(): + case g.out <- infile: + } + }() + return + } + return + } + if strings.HasPrefix(filename, "split-archive-end-") { + g.processRun(recordingID) + select { + case <-g.ctx.Done(): + case g.out <- infile: + } + return + } + if strings.HasPrefix(filename, "archive-") || strings.HasPrefix(filename, "split-archive-") { + archiveID := strings.Split(filename, ".")[0] + g.processRun(recordingID) + select { + case <-g.ctx.Done(): + return + case g.out <- infile: + zlog.Debug().Str("archive_id", archiveID).Msg("RUN-ARCHIVE-FILE-PROCESS") + } + return + } + } + + for _, infile := range infiles { + processArchiveFile(infile) + } + <-g.ctx.Done() + }() + return g.out +} + +func (g *GateKeeper) getRecordingUnit(recordingID string) (*RecordingUnit, bool) { + ru, ok := g.processingList.Load(recordingID) + if ok { + return ru.(*RecordingUnit), ok + } else { + return nil, ok + } +} + +func (g *GateKeeper) processRun(recordingID string) { + g.mutex.Lock() + defer g.mutex.Unlock() + ru, ok := g.getRecordingUnit(recordingID) + if !ok { + ru = newRecordingUnit(recordingID) + g.processingList.Store(recordingID, ru) + } + ru.run() +} + +func (g *GateKeeper) processDone(infile string) { + g.mutex.Lock() + defer g.mutex.Unlock() + zlog.Debug().Str("infile", infile).Msg("PROCESS-DONE") + + recordingID := filepath.Base(filepath.Dir(infile)) + ru, ok := g.getRecordingUnit(recordingID) + if !ok { + atomic.AddInt64(&g.processingCounter, -1) + zlog.Error().Str("infile", infile).Msg("WAIT-GROUP-NOT-FOUND") + return + } + ru.done() + if reportFile, ok := ru.canProcessAndGetReportFile(); ok { + go func() { + select { + case <-g.ctx.Done(): + case g.out <- *reportFile: + } + }() + } + atomic.AddInt64(&g.processingCounter, -1) +} + +func (g *GateKeeper) recordingDone(infile string) { + zlog.Debug().Str("infile", infile).Msg("RECORDING-DONE") + + // Recording ID のディレクトリを削除するべきだが、いきなり削除せず、mv で監視対象外のパスに移動しておく + dirname := filepath.Dir(infile) + newDirPath := filepath.Join(g.config.SoraEvacuateDirFullPath, filepath.Base(dirname)) + var evacuatePath = g.config.SoraEvacuateDirFullPath + _, err := os.Stat(g.config.SoraEvacuateDirFullPath) + if err != nil { + err = os.Mkdir(evacuatePath, 0755) + if err != nil { + zlog.Error(). + Str("evacuate_dir_path", evacuatePath). + Str("old_path", dirname). + Str("new_path", newDirPath). + Msg("EVACUATE-DIRECTORY-CREATE-ERROR") + } + } + err = os.Rename(dirname, newDirPath) + if err != nil { + zlog.Error(). + Err(err). + Str("old_path", dirname). + Str("new_path", newDirPath). + Msg("RECORDING-DIRECTORY-MOVE-ERROR") + } else { + zlog.Debug(). + Str("old_path", dirname). + Str("new_path", newDirPath). + Msg("RECORDING-DIRECTORY-MOVE-SUCCESSFULLY") + } + atomic.AddInt64(&g.processingCounter, -1) +} + +func (g *GateKeeper) isFileUploadFinished() bool { + return atomic.LoadInt64(&g.processingCounter) == 0 +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..04837a3 --- /dev/null +++ b/go.mod @@ -0,0 +1,37 @@ +module github.com/shiguredo/sora-archive-uploader + +go 1.21.3 + +require ( + github.com/conduitio/bwlimit v0.1.0 + github.com/google/uuid v1.3.1 + github.com/minio/minio-go/v7 v7.0.63 + github.com/rs/zerolog v1.31.0 + github.com/shogo82148/go-clockwork-base32 v1.1.0 + github.com/stretchr/testify v1.8.4 + gopkg.in/ini.v1 v1.67.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.1 // indirect + github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rs/xid v1.5.0 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/time v0.3.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d56410b --- /dev/null +++ b/go.sum @@ -0,0 +1,76 @@ +github.com/conduitio/bwlimit v0.1.0 h1:x3ijON0TSghQob4tFKaEvKixFmYKfVJQeSpXluC2JvE= +github.com/conduitio/bwlimit v0.1.0/go.mod h1:E+ASZ1/5L33MTb8hJTERs5Xnmh6Ulq3jbRh7LrdbXWU= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= +github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= +github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.63 h1:GbZ2oCvaUdgT5640WJOpyDhhDxvknAJU2/T3yurwcbQ= +github.com/minio/minio-go/v7 v7.0.63/go.mod h1:Q6X7Qjb7WMhvG65qKf4gUgA5XaiSox74kR1uAEjxRS4= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= +github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/shogo82148/go-clockwork-base32 v1.1.0 h1:PEqSTiyVEKdl5ar5RHlUQjFJyJTx9XZD5dUVCLG08K0= +github.com/shogo82148/go-clockwork-base32 v1.1.0/go.mod h1:iMSFoMZCgiXZbDyIpEjQxr4T9dH5q8agqxt9HECoiEQ= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logging.go b/logging.go new file mode 100644 index 0000000..d090f56 --- /dev/null +++ b/logging.go @@ -0,0 +1,122 @@ +package archive + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "gopkg.in/natefinch/lumberjack.v2" +) + +func initLogger(config *Config) error { + if f, err := os.Stat(config.LogDir); os.IsNotExist(err) || !f.IsDir() { + return err + } + + logPath := fmt.Sprintf("%s/%s", config.LogDir, config.LogName) + + // https://github.com/rs/zerolog/issues/77 + zerolog.TimestampFunc = func() time.Time { + return time.Now().UTC() + } + + zerolog.TimeFieldFormat = "2006-01-02T15:04:05.000000Z" + + if config.Debug { + zerolog.SetGlobalLevel(zerolog.DebugLevel) + } else { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } + + if config.Debug && config.LogStdout { + writer := zerolog.ConsoleWriter{ + Out: os.Stdout, + FormatTimestamp: func(i interface{}) string { + darkGray := "\x1b[90m" + reset := "\x1b[0m" + return strings.Join([]string{darkGray, i.(string), reset}, "") + }, + NoColor: false, + } + prettyFormat(&writer) + log.Logger = zerolog.New(writer).With().Caller().Timestamp().Logger() + } else if config.LogStdout { + writer := os.Stdout + log.Logger = zerolog.New(writer).With().Caller().Timestamp().Logger() + } else { + var logRotateMaxSize, logRotateMaxBackups, logRotateMaxAge int + if config.LogRotateMaxSize == 0 { + logRotateMaxSize = DefaultLogRotateMaxSize + } + if config.LogRotateMaxBackups == 0 { + logRotateMaxBackups = DefaultLogRotateMaxBackups + } + if config.LogRotateMaxAge == 0 { + logRotateMaxAge = DefaultLogRotateMaxAge + } + + writer := &lumberjack.Logger{ + Filename: logPath, + MaxSize: logRotateMaxSize, + MaxBackups: logRotateMaxBackups, + MaxAge: logRotateMaxAge, + Compress: false, + } + log.Logger = zerolog.New(writer).With().Caller().Timestamp().Logger() + } + + return nil +} + +// 現時点での prettyFormat +// 2023-04-17 12:51:56.333485Z [INFO] config.go:102 > CONF | debug=true +func prettyFormat(w *zerolog.ConsoleWriter) { + const Reset = "\x1b[0m" + + w.FormatLevel = func(i interface{}) string { + var color, level string + // TODO: 各色を定数に置き換える + // TODO: 他の logLevel が必要な場合は追加する + switch i.(string) { + case "info": + color = "\x1b[32m" + case "error": + color = "\x1b[31m" + case "warn": + color = "\x1b[33m" + case "debug": + color = "\x1b[34m" + default: + color = "\x1b[37m" + } + + level = strings.ToUpper(i.(string)) + return fmt.Sprintf("%s[%s]%s", color, level, Reset) + } + w.FormatCaller = func(i interface{}) string { + return fmt.Sprintf("[%s]", filepath.Base(i.(string))) + } + // TODO: Caller をファイル名と行番号だけの表示で出力する + // 以下のようなフォーマットにしたい + // 2023-04-17 12:50:09.334758Z [INFO] [config.go:102] CONF | debug=true + // TODO: name=value が無い場合に | を消す方法がわからなかった + w.FormatMessage = func(i interface{}) string { + if i == nil { + return "" + } else { + return fmt.Sprintf("%s |", i) + } + } + w.FormatFieldName = func(i interface{}) string { + const Cyan = "\x1b[36m" + return fmt.Sprintf("%s%s=%s", Cyan, i, Reset) + } + // TODO: カンマ区切りを同実現するかわからなかった + w.FormatFieldValue = func(i interface{}) string { + return fmt.Sprintf("%s", i) + } +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..882fb41 --- /dev/null +++ b/runner.go @@ -0,0 +1,186 @@ +package archive + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + zlog "github.com/rs/zerolog/log" +) + +type Main struct { + config *Config +} + +func newMain(config *Config) *Main { + return &Main{ + config: config, + } +} + +func (m *Main) run(ctx context.Context, cancel context.CancelFunc) error { + var archiveDir = m.config.SoraArchiveDirFullPath + zlog.Debug().Str("path", archiveDir).Msg("WATCHING-ROOT-DIR") + fileInfo, err := os.Stat(archiveDir) + if err != nil { + // 対象のディレクトリが存在しなければ終わる + zlog.Fatal().Err(err).Str("path", archiveDir).Msg("NOT-FOUND-TARGET-PATH") + } + if !fileInfo.IsDir() { + // 対象のパスが Directory でなければ終わる + zlog.Fatal().Str("path", archiveDir).Msg("TARGET-PATH-DOES-NOT-DIRECTORY") + } + // TODO: パーミッションチェック + + // ディレクトリ退避先を作成する + var evacuatePath = m.config.SoraEvacuateDirFullPath + _, err = os.Stat(m.config.SoraEvacuateDirFullPath) + if err != nil { + err = os.Mkdir(evacuatePath, 0755) + if err != nil { + zlog.Fatal(). + Str("evacuate_dir_path", evacuatePath). + Msg("CANT-CREATE-DIRECTORY") + } + } + + foundFiles, err := runFileFinder(archiveDir) + if err != nil { + return err + } + if len(foundFiles) == 0 { + // 処理対象のファイルが見つからなかったので終わる + cancel() + zlog.Debug().Msg("ARCHIVE-FILE-NOT-FOUND") + return nil + } + + processContext, processContextCancel := context.WithCancel(context.Background()) + gateKeeper := newGateKeeper(m.config) + recordingFileStream := gateKeeper.run(processContext, foundFiles) + + uploaderManager := newUploaderManager() + _, err = uploaderManager.run(processContext, m.config, recordingFileStream) + if err != nil { + processContextCancel() + return err + } + + for { + select { + case <-ctx.Done(): + processContextCancel() + // 停止ログ出力待ちのため、500ms 待ってから停止している + <-time.After(500 * time.Millisecond) + return nil + case archiveFileResult := <-uploaderManager.ArchiveStream: + if !archiveFileResult.Success { + zlog.Warn(). + Str("archive_file", archiveFileResult.Filepath). + Msg("FAILED-UPLOAD-ARCHIVE-END") + } + // zlog.Info(). + // Str("archive_file", archiveFileResult.Filepath). + // Msg("UPLOADED-ARCHIVE-FILE") + gateKeeper.processDone(archiveFileResult.Filepath) + if gateKeeper.isFileUploadFinished() { + cancel() + } + case archiveEndFileResult := <-uploaderManager.ArchiveEndStream: + if !archiveEndFileResult.Success { + zlog.Warn(). + Str("archive_end_file", archiveEndFileResult.Filepath). + Msg("FAILED-UPLOAD-ARCHIVE-END") + } + // zlog.Info(). + // Str("archive_end_file", archiveEndFileResult.Filepath). + // Msg("UPLOADED-ARCHIVE-END-FILE") + gateKeeper.processDone(archiveEndFileResult.Filepath) + if gateKeeper.isFileUploadFinished() { + cancel() + } + case reportFileResult := <-uploaderManager.ReportStream: + if !reportFileResult.Success { + zlog.Warn(). + Str("report_file", reportFileResult.Filepath). + Msg("FAILED-UPLOAD-ARCHIVE-END") + } + // zlog.Info(). + // Str("report_file", reportFileResult.Filepath). + // Msg("UPLOADED-REPORT-FILE") + gateKeeper.recordingDone(reportFileResult.Filepath) + if gateKeeper.isFileUploadFinished() { + cancel() + } + } + } +} + +func Run(configFilePath *string) { + // INI をパース + config, err := newConfig(*configFilePath) + if err != nil { + // パースに失敗した場合 Fatal で終了 + log.Fatal("cannot parse config file, err=", err) + } + + // ロガー初期化 + err = initLogger(config) + if err != nil { + // ロガー初期化に失敗したら Fatal で終了 + log.Fatal("cannot parse config file, err=", err) + } + + // もしあれば mTLS の設定確認と Webhook のヘルスチェック + if config.WebhookEndpointHealthCheckURL != "" { + client, err := createHTTPClient(config) + if err != nil { + zlog.Fatal().Err(err).Msg("FAILED-CREATE-RPC-CLIENT") + } + // ヘルスチェック URL で起動確認する + resp, err := client.Get(config.WebhookEndpointHealthCheckURL) + if err != nil { + zlog.Fatal().Err(err).Msg("WEBHOOK-SERVER-CONNECT-ERROR") + } + if resp.StatusCode != 200 { + zlog.Fatal().Err(err).Msg("WEBHOOK-SERVER-UNHEALTHY") + } + resp.Body.Close() + } + + zlog.Debug().Msg("STARTED-SORA-ARCHIVE-UPLOADER") + + // シグナルをキャッチして停止処理 + trapSignals := []os.Signal{ + syscall.SIGINT, + syscall.SIGTERM, + } + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, trapSignals...) + ctx, cancel := context.WithCancel(context.Background()) + doneShutdown := make(chan interface{}) + defer close(doneShutdown) + go func() { + sig := <-signalChannel + zlog.Debug().Str("signal", sig.String()).Msg("RECEIVED-SIGNAL") + + cancel() + doneShutdown <- struct{}{} + }() + + // ディレクトリ監視とアップロード処理 + m := newMain(config) + if err := m.run(ctx, cancel); err != nil { + zlog.Error().Err(err).Msg("FAILED-RUN") + os.Exit(1) + } else { + go func() { + doneShutdown <- struct{}{} + }() + } + <-doneShutdown + zlog.Debug().Msg("STOPPED-SORA-ARCHIVE-UPLOADER") +} diff --git a/s3.go b/s3.go new file mode 100644 index 0000000..1464f97 --- /dev/null +++ b/s3.go @@ -0,0 +1,198 @@ +package archive + +import ( + "context" + "fmt" + "net" + "net/url" + "os" + "path/filepath" + "time" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + zlog "github.com/rs/zerolog/log" + "github.com/shiguredo/sora-archive-uploader/s3" + + "github.com/conduitio/bwlimit" +) + +func uploadJSONFile( + ctx context.Context, + osConfig *s3.S3CompatibleObjectStorage, + dst, filePath string, +) (string, error) { + var creds *credentials.Credentials + if (osConfig.AccessKeyID != "") || (osConfig.SecretAccessKey != "") { + creds = credentials.NewStaticV4( + osConfig.AccessKeyID, + osConfig.SecretAccessKey, + "", + ) + } else if (len(os.Getenv("AWS_ACCESS_KEY_ID")) > 0) && (len(os.Getenv("AWS_SECRET_ACCESS_KEY")) > 0) { + creds = credentials.NewEnvAWS() + } else { + creds = credentials.NewIAM("") + } + + s3Client, err := s3.NewClient(osConfig.Endpoint, creds) + if err != nil { + return "", err + } + + n, err := s3Client.FPutObject(ctx, + osConfig.BucketName, dst, filePath, + minio.PutObjectOptions{ContentType: "application/octet-stream"}, + ) + if err != nil { + return "", err + } + zlog.Debug(). + Str("dst", dst). + Int64("size", n.Size). + Msg("UPLAOD-SUCCESSFULLY") + + reqParams := make(url.Values) + filename := filepath.Base(dst) + zlog.Debug(). + Str("filename", filename). + Msg("CREATE-CONTENT-DISPOSITION-FILENAME") + reqParams.Set( + "response-content-disposition", + fmt.Sprintf("attachment; filename=\"%s\"", filename), + ) + + objectURL := fmt.Sprintf("s3://%s/%s", n.Bucket, n.Key) + return objectURL, nil +} + +func uploadWebMFile(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage, dst, filePath string) (string, error) { + var creds *credentials.Credentials + if (osConfig.AccessKeyID != "") || (osConfig.SecretAccessKey != "") { + creds = credentials.NewStaticV4( + osConfig.AccessKeyID, + osConfig.SecretAccessKey, + "", + ) + } else if (len(os.Getenv("AWS_ACCESS_KEY_ID")) > 0) && (len(os.Getenv("AWS_SECRET_ACCESS_KEY")) > 0) { + creds = credentials.NewEnvAWS() + } else { + creds = credentials.NewIAM("") + } + s3Client, err := s3.NewClient(osConfig.Endpoint, creds) + if err != nil { + return "", err + } + + zlog.Info(). + Str("dst", dst). + Msg("WEB-UPLOAD-START") + n, err := s3Client.FPutObject(ctx, + osConfig.BucketName, dst, filePath, + minio.PutObjectOptions{ContentType: "application/octet-stream"}, + ) + if err != nil { + return "", err + } + zlog.Info(). + Str("dst", dst). + Int64("size", n.Size). + Msg("UPLOAD-WEBM-SUCCESSFULLY") + + reqParams := make(url.Values) + filename := filepath.Base(dst) + zlog.Debug(). + Str("filename", filename). + Msg("create content-disposition filename") + reqParams.Set( + "response-content-disposition", + fmt.Sprintf("attachment; filename=\"%s\"", filename), + ) + + objectURL := fmt.Sprintf("s3://%s/%s", n.Bucket, n.Key) + return objectURL, nil +} + +// minio のエラーをレスポンスに復元して、リトライするためファイルを残すか対象のファイルを削除するか判断する +func isFileContinuous(err error) bool { + errResp := minio.ToErrorResponse(err) + switch errResp.Code { + case "NoSuchBucket": + return false + case "AccessDenied": + return false + case "InvalidRegion": + return false + } + return true +} + +func uploadWebMFileWithRateLimit(ctx context.Context, osConfig *s3.S3CompatibleObjectStorage, dst, filePath string, + rateLimitMpbs int) (string, error) { + var creds *credentials.Credentials + if (osConfig.AccessKeyID != "") || (osConfig.SecretAccessKey != "") { + creds = credentials.NewStaticV4( + osConfig.AccessKeyID, + osConfig.SecretAccessKey, + "", + ) + } else if (len(os.Getenv("AWS_ACCESS_KEY_ID")) > 0) && (len(os.Getenv("AWS_SECRET_ACCESS_KEY")) > 0) { + creds = credentials.NewEnvAWS() + } else { + creds = credentials.NewIAM("") + } + + // bit を byte にする + rateLimitMByteps := (bwlimit.Byte(rateLimitMpbs) * bwlimit.MiB) / 8 + + // 受信には制限をかけない + dialer := bwlimit.NewDialer(&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }, rateLimitMByteps, 0) + + transport, err := s3.DefaultTransport(osConfig.Endpoint) + if err != nil { + return "", err + } + transport.DialContext = dialer.DialContext + + s3Client, err := s3.NewClientWithTransport(osConfig.Endpoint, creds, transport) + if err != nil { + return "", err + } + + fileReader, err := os.Open(filePath) + if err != nil { + return "", err + } + defer fileReader.Close() + + // Save the file stat. + fileStat, err := fileReader.Stat() + if err != nil { + return "", err + } + + // Save the file size. + fileSize := fileStat.Size() + + zlog.Info(). + Str("dst", dst). + Msg("WEB-UPLOAD-START") + + // 制限時にはマルチパートアップロードを行わない + n, err := s3Client.PutObject(ctx, osConfig.BucketName, dst, fileReader, fileSize, + minio.PutObjectOptions{ContentType: "application/octet-stream", DisableMultipart: true}) + if err != nil { + return "", err + } + + zlog.Info(). + Str("dst", dst). + Int64("size", n.Size). + Msg("UPLOAD-WEBM-SUCCESSFULLY") + + objectURL := fmt.Sprintf("s3://%s/%s", n.Bucket, n.Key) + return objectURL, nil +} diff --git a/s3/s3.go b/s3/s3.go new file mode 100644 index 0000000..bc2cad7 --- /dev/null +++ b/s3/s3.go @@ -0,0 +1,67 @@ +package s3 + +import ( + "net/http" + "net/url" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +type S3CompatibleObjectStorage struct { + Endpoint string + AccessKeyID string + SecretAccessKey string + BucketName string +} + +func maybeEndpointURL(endpoint string) (string, bool) { + // もし endpoint に指定されたのが endpoint_url だった場合、 + // scheme をチェックして http ならば secure = false にする + // さらに host だけを取り出して endpoint として扱う + var secure = false + u, err := url.Parse(endpoint) + // エラーがあっても無視してそのまま文字列として扱う + // エラーがないときだけ scheme チェックする + if err == nil { + switch u.Scheme { + case "http": + return u.Host, secure + case "https": + // https なので secure を true にする + secure = true + return u.Host, secure + case "": + // scheme なしの場合は secure を true にする + secure = true + return endpoint, secure + default: + // サポート外の scheme の場合はタダの文字列として扱う + } + } + return endpoint, secure +} + +func NewClient(endpoint string, credentials *credentials.Credentials) (*minio.Client, error) { + transport, err := DefaultTransport(endpoint) + if err != nil { + return nil, err + } + return NewClientWithTransport(endpoint, credentials, transport) +} + +func NewClientWithTransport(endpoint string, credentials *credentials.Credentials, transport http.RoundTripper) (*minio.Client, error) { + newEndpoint, secure := maybeEndpointURL(endpoint) + return minio.New( + newEndpoint, + &minio.Options{ + Creds: credentials, + Secure: secure, + Transport: transport, + }) +} + +func DefaultTransport(endpoint string) (*http.Transport, error) { + _, secure := maybeEndpointURL(endpoint) + return minio.DefaultTransport(secure) +} diff --git a/s3/s3_test.go b/s3/s3_test.go new file mode 100644 index 0000000..55e8d70 --- /dev/null +++ b/s3/s3_test.go @@ -0,0 +1,25 @@ +package s3 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMaybeEndpointURL(t *testing.T) { + endpoint, secure := maybeEndpointURL("s3.example.com") + assert.Equal(t, "s3.example.com", endpoint) + assert.True(t, secure) + + endpoint, secure = maybeEndpointURL("http://s3.example.com") + assert.Equal(t, "s3.example.com", endpoint) + assert.False(t, secure) + + endpoint, secure = maybeEndpointURL("https://s3.example.com") + assert.Equal(t, "s3.example.com", endpoint) + assert.True(t, secure) + + endpoint, secure = maybeEndpointURL("ldap://s3.example.com") + assert.Equal(t, "ldap://s3.example.com", endpoint) + assert.False(t, secure) +} diff --git a/script/sora-archive-uploader.service b/script/sora-archive-uploader.service new file mode 100644 index 0000000..ff23c29 --- /dev/null +++ b/script/sora-archive-uploader.service @@ -0,0 +1,21 @@ +[Unit] +Description=Sora Archive Uploader Service +RefuseManualStart=no +RefuseManualStop=yes +After=network-online.target + +[Service] +Type=oneshot +User=sora +Group=sora +PermissionsStartOnly=true +Restart=no + +WorkingDirectory=/home/sora/sora-archive-uploader +ExecStartPre=/bin/mkdir -p /var/log/sora-archive-uploader +ExecStartPre=/bin/chown -R sora:sora /var/log/sora-archive-uploader + +ExecStart=/home/sora/sora-archive-uploader/bin/sora-archive-uploader -C /home/sora/sora-archive-uploader/config.ini + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/script/sora-archive-uploader.timer b/script/sora-archive-uploader.timer new file mode 100644 index 0000000..21909be --- /dev/null +++ b/script/sora-archive-uploader.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Sora Archive Uploader Wake Timer + +[Timer] +OnBootSec=2min +OnUnitActiveSec=1min + +[Install] +WantedBy=timers.target \ No newline at end of file diff --git a/staticcheck.conf b/staticcheck.conf new file mode 100644 index 0000000..5f0a348 --- /dev/null +++ b/staticcheck.conf @@ -0,0 +1 @@ +checks = ["all", "-ST1000"] diff --git a/uploader.go b/uploader.go new file mode 100644 index 0000000..994a704 --- /dev/null +++ b/uploader.go @@ -0,0 +1,711 @@ +package archive + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/google/uuid" + "github.com/shiguredo/sora-archive-uploader/s3" + base32 "github.com/shogo82148/go-clockwork-base32" + + zlog "github.com/rs/zerolog/log" +) + +type RecordingReport struct { + RecordingID string `json:"recording_id"` + ChannelID string `json:"channel_id"` + FilePath string `json:"file_path"` + Filename string `json:"filename"` +} + +type UploaderManager struct { + ArchiveStream chan UploaderResult + ArchiveEndStream chan UploaderResult + ReportStream chan UploaderResult + uploaders []Uploader +} + +type ArchiveMetadata struct { + RecordingID string `json:"recording_id"` + ChannelID string `json:"channel_id"` + SessionID string `json:"session_id"` + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + FilePath string `json:"file_path"` + Filename string `json:"filename"` + MetadataFilePath string `json:"metadata_file_path"` + MetadataFilename string `json:"metadata_filename"` +} + +type ArchiveEndMetadata struct { + RecordingID string `json:"recording_id"` + ChannelID string `json:"channel_id"` + SessionID string `json:"session_id"` + ClientID string `json:"client_id"` + ConnectionID string `json:"connection_id"` + FilePath string `json:"file_path"` + Filename string `json:"filename"` +} + +func newUploaderManager() *UploaderManager { + var uploaders []Uploader + archiveStream := make(chan UploaderResult) + archiveEndStream := make(chan UploaderResult) + reportStream := make(chan UploaderResult) + return &UploaderManager{ + ArchiveStream: archiveStream, + ArchiveEndStream: archiveEndStream, + ReportStream: reportStream, + uploaders: uploaders, + } +} + +func (um *UploaderManager) run(ctx context.Context, config *Config, fileStream <-chan string) (*UploaderManager, error) { + for i := 0; i < config.UploadWorkers; i++ { + uploader, err := newUploader(i+1, config) + if err != nil { + return nil, err + } + uploader.run(fileStream, um.ArchiveStream, um.ArchiveEndStream, um.ReportStream) + um.uploaders = append(um.uploaders, *uploader) + } + go func() { + defer func() { + close(um.ArchiveStream) + close(um.ArchiveEndStream) + close(um.ReportStream) + }() + + <-ctx.Done() + zlog.Debug().Msg("STOP-UPLOADER-MANAGER") + for _, u := range um.uploaders { + u.Stop() + } + zlog.Debug().Msg("STOPPED-UPLOADER-MANAGER") + }() + return um, nil +} + +type UploaderResult struct { + Success bool + Filepath string +} + +type Uploader struct { + id int + config *Config + ctx context.Context + cancel context.CancelFunc + base32Encoder *base32.Encoding +} + +func newUploader(id int, config *Config) (*Uploader, error) { + u := &Uploader{ + id: id, + config: config, + base32Encoder: base32.NewEncoding(), + } + u.ctx, u.cancel = context.WithCancel(context.Background()) + return u, nil +} + +func (u Uploader) run( + fileStream <-chan string, + outArchive chan UploaderResult, + outArchiveEnd chan UploaderResult, + outReport chan UploaderResult, +) { + go func() { + for { + select { + case <-u.ctx.Done(): + zlog.Debug(). + Int("uploader_id", u.id). + Msg("STOPPED-UPLOADER") + return + case inputFilepath, ok := <-fileStream: + if !ok { + continue + } + filename := filepath.Base(inputFilepath) + if strings.HasPrefix(filename, "report-") { + zlog.Debug(). + Int("uploader_id", u.id). + Str("json_file_path", inputFilepath). + Msg("FOUND-AT-STARTUP") + ok := u.handleReport(inputFilepath) + select { + case <-u.ctx.Done(): + return + case outReport <- UploaderResult{ + Success: ok, + Filepath: inputFilepath, + }: + } + } else if strings.HasPrefix(filename, "split-archive-end-") { + zlog.Debug(). + Int("uploader_id", u.id). + Str("json_file_path", inputFilepath). + Msg("FOUND-AT-STARTUP") + ok := u.handleArchiveEnd(inputFilepath) + select { + case <-u.ctx.Done(): + return + case outArchiveEnd <- UploaderResult{ + Success: ok, + Filepath: inputFilepath, + }: + } + } else if strings.HasPrefix(filename, "archive-") { + zlog.Debug(). + Int("uploader_id", u.id). + Str("file_path", inputFilepath). + Msg("FOUND-AT-STARTUP") + ok := u.handleArchive(inputFilepath, false) + select { + case <-u.ctx.Done(): + return + case outArchive <- UploaderResult{ + Success: ok, + Filepath: inputFilepath, + }: + } + } else if strings.HasPrefix(filename, "split-archive-") { + zlog.Debug(). + Int("uploader_id", u.id). + Str("file_path", inputFilepath). + Msg("FOUND-AT-STARTUP") + ok := u.handleArchive(inputFilepath, true) + select { + case <-u.ctx.Done(): + return + case outArchive <- UploaderResult{ + Success: ok, + Filepath: inputFilepath, + }: + } + } + } + } + }() +} + +func (u Uploader) Stop() { + u.cancel() +} + +func (u Uploader) handleArchive(archiveJSONFilePath string, split bool) bool { + fileInfo, err := os.Stat(archiveJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Msg("JSON-NOT-ACCESSIBLE") + return false + } + + // json をパースする + raw, err := os.ReadFile(archiveJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveJSONFilePath). + Msg("ARCHIVE-JSON-FILE-READ-ERROR") + return false + } + + var am ArchiveMetadata + if err := json.Unmarshal(raw, &am); err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveJSONFilePath). + Msg("ARCHIVE-JSON-PARSE-ERROR") + return false + } + + // ここで s3 ファイルをアップロード + // json がくればファイルパスもわかる + zlog.Debug(). + Int("uploader_id", u.id). + Str("recording_id", am.RecordingID). + Str("channel_id", am.ChannelID). + Str("connection_id", am.ConnectionID). + Msg("ARCHIVE-METADATA-INFO") + + // webm ファイルのパスを作っておく + webmFilename := filepath.Base(am.Filename) + webmFilepath := filepath.Join(filepath.Dir(archiveJSONFilePath), webmFilename) + + // metadata ファイル (json) をアップロード + metadataFilename := fileInfo.Name() + metadataObjectKey := fmt.Sprintf("%s/%s", am.RecordingID, metadataFilename) + osConfig := &s3.S3CompatibleObjectStorage{ + Endpoint: u.config.ObjectStorageEndpoint, + BucketName: u.config.ObjectStorageBucketName, + AccessKeyID: u.config.ObjectStorageAccessKeyID, + SecretAccessKey: u.config.ObjectStorageSecretAccessKey, + } + + // webm ファイルを開いておく + f, err := os.Open(webmFilepath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveJSONFilePath). + Str("webm_filename", am.FilePath). + Msg("WEBM-FILE-OPEN-ERROR") + return false + } + defer f.Close() + + zlog.Info(). + Str("path", webmFilepath). + Msg("WEBM-FILE-PATH") + + metadataFileURL, err := uploadJSONFile( + u.ctx, + osConfig, + metadataObjectKey, + archiveJSONFilePath, + ) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveJSONFilePath). + Str("metadata_filename", metadataFilename). + Str("metadata_object_key", metadataObjectKey). + Msg("METADATA-FILE-UPLOAD-ERROR") + if !isFileContinuous(err) { + // リトライしないエラーの場合は、ファイルを削除 + u.removeArchiveJSONFile(archiveJSONFilePath, webmFilepath) + u.removeArchiveWEBMFile(archiveJSONFilePath, webmFilepath) + } + return false + } + zlog.Debug(). + Int("uploader_id", u.id). + Str("uploaded_matadata", am.MetadataFilename). + Msg("UPLOAD-METADATA-FILE-SUCCESSFULLY") + + webmObjectKey := fmt.Sprintf("%s/%s", am.RecordingID, webmFilename) + + var fileURL string + if u.config.UploadFileRateLimitMbps == 0 { + fileURL, err = uploadWebMFile(u.ctx, osConfig, webmObjectKey, webmFilepath) + } else { + fileURL, err = uploadWebMFileWithRateLimit(u.ctx, osConfig, webmObjectKey, webmFilepath, u.config.UploadFileRateLimitMbps) + } + + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveJSONFilePath). + Str("webm_filename", webmFilename). + Str("webm_object_key", webmObjectKey). + Msg("WEBM-FILE-UPLOAD-ERROR") + if !isFileContinuous(err) { + // リトライしないエラーの場合は、ファイルを削除 + u.removeArchiveJSONFile(archiveJSONFilePath, webmFilepath) + u.removeArchiveWEBMFile(archiveJSONFilePath, webmFilepath) + } + return false + } + zlog.Debug(). + Int("uploader_id", u.id). + Str("uploaded_webm", am.Filename). + Msg("UPLOAD-WEBM-FILE-SUCCESSFULLY") + + if u.config.WebhookEndpointURL != "" { + var archiveUploadedType string + if split { + archiveUploadedType = u.config.WebhookTypeSplitArchiveUploaded + } else { + archiveUploadedType = u.config.WebhookTypeArchiveUploaded + } + webhookID, err := u.generateWebhookID() + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("uploaded_webm", am.Filename). + Msg("WEBHOOK-ID-GENERATE-ERROR") + return false + } + var w = WebhookArchiveUploaded{ + ID: webhookID, + Type: archiveUploadedType, + Timestamp: time.Now().UTC(), + SessionID: am.SessionID, + ClientID: am.ClientID, + RecordingID: am.RecordingID, + ChannelID: am.ChannelID, + ConnectionID: am.ConnectionID, + Filename: webmFilename, + FileURL: fileURL, + MetadataFilename: metadataFilename, + MetadataFileURL: metadataFileURL, + } + buf, err := json.Marshal(w) + if err != nil { + zlog.Error(). + Int("uploader_id", u.id). + Err(err). + Msg("ARCHIVE-UPLOADED-WEBHOOK-MARSHAL-ERROR") + return false + } + if err := u.postWebhook( + archiveUploadedType, + buf, + ); err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("recording_id", w.RecordingID). + Str("channel_id", w.ChannelID). + Str("filename", w.Filename). + Str("metadata_filename", w.MetadataFilename). + Msg("ARCHIVE-UPLOADED-WEBHOOK-SEND-ERROR") + return false + } + } + + // 処理し終わったファイルを削除 + jsonError := u.removeArchiveJSONFile(archiveJSONFilePath, webmFilepath) + webmError := u.removeArchiveWEBMFile(archiveJSONFilePath, webmFilepath) + return jsonError == nil && webmError == nil +} + +func (u Uploader) handleReport(reportJSONFilePath string) bool { + fileInfo, err := os.Stat(reportJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Msg("JSON-NOT-ACCESSABLE") + return false + } + + // report- ファイルのアップロード + // json をパースする + raw, err := os.ReadFile(reportJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", reportJSONFilePath). + Msg("REPORT-JSON-FILE-READ-ERROR") + return false + } + var rr RecordingReport + if err := json.Unmarshal(raw, &rr); err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", reportJSONFilePath). + Msg("REPORT-JSON-FILE-UNMARSHAL-ERROR") + return false + } + + // report ファイル (json) をアップロード + filename := fileInfo.Name() + reportObjectKey := fmt.Sprintf("%s/%s", rr.RecordingID, filename) + osConfig := &s3.S3CompatibleObjectStorage{ + Endpoint: u.config.ObjectStorageEndpoint, + BucketName: u.config.ObjectStorageBucketName, + AccessKeyID: u.config.ObjectStorageAccessKeyID, + SecretAccessKey: u.config.ObjectStorageSecretAccessKey, + } + + fileURL, err := uploadJSONFile( + u.ctx, + osConfig, + reportObjectKey, + reportJSONFilePath, + ) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("filename", filename). + Str("report_object_key", reportObjectKey). + Msg("REPORT-FILE-UPLOAD-ERROR") + if !isFileContinuous(err) { + // リトライしないエラーの場合は、ファイルを削除 + u.removeReportFile(reportJSONFilePath) + } + return false + } + zlog.Debug(). + Int("uploader_id", u.id). + Str("uplaoded_report", filename). + Msg("UPLOAD-REPORT-JSON-SUCCESSFULLY") + + if u.config.WebhookEndpointURL != "" { + webhookID, err := u.generateWebhookID() + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("uploaded_report", filename). + Msg("WEBHOOK-ID-GENERATE-ERROR") + return false + } + var w = WebhookReportUploaded{ + ID: webhookID, + Type: u.config.WebhookTypeReportUploaded, + Timestamp: time.Now().UTC(), + RecordingID: rr.RecordingID, + ChannelID: rr.ChannelID, + Filename: filename, + FileURL: fileURL, + } + buf, err := json.Marshal(w) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("recording_id", w.RecordingID). + Str("channel_id", w.ChannelID). + Str("filename", w.Filename). + Msg("REPORT-UPLOAD-WEBHOOK-MARSHAL-ERROR") + return false + } + if err := u.postWebhook( + u.config.WebhookTypeReportUploaded, + buf, + ); err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("recording_id", w.RecordingID). + Str("channel_id", w.ChannelID). + Str("filename", w.Filename). + Msg("REPORT-UPLOADED-WEBHOOK-SEND-ERROR") + return false + } + zlog.Debug(). + Int("uploader_id", u.id). + Str("recording_id", w.RecordingID). + Str("channel_id", w.ChannelID). + Str("filename", w.Filename). + Msg("REPORT-UPLOADED-WEBHOOK-SEND-SUCCESSFULLY") + } + + // 処理し終わったファイルを削除 + if err = u.removeReportFile(reportJSONFilePath); err != nil { + return false + } + return true +} + +func (u Uploader) handleArchiveEnd(archiveEndJSONFilePath string) bool { + fileInfo, err := os.Stat(archiveEndJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Msg("JSON-NOT-ACCESSIBLE") + return false + } + + // json をパースする + raw, err := os.ReadFile(archiveEndJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveEndJSONFilePath). + Msg("archive json file read error") + return false + } + + var aem ArchiveEndMetadata + if err := json.Unmarshal(raw, &aem); err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveEndJSONFilePath). + Msg("ARCHIVE-END-JSON-FILE-PARSE-ERROR") + return false + } + + zlog.Debug(). + Int("uploader_id", u.id). + Str("recording_id", aem.RecordingID). + Str("channel_id", aem.ChannelID). + Str("connection_id", aem.ConnectionID). + Msg("ARCHIVE-END-METADATA-INFO") + + // metadata ファイル (json) をアップロード + filename := fileInfo.Name() + objectKey := fmt.Sprintf("%s/%s", aem.RecordingID, filename) + osConfig := &s3.S3CompatibleObjectStorage{ + Endpoint: u.config.ObjectStorageEndpoint, + BucketName: u.config.ObjectStorageBucketName, + AccessKeyID: u.config.ObjectStorageAccessKeyID, + SecretAccessKey: u.config.ObjectStorageSecretAccessKey, + } + + archiveEndURL, err := uploadJSONFile( + u.ctx, + osConfig, + objectKey, + archiveEndJSONFilePath, + ) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("path", archiveEndJSONFilePath). + Str("filename", filename). + Str("object_key", objectKey). + Msg("METADATA-FILE-UPLOAD-ERROR") + if !isFileContinuous(err) { + u.removeArchiveEndFile(archiveEndJSONFilePath) + } + return false + } + zlog.Debug(). + Int("uploader_id", u.id). + Str("uploaded_archive_end", aem.Filename). + Str("archive_end_presigned_url", archiveEndURL). + Msg("UPLOAD-ARCHIVE-END-FILE-SUCCESSFULLY") + + if u.config.WebhookEndpointURL != "" { + webhookID, err := u.generateWebhookID() + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("uploaded_archive_end", aem.Filename). + Str("archive_end_presigned_url", archiveEndURL). + Msg("WEBHOOK-ID-GENERATE-ERROR") + return false + } + var w = WebhookArchiveEndUploaded{ + ID: webhookID, + Type: u.config.WebhookTypeSplitArchiveEndUploaded, + Timestamp: time.Now().UTC(), + RecordingID: aem.RecordingID, + SessionID: aem.SessionID, + ClientID: aem.ClientID, + ChannelID: aem.ChannelID, + ConnectionID: aem.ConnectionID, + Filename: filename, + FileURL: archiveEndURL, + } + buf, err := json.Marshal(w) + if err != nil { + zlog.Error(). + Int("uploader_id", u.id). + Err(err). + Msg("ARCHIVE-UPLOADED-WEBHOOK-MARSHAL-ERROR") + return false + } + if err := u.postWebhook( + u.config.WebhookTypeSplitArchiveEndUploaded, + buf, + ); err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("recording_id", w.RecordingID). + Str("channel_id", w.ChannelID). + Str("filename", w.Filename). + Msg("ARCHIVE-END-UPLOADED-WEBHOOK-SEND-ERROR") + return false + } + } + + if err = u.removeArchiveEndFile(archiveEndJSONFilePath); err != nil { + return false + } + return true +} + +func (u Uploader) removeArchiveJSONFile(metadataFilePath, webmFilepath string) error { + err := os.Remove(metadataFilePath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("metadata_filepath", metadataFilePath). + Str("archive_filepath", webmFilepath). + Msg("FAILED-REMOVE-METADATA-JSON-FILE") + } else { + zlog.Debug(). + Int("uploader_id", u.id). + Str("metadata_filepath", metadataFilePath). + Str("archive_filepath", webmFilepath). + Msg("REMOVED-METADATA-JSON-FILE") + } + return err +} + +func (u Uploader) removeArchiveWEBMFile(metadataFilePath, webmFilepath string) error { + err := os.Remove(webmFilepath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("archive_filepath", webmFilepath). + Msg("FAILED-REMOVE-ARCHIVE-WEBM-FILE") + } else { + zlog.Debug(). + Int("uploader_id", u.id). + Str("archive_filepath", webmFilepath). + Msg("remove archive webm file successfully.") + } + return err +} + +func (u Uploader) removeReportFile(reportJSONFilePath string) error { + err := os.Remove(reportJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("filepath", reportJSONFilePath). + Msg("FAILED-REMOVE-REPORT-JSON-FILE") + } else { + zlog.Debug(). + Int("uploader_id", u.id). + Str("filepath", reportJSONFilePath). + Msg("REMOVED-REPORT-JSON-FILE") + + } + return err +} + +func (u Uploader) removeArchiveEndFile(archiveEndJSONFilePath string) error { + err := os.Remove(archiveEndJSONFilePath) + if err != nil { + zlog.Error(). + Err(err). + Int("uploader_id", u.id). + Str("filepath", archiveEndJSONFilePath). + Msg("FAILED-REMOVE-ARCHIVE-END-FILE") + } else { + zlog.Debug(). + Int("uploader_id", u.id). + Str("filepath", archiveEndJSONFilePath). + Msg("REMOVED-ARCHIVE-END-FILE") + } + return err +} + +func (u Uploader) generateWebhookID() (string, error) { + id := uuid.New() + binaryUUID, err := id.MarshalBinary() + if err != nil { + return "", err + } + return u.base32Encoder.EncodeToString(binaryUUID), nil +} diff --git a/webhook.go b/webhook.go new file mode 100644 index 0000000..d7108da --- /dev/null +++ b/webhook.go @@ -0,0 +1,144 @@ +package archive + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + "net/url" + "os" + "time" +) + +type WebhookReportUploaded struct { + ID string `json:"id"` + Type string `json:"type"` + Timestamp time.Time `json:"timestamp"` + RecordingID string `json:"recording_id"` + ChannelID string `json:"channel_id"` + Filename string `json:"filename"` + FileURL string `json:"file_url"` +} + +type WebhookArchiveUploaded struct { + ID string `json:"id"` + Type string `json:"type"` + Timestamp time.Time `json:"timestamp"` + RecordingID string `json:"recording_id"` + SessionID string `json:"session_id"` + ClientID string `json:"client_id"` + ChannelID string `json:"channel_id"` + ConnectionID string `json:"connection_id"` + Filename string `json:"filename"` + FileURL string `json:"file_url"` + MetadataFilename string `json:"metadata_filename"` + MetadataFileURL string `json:"metadata_file_url"` +} + +type WebhookArchiveEndUploaded struct { + ID string `json:"id"` + Type string `json:"type"` + Timestamp time.Time `json:"timestamp"` + RecordingID string `json:"recording_id"` + SessionID string `json:"session_id"` + ClientID string `json:"client_id"` + ChannelID string `json:"channel_id"` + ConnectionID string `json:"connection_id"` + Filename string `json:"filename"` + FileURL string `json:"file_url"` +} + +// mTLS を組み込んだ http.Client を構築する +func createHTTPClient(config *Config) (*http.Client, error) { + e, err := url.Parse(config.WebhookEndpointURL) + if err != nil { + return nil, err + } + + // http または VerifyCacertPath 指定していない場合はそのまま投げる + if e.Scheme != "https" || config.WebhookTLSVerifyCacertPath == "" { + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + Timeout: time.Duration(config.WebhookRequestTimeoutS) * time.Second, + } + + return client, nil + } + + CaCert, err := os.ReadFile(config.WebhookTLSVerifyCacertPath) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(CaCert) + + var certificates []tls.Certificate + if config.WebhookTLSFullchainPath != "" && config.WebhookTLSPrivkeyPath != "" { + pair, err := tls.LoadX509KeyPair(config.WebhookTLSFullchainPath, config.WebhookTLSPrivkeyPath) + if err != nil { + return nil, err + } + certificates = append(certificates, pair) + } + + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + + }, + Timeout: time.Duration(config.WebhookRequestTimeoutS) * time.Second, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + // hostname はチェックする + ServerName: e.Hostname(), + RootCAs: caCertPool, + Certificates: certificates, + }, + // TODO: config へ + // ForceAttemptHTTP2: true, + }, + } + + return client, nil +} + +func (u Uploader) httpClientDo(client *http.Client, webhookType string, buf []byte) error { + req, err := http.NewRequest("POST", u.config.WebhookEndpointURL, bytes.NewBuffer(buf)) + if err != nil { + return err + } + + // 固有ヘッダーを追加する + req.Header.Set("Content-Type", "application/json") + req.Header.Add(u.config.WebhookTypeHeaderName, webhookType) + + // 設定があれば Basic 認証に対応する + if u.config.WebhookBasicAuthUsername != "" && u.config.WebhookBasicAuthPassword != "" { + req.SetBasicAuth(u.config.WebhookBasicAuthUsername, u.config.WebhookBasicAuthPassword) + } + + resp, err := client.Do(req) + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("status_code: %d", resp.StatusCode) + } + + return nil +} + +func (u Uploader) postWebhook(webhookType string, buf []byte) error { + client, err := createHTTPClient(u.config) + if err != nil { + return err + } + if err := u.httpClientDo(client, webhookType, buf); err != nil { + return err + } + + return nil +}