diff --git a/app/Console/Kernel.php b/app/Console/Kernel.php index 809698a6..6c32ab7e 100644 --- a/app/Console/Kernel.php +++ b/app/Console/Kernel.php @@ -4,6 +4,7 @@ use App\Jobs\CheckCleanup; use App\Jobs\CheckQueueFailedJobs; +use App\Utils\ThirdPartyJob; use Carbon\Carbon; use Illuminate\Console\Scheduling\Event; use Illuminate\Console\Scheduling\Schedule; @@ -41,6 +42,7 @@ protected function schedule(Schedule $schedule) $schedule->command('meilisearch:import')->weeklyOn(1, "03:00")->withoutOverlapping(); $schedule->command('torrent:load_pieces_hash')->dailyAt("01:00")->withoutOverlapping(); $schedule->job(new CheckQueueFailedJobs())->everySixHours()->withoutOverlapping(); + $schedule->job(new ThirdPartyJob())->everyMinute()->withoutOverlapping(); $this->registerScheduleCleanup($schedule); } diff --git a/app/Jobs/BuyTorrent.php b/app/Jobs/BuyTorrent.php new file mode 100644 index 00000000..258a221b --- /dev/null +++ b/app/Jobs/BuyTorrent.php @@ -0,0 +1,68 @@ +userId = $userId; + $this->torrentId = $torrentId; + } + + /** + * Execute the job. + * + * @return void + */ + public function handle() + { + $logPrefix = sprintf("user: %s, torrent: %s", $this->userId, $this->torrentId); + $torrentRep = new TorrentRepository(); + $userId = $this->userId; + $torrentId = $this->torrentId; + + $hasBuy = TorrentBuyLog::query() + ->where("uid", $userId) + ->where("torrent_id", $torrentId) + ->exists() + ; + if ($hasBuy) { + //标记购买成功 + do_log("$logPrefix, already bought"); + $torrentRep->addBuySuccessCache($userId, $torrentId); + return; + } + try { + $bonusRep = new BonusRepository(); + $bonusRep->consumeToBuyTorrent($this->userId, $this->torrentId); + //标记购买成功 + do_log("$logPrefix, buy torrent success"); + $torrentRep->addBuySuccessCache($userId, $torrentId); + } catch (\Throwable $throwable) { + //标记购买失败,缓存 3600 秒,这个时间内不能再次购买 + do_log("$logPrefix, buy torrent fail: " . $throwable->getMessage(), "error"); + $torrentRep->addBuyFailCache($userId, $torrentId); + } + } +} diff --git a/app/Repositories/TorrentRepository.php b/app/Repositories/TorrentRepository.php index 83f7c67a..64507426 100644 --- a/app/Repositories/TorrentRepository.php +++ b/app/Repositories/TorrentRepository.php @@ -49,6 +49,7 @@ class TorrentRepository extends BaseRepository const BUY_STATUS_SUCCESS = 0; const BUY_STATUS_NOT_YET = -1; + const BUY_STATUS_UNKNOWN = -2; @@ -807,16 +808,8 @@ public function getBuyStatus($uid, $torrentId): int //根据失败次数,禁用下载权限并做提示等 return $buyFailCount; } - //购买失败缓存失效后,再重新查询数据库确定最终状态 - $hasBuyFromDB = TorrentBuyLog::query()->where("uid", $uid)->where("torrent_id", $torrentId)->exists(); - if ($hasBuyFromDB) { - //标记购买成功, 返回已购买 - $this->addBuySuccessCache($uid, $torrentId); - return self::BUY_STATUS_SUCCESS; - } else { - //返回未购买,前端可执行购买逻辑 - return self::BUY_STATUS_NOT_YET; - } + //不是成功或失败,直接返回未知 + return self::BUY_STATUS_UNKNOWN; } /** diff --git a/app/Utils/ThirdPartyJob.php b/app/Utils/ThirdPartyJob.php new file mode 100644 index 00000000..00251ce7 --- /dev/null +++ b/app/Utils/ThirdPartyJob.php @@ -0,0 +1,70 @@ +get()) { + do_log("can not get lock: $lockName, return ..."); + return; + } + $list = NexusDB::redis()->lRange(self::$queueKey, 0, self::$size); + $successCount = 0; + foreach ($list as $item) { + $data = json_decode($item, true); + if (!empty($data['name'])) { + $successCount++; + match ($data['name']) { + self::JOB_BUY_TORRENT => self::enqueueJobBuyTorrent($data), + default => throw new InvalidArgumentException("invalid name: {$data['name']}") + }; + } else { + do_log(sprintf("%s no name, skip", $item), "error"); + } + NexusDB::redis()->lRem(self::$queueKey, $item); + } + do_log(sprintf("success dispatch %s jobs", $successCount)); + $lock->release(); + } + + public static function addBuyTorrent(int $userId, int $torrentId): void + { + $key = sprintf("%s:%s_%s_%s", self::$queueKey, convertNamespaceToSnake(__METHOD__), $userId, $torrentId); + if (NexusDB::redis()->set($key, now()->toDateTimeString(), ['nx', 'ex' => 3600])) { + $value = [ + 'name' => self::JOB_BUY_TORRENT, + 'userId' => $userId, + 'torrentId' => $torrentId, + ]; + NexusDB::redis()->rPush(self::$queueKey, json_encode($value)); + do_log("success addBuyTorrent: $key", "debug"); + } else { + do_log("no need to addBuyTorrent: $key", "debug"); + } + } + + private static function enqueueJobBuyTorrent(array $params): void + { + if (!empty($params['userId']) && !empty($params['torrentId'])) { + $job = new BuyTorrent($params['userId'], $params['torrentId']); + Queue::push($job); + } else { + do_log("no userId or torrentId: " . json_encode($params), "error"); + } + } +} diff --git a/include/constants.php b/include/constants.php index bda539d3..9962a699 100644 --- a/include/constants.php +++ b/include/constants.php @@ -1,6 +1,6 @@ \n"); print("