diff --git a/.markdownlint-cli2.jsonc b/.markdownlint-cli2.jsonc
index 1446dcf06017..4bf28699dab7 100644
--- a/.markdownlint-cli2.jsonc
+++ b/.markdownlint-cli2.jsonc
@@ -158,6 +158,7 @@
"StarRocks",
"ThriftServer",
"TiKV",
+ "Trino",
"UID",
"UUID",
"Ubuntu",
diff --git a/cmd/clone.go b/cmd/clone.go
index 2d041478a3fe..d4fac8781358 100644
--- a/cmd/clone.go
+++ b/cmd/clone.go
@@ -66,7 +66,6 @@ func clone(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("abs of %s: %s", srcPath, err)
}
- srcParent := filepath.Dir(srcAbsPath)
srcIno, err := utils.GetFileInode(srcPath)
if err != nil {
return fmt.Errorf("lookup inode for %s: %s", srcPath, err)
@@ -84,6 +83,22 @@ func clone(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("abs of %s: %s", dst, err)
}
+
+ srcMp, err := findMountpoint(srcAbsPath)
+ if err != nil {
+ return err
+ }
+ dstMp, err := findMountpoint(filepath.Dir(dstAbsPath))
+ if err != nil {
+ return err
+ }
+ if srcMp != dstMp {
+ return fmt.Errorf("the clone DST path should be at the same mount point as the SRC path")
+ }
+ if strings.HasPrefix(dstAbsPath, srcAbsPath) {
+ return fmt.Errorf("the clone DST path should not be under the SRC path")
+ }
+
dstParent := filepath.Dir(dstAbsPath)
dstName := filepath.Base(dstAbsPath)
dstParentIno, err := utils.GetFileInode(dstParent)
@@ -107,9 +122,9 @@ func clone(ctx *cli.Context) error {
wb.Put([]byte(dstName))
wb.Put16(uint16(umask))
wb.Put8(cmode)
- f := openController(srcParent)
- if f == nil {
- return fmt.Errorf("%s is not inside JuiceFS", srcPath)
+ f, err := openController(srcMp)
+ if err == nil {
+ return err
}
defer f.Close()
if _, err = f.Write(wb.Bytes()); err != nil {
@@ -127,3 +142,16 @@ func clone(ctx *cli.Context) error {
}
return nil
}
+
+func findMountpoint(fpath string) (string, error) {
+ for p := fpath; p != "/"; p = filepath.Dir(p) {
+ inode, err := utils.GetFileInode(p)
+ if err != nil {
+ return "", fmt.Errorf("get inode of %s: %s", p, err)
+ }
+ if inode == uint64(meta.RootInode) {
+ return p, nil
+ }
+ }
+ return "", fmt.Errorf("%s is not inside JuiceFS", fpath)
+}
diff --git a/cmd/info.go b/cmd/info.go
index 7b8b3195c353..b831388c4118 100644
--- a/cmd/info.go
+++ b/cmd/info.go
@@ -111,9 +111,9 @@ func info(ctx *cli.Context) error {
if inode < uint64(meta.RootInode) {
logger.Fatalf("inode number shouldn't be less than %d", meta.RootInode)
}
- f := openController(d)
- if f == nil {
- logger.Errorf("%s is not inside JuiceFS", path)
+ f, err := openController(d)
+ if err != nil {
+ logger.Errorf("Open control file for %s: %s", d, err)
continue
}
@@ -244,21 +244,19 @@ func ltypeToString(t uint32) string {
}
func legacyInfo(d, path string, inode uint64, recursive, raw uint8) {
- f := openController(d)
- defer f.Close()
- if f == nil {
- logger.Errorf("%s is not inside JuiceFS", path)
- // continue to next path
+ f, err := openController(d)
+ if err != nil {
+ logger.Errorf("Open control file for %s: %s", d, err)
return
}
-
+ defer f.Close()
wb := utils.NewBuffer(8 + 10)
wb.Put32(meta.LegacyInfo)
wb.Put32(10)
wb.Put64(inode)
wb.Put8(recursive)
wb.Put8(raw)
- _, err := f.Write(wb.Bytes())
+ _, err = f.Write(wb.Bytes())
if err != nil {
logger.Fatalf("write message: %s", err)
}
diff --git a/cmd/main.go b/cmd/main.go
index ef048e095709..8911f68916ee 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -55,6 +55,7 @@ func Main(args []string) error {
Commands: []*cli.Command{
cmdFormat(),
cmdConfig(),
+ cmdQuota(),
cmdDestroy(),
cmdGC(),
cmdFsck(),
diff --git a/cmd/quota.go b/cmd/quota.go
new file mode 100644
index 000000000000..398a18f0851e
--- /dev/null
+++ b/cmd/quota.go
@@ -0,0 +1,133 @@
+/*
+ * JuiceFS, Copyright 2023 Juicedata, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cmd
+
+import (
+ "fmt"
+
+ "github.com/juicedata/juicefs/pkg/meta"
+
+ "github.com/urfave/cli/v2"
+)
+
+func cmdQuota() *cli.Command {
+ return &cli.Command{
+ Name: "quota",
+ Category: "ADMIN",
+ Usage: "Manage directory quotas",
+ ArgsUsage: "META-URL",
+ HideHelpCommand: true,
+ Description: `
+Examples:
+$ juicefs quota set redis://localhost --path /dir1 --capacity 1 --inodes 100
+$ juicefs quota get redis://localhost --path /dir1
+$ juicefs quota del redis://localhost --path /dir1
+$ juicefs quota ls redis://localhost`,
+ Subcommands: []*cli.Command{
+ {
+ Name: "set",
+ Usage: "Set quota to a directory",
+ ArgsUsage: "META-URL",
+ Action: quota,
+ },
+ {
+ Name: "get",
+ Usage: "Get quota of a directory",
+ ArgsUsage: "META-URL",
+ Action: quota,
+ },
+ {
+ Name: "del",
+ Usage: "Delete quota of a directory",
+ ArgsUsage: "META-URL",
+ Action: quota,
+ },
+ {
+ Name: "ls",
+ Usage: "List all directory quotas",
+ ArgsUsage: "META-URL",
+ Action: quota,
+ },
+ {
+ Name: "check",
+ Usage: "Check quota consistency of a directory",
+ ArgsUsage: "META-URL",
+ Action: quota,
+ },
+ },
+ Flags: []cli.Flag{
+ &cli.StringFlag{
+ Name: "path",
+ Usage: "full path of the directory within the volume",
+ },
+ &cli.Uint64Flag{
+ Name: "capacity",
+ Usage: "hard quota of the directory limiting its usage of space in GiB",
+ },
+ &cli.Uint64Flag{
+ Name: "inodes",
+ Usage: "hard quota of the directory limiting its number of inodes",
+ },
+ },
+ }
+}
+
+func quota(c *cli.Context) error {
+ setup(c, 1)
+ var cmd uint8
+ switch c.Command.Name {
+ case "set":
+ cmd = meta.QuotaSet
+ case "get":
+ cmd = meta.QuotaGet
+ case "del":
+ cmd = meta.QuotaDel
+ case "ls":
+ cmd = meta.QuotaList
+ case "check":
+ cmd = meta.QuotaCheck
+ default:
+ logger.Fatalf("Invalid quota command: %s", c.Command.Name)
+ }
+ dpath := c.String("path")
+ if dpath == "" && cmd != meta.QuotaList {
+ logger.Fatalf("Please specify the directory with `--path
` option")
+ }
+ removePassword(c.Args().Get(0))
+
+ m := meta.NewClient(c.Args().Get(0), nil)
+ qs := make(map[string]*meta.Quota)
+ if cmd == meta.QuotaSet {
+ q := &meta.Quota{MaxSpace: -1, MaxInodes: -1} // negative means no change
+ if c.IsSet("capacity") {
+ q.MaxSpace = int64(c.Uint64("capacity")) << 30
+ }
+ if c.IsSet("inodes") {
+ q.MaxInodes = int64(c.Uint64("inodes"))
+ }
+ qs[dpath] = q
+ }
+ if err := m.HandleQuota(meta.Background, cmd, dpath, qs); err != nil {
+ return err
+ }
+
+ for p, q := range qs {
+ // FIXME: need a better way to do print
+ fmt.Printf("%s: %+v\n", p, *q)
+ }
+ return nil
+}
diff --git a/cmd/rmr.go b/cmd/rmr.go
index e5be848b30f1..4b8c7605aab2 100644
--- a/cmd/rmr.go
+++ b/cmd/rmr.go
@@ -42,25 +42,12 @@ $ juicefs rmr /mnt/jfs/foo`,
}
}
-func openController(mp string) *os.File {
- st, err := os.Stat(mp)
+func openController(path string) (*os.File, error) {
+ mp, err := findMountpoint(path)
if err != nil {
- logger.Fatal(err)
+ return nil, err
}
- if !st.IsDir() {
- mp = filepath.Dir(mp)
- }
- for ; mp != "/"; mp = filepath.Dir(mp) {
- f, err := os.OpenFile(filepath.Join(mp, ".control"), os.O_RDWR, 0)
- if err == nil {
- return f
- }
- if !os.IsNotExist(err) {
- logger.Fatal(err)
- }
- }
- logger.Fatalf("Path %s is not inside JuiceFS", mp)
- panic("unreachable")
+ return os.OpenFile(filepath.Join(mp, ".control"), os.O_RDWR, 0)
}
func rmr(ctx *cli.Context) error {
@@ -84,9 +71,9 @@ func rmr(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("lookup inode for %s: %s", d, err)
}
- f := openController(d)
- if f == nil {
- logger.Errorf("%s is not inside JuiceFS", path)
+ f, err := openController(d)
+ if err != nil {
+ logger.Errorf("Open control file for %s: %s", d, err)
continue
}
wb := utils.NewBuffer(8 + 8 + 1 + uint32(len(name)))
diff --git a/cmd/warmup.go b/cmd/warmup.go
index 054ae6cb9a21..de79daae82f9 100644
--- a/cmd/warmup.go
+++ b/cmd/warmup.go
@@ -178,22 +178,15 @@ func warmup(ctx *cli.Context) error {
// find mount point
first := paths[0]
- controller := openController(first)
- if controller == nil {
- logger.Fatalf("open control file for %s", first)
+ mp, err := findMountpoint(first)
+ if err != nil {
+ return err
}
- defer controller.Close()
-
- mp := first
- for ; mp != "/"; mp = filepath.Dir(mp) {
- inode, err := utils.GetFileInode(mp)
- if err != nil {
- logger.Fatalf("lookup inode for %s: %s", mp, err)
- }
- if inode == uint64(meta.RootInode) {
- break
- }
+ controller, err := openController(mp)
+ if err != nil {
+ return fmt.Errorf("open control file for %s: %s", first, err)
}
+ defer controller.Close()
threads := ctx.Uint("threads")
if threads == 0 {
diff --git a/docs/en/deployment/hadoop_java_sdk.md b/docs/en/deployment/hadoop_java_sdk.md
index e1fe326c971f..de8c8dbb02f9 100644
--- a/docs/en/deployment/hadoop_java_sdk.md
+++ b/docs/en/deployment/hadoop_java_sdk.md
@@ -33,6 +33,16 @@ If you want to use JuiceFS in a distributed environment, when creating a file sy
Depending on the read and write load of computing tasks (such as Spark executor), JuiceFS Hadoop Java SDK may require an additional 4 * [`juicefs.memory-size`](#io-configurations) off-heap memory to speed up read and write performance. By default, it is recommended to configure at least 1.2GB of off-heap memory for compute tasks.
+### 5. Java runtime version
+
+JuiceFS Hadoop Java SDK is compiled with JDK 8 by default. If it needs to be used in a higher version of Java runtime (such as Java 17), the following options need to be added to the JVM parameters to allow the use of reflection API:
+
+```shell
+--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
+```
+
+For more information on the above option, please refer to [official documentation](https://docs.oracle.com/en/java/javase/17/migrate/migrating-jdk-8-later-jdk-releases.html#GUID-7BB28E4D-99B3-4078-BDC4-FC24180CE82B).
+
## Install and compile the client
### Install the pre-compiled client
@@ -132,6 +142,7 @@ It is recommended to place the JAR file in a fixed location, and the other locat
|-----------|---------------------------------------------------------------------------|
| Spark | `${SPARK_HOME}/jars` |
| Presto | `${PRESTO_HOME}/plugin/hive-hadoop2` |
+| Trino | `${TRINO_HOME}/plugin/hive` |
| Flink | `${FLINK_HOME}/lib` |
| StarRocks | `${StarRocks_HOME}/fe/lib/`, `${StarRocks_HOME}/be/lib/hadoop/common/lib` |
diff --git a/docs/en/guide/how_to_set_up_metadata_engine.md b/docs/en/guide/how_to_set_up_metadata_engine.md
index a4b63b360b95..dcb10904c6bb 100644
--- a/docs/en/guide/how_to_set_up_metadata_engine.md
+++ b/docs/en/guide/how_to_set_up_metadata_engine.md
@@ -14,6 +14,17 @@ import TabItem from '@theme/TabItem';
JuiceFS is a decoupled structure that separates data and metadata. Metadata can be stored in any supported database (called Metadata Engine). Many databases are supported and they all comes with different performance and intended scenarios, refer to [our docs](../benchmark/metadata_engines_benchmark.md) for comparison.
+## The storage usage of metadata {#storage-usage}
+
+The storage space required for metadata is related to the length of the file name, the type and length of the file, and extended attributes. It is difficult to accurately estimate the metadata storage space requirements of a file system. For simplicity, we can approximate based on the storage space required for a single small file without extended attributes.
+
+- **Key-Value Database** (e.g. Redis, TiKV): 300 bytes/file
+- **Relational Database** (e.g. SQLite, MySQL, PostgreSQL): 600 bytes/file
+
+When the average file is larger (over 64MB), or the file is frequently modified and has a lot of fragments, or there are many extended attributes, or the average file name is long (over 50 bytes), more storage space is needed.
+
+When you need to migrate between two types of metadata engines, you can use this method to estimate the required storage space. For example, if you want to migrate the metadata engine from a relational database (MySQL) to a key-value database (Redis), and the current usage of MySQL is 30GB, then the target Redis needs to prepare at least 15GB or more of memory. The reverse is also true.
+
## Redis
JuiceFS requires Redis version 4.0 and above. Redis Cluster is also supported, but in order to avoid transactions across different Redis instances, JuiceFS puts all metadata for one file system on a single Redis instance.
diff --git a/docs/en/release_notes.md b/docs/en/release_notes.md
index d4b039bd4285..179ef21b8938 100644
--- a/docs/en/release_notes.md
+++ b/docs/en/release_notes.md
@@ -4,11 +4,21 @@
For all versions, please see [GitHub Releases](https://github.com/juicedata/juicefs/releases).
:::
-## Upgrade to JuiceFS v1.0.0 Beta3
+## Version number {#version-number}
+
+JuiceFS Community Edition uses [semantic versioning](https://semver.org) to label its releases. Each version number consists of three numbers in the format `x.y.z`, representing the major version number (x), the minor version number (y), and the patch number (z).
+
+1. **Major version number (x)**: When the major version number is greater than or equal to `1`, it indicates that the version is suitable for production environments. When the major version number changes, it indicates that this version may have added major features, architectural changes, or data format changes that are not backward compatible. For example, `v0.8.3` → `v1.0.0` means production-ready, `v1.0.0` → `v2.0.0` represents an architectural or functional change.
+2. **Minor version number (y)**: The minor version number indicates that the version adds some new features, performance optimizations, bug fixes, etc. that can be backward compatible. For example, `v1.0.0` → `v1.1.0`.
+3. **Patch version number (z)**: The patch version number indicates a minor update or bug fix for the software, which is only some minor changes or fixes to existing features and will not affect the compatibility of the softwares. For example, `v1.0.3` → `v1.0.4`.
+
+## Changes {#changes}
+
+### JuiceFS v1.0.0 Beta3
JuiceFS client has only one binary file, so you only need to replace the old version with the new one when upgrading JuiceFS.
-### SQL: Update table schema to support encoding other than UTF-8
+#### SQL: Update table schema to support encoding other than UTF-8
JuiceFS v1.0.0 Beta3 has changed the table schema to support encoding other than UTF-8. For existing file systems, you need to upgrade the table schema manually to support that. It's recommended to upgrade all clients first and then the table schema.
@@ -16,7 +26,7 @@ JuiceFS v1.0.0 Beta3 has changed the table schema to support encoding other than
Table schema upgrades are optional, and they are required only if you need to use non-UTF-8 characters. In addition, database performance may degrade when upgrading SQL table schemas, affecting running services.
:::
-#### MySQL/MariaDB
+##### MySQL/MariaDB
```sql
alter table jfs_edge
@@ -25,7 +35,7 @@ alter table jfs_symlink
modify target varbinary(4096) not null;
```
-#### PostgreSQL
+##### PostgreSQL
```sql
alter table jfs_edge
@@ -34,10 +44,10 @@ alter table jfs_symlink
alter column target type bytea using target::bytea;
```
-#### SQLite
+##### SQLite
SQLite does not support modifying columns, but you can migrate columns by `dump` and `load` commands, refer to [JuiceFS Metadata Backup and Recovery](administration/metadata_dump_load.md) for details.
-### New session management format
+#### New session management format
JuiceFS v1.0.0 Beta3 uses a new session management format. The previous versions of clients cannot see the sessions generated by v1.0.0 Beta3 clients via `juicefs status` or `juicefs destroy`, whereas the new versions are able to see all the sessions.
diff --git a/docs/zh_cn/deployment/hadoop_java_sdk.md b/docs/zh_cn/deployment/hadoop_java_sdk.md
index 03faae27b106..d85edfd09c8c 100644
--- a/docs/zh_cn/deployment/hadoop_java_sdk.md
+++ b/docs/zh_cn/deployment/hadoop_java_sdk.md
@@ -1,5 +1,5 @@
---
-sidebar_label: Hadoop 使用 JuiceFS
+title: 在 Hadoop 生态使用 JuiceFS
sidebar_position: 5
slug: /hadoop_java_sdk
---
@@ -7,8 +7,6 @@ slug: /hadoop_java_sdk
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
-# 在 Hadoop 生态使用 JuiceFS 存储
-
JuiceFS 提供与 HDFS 接口[高度兼容](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/introduction.html)的 Java 客户端,Hadoop 生态中的各种应用都可以在不改变代码的情况下,平滑地使用 JuiceFS 存储数据。
## 环境要求
@@ -35,6 +33,16 @@ JuiceFS 默认使用本地的「用户/UID」及「用户组/GID」映射,
根据计算任务(如 Spark executor)的读写负载,JuiceFS Hadoop Java SDK 可能需要额外使用 4 * [`juicefs.memory-size`](#io-配置) 的堆外内存用来加速读写性能。默认情况下,建议为计算任务至少配置 1.2GB 的堆外内存。
+### 5. Java 运行时版本
+
+JuiceFS Hadoop Java SDK 默认使用 JDK 8 编译,如果需要在高版本的 Java 运行时中使用(如 Java 17),需在 JVM 参数中增加以下选项以允许使用反射 API:
+
+```shell
+--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
+```
+
+更多关于以上选项的说明请参考[官方文档](https://docs.oracle.com/en/java/javase/17/migrate/migrating-jdk-8-later-jdk-releases.html#GUID-7BB28E4D-99B3-4078-BDC4-FC24180CE82B)。
+
## 安装与编译客户端
### 安装预编译客户端
@@ -134,6 +142,7 @@ make win
|-----------|---------------------------------------------------------------------------|
| Spark | `${SPARK_HOME}/jars` |
| Presto | `${PRESTO_HOME}/plugin/hive-hadoop2` |
+| Trino | `${TRINO_HOME}/plugin/hive` |
| Flink | `${FLINK_HOME}/lib` |
| StarRocks | `${StarRocks_HOME}/fe/lib/`, `${StarRocks_HOME}/be/lib/hadoop/common/lib` |
diff --git a/docs/zh_cn/guide/how_to_set_up_metadata_engine.md b/docs/zh_cn/guide/how_to_set_up_metadata_engine.md
index 7a0ce5742ab6..e8d1c5ef08cc 100644
--- a/docs/zh_cn/guide/how_to_set_up_metadata_engine.md
+++ b/docs/zh_cn/guide/how_to_set_up_metadata_engine.md
@@ -14,6 +14,17 @@ import TabItem from '@theme/TabItem';
JuiceFS 采用数据和元数据分离的存储架构,元数据可以存储在任意支持的数据库中,称为「元数据存储引擎」。JuiceFS 支持众多元数据存储引擎,各个数据库性能、易用性、场景均有区别,具体性能对比可参考[该文档](../benchmark/metadata_engines_benchmark.md)。
+## 元数据存储用量 {#storage-usage}
+
+元数据所需的存储空间跟文件名的长度、文件的类型和长度以及扩展属性等相关,无法准确地估计一个文件系统的元数据存空间需求。简单起见,我们可以根据没有扩展属性的单个小文件所需的存储空间来做近似:
+
+- **键值(Key-Value)数据库**(如 Redis、TiKV):300 字节/文件
+- **关系型数据库**(如 SQLite、MySQL、PostgreSQL):600 字节/文件
+
+当平均文件更大(超过 64MB),或者文件被频繁修改导致有很多碎片,或者有很多扩展属性,或者平均文件名很长(超过 50 字节),都会导致需要更多的存储空间。
+
+当你需要在两种类型的元数据引擎之间迁移时,就可以据此来估算所需的存储空间。例如,假设你希望将元数据引擎从一个关系型数据库(MySQL)迁移到键值数据库(Redis),如果当前 MySQL 的用量为 30GB,那么目标 Redis 至少需要准备 15GB 以上的内存。反之亦然。
+
## Redis
JuiceFS 要求使用 4.0 及以上版本的 Redis。JuiceFS 也支持使用 Redis Cluster 作为元数据引擎,但为了避免在 Redis 集群中执行跨节点事务,同一个文件系统的元数据总会坐落于单个 Redis 实例中。
diff --git a/docs/zh_cn/release_notes.md b/docs/zh_cn/release_notes.md
index 202afbe1a296..6d336e0cfccd 100644
--- a/docs/zh_cn/release_notes.md
+++ b/docs/zh_cn/release_notes.md
@@ -4,11 +4,21 @@
所有历史版本请查看 [GitHub Releases](https://github.com/juicedata/juicefs/releases) 页面
:::
-## 升级到 JuiceFS v1.0.0 Beta3
+## 版本号 {#version-number}
+
+JuiceFS 社区版采用[语义化版本号](https://semver.org/lang/zh-CN)标记方式,每个版本号都由三个数字组成 `x.y.z`,分别是主版本号(x)、次版本号(y)和修订号(z)。
+
+1. **主版本号(x)**:主版本号大于等于 `1` 时,表示该版本已经适用于生产环境。当主版本号发生变化时,表明这个版本可能增加了不能向后兼容的重大功能、架构变化或数据格式变化。例如,`v0.8.3` → `v1.0.0` 代表生产就绪,`v1.0.0` → `v2.0.0` 代表架构或功能变化。
+2. **次版本号(y)**:次版本号表示该版本增加了一些能够向后兼容的新功能、性能优化和 bug 修复等。例如,`v1.0.0` → `v1.1.0`。
+3. **修订号(z)**:修订号表示软件的小更新或者 bug 修复,只是对现有功能的一些小的改动或者修复,不会影响软件兼容性。例如,`v1.0.3` → `v1.0.4`。
+
+## 版本变化 {#changes}
+
+### JuiceFS v1.0.0 Beta3
JuiceFS 的客户端只有一个二进制文件,升级时只需要将用新版替换旧版即可。
-### 调整 SQL 表结构以支持非 UTF-8 字符
+#### 调整 SQL 表结构以支持非 UTF-8 字符
JuiceFS v1.0.0 Beta3 改进了 SQL 引擎对非 UTF-8 字符集的支持。对于已有的文件系统,需要手动调整表结构才能支持非 UTF-8 字符集,建议在升级完所有客户端后再选择访问压力比较低的时候进行操作。
@@ -16,7 +26,7 @@ JuiceFS v1.0.0 Beta3 改进了 SQL 引擎对非 UTF-8 字符集的支持。对
调整 SQL 表结构时数据库性能可能会下降,影响正在运行的服务。
:::
-#### MySQL/MariaDB
+##### MySQL/MariaDB
```sql
alter table jfs_edge
@@ -25,7 +35,7 @@ alter table jfs_symlink
modify target varbinary(4096) not null;
```
-#### PostgreSQL
+##### PostgreSQL
```sql
alter table jfs_edge
@@ -34,10 +44,10 @@ alter table jfs_symlink
alter column target type bytea using target::bytea;
```
-#### SQLite
+##### SQLite
由于 SQLite 不支持修改字段,可以通过 dump 和 load 命令进行迁移,详情参考:[JuiceFS 元数据备份和恢复](administration/metadata_dump_load.md)。
-### 会话管理格式变更
+#### 会话管理格式变更
JuiceFS v1.0.0 Beta3 使用了新的会话管理格式,历史版本客户端通过 `juicefs status` 或者 `juicefs destroy` 将无法看到 v1.0.0 Beta3 客户端产生的会话,新版客户端可以看到所有会话。
diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go
index 956b07641f1d..fe9d091259a8 100644
--- a/pkg/fs/fs.go
+++ b/pkg/fs/fs.go
@@ -1188,6 +1188,6 @@ func (f *File) Summary(ctx meta.Context) (s *meta.Summary, err syscall.Errno) {
f.fs.log(l, "Summary (%s): %s (%d,%d,%d,%d)", f.path, errstr(err), s.Length, s.Size, s.Files, s.Dirs)
}()
s = &meta.Summary{}
- err = meta.GetSummary(f.fs.m, ctx, f.inode, s, true)
+ err = f.fs.m.GetSummary(ctx, f.inode, s, true)
return
}
diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go
index 362c07a9966a..50b65fa9a093 100644
--- a/pkg/gateway/gateway.go
+++ b/pkg/gateway/gateway.go
@@ -584,17 +584,20 @@ func (n *jfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, o
etag, _ = n.fs.GetXattr(mctx, n.path(bucket, object), s3Etag)
}
size := fi.Size()
+ var contentType string
if fi.IsDir() {
size = 0
+ contentType = "application/octet-stream"
}
return minio.ObjectInfo{
- Bucket: bucket,
- Name: object,
- ModTime: fi.ModTime(),
- Size: size,
- IsDir: fi.IsDir(),
- AccTime: fi.ModTime(),
- ETag: string(etag),
+ Bucket: bucket,
+ Name: object,
+ ModTime: fi.ModTime(),
+ Size: size,
+ IsDir: fi.IsDir(),
+ AccTime: fi.ModTime(),
+ ETag: string(etag),
+ ContentType: contentType,
}, nil
}
diff --git a/pkg/meta/base.go b/pkg/meta/base.go
index 5ee0e962c5c9..d2c614c53c97 100644
--- a/pkg/meta/base.go
+++ b/pkg/meta/base.go
@@ -73,12 +73,18 @@ type engine interface {
doFindDetachedNodes(t time.Time) []Ino
doCleanupDetachedNode(ctx Context, detachedNode Ino) syscall.Errno
+ doGetQuota(ctx Context, inode Ino) (*Quota, error)
+ doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error
+ doDelQuota(ctx Context, inode Ino) error
+ doLoadQuotas(ctx Context) (map[Ino]*Quota, error)
+ doFlushQuota(ctx Context, inode Ino, space, inodes int64) error
+
doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno
doLookup(ctx Context, parent Ino, name string, inode *Ino, attr *Attr) syscall.Errno
doMknod(ctx Context, parent Ino, name string, _type uint8, mode, cumask uint16, rdev uint32, path string, inode *Ino, attr *Attr) syscall.Errno
doLink(ctx Context, inode, parent Ino, name string, attr *Attr) syscall.Errno
doUnlink(ctx Context, parent Ino, name string, attr *Attr, skipCheckTrash ...bool) syscall.Errno
- doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ...bool) syscall.Errno
+ doRmdir(ctx Context, parent Ino, name string, inode *Ino, skipCheckTrash ...bool) syscall.Errno
doReadlink(ctx Context, inode Ino) ([]byte, error)
doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry, limit int) syscall.Errno
doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst Ino, nameDst string, flags uint32, inode *Ino, attr *Attr) syscall.Errno
@@ -89,7 +95,7 @@ type engine interface {
doGetParents(ctx Context, inode Ino) map[Ino]int
doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error
// @trySync: try sync dir stat if broken or not existed
- doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, error)
+ doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, syscall.Errno)
doSyncDirStat(ctx Context, ino Ino) (*dirStat, syscall.Errno)
scanTrashSlices(Context, trashSliceScan) error
@@ -152,6 +158,11 @@ type baseMeta struct {
dirStats map[Ino]dirStat
*fsStat
+ parentMu sync.Mutex // protect dirParents
+ quotaMu sync.RWMutex // protect dirQuotas
+ dirParents map[Ino]Ino // directory inode -> parent inode
+ dirQuotas map[Ino]*Quota // directory inode -> quota
+
freeMu sync.Mutex
freeInodes freeID
freeSlices freeID
@@ -178,6 +189,8 @@ func newBaseMeta(addr string, conf *Config) *baseMeta {
symlinks: &sync.Map{},
fsStat: new(fsStat),
dirStats: make(map[Ino]dirStat),
+ dirParents: make(map[Ino]Ino),
+ dirQuotas: make(map[Ino]*Quota),
msgCallbacks: &msgCallbacks{
callbacks: make(map[uint32]MsgCallback),
},
@@ -302,13 +315,13 @@ func (m *baseMeta) calcDirStat(ctx Context, ino Ino) (*dirStat, syscall.Errno) {
return stat, 0
}
-func (m *baseMeta) GetDirStat(ctx Context, inode Ino) (stat *dirStat, err error) {
- stat, err = m.en.doGetDirStat(ctx, m.checkRoot(inode), !m.conf.ReadOnly)
- if err != nil {
+func (m *baseMeta) GetDirStat(ctx Context, inode Ino) (stat *dirStat, st syscall.Errno) {
+ stat, st = m.en.doGetDirStat(ctx, m.checkRoot(inode), !m.conf.ReadOnly)
+ if st != 0 {
return
}
if stat == nil {
- stat, err = m.calcDirStat(ctx, inode)
+ stat, st = m.calcDirStat(ctx, inode)
}
return
}
@@ -324,16 +337,18 @@ func (m *baseMeta) updateDirStat(ctx Context, ino Ino, length, space, inodes int
}
func (m *baseMeta) updateParentStat(ctx Context, inode, parent Ino, length, space int64) {
- if space == 0 {
+ if length == 0 && space == 0 {
return
}
m.en.updateStats(space, 0)
if parent > 0 {
m.updateDirStat(ctx, parent, length, space, 0)
+ m.updateDirQuota(ctx, parent, space, 0)
} else {
go func() {
for p := range m.en.doGetParents(ctx, inode) {
m.updateDirStat(ctx, p, length, space, 0)
+ m.updateDirQuota(ctx, parent, space, 0)
}
}()
}
@@ -441,8 +456,10 @@ func (m *baseMeta) NewSession() error {
}
logger.Infof("Create session %d OK with version: %s", m.sid, version.Version())
+ m.loadQuotas()
go m.en.flushStats()
go m.flushDirStat()
+ go m.flushQuotas() // TODO: improve it in Redis?
for i := 0; i < m.conf.MaxDeletes; i++ {
go m.deleteSlices()
}
@@ -505,6 +522,7 @@ func (m *baseMeta) refresh() {
} else {
logger.Warnf("Get counter %s: %s", totalInodes, err)
}
+ m.loadQuotas()
if m.conf.ReadOnly || m.conf.NoBGJob {
continue
@@ -545,11 +563,232 @@ func (m *baseMeta) CloseSession() error {
return nil
}
-func (m *baseMeta) checkQuota(size, inodes int64) bool {
- if size > 0 && m.fmt.Capacity > 0 && atomic.LoadInt64(&m.usedSpace)+atomic.LoadInt64(&m.newSpace)+size > int64(m.fmt.Capacity) {
+func (m *baseMeta) checkQuota(ctx Context, space, inodes int64, parent Ino) bool {
+ if space > 0 && m.fmt.Capacity > 0 && atomic.LoadInt64(&m.usedSpace)+atomic.LoadInt64(&m.newSpace)+space > int64(m.fmt.Capacity) {
+ return true
+ }
+ if inodes > 0 && m.fmt.Inodes > 0 && atomic.LoadInt64(&m.usedInodes)+atomic.LoadInt64(&m.newInodes)+inodes > int64(m.fmt.Inodes) {
return true
}
- return inodes > 0 && m.fmt.Inodes > 0 && atomic.LoadInt64(&m.usedInodes)+atomic.LoadInt64(&m.newInodes)+inodes > int64(m.fmt.Inodes)
+ if parent == 0 { // FIXME: check all parents of the file
+ logger.Warnf("Quota check is skipped for hardlinked files")
+ return false
+ }
+ return m.checkDirQuota(ctx, parent, space, inodes)
+}
+
+func (m *baseMeta) loadQuotas() {
+ quotas, err := m.en.doLoadQuotas(Background)
+ if err == nil {
+ m.quotaMu.Lock()
+ for ino := range m.dirQuotas {
+ if _, ok := quotas[ino]; !ok {
+ logger.Infof("Quota for inode %d is deleted", ino)
+ delete(m.dirQuotas, ino)
+ }
+ }
+ for ino, q := range quotas {
+ logger.Debugf("Load quotas got %d -> %+v", ino, q)
+ if _, ok := m.dirQuotas[ino]; !ok {
+ m.dirQuotas[ino] = q
+ }
+ }
+ m.quotaMu.Unlock()
+
+ // skip lock since I'm the only one updating the m.dirQuotas
+ for ino, q := range quotas {
+ quota := m.dirQuotas[ino]
+ atomic.SwapInt64("a.MaxSpace, q.MaxSpace)
+ atomic.SwapInt64("a.MaxInodes, q.MaxInodes)
+ atomic.SwapInt64("a.UsedSpace, q.UsedSpace)
+ atomic.SwapInt64("a.UsedInodes, q.UsedInodes)
+ }
+ } else {
+ logger.Warnf("Load quotas: %s", err)
+ }
+}
+
+func (m *baseMeta) getDirParent(ctx Context, inode Ino) (Ino, syscall.Errno) {
+ m.parentMu.Lock()
+ parent, ok := m.dirParents[inode]
+ m.parentMu.Unlock()
+ if ok {
+ return parent, 0
+ }
+ logger.Debugf("Get directory parent of inode %d: cache miss", inode)
+ var attr Attr
+ st := m.GetAttr(ctx, inode, &attr)
+ return attr.Parent, st
+}
+
+func (m *baseMeta) hasDirQuota(ctx Context, inode Ino) bool {
+ var q *Quota
+ var st syscall.Errno
+ for {
+ m.quotaMu.RLock()
+ q = m.dirQuotas[inode]
+ m.quotaMu.RUnlock()
+ if q != nil {
+ return true
+ }
+ if inode <= RootInode {
+ break
+ }
+ if inode, st = m.getDirParent(ctx, inode); st != 0 {
+ logger.Warnf("Get directory parent of inode %d: %s", inode, st)
+ break
+ }
+ }
+ return false
+}
+
+func (m *baseMeta) checkDirQuota(ctx Context, inode Ino, space, inodes int64) bool {
+ var q *Quota
+ var st syscall.Errno
+ for {
+ m.quotaMu.RLock()
+ q = m.dirQuotas[inode]
+ m.quotaMu.RUnlock()
+ if q != nil && q.check(space, inodes) {
+ return true
+ }
+ if inode <= RootInode {
+ break
+ }
+ if inode, st = m.getDirParent(ctx, inode); st != 0 {
+ logger.Warnf("Get directory parent of inode %d: %s", inode, st)
+ break
+ }
+ }
+ return false
+}
+
+func (m *baseMeta) updateDirQuota(ctx Context, inode Ino, space, inodes int64) {
+ var q *Quota
+ var st syscall.Errno
+ for {
+ m.quotaMu.RLock()
+ q = m.dirQuotas[inode]
+ m.quotaMu.RUnlock()
+ if q != nil {
+ q.update(space, inodes)
+ }
+ if inode <= RootInode {
+ break
+ }
+ if inode, st = m.getDirParent(ctx, inode); st != 0 {
+ logger.Warnf("Get directory parent of inode %d: %s", inode, st)
+ break
+ }
+ }
+}
+
+func (m *baseMeta) flushQuotas() {
+ quotas := make(map[Ino]*Quota)
+ var newSpace, newInodes int64
+ for {
+ time.Sleep(time.Second * 3)
+ m.quotaMu.RLock()
+ for ino, q := range m.dirQuotas {
+ newSpace = atomic.SwapInt64(&q.newSpace, 0)
+ newInodes = atomic.SwapInt64(&q.newInodes, 0)
+ if newSpace != 0 || newInodes != 0 {
+ quotas[ino] = &Quota{newSpace: newSpace, newInodes: newInodes}
+ }
+ }
+ m.quotaMu.RUnlock()
+ // FIXME: merge
+ for ino, q := range quotas {
+ if err := m.en.doFlushQuota(Background, ino, q.newSpace, q.newInodes); err != nil {
+ logger.Warnf("Flush quota of inode %d: %s", ino, err)
+ m.quotaMu.RLock()
+ cur := m.dirQuotas[ino]
+ m.quotaMu.RUnlock()
+ if cur != nil {
+ cur.update(q.newSpace, q.newInodes)
+ }
+ }
+ }
+ for ino := range quotas {
+ delete(quotas, ino)
+ }
+ }
+}
+
+func (m *baseMeta) HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[string]*Quota) error {
+ var inode Ino
+ if cmd != QuotaList {
+ if st := m.resolve(ctx, dpath, &inode); st != 0 {
+ return st
+ }
+ if isTrash(inode) {
+ return errors.New("no quota for any trash directory")
+ }
+ }
+
+ switch cmd {
+ case QuotaSet:
+ q, err := m.en.doGetQuota(ctx, inode)
+ if err != nil {
+ return err
+ }
+ quota := quotas[dpath]
+ if q == nil {
+ var sum Summary
+ if st := m.FastGetSummary(ctx, inode, &sum, true); st != 0 {
+ return st
+ }
+ quota.UsedSpace = int64(sum.Size) - align4K(0)
+ quota.UsedInodes = int64(sum.Dirs+sum.Files) - 1
+ if quota.MaxSpace < 0 {
+ quota.MaxSpace = 0
+ }
+ if quota.MaxInodes < 0 {
+ quota.MaxInodes = 0
+ }
+ return m.en.doSetQuota(ctx, inode, quota, true)
+ } else {
+ quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
+ if quota.MaxSpace < 0 {
+ quota.MaxSpace = q.MaxSpace
+ }
+ if quota.MaxInodes < 0 {
+ quota.MaxInodes = q.MaxInodes
+ }
+ if quota.MaxSpace == q.MaxSpace && quota.MaxInodes == q.MaxInodes {
+ return nil // nothing to update
+ }
+ return m.en.doSetQuota(ctx, inode, quota, false)
+ }
+ case QuotaGet:
+ q, err := m.en.doGetQuota(ctx, inode)
+ if err != nil {
+ return err
+ }
+ if q == nil {
+ return fmt.Errorf("no quota for inode %d path %s", inode, dpath)
+ }
+ quotas[dpath] = q
+ case QuotaDel:
+ return m.en.doDelQuota(ctx, inode)
+ case QuotaList:
+ quotaMap, err := m.en.doLoadQuotas(ctx)
+ if err != nil {
+ return err
+ }
+ var p string
+ for ino, quota := range quotaMap {
+ if ps := m.GetPaths(ctx, ino); len(ps) > 0 {
+ p = ps[0]
+ } else {
+ p = fmt.Sprintf("inode:%d", ino)
+ }
+ quotas[p] = quota
+ }
+ default: // FIXME: QuotaCheck
+ return fmt.Errorf("invalid quota command: %d", cmd)
+ }
+ return nil
}
func (m *baseMeta) cleanupDeletedFiles() {
@@ -693,6 +932,11 @@ func (m *baseMeta) Lookup(ctx Context, parent Ino, name string, inode *Ino, attr
}
}
}
+ if st == 0 && attr.Typ == TypeDirectory && !isTrash(parent) {
+ m.parentMu.Lock()
+ m.dirParents[*inode] = parent
+ m.parentMu.Unlock()
+ }
return st
}
@@ -832,6 +1076,11 @@ func (m *baseMeta) GetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno {
}
if err == 0 {
m.of.Update(inode, attr)
+ if attr.Typ == TypeDirectory && inode != RootInode && !isTrash(attr.Parent) {
+ m.parentMu.Lock()
+ m.dirParents[inode] = attr.Parent
+ m.parentMu.Unlock()
+ }
}
return err
}
@@ -871,13 +1120,16 @@ func (m *baseMeta) Mknod(ctx Context, parent Ino, name string, _type uint8, mode
}
defer m.timeit(time.Now())
- if m.checkQuota(4<<10, 1) {
+ parent = m.checkRoot(parent)
+ var space, inodes int64 = align4K(0), 1
+ if m.checkQuota(ctx, space, inodes, parent) {
return syscall.ENOSPC
}
- err := m.en.doMknod(ctx, m.checkRoot(parent), name, _type, mode, cumask, rdev, path, inode, attr)
+ err := m.en.doMknod(ctx, parent, name, _type, mode, cumask, rdev, path, inode, attr)
if err == 0 {
- m.en.updateStats(align4K(0), 1)
- m.updateDirStat(ctx, parent, 0, align4K(0), 1)
+ m.en.updateStats(space, inodes)
+ m.updateDirStat(ctx, parent, 0, space, inodes)
+ m.updateDirQuota(ctx, parent, space, inodes)
}
return err
}
@@ -897,7 +1149,13 @@ func (m *baseMeta) Create(ctx Context, parent Ino, name string, mode uint16, cum
}
func (m *baseMeta) Mkdir(ctx Context, parent Ino, name string, mode uint16, cumask uint16, copysgid uint8, inode *Ino, attr *Attr) syscall.Errno {
- return m.Mknod(ctx, parent, name, TypeDirectory, mode, cumask, 0, "", inode, attr)
+ st := m.Mknod(ctx, parent, name, TypeDirectory, mode, cumask, 0, "", inode, attr)
+ if st == 0 {
+ m.parentMu.Lock()
+ m.dirParents[*inode] = parent
+ m.parentMu.Unlock()
+ }
+ return st
}
func (m *baseMeta) Symlink(ctx Context, parent Ino, name string, path string, inode *Ino, attr *Attr) syscall.Errno {
@@ -920,14 +1178,22 @@ func (m *baseMeta) Link(ctx Context, inode, parent Ino, name string, attr *Attr)
}
defer m.timeit(time.Now())
- parent = m.checkRoot(parent)
- defer func() { m.of.InvalidateChunk(inode, invalidateAttrOnly) }()
if attr == nil {
attr = &Attr{}
}
+ parent = m.checkRoot(parent)
+ if st := m.GetAttr(ctx, inode, attr); st != 0 {
+ return st
+ }
+ if m.checkQuota(ctx, align4K(attr.Length), 1, parent) {
+ return syscall.ENOSPC
+ }
+
+ defer func() { m.of.InvalidateChunk(inode, invalidateAttrOnly) }()
err := m.en.doLink(ctx, inode, parent, name, attr)
if err == 0 {
m.updateDirStat(ctx, parent, int64(attr.Length), align4K(attr.Length), 1)
+ m.updateDirQuota(ctx, parent, align4K(attr.Length), 1)
}
return err
}
@@ -959,14 +1225,16 @@ func (m *baseMeta) Unlink(ctx Context, parent Ino, name string, skipCheckTrash .
}
defer m.timeit(time.Now())
+ parent = m.checkRoot(parent)
var attr Attr
- err := m.en.doUnlink(ctx, m.checkRoot(parent), name, &attr, skipCheckTrash...)
+ err := m.en.doUnlink(ctx, parent, name, &attr, skipCheckTrash...)
if err == 0 {
var diffLength uint64
if attr.Typ == TypeFile {
diffLength = attr.Length
}
m.updateDirStat(ctx, parent, -int64(diffLength), -align4K(diffLength), -1)
+ m.updateDirQuota(ctx, parent, -align4K(diffLength), -1)
}
return err
}
@@ -986,11 +1254,19 @@ func (m *baseMeta) Rmdir(ctx Context, parent Ino, name string, skipCheckTrash ..
}
defer m.timeit(time.Now())
- err := m.en.doRmdir(ctx, m.checkRoot(parent), name, skipCheckTrash...)
- if err == 0 {
+ parent = m.checkRoot(parent)
+ var inode Ino
+ st := m.en.doRmdir(ctx, parent, name, &inode, skipCheckTrash...)
+ if st == 0 {
+ if !isTrash(parent) {
+ m.parentMu.Lock()
+ delete(m.dirParents, inode)
+ m.parentMu.Unlock()
+ }
m.updateDirStat(ctx, parent, 0, -align4K(0), -1)
+ m.updateDirQuota(ctx, parent, -align4K(0), -1)
}
- return err
+ return st
}
func (m *baseMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst Ino, nameDst string, flags uint32, inode *Ino, attr *Attr) syscall.Errno {
@@ -1015,19 +1291,72 @@ func (m *baseMeta) Rename(ctx Context, parentSrc Ino, nameSrc string, parentDst
}
defer m.timeit(time.Now())
+ if inode == nil {
+ inode = new(Ino)
+ }
if attr == nil {
attr = &Attr{}
}
- err := m.en.doRename(ctx, m.checkRoot(parentSrc), nameSrc, m.checkRoot(parentDst), nameDst, flags, inode, attr)
- if err == 0 {
+ parentSrc = m.checkRoot(parentSrc)
+ parentDst = m.checkRoot(parentDst)
+ var quotaSrc bool = !isTrash(parentSrc) && m.hasDirQuota(ctx, parentSrc)
+ var quotaDst bool
+ if parentSrc == parentDst {
+ quotaDst = quotaSrc
+ } else {
+ quotaDst = m.hasDirQuota(ctx, parentDst)
+ }
+ var space, inodes int64
+ if parentSrc != parentDst && (quotaSrc || quotaDst) {
+ if st := m.Lookup(ctx, parentSrc, nameSrc, inode, attr); st != 0 {
+ return st
+ }
+ if attr.Typ == TypeDirectory {
+ m.quotaMu.RLock()
+ q := m.dirQuotas[*inode]
+ m.quotaMu.RUnlock()
+ if q != nil {
+ space, inodes = q.UsedSpace+align4K(0), q.UsedInodes+1
+ } else {
+ var sum Summary
+ logger.Debugf("Start to get summary of inode %d", *inode)
+ if st := m.FastGetSummary(ctx, *inode, &sum, true); st != 0 {
+ logger.Warnf("Get summary of inode %d: %s", *inode, st)
+ return st
+ }
+ space, inodes = int64(sum.Size), int64(sum.Dirs+sum.Files)
+ }
+ } else {
+ space, inodes = align4K(attr.Length), 1
+ }
+ // FIXME: dst exists and is replaced or exchanged
+ if quotaDst && m.checkDirQuota(ctx, parentDst, space, inodes) {
+ return syscall.ENOSPC
+ }
+ }
+ st := m.en.doRename(ctx, parentSrc, nameSrc, parentDst, nameDst, flags, inode, attr)
+ if st == 0 {
var diffLengh uint64
- if attr.Typ == TypeFile {
+ if attr.Typ == TypeDirectory {
+ m.parentMu.Lock()
+ m.dirParents[*inode] = parentDst
+ m.parentMu.Unlock()
+ } else if attr.Typ == TypeFile {
diffLengh = attr.Length
}
- m.updateDirStat(ctx, parentSrc, -int64(diffLengh), -align4K(diffLengh), -1)
- m.updateDirStat(ctx, parentDst, int64(diffLengh), align4K(diffLengh), 1)
+ if parentSrc != parentDst {
+ // FIXME: dst exists and is replaced or exchanged
+ m.updateDirStat(ctx, parentSrc, -int64(diffLengh), -align4K(diffLengh), -1)
+ m.updateDirStat(ctx, parentDst, int64(diffLengh), align4K(diffLengh), 1)
+ if quotaSrc {
+ m.updateDirQuota(ctx, parentSrc, -space, -inodes)
+ }
+ if quotaDst {
+ m.updateDirQuota(ctx, parentDst, space, inodes)
+ }
+ }
}
- return err
+ return st
}
func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) syscall.Errno {
@@ -1380,9 +1709,9 @@ func (m *baseMeta) Check(ctx Context, fpath string, repair bool, recursive bool,
}
}
- stat, err := m.en.doGetDirStat(ctx, inode, false)
- if err != nil {
- logger.Errorf("get dir stat for inode %d: %v", inode, err)
+ stat, st := m.en.doGetDirStat(ctx, inode, false)
+ if st != 0 {
+ logger.Errorf("get dir stat for inode %d: %v", inode, st)
continue
}
if stat == nil || stat.space < 0 || stat.inodes < 0 {
@@ -1395,7 +1724,7 @@ func (m *baseMeta) Check(ctx Context, fpath string, repair bool, recursive bool,
if _, st := m.en.doSyncDirStat(ctx, inode); st == 0 {
logger.Debugf("Stat of path %s (inode %d) is successfully synced", path, inode)
} else {
- logger.Errorf("Sync stat of path %s inode %d: %s", path, inode, err)
+ logger.Errorf("Sync stat of path %s inode %d: %s", path, inode, st)
}
}
} else if statBroken {
@@ -1434,6 +1763,27 @@ func (m *baseMeta) Chroot(ctx Context, subdir string) syscall.Errno {
return 0
}
+func (m *baseMeta) resolve(ctx Context, dpath string, inode *Ino) syscall.Errno {
+ var attr Attr
+ *inode = RootInode
+ for dpath != "" {
+ ps := strings.SplitN(dpath, "/", 2)
+ if ps[0] != "" {
+ if st := m.en.doLookup(ctx, *inode, ps[0], inode, &attr); st != 0 {
+ return st
+ }
+ if attr.Typ != TypeDirectory {
+ return syscall.ENOTDIR
+ }
+ }
+ if len(ps) == 1 {
+ break
+ }
+ dpath = ps[1]
+ }
+ return 0
+}
+
func (m *baseMeta) GetFormat() Format {
return *m.fmt
}
@@ -1624,7 +1974,7 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress
}
for _, se := range subEntries {
if se.Attr.Typ == TypeDirectory {
- st = m.en.doRmdir(ctx, e.Inode, string(se.Name))
+ st = m.en.doRmdir(ctx, e.Inode, string(se.Name), nil)
} else {
st = m.en.doUnlink(ctx, e.Inode, string(se.Name), nil)
}
@@ -1643,7 +1993,7 @@ func (m *baseMeta) CleanupTrashBefore(ctx Context, edge time.Time, increProgress
}
}
if rmdir {
- if st = m.en.doRmdir(ctx, TrashInode, string(e.Name)); st != 0 {
+ if st = m.en.doRmdir(ctx, TrashInode, string(e.Name), nil); st != 0 {
logger.Warnf("rmdir subTrash %s: %s", e.Name, st)
}
}
diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go
index 8a19ca5fa92f..76c72b0f5abf 100644
--- a/pkg/meta/base_test.go
+++ b/pkg/meta/base_test.go
@@ -117,6 +117,7 @@ func testMeta(t *testing.T, m Meta) {
testCloseSession(t, m)
testConcurrentDir(t, m)
testAttrFlags(t, m)
+ testQuota(t, m)
base := m.getBase()
base.conf.OpenCache = time.Second
base.of.expire = time.Second
@@ -536,7 +537,7 @@ func testMetaClient(t *testing.T, m Meta) {
}
}
var summary Summary
- if st := GetSummary(m, ctx, parent, &summary, false); st != 0 {
+ if st := m.GetSummary(ctx, parent, &summary, false); st != 0 {
t.Fatalf("summary: %s", st)
}
expected := Summary{Length: 0, Size: 4096, Files: 0, Dirs: 1}
@@ -544,14 +545,14 @@ func testMetaClient(t *testing.T, m Meta) {
t.Fatalf("summary %+v not equal to expected: %+v", summary, expected)
}
summary = Summary{}
- if st := GetSummary(m, ctx, 1, &summary, true); st != 0 {
+ if st := m.GetSummary(ctx, 1, &summary, true); st != 0 {
t.Fatalf("summary: %s", st)
}
expected = Summary{Length: 400, Size: 20480, Files: 3, Dirs: 2}
if summary != expected {
t.Fatalf("summary %+v not equal to expected: %+v", summary, expected)
}
- if st := GetSummary(m, ctx, inode, &summary, true); st != 0 {
+ if st := m.GetSummary(ctx, inode, &summary, true); st != 0 {
t.Fatalf("summary: %s", st)
}
expected = Summary{Length: 600, Size: 24576, Files: 4, Dirs: 2}
@@ -1878,14 +1879,14 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("mkdir: %s", st)
}
- st, err := m.GetDirStat(Background, testInode)
+ stat, st := m.GetDirStat(Background, testInode)
checkResult := func(length, space, inodes int64) {
- if err != nil {
- t.Fatalf("get dir usage: %s", err)
+ if st != 0 {
+ t.Fatalf("get dir usage: %s", st)
}
expect := dirStat{length, space, inodes}
- if *st != expect {
- t.Fatalf("test dir usage: expect %+v, but got %+v", expect, st)
+ if *stat != expect {
+ t.Fatalf("test dir usage: expect %+v, but got %+v", expect, stat)
}
}
checkResult(0, 0, 0)
@@ -1896,7 +1897,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("create: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(0, align4K(0), 1)
// test dir with file and fallocate
@@ -1904,7 +1905,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("fallocate: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(4097, align4K(4097), 1)
// test dir with file and truncate
@@ -1912,7 +1913,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("truncate: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(0, align4K(0), 1)
// test dir with file and write
@@ -1920,7 +1921,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("write: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(4097, align4K(4097), 1)
// test dir with file and link
@@ -1928,7 +1929,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("link: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(2*4097, 2*align4K(4097), 2)
// test dir with subdir
@@ -1937,7 +1938,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("mkdir: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(2*4097, align4K(0)+2*align4K(4097), 3)
// test rename
@@ -1945,9 +1946,9 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("rename: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(4097, align4K(0)+align4K(4097), 2)
- st, err = m.GetDirStat(Background, subInode)
+ stat, st = m.GetDirStat(Background, subInode)
checkResult(4097, align4K(4097), 1)
// test unlink
@@ -1958,9 +1959,9 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("unlink: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(0, align4K(0), 1)
- st, err = m.GetDirStat(Background, subInode)
+ stat, st = m.GetDirStat(Background, subInode)
checkResult(0, 0, 0)
// test rmdir
@@ -1968,7 +1969,7 @@ func testDirStat(t *testing.T, m Meta) {
t.Fatalf("rmdir: %s", st)
}
time.Sleep(1100 * time.Millisecond)
- st, err = m.GetDirStat(Background, testInode)
+ stat, st = m.GetDirStat(Background, testInode)
checkResult(0, 0, 0)
}
@@ -2256,3 +2257,102 @@ func checkEntry(t *testing.T, m Meta, srcEntry, dstEntry *Entry, dstParentIno In
}
}
}
+
+func testQuota(t *testing.T, m Meta) {
+ if err := m.NewSession(); err != nil {
+ t.Fatalf("New session: %s", err)
+ }
+ defer m.CloseSession()
+ ctx := Background
+ var inode, parent Ino
+ var attr Attr
+ if st := m.Mkdir(ctx, RootInode, "quota", 0755, 0, 0, &parent, &attr); st != 0 {
+ t.Fatalf("Mkdir quota: %s", st)
+ }
+ p := "/quota"
+ if err := m.HandleQuota(ctx, QuotaSet, p, map[string]*Quota{p: {MaxSpace: 2 << 30, MaxInodes: 6}}); err != nil {
+ t.Fatalf("HandleQuota set %s: %s", p, err)
+ }
+ m.getBase().loadQuotas()
+ if st := m.Mkdir(ctx, parent, "d1", 0755, 0, 0, &inode, &attr); st != 0 {
+ t.Fatalf("Mkdir quota/d1: %s", st)
+ }
+ p = "/quota/d1"
+ if err := m.HandleQuota(ctx, QuotaSet, p, map[string]*Quota{p: {MaxSpace: 1 << 30, MaxInodes: 5}}); err != nil {
+ t.Fatalf("HandleQuota %s: %s", p, err)
+ }
+ m.getBase().loadQuotas()
+ if st := m.Create(ctx, inode, "f1", 0644, 0, 0, nil, &attr); st != 0 {
+ t.Fatalf("Create quota/d1/f1: %s", st)
+ }
+ if st := m.Mkdir(ctx, parent, "d2", 0755, 0, 0, &parent, &attr); st != 0 {
+ t.Fatalf("Mkdir quota/d2: %s", st)
+ }
+ if st := m.Mkdir(ctx, parent, "d22", 0755, 0, 0, &inode, &attr); st != 0 {
+ t.Fatalf("Mkdir quota/d2/d22: %s", st)
+ }
+ p = "/quota/d2/d22"
+ if err := m.HandleQuota(ctx, QuotaSet, p, map[string]*Quota{p: {MaxSpace: 1 << 30, MaxInodes: 5}}); err != nil {
+ t.Fatalf("HandleQuota %s: %s", p, err)
+ }
+ m.getBase().loadQuotas()
+ // parent -> d2, inode -> d22
+ if st := m.Create(ctx, parent, "f2", 0644, 0, 0, nil, &attr); st != 0 {
+ t.Fatalf("Create quota/d2/f2: %s", st)
+ }
+ if st := m.Create(ctx, inode, "f22", 0644, 0, 0, nil, &attr); st != 0 {
+ t.Fatalf("Create quota/d22/f22: %s", st)
+ }
+ time.Sleep(time.Second * 5)
+
+ qs := make(map[string]*Quota)
+ p = "/quota"
+ if err := m.HandleQuota(ctx, QuotaGet, p, qs); err != nil {
+ t.Fatalf("HandleQuota get %s: %s", p, err)
+ } else if q := qs[p]; q.MaxSpace != 2<<30 || q.MaxInodes != 6 || q.UsedSpace != 6*4<<10 || q.UsedInodes != 6 {
+ t.Fatalf("HandleQuota get %s: %+v", p, q)
+ }
+ delete(qs, p)
+ p = "/quota/d1"
+ if err := m.HandleQuota(ctx, QuotaGet, p, qs); err != nil {
+ t.Fatalf("HandleQuota get %s: %s", p, err)
+ } else if q := qs[p]; q.MaxSpace != 1<<30 || q.MaxInodes != 5 || q.UsedSpace != 4<<10 || q.UsedInodes != 1 {
+ t.Fatalf("HandleQuota get %s: %+v", p, q)
+ }
+ delete(qs, p)
+ p = "/quota/d2/d22"
+ if err := m.HandleQuota(ctx, QuotaGet, p, qs); err != nil {
+ t.Fatalf("HandleQuota get %s: %s", p, err)
+ } else if q := qs[p]; q.MaxSpace != 1<<30 || q.MaxInodes != 5 || q.UsedSpace != 4<<10 || q.UsedInodes != 1 {
+ t.Fatalf("HandleQuota get %s: %+v", p, q)
+ }
+ delete(qs, p)
+
+ if err := m.HandleQuota(ctx, QuotaList, "", qs); err != nil {
+ t.Fatalf("HandleQuota list: %s", err)
+ } else {
+ if len(qs) != 3 {
+ t.Fatalf("HandleQuota list bad result: %d", len(qs))
+ }
+ }
+
+ if err := m.HandleQuota(ctx, QuotaDel, "/quota/d1", nil); err != nil {
+ t.Fatalf("HandleQuota del /quota/d1: %s", err)
+ }
+ if err := m.HandleQuota(ctx, QuotaDel, "/quota/d2", nil); err != nil {
+ t.Fatalf("HandleQuota del /quota/d2: %s", err)
+ }
+
+ qs = make(map[string]*Quota)
+ if err := m.HandleQuota(ctx, QuotaList, "", qs); err != nil {
+ t.Fatalf("HandleQuota list: %s", err)
+ } else {
+ if len(qs) != 2 {
+ t.Fatalf("HandleQuota list bad result: %d", len(qs))
+ }
+ }
+ m.getBase().loadQuotas()
+ if st := m.Create(ctx, parent, "f3", 0644, 0, 0, nil, &attr); st != syscall.ENOSPC {
+ t.Fatalf("Create quota/d22/f3: %s", st)
+ }
+}
diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go
index 0f718e240f34..ae8450a8687c 100644
--- a/pkg/meta/interface.go
+++ b/pkg/meta/interface.go
@@ -23,6 +23,7 @@ import (
"net/url"
"os"
"strings"
+ "sync/atomic"
"syscall"
"time"
@@ -86,6 +87,14 @@ const (
FlagAppend
)
+const (
+ QuotaSet uint8 = iota
+ QuotaGet
+ QuotaDel
+ QuotaList
+ QuotaCheck
+)
+
const MaxName = 255
const RootInode Ino = 1
const TrashInode Ino = 0x7FFFFFFF10000000 // larger than vfs.minInternalNode
@@ -250,6 +259,34 @@ type Session struct {
Plocks []Plock `json:",omitempty"`
}
+type Quota struct {
+ MaxSpace, MaxInodes int64
+ UsedSpace, UsedInodes int64
+ newSpace, newInodes int64
+}
+
+// Returns true if it will exceed the quota limit
+func (q *Quota) check(space, inodes int64) bool {
+ if space > 0 {
+ max := atomic.LoadInt64(&q.MaxSpace)
+ if max > 0 && atomic.LoadInt64(&q.UsedSpace)+atomic.LoadInt64(&q.newSpace)+space > max {
+ return true
+ }
+ }
+ if inodes > 0 {
+ max := atomic.LoadInt64(&q.MaxInodes)
+ if max > 0 && atomic.LoadInt64(&q.UsedInodes)+atomic.LoadInt64(&q.newInodes)+inodes > max {
+ return true
+ }
+ }
+ return false
+}
+
+func (q *Quota) update(space, inodes int64) {
+ atomic.AddInt64(&q.newSpace, space)
+ atomic.AddInt64(&q.newInodes, inodes)
+}
+
// Meta is a interface for a meta service for file system.
type Meta interface {
// Name of database
@@ -339,7 +376,7 @@ type Meta interface {
// GetParents returns a map of node parents (> 1 parents if hardlinked)
GetParents(ctx Context, inode Ino) map[Ino]int
// GetDirStat returns the space and inodes usage of a directory.
- GetDirStat(ctx Context, inode Ino) (st *dirStat, err error)
+ GetDirStat(ctx Context, inode Ino) (stat *dirStat, st syscall.Errno)
// GetXattr returns the value of extended attribute for given name.
GetXattr(ctx Context, inode Ino, name string, vbuff *[]byte) syscall.Errno
@@ -362,7 +399,11 @@ type Meta interface {
ListSlices(ctx Context, slices map[Ino][]Slice, delete bool, showProgress func()) syscall.Errno
// Remove all files and directories recursively.
Remove(ctx Context, parent Ino, name string, count *uint64) syscall.Errno
- //Clone a file or directory
+ // Get summary of a node; for a directory it will accumulate all its child nodes
+ GetSummary(ctx Context, inode Ino, summary *Summary, recursive bool) syscall.Errno
+ // Get summary of a node; for a directory it will use recorded dirStats
+ FastGetSummary(ctx Context, inode Ino, summary *Summary, recursive bool) syscall.Errno
+ // Clone a file or directory
Clone(ctx Context, srcIno, dstParentIno Ino, dstName string, cmode uint8, cumask uint16, count, total *uint64) syscall.Errno
// GetPaths returns all paths of an inode
GetPaths(ctx Context, inode Ino) []string
@@ -378,6 +419,8 @@ type Meta interface {
// OnReload register a callback for any change founded after reloaded.
OnReload(func(new *Format))
+ HandleQuota(ctx Context, cmd uint8, dpath string, quotas map[string]*Quota) error
+
// Dump the tree under root, which may be modified by checkRoot
DumpMeta(w io.Writer, root Ino, keepSecret bool) error
LoadMeta(r io.Reader) error
diff --git a/pkg/meta/load_dump_test.go b/pkg/meta/load_dump_test.go
index 7ad244a53bc4..201718a7942a 100644
--- a/pkg/meta/load_dump_test.go
+++ b/pkg/meta/load_dump_test.go
@@ -131,9 +131,9 @@ func testLoad(t *testing.T, uri, fname string) Meta {
}
}
- stat, err := m.(engine).doGetDirStat(ctx, 1, false)
- if err != nil {
- t.Fatalf("get dir stat: %s", err)
+ stat, st := m.(engine).doGetDirStat(ctx, 1, false)
+ if st != 0 {
+ t.Fatalf("get dir stat: %s", st)
}
if stat == nil {
t.Fatalf("get dir stat: nil")
diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go
index 8d6418f87724..3a2fa9b79ba9 100644
--- a/pkg/meta/redis.go
+++ b/pkg/meta/redis.go
@@ -22,6 +22,7 @@ package meta
import (
"bufio"
"context"
+ "crypto/tls"
"encoding/binary"
"encoding/json"
"fmt"
@@ -49,25 +50,28 @@ import (
)
/*
- Node: i$inode -> Attribute{type,mode,uid,gid,atime,mtime,ctime,nlink,length,rdev}
- Dir: d$inode -> {name -> {inode,type}}
- Parent: p$inode -> {parent -> count} // for hard links
- File: c$inode_$indx -> [Slice{pos,id,length,off,len}]
- Symlink: s$inode -> target
- Xattr: x$inode -> {name -> value}
- Flock: lockf$inode -> { $sid_$owner -> ltype }
+ Node: i$inode -> Attribute{type,mode,uid,gid,atime,mtime,ctime,nlink,length,rdev}
+ Dir: d$inode -> {name -> {inode,type}}
+ Parent: p$inode -> {parent -> count} // for hard links
+ File: c$inode_$indx -> [Slice{pos,id,length,off,len}]
+ Symlink: s$inode -> target
+ Xattr: x$inode -> {name -> value}
+ Flock: lockf$inode -> { $sid_$owner -> ltype }
POSIX lock: lockp$inode -> { $sid_$owner -> Plock(pid,ltype,start,end) }
- Sessions: sessions -> [ $sid -> heartbeat ]
- sustained: session$sid -> [$inode]
- locked: locked$sid -> { lockf$inode or lockp$inode }
+ Sessions: sessions -> [ $sid -> heartbeat ]
+ sustained: session$sid -> [$inode]
+ locked: locked$sid -> { lockf$inode or lockp$inode }
Removed files: delfiles -> [$inode:$length -> seconds]
detached nodes: detachedNodes -> [$inode -> seconds]
Slices refs: k$sliceId_$size -> refcount
- Dir data length: dirDataLength -> { $inode -> length }
- Dir used space: dirUsedSpace -> { $inode -> usedSpace }
- Dir used inodes: dirUsedInodes -> { $inode -> usedInodes }
+ Dir data length: dirDataLength -> { $inode -> length }
+ Dir used space: dirUsedSpace -> { $inode -> usedSpace }
+ Dir used inodes: dirUsedInodes -> { $inode -> usedInodes }
+ Quota: dirQuota -> { $inode -> {maxSpace, maxInodes} }
+ Quota used space: dirQuotaUsedSpace -> { $inode -> usedSpace }
+ Quota used inodes: dirQuotaUsedInodes -> { $inode -> usedInodes }
Redis features:
Sorted Set: 1.2+
@@ -108,6 +112,8 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) {
writeTimeout := query.duration("write-timeout", "write_timeout", time.Second*5)
routeRead := query.pop("route-read")
skipVerify := query.pop("insecure-skip-verify")
+ certFile := query.pop("tls-cert-file")
+ keyFile := query.pop("tls-key-file")
u.RawQuery = values.Encode()
hosts := u.Host
@@ -118,6 +124,13 @@ func newRedisMeta(driver, addr string, conf *Config) (Meta, error) {
if opt.TLSConfig != nil {
opt.TLSConfig.ServerName = "" // use the host of each connection as ServerName
opt.TLSConfig.InsecureSkipVerify = skipVerify != ""
+ if certFile != "" {
+ cert, err := tls.LoadX509KeyPair(certFile, keyFile)
+ if err != nil {
+ return nil, fmt.Errorf("get certificate error certFile:%s keyFile:%s error:%s", certFile, keyFile, err)
+ }
+ opt.TLSConfig.Certificates = []tls.Certificate{cert}
+ }
}
if opt.Password == "" {
opt.Password = os.Getenv("REDIS_PASSWORD")
@@ -546,6 +559,18 @@ func (m *redisMeta) dirUsedInodesKey() string {
return m.prefix + "dirUsedInodes"
}
+func (m *redisMeta) dirQuotaUsedSpaceKey() string {
+ return m.prefix + "dirQuotaUsedSpace"
+}
+
+func (m *redisMeta) dirQuotaUsedInodesKey() string {
+ return m.prefix + "dirQuotaUsedInodes"
+}
+
+func (m *redisMeta) dirQuotaKey() string {
+ return m.prefix + "dirQuota"
+}
+
func (m *redisMeta) totalInodesKey() string {
return m.prefix + totalInodes
}
@@ -574,6 +599,25 @@ func (m *redisMeta) sliceRefs() string {
return m.prefix + "sliceRef"
}
+func (m *redisMeta) packQuota(space, inodes int64) []byte {
+ wb := utils.NewBuffer(16)
+ wb.Put64(uint64(space))
+ wb.Put64(uint64(inodes))
+ return wb.Bytes()
+}
+
+func (m *redisMeta) parseQuota(buf []byte) (space, inodes int64) {
+ if len(buf) == 0 {
+ return 0, 0
+ }
+ if len(buf) != 16 {
+ logger.Errorf("Invalid quota value: %v", buf)
+ return 0, 0
+ }
+ rb := utils.ReadBuffer(buf)
+ return int64(rb.Get64()), int64(rb.Get64())
+}
+
func (m *redisMeta) packEntry(_type uint8, inode Ino) []byte {
wb := utils.NewBuffer(9)
wb.Put8(_type)
@@ -791,8 +835,10 @@ func (m *redisMeta) txn(ctx Context, txf func(tx *redis.Tx) error, keys ...strin
var khash = fnv.New32()
_, _ = khash.Write([]byte(keys[0]))
h := uint(khash.Sum32())
+
start := time.Now()
defer func() { m.txDist.Observe(time.Since(start).Seconds()) }()
+
m.txLock(h)
defer m.txUnlock(h)
// TODO: enable retry for some of idempodent transactions
@@ -853,7 +899,7 @@ func (m *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64,
}
newLength = int64(length) - int64(t.Length)
newSpace = align4K(length) - align4K(t.Length)
- if newSpace > 0 && m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, t.Parent) {
return syscall.ENOSPC
}
var zeroChunks []uint32
@@ -981,7 +1027,7 @@ func (m *redisMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, si
old := t.Length
newLength = int64(length) - int64(old)
newSpace = align4K(length) - align4K(old)
- if newSpace > 0 && m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, t.Parent) {
return syscall.ENOSPC
}
t.Length = length
@@ -1376,7 +1422,7 @@ func (m *redisMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, s
return errno(err)
}
-func (m *redisMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ...bool) syscall.Errno {
+func (m *redisMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skipCheckTrash ...bool) syscall.Errno {
var trash Ino
if !(len(skipCheckTrash) == 1 && skipCheckTrash[0]) {
if st := m.checkTrash(parent, &trash); st != 0 {
@@ -1399,6 +1445,9 @@ func (m *redisMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash
if typ != TypeDirectory {
return syscall.ENOTDIR
}
+ if pinode != nil {
+ *pinode = inode
+ }
if err = tx.Watch(ctx, m.inodeKey(inode), m.entryKey(inode)).Err(); err != nil {
return err
}
@@ -1459,10 +1508,13 @@ func (m *redisMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash
pipe.Decr(ctx, m.totalInodesKey())
}
- field := strconv.FormatUint(uint64(inode), 10)
+ field := inode.String()
pipe.HDel(ctx, m.dirDataLengthKey(), field)
pipe.HDel(ctx, m.dirUsedSpaceKey(), field)
pipe.HDel(ctx, m.dirUsedInodesKey(), field)
+ pipe.HDel(ctx, m.dirQuotaKey(), field)
+ pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
+ pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
return nil
})
return err
@@ -1701,6 +1753,12 @@ func (m *redisMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentD
pipe.Del(ctx, m.parentKey(dino))
}
}
+ if dtyp == TypeDirectory {
+ field := dino.String()
+ pipe.HDel(ctx, m.dirQuotaKey(), field)
+ pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
+ pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
+ }
}
}
if parentDst != parentSrc {
@@ -2011,11 +2069,10 @@ func (m *redisMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
return err
}
m.parseAttr(a, &attr)
- var newSpace int64
+ newSpace := -align4K(attr.Length)
_, err = m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.ZAdd(ctx, m.delfiles(), redis.Z{Score: float64(time.Now().Unix()), Member: m.toDelete(inode, attr.Length)})
pipe.Del(ctx, m.inodeKey(inode))
- newSpace = -align4K(attr.Length)
pipe.IncrBy(ctx, m.usedSpaceKey(), newSpace)
pipe.Decr(ctx, m.totalInodesKey())
pipe.SRem(ctx, m.sustained(sid), strconv.Itoa(int(inode)))
@@ -2024,6 +2081,7 @@ func (m *redisMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
if err == nil {
m.updateStats(newSpace, -1)
m.tryDeleteFileData(inode, attr.Length, false)
+ m.updateDirQuota(Background, attr.Parent, newSpace, -1)
}
return err
}
@@ -2083,7 +2141,7 @@ func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice
newSpace = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
}
- if m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, attr.Parent) {
return syscall.ENOSPC
}
now := time.Now()
@@ -2164,7 +2222,7 @@ func (m *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino,
newSpace = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
}
- if m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, attr.Parent) {
return syscall.ENOSPC
}
now := time.Now()
@@ -2345,31 +2403,31 @@ func (m *redisMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
return nil
}
-func (m *redisMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, error) {
+func (m *redisMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, syscall.Errno) {
field := ino.String()
dataLength, errLength := m.rdb.HGet(ctx, m.dirDataLengthKey(), field).Int64()
if errLength != nil && errLength != redis.Nil {
- return nil, errLength
+ return nil, errno(errLength)
}
usedSpace, errSpace := m.rdb.HGet(ctx, m.dirUsedSpaceKey(), field).Int64()
if errSpace != nil && errSpace != redis.Nil {
- return nil, errSpace
+ return nil, errno(errSpace)
}
usedInodes, errInodes := m.rdb.HGet(ctx, m.dirUsedInodesKey(), field).Int64()
if errInodes != nil && errSpace != redis.Nil {
- return nil, errInodes
+ return nil, errno(errInodes)
}
if errLength != redis.Nil && errSpace != redis.Nil && errInodes != redis.Nil {
if trySync && (dataLength < 0 || usedSpace < 0 || usedInodes < 0) {
return m.doSyncDirStat(ctx, ino)
}
- return &dirStat{dataLength, usedSpace, usedInodes}, nil
+ return &dirStat{dataLength, usedSpace, usedInodes}, 0
}
if trySync {
return m.doSyncDirStat(ctx, ino)
}
- return nil, nil
+ return nil, 0
}
// For now only deleted files
@@ -2475,9 +2533,13 @@ func (m *redisMeta) cleanupLeakedChunks(delete bool) {
rs = append(rs, p.Exists(ctx, m.inodeKey(Ino(ino))))
}
if len(rs) > 0 {
- _, err := p.Exec(ctx)
+ cmds, err := p.Exec(ctx)
if err != nil {
- logger.Errorf("check inodes: %s", err)
+ for _, c := range cmds {
+ if c.Err() != nil {
+ logger.Errorf("Check inodes with command %s: %s", c.String(), c.Err())
+ }
+ }
return err
}
for i, rr := range rs {
@@ -2819,7 +2881,11 @@ func (m *redisMeta) scanAllChunks(ctx Context, ch chan<- cchunk, bar *utils.Bar)
}
cmds, err := p.Exec(ctx)
if err != nil {
- logger.Warnf("list slices: %s", err)
+ for _, c := range cmds {
+ if c.Err() != nil {
+ logger.Warnf("Scan chunks with command %s: %s", c.String(), c.Err())
+ }
+ }
return err
}
for i, cmd := range cmds {
@@ -2956,7 +3022,11 @@ func (m *redisMeta) ListSlices(ctx Context, slices map[Ino][]Slice, delete bool,
}
cmds, err := p.Exec(ctx)
if err != nil {
- logger.Warnf("list slices: %s", err)
+ for _, c := range cmds {
+ if c.Err() != nil {
+ logger.Warnf("List slices with command %s: %s", c.String(), c.Err())
+ }
+ }
return err
}
for _, cmd := range cmds {
@@ -3242,6 +3312,107 @@ func (m *redisMeta) doRemoveXattr(ctx Context, inode Ino, name string) syscall.E
}
}
+func (m *redisMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
+ field := inode.String()
+ cmds, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
+ pipe.HGet(ctx, m.dirQuotaKey(), field)
+ pipe.HGet(ctx, m.dirQuotaUsedSpaceKey(), field)
+ pipe.HGet(ctx, m.dirQuotaUsedInodesKey(), field)
+ return nil
+ })
+ if err == redis.Nil {
+ return nil, nil
+ } else if err != nil {
+ return nil, err
+ }
+
+ buf, _ := cmds[0].(*redis.StringCmd).Bytes()
+ if len(buf) != 16 {
+ return nil, fmt.Errorf("invalid quota value: %v", buf)
+ }
+ var quota Quota
+ quota.MaxSpace, quota.MaxInodes = m.parseQuota(buf)
+ if quota.UsedSpace, err = cmds[1].(*redis.StringCmd).Int64(); err != nil {
+ return nil, err
+ }
+ if quota.UsedInodes, err = cmds[2].(*redis.StringCmd).Int64(); err != nil {
+ return nil, err
+ }
+ return "a, nil
+}
+
+func (m *redisMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
+ return m.txn(ctx, func(tx *redis.Tx) error {
+ field := inode.String()
+ if create {
+ _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
+ pipe.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes))
+ pipe.HSet(ctx, m.dirQuotaUsedSpaceKey(), field, quota.UsedSpace)
+ pipe.HSet(ctx, m.dirQuotaUsedInodesKey(), field, quota.UsedInodes)
+ return nil
+ })
+ return err
+ } else {
+ return tx.HSet(ctx, m.dirQuotaKey(), field, m.packQuota(quota.MaxSpace, quota.MaxInodes)).Err()
+ }
+ }, m.dirQuotaKey())
+}
+
+func (m *redisMeta) doDelQuota(ctx Context, inode Ino) error {
+ field := inode.String()
+ _, err := m.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
+ pipe.HDel(ctx, m.dirQuotaKey(), field)
+ pipe.HDel(ctx, m.dirQuotaUsedSpaceKey(), field)
+ pipe.HDel(ctx, m.dirQuotaUsedInodesKey(), field)
+ return nil
+ })
+ return err
+}
+
+func (m *redisMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) {
+ quotas := make(map[Ino]*Quota)
+ return quotas, m.hscan(ctx, m.dirQuotaKey(), func(keys []string) error {
+ for i := 0; i < len(keys); i += 2 {
+ key, val := keys[i], []byte(keys[i+1])
+ inode, err := strconv.ParseUint(key, 10, 64)
+ if err != nil {
+ logger.Errorf("invalid inode: %s", key)
+ continue
+ }
+ if len(val) != 16 {
+ logger.Errorf("invalid quota: %s=%s", key, val)
+ continue
+ }
+ maxSpace, maxInodes := m.parseQuota(val)
+ usedSpace, err := m.rdb.HGet(ctx, m.dirQuotaUsedSpaceKey(), key).Int64()
+ if err != nil && err != redis.Nil {
+ return err
+ }
+ usedInodes, err := m.rdb.HGet(ctx, m.dirQuotaUsedInodesKey(), key).Int64()
+ if err != nil && err != redis.Nil {
+ return err
+ }
+ quotas[Ino(inode)] = &Quota{
+ MaxSpace: int64(maxSpace),
+ MaxInodes: int64(maxInodes),
+ UsedSpace: usedSpace,
+ UsedInodes: usedInodes,
+ }
+ }
+ return nil
+ })
+}
+
+func (m *redisMeta) doFlushQuota(ctx Context, inode Ino, space, inodes int64) error {
+ field := inode.String()
+ _, err := m.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
+ p.HIncrBy(ctx, m.dirQuotaUsedSpaceKey(), field, space)
+ p.HIncrBy(ctx, m.dirQuotaUsedInodesKey(), field, inodes)
+ return nil
+ })
+ return err
+}
+
func (m *redisMeta) checkServerConfig() {
rawInfo, err := m.rdb.Info(Background).Result()
if err != nil {
@@ -3996,7 +4167,7 @@ func (m *redisMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParent
}
// copy chunks
if srcAttr.Length != 0 {
- if m.checkQuota(align4K(srcAttr.Length), 0) {
+ if m.checkQuota(ctx, align4K(srcAttr.Length), 0, dstParentIno) {
return syscall.ENOSPC
}
p := tx.Pipeline()
@@ -4066,7 +4237,7 @@ func (m *redisMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParent
}
func (m *redisMeta) mkNodeWithAttr(ctx Context, tx *redis.Tx, srcIno Ino, srcAttr *Attr, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, attach bool) error {
- if m.checkQuota(4<<10, 1) {
+ if m.checkQuota(ctx, align4K(0), 1, dstParentIno) {
return syscall.ENOSPC
}
srcAttr.Parent = dstParentIno
diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go
index 476585df7894..57cc2d9f1edb 100644
--- a/pkg/meta/sql.go
+++ b/pkg/meta/sql.go
@@ -171,6 +171,14 @@ type detachedNode struct {
Expire int64 `xorm:"notnull"`
}
+type dirQuota struct {
+ Inode Ino `xorm:"pk"`
+ MaxSpace int64 `xorm:"notnull"`
+ MaxInodes int64 `xorm:"notnull"`
+ UsedSpace int64 `xorm:"notnull"`
+ UsedInodes int64 `xorm:"notnull"`
+}
+
type dbMeta struct {
*baseMeta
db *xorm.Engine
@@ -285,8 +293,8 @@ func (m *dbMeta) Init(format *Format, force bool) error {
if err := m.syncTable(new(session2), new(sustained), new(delfile)); err != nil {
return fmt.Errorf("create table session2, sustaind, delfile: %s", err)
}
- if err := m.syncTable(new(flock), new(plock)); err != nil {
- return fmt.Errorf("create table flock, plock: %s", err)
+ if err := m.syncTable(new(flock), new(plock), new(dirQuota)); err != nil {
+ return fmt.Errorf("create table flock, plock, dirQuota: %s", err)
}
if err := m.syncTable(new(dirStats)); err != nil {
return fmt.Errorf("create table dirStats: %s", err)
@@ -377,7 +385,7 @@ func (m *dbMeta) Reset() error {
&node{}, &edge{}, &symlink{}, &xattr{},
&chunk{}, &sliceRef{}, &delslices{},
&session{}, &session2{}, &sustained{}, &delfile{},
- &flock{}, &plock{}, &dirStats{}, &detachedNode{})
+ &flock{}, &plock{}, &dirStats{}, &dirQuota{},&detachedNode{})
}
func (m *dbMeta) doLoad() (data []byte, err error) {
@@ -399,9 +407,9 @@ func (m *dbMeta) doLoad() (data []byte, err error) {
func (m *dbMeta) doNewSession(sinfo []byte) error {
// add new table
- err := m.syncTable(new(session2), new(delslices), new(dirStats))
+ err := m.syncTable(new(session2), new(delslices), new(dirStats), new(dirQuota))
if err != nil {
- return fmt.Errorf("update table session2, delslices, dirstats: %s", err)
+ return fmt.Errorf("update table session2, delslices, dirstats, dirQuota: %s", err)
}
// add primary key
if err = m.syncTable(new(edge), new(chunk), new(xattr), new(sustained)); err != nil {
@@ -418,7 +426,7 @@ func (m *dbMeta) doNewSession(sinfo []byte) error {
}); err == nil {
break
}
- if strings.Contains(err.Error(), "UNIQUE constraint failed") {
+ if isDuplicateEntryErr(err) {
logger.Warnf("session id %d is already used", m.sid)
if v, e := m.incrCounter("nextSession", 1); e == nil {
m.sid = uint64(v)
@@ -978,7 +986,7 @@ func (m *dbMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, at
}
newLength = int64(length) - int64(nodeAttr.Length)
newSpace = align4K(length) - align4K(nodeAttr.Length)
- if newSpace > 0 && m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, nodeAttr.Parent) {
return syscall.ENOSPC
}
var zeroChunks []chunk
@@ -1084,7 +1092,7 @@ func (m *dbMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size
old := nodeAttr.Length
newLength = int64(length) - int64(old)
newSpace = align4K(length) - align4K(old)
- if newSpace > 0 && m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, nodeAttr.Parent) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
@@ -1403,14 +1411,14 @@ func (m *dbMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
m.fileDeleted(opened, isTrash(parent), n.Inode, n.Length)
}
m.updateStats(newSpace, newInode)
- if attr != nil {
- m.parseAttr(&n, attr)
- }
+ }
+ if err == nil && attr != nil {
+ m.parseAttr(&n, attr)
}
return errno(err)
}
-func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ...bool) syscall.Errno {
+func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skipCheckTrash ...bool) syscall.Errno {
var trash Ino
if !(len(skipCheckTrash) == 1 && skipCheckTrash[0]) {
if st := m.checkTrash(parent, &trash); st != 0 {
@@ -1451,6 +1459,9 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ..
if e.Type != TypeDirectory {
return syscall.ENOTDIR
}
+ if pinode != nil {
+ *pinode = e.Inode
+ }
var n = node{Inode: e.Inode}
ok, err = s.ForUpdate().Get(&n)
if err != nil {
@@ -1486,6 +1497,9 @@ func (m *dbMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ..
if _, err := s.Delete(&dirStats{Inode: e.Inode}); err != nil {
logger.Warnf("remove dir usage of ino(%d): %s", e.Inode, err)
}
+ if _, err = s.Delete(&dirQuota{Inode: e.Inode}); err != nil {
+ return err
+ }
if trash > 0 {
if _, err = s.Cols("ctime", "parent").Update(&n, &node{Inode: n.Inode}); err != nil {
@@ -1761,6 +1775,11 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
if _, err := s.Delete(&edge{Parent: parentDst, Name: de.Name}); err != nil {
return err
}
+ if de.Type == TypeDirectory {
+ if _, err = s.Delete(&dirQuota{Inode: dino}); err != nil {
+ return err
+ }
+ }
}
if err = mustInsert(s, &edge{Parent: parentDst, Name: de.Name, Inode: se.Inode, Type: se.Type}); err != nil {
return err
@@ -1999,9 +2018,7 @@ func (m *dbMeta) doRefreshSession() error {
func (m *dbMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
var n = node{Inode: inode}
- var newSpace int64
err := m.txn(func(s *xorm.Session) error {
- newSpace = 0
n = node{Inode: inode}
ok, err := s.ForUpdate().Get(&n)
if err != nil {
@@ -2017,13 +2034,14 @@ func (m *dbMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
if err != nil {
return err
}
- newSpace = -align4K(n.Length)
_, err = s.Delete(&node{Inode: inode})
return err
})
if err == nil {
+ newSpace := -align4K(n.Length)
m.updateStats(newSpace, -1)
m.tryDeleteFileData(inode, n.Length, false)
+ m.updateDirQuota(Background, n.Parent, newSpace, -1)
}
return err
}
@@ -2089,7 +2107,7 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
newSpace = align4K(newleng) - align4K(nodeAttr.Length)
nodeAttr.Length = newleng
}
- if m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, nodeAttr.Parent) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
@@ -2171,7 +2189,7 @@ func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off
newSpace = align4K(newleng) - align4K(nout.Length)
nout.Length = newleng
}
- if m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, nout.Parent) {
return syscall.ENOSPC
}
now := time.Now().UnixNano() / 1e3
@@ -2330,7 +2348,7 @@ func (m *dbMeta) doSyncDirStat(ctx Context, ino Ino) (*dirStat, syscall.Errno) {
return syscall.ENOENT
}
_, err = s.Insert(&dirStats{ino, stat.length, stat.space, stat.inodes})
- if err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") {
+ if err != nil && isDuplicateEntryErr(err) {
// other client synced
err = nil
}
@@ -2339,7 +2357,7 @@ func (m *dbMeta) doSyncDirStat(ctx Context, ino Ino) (*dirStat, syscall.Errno) {
return stat, errno(err)
}
-func (m *dbMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, error) {
+func (m *dbMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, syscall.Errno) {
st := dirStats{Inode: ino}
var exist bool
var err error
@@ -2347,13 +2365,13 @@ func (m *dbMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, err
exist, err = s.Get(&st)
return err
}); err != nil {
- return nil, err
+ return nil, errno(err)
}
if !exist {
if trySync {
return m.doSyncDirStat(ctx, ino)
}
- return nil, nil
+ return nil, 0
}
if trySync && (st.UsedSpace < 0 || st.UsedInodes < 0) {
@@ -2377,7 +2395,7 @@ func (m *dbMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, err
logger.Warn(e)
}
}
- return &dirStat{st.DataLength, st.UsedSpace, st.UsedInodes}, nil
+ return &dirStat{st.DataLength, st.UsedSpace, st.UsedInodes}, 0
}
func (m *dbMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) {
@@ -2998,6 +3016,93 @@ func (m *dbMeta) doRemoveXattr(ctx Context, inode Ino, name string) syscall.Errn
}))
}
+func (m *dbMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
+ var quota *Quota
+ return quota, m.roTxn(func(s *xorm.Session) error {
+ q := dirQuota{Inode: inode}
+ ok, e := s.Get(&q)
+ if e == nil && ok {
+ quota = &Quota{
+ MaxSpace: q.MaxSpace,
+ MaxInodes: q.MaxInodes,
+ UsedSpace: q.UsedSpace,
+ UsedInodes: q.UsedInodes}
+ }
+ return e
+ })
+}
+
+func (m *dbMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
+ return m.txn(func(s *xorm.Session) error {
+ q := dirQuota{Inode: inode}
+ ok, e := s.ForUpdate().Get(&q)
+ if e != nil {
+ return e
+ }
+ q.MaxSpace, q.MaxInodes = quota.MaxSpace, quota.MaxInodes
+ if ok {
+ if create {
+ q.UsedSpace, q.UsedInodes = quota.UsedSpace, quota.UsedInodes
+ _, e = s.Cols("max_space", "max_inodes", "used_space", "used_inodes").Update(&q, &dirQuota{Inode: inode})
+ } else {
+ quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
+ _, e = s.Cols("max_space", "max_inodes").Update(&q, &dirQuota{Inode: inode})
+ }
+ } else {
+ q.UsedSpace, q.UsedInodes = quota.UsedSpace, quota.UsedInodes
+ e = mustInsert(s, &q)
+ }
+ return e
+ })
+}
+
+func (m *dbMeta) doDelQuota(ctx Context, inode Ino) error {
+ return m.txn(func(s *xorm.Session) error {
+ _, e := s.Delete(&dirQuota{Inode: inode})
+ return e
+ })
+}
+
+func (m *dbMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) {
+ var rows []dirQuota
+ err := m.roTxn(func(s *xorm.Session) error {
+ rows = rows[:0]
+ return s.Find(&rows)
+ })
+ if err != nil || len(rows) == 0 {
+ return nil, err
+ }
+
+ quotas := make(map[Ino]*Quota, len(rows))
+ for _, row := range rows {
+ quotas[row.Inode] = &Quota{
+ MaxSpace: row.MaxSpace,
+ MaxInodes: row.MaxInodes,
+ UsedSpace: row.UsedSpace,
+ UsedInodes: row.UsedInodes,
+ }
+ }
+ return quotas, nil
+}
+
+func (m *dbMeta) doFlushQuota(ctx Context, inode Ino, space, inodes int64) error {
+ return m.txn(func(s *xorm.Session) error {
+ q := dirQuota{Inode: inode}
+ // FIXME: use Update
+ ok, err := s.Get(&q)
+ if err == nil && !ok {
+ logger.Warnf("No quota for inode %d, skip flushing", inode)
+ return nil
+ }
+ if err == nil {
+ q.UsedSpace += space
+ q.UsedInodes += inodes
+ _, err = s.Cols("used_space", "used_inodes").Update(&q, &dirQuota{Inode: inode})
+ }
+ return err
+ })
+}
+
func (m *dbMeta) dumpEntry(s *xorm.Session, inode Ino, typ uint8) (*DumpedEntry, error) {
e := &DumpedEntry{}
n := &node{Inode: inode}
@@ -3452,8 +3557,8 @@ func (m *dbMeta) LoadMeta(r io.Reader) error {
if err = m.syncTable(new(session2), new(sustained), new(delfile)); err != nil {
return fmt.Errorf("create table session2, sustaind, delfile: %s", err)
}
- if err = m.syncTable(new(flock), new(plock)); err != nil {
- return fmt.Errorf("create table flock, plock: %s", err)
+ if err = m.syncTable(new(flock), new(plock), new(dirQuota)); err != nil {
+ return fmt.Errorf("create table flock, plock, dirQuota: %s", err)
}
if err := m.syncTable(new(dirStats)); err != nil {
return fmt.Errorf("create table dirStats: %s", err)
@@ -3754,7 +3859,7 @@ func (m *dbMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno
}
// copy chunks
if srcNode.Length != 0 {
- if m.checkQuota(align4K(srcNode.Length), 0) {
+ if m.checkQuota(ctx, align4K(srcNode.Length), 0, dstParentIno) {
return syscall.ENOSPC
}
var cs []chunk
@@ -3827,7 +3932,7 @@ func (m *dbMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno
}
func (m *dbMeta) mkNodeWithAttr(ctx Context, s *xorm.Session, srcIno Ino, srcNode *node, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, attach bool) error {
- if m.checkQuota(4<<10, 1) {
+ if m.checkQuota(ctx, align4K(0), 1, dstParentIno) {
return syscall.ENOSPC
}
srcNode.Parent = dstParentIno
diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go
index 429eef9761df..d40f8f3e118d 100644
--- a/pkg/meta/tkv.go
+++ b/pkg/meta/tkv.go
@@ -184,8 +184,9 @@ All keys:
SHssssssss session heartbeat // for legacy client
SIssssssss session info
SSssssssssiiiiiiii sustained inode
- Uiiiiiiii data length, space and inodes usage in directory
Niiiiiiii detached inodes
+ Uiiiiiiii data length, space and inodes usage in directory
+ QDiiiiiiii directory quota
*/
func (m *kvMeta) inodeKey(inode Ino) []byte {
@@ -244,6 +245,10 @@ func (m *kvMeta) detachedKey(inode Ino) []byte {
return m.fmtKey("N", inode)
}
+func (m *kvMeta) dirQuotaKey(inode Ino) []byte {
+ return m.fmtKey("QD", inode)
+}
+
func (m *kvMeta) parseSid(key string) uint64 {
buf := []byte(key[2:]) // "SE" or "SH"
if len(buf) != 8 {
@@ -335,6 +340,25 @@ func (m *kvMeta) parseDirStat(buf []byte) *dirStat {
return &dirStat{int64(b.Get64()), int64(b.Get64()), int64(b.Get64())}
}
+func (m *kvMeta) packQuota(q *Quota) []byte {
+ b := utils.NewBuffer(32)
+ b.Put64(uint64(q.MaxSpace))
+ b.Put64(uint64(q.MaxInodes))
+ b.Put64(uint64(q.UsedSpace))
+ b.Put64(uint64(q.UsedInodes))
+ return b.Bytes()
+}
+
+func (m *kvMeta) parseQuota(buf []byte) *Quota {
+ b := utils.FromBuffer(buf)
+ return &Quota{
+ MaxSpace: int64(b.Get64()),
+ MaxInodes: int64(b.Get64()),
+ UsedSpace: int64(b.Get64()),
+ UsedInodes: int64(b.Get64()),
+ }
+}
+
func (m *kvMeta) get(key []byte) ([]byte, error) {
var value []byte
err := m.client.txn(func(tx *kvTxn) error {
@@ -922,7 +946,7 @@ func (m *kvMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, at
}
newLength = int64(length) - int64(t.Length)
newSpace = align4K(length) - align4K(t.Length)
- if newSpace > 0 && m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, t.Parent) {
return syscall.ENOSPC
}
var left, right = t.Length, length
@@ -1018,7 +1042,7 @@ func (m *kvMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size
old := t.Length
newLength = int64(length) - int64(t.Length)
newSpace = align4K(length) - align4K(t.Length)
- if newSpace > 0 && m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, t.Parent) {
return syscall.ENOSPC
}
t.Length = length
@@ -1310,7 +1334,7 @@ func (m *kvMeta) doUnlink(ctx Context, parent Ino, name string, attr *Attr, skip
return errno(err)
}
-func (m *kvMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ...bool) syscall.Errno {
+func (m *kvMeta) doRmdir(ctx Context, parent Ino, name string, pinode *Ino, skipCheckTrash ...bool) syscall.Errno {
var trash Ino
if !(len(skipCheckTrash) == 1 && skipCheckTrash[0]) {
if st := m.checkTrash(parent, &trash); st != 0 {
@@ -1332,6 +1356,9 @@ func (m *kvMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ..
if _type != TypeDirectory {
return syscall.ENOTDIR
}
+ if pinode != nil {
+ *pinode = inode
+ }
rs := tx.gets(m.inodeKey(parent), m.inodeKey(inode))
if rs[0] == nil {
return syscall.ENOENT
@@ -1374,6 +1401,7 @@ func (m *kvMeta) doRmdir(ctx Context, parent Ino, name string, skipCheckTrash ..
}
tx.delete(m.entryKey(parent, name))
tx.delete(m.dirStatKey(inode))
+ tx.delete(m.dirQuotaKey(inode))
if trash > 0 {
tx.set(m.inodeKey(inode), m.marshal(&attr))
tx.set(m.entryKey(trash, m.trashEntry(parent, inode, name)), buf)
@@ -1587,6 +1615,9 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
tx.deleteKeys(m.fmtKey("A", dino, "P"))
}
}
+ if dtyp == TypeDirectory {
+ tx.delete(m.dirQuotaKey(dino))
+ }
}
}
if parentDst != parentSrc {
@@ -1749,9 +1780,7 @@ func (m *kvMeta) doReaddir(ctx Context, inode Ino, plus uint8, entries *[]*Entry
func (m *kvMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
var attr Attr
- var newSpace int64
err := m.txn(func(tx *kvTxn) error {
- newSpace = 0
a := tx.get(m.inodeKey(inode))
if a == nil {
return nil
@@ -1760,12 +1789,13 @@ func (m *kvMeta) doDeleteSustainedInode(sid uint64, inode Ino) error {
tx.set(m.delfileKey(inode, attr.Length), m.packInt64(time.Now().Unix()))
tx.delete(m.inodeKey(inode))
tx.delete(m.sustainedKey(sid, inode))
- newSpace = -align4K(attr.Length)
return nil
})
if err == nil {
+ newSpace := -align4K(attr.Length)
m.updateStats(newSpace, -1)
m.tryDeleteFileData(inode, attr.Length, false)
+ m.updateDirQuota(Background, attr.Parent, newSpace, -1)
}
return err
}
@@ -1824,11 +1854,10 @@ func (m *kvMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl
newLength = int64(newleng - attr.Length)
newSpace = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
- if m.checkQuota(newSpace, 0) {
- return syscall.ENOSPC
- }
}
-
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, attr.Parent) {
+ return syscall.ENOSPC
+ }
now := time.Now()
attr.Mtime = now.Unix()
attr.Mtimensec = uint32(now.Nanosecond())
@@ -1892,7 +1921,7 @@ func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off
newSpace = align4K(newleng) - align4K(attr.Length)
attr.Length = newleng
}
- if m.checkQuota(newSpace, 0) {
+ if newSpace > 0 && m.checkQuota(ctx, newSpace, 0, attr.Parent) {
return syscall.ENOSPC
}
now := time.Now()
@@ -2048,18 +2077,18 @@ func (m *kvMeta) doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error {
return nil
}
-func (m *kvMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, error) {
+func (m *kvMeta) doGetDirStat(ctx Context, ino Ino, trySync bool) (*dirStat, syscall.Errno) {
rawStat, err := m.get(m.dirStatKey(ino))
if err != nil {
- return nil, err
+ return nil, errno(err)
}
if rawStat != nil {
- return m.parseDirStat(rawStat), nil
+ return m.parseDirStat(rawStat), 0
}
if trySync {
return m.doSyncDirStat(ctx, ino)
}
- return nil, nil
+ return nil, 0
}
func (m *kvMeta) doFindDeletedFiles(ts int64, limit int) (map[Ino]uint64, error) {
@@ -2601,6 +2630,71 @@ func (m *kvMeta) doRemoveXattr(ctx Context, inode Ino, name string) syscall.Errn
}))
}
+func (m *kvMeta) doGetQuota(ctx Context, inode Ino) (*Quota, error) {
+ buf, err := m.get(m.dirQuotaKey(inode))
+ if err != nil {
+ return nil, err
+ } else if buf == nil {
+ return nil, nil
+ } else if len(buf) != 32 {
+ return nil, fmt.Errorf("invalid quota value: %v", buf)
+ }
+ return m.parseQuota(buf), nil
+}
+
+func (m *kvMeta) doSetQuota(ctx Context, inode Ino, quota *Quota, create bool) error {
+ return m.txn(func(tx *kvTxn) error {
+ if create {
+ tx.set(m.dirQuotaKey(inode), m.packQuota(quota))
+ } else {
+ buf := tx.get(m.dirQuotaKey(inode))
+ if len(buf) == 32 {
+ q := m.parseQuota(buf)
+ quota.UsedSpace, quota.UsedInodes = q.UsedSpace, q.UsedInodes
+ }
+ tx.set(m.dirQuotaKey(inode), m.packQuota(quota))
+ }
+ return nil
+ })
+}
+func (m *kvMeta) doDelQuota(ctx Context, inode Ino) error {
+ return m.deleteKeys(m.dirQuotaKey(inode))
+}
+
+func (m *kvMeta) doLoadQuotas(ctx Context) (map[Ino]*Quota, error) {
+ pairs, err := m.scanValues(m.fmtKey("QD"), -1, nil)
+ if err != nil || len(pairs) == 0 {
+ return nil, err
+ }
+
+ quotas := make(map[Ino]*Quota, len(pairs))
+ for k, v := range pairs {
+ inode := m.decodeInode([]byte(k[2:])) // skip "QD"
+ quota := m.parseQuota(v)
+ quotas[inode] = quota
+ }
+ return quotas, nil
+}
+
+func (m *kvMeta) doFlushQuota(ctx Context, inode Ino, space, inodes int64) error {
+ key := m.dirQuotaKey(inode)
+ return m.txn(func(tx *kvTxn) error {
+ rawQ := tx.get(key)
+ if len(rawQ) == 0 {
+ logger.Warnf("No quota for inode %d, skip flushing", inode)
+ return nil
+ }
+ if len(rawQ) != 32 {
+ return fmt.Errorf("invalid quota value: %v", rawQ)
+ }
+ q := m.parseQuota(rawQ)
+ q.UsedSpace += space
+ q.UsedInodes += inodes
+ tx.set(key, m.packQuota(q))
+ return nil
+ }, inode)
+}
+
func (m *kvMeta) dumpEntry(inode Ino, e *DumpedEntry) error {
if m.snap != nil {
return nil
@@ -3264,7 +3358,7 @@ func (m *kvMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno
}
// copy chunks
if srcAttr.Length != 0 {
- if m.checkQuota(align4K(srcAttr.Length), 0) {
+ if m.checkQuota(ctx, align4K(srcAttr.Length), 0, dstParentIno) {
return syscall.ENOSPC
}
vals := make(map[string][]byte)
@@ -3324,7 +3418,7 @@ func (m *kvMeta) cloneEntry(ctx Context, srcIno Ino, srcType uint8, dstParentIno
}
func (m *kvMeta) mkNodeWithAttr(ctx Context, tx *kvTxn, srcIno Ino, srcAttr *Attr, dstParentIno Ino, dstName string, dstIno *Ino, cmode uint8, cumask uint16, attach bool) error {
- if m.checkQuota(4<<10, 1) {
+ if m.checkQuota(ctx, align4K(0), 1, dstParentIno) {
return syscall.ENOSPC
}
srcAttr.Parent = dstParentIno
diff --git a/pkg/meta/utils.go b/pkg/meta/utils.go
index 444f50c56b7d..3ead5459c94e 100644
--- a/pkg/meta/utils.go
+++ b/pkg/meta/utils.go
@@ -338,9 +338,9 @@ func (m *baseMeta) Remove(ctx Context, parent Ino, name string, count *uint64) s
return m.emptyEntry(ctx, parent, name, inode, false, count, concurrent)
}
-func GetSummary(r Meta, ctx Context, inode Ino, summary *Summary, recursive bool) syscall.Errno {
+func (m *baseMeta) GetSummary(ctx Context, inode Ino, summary *Summary, recursive bool) syscall.Errno {
var attr Attr
- if st := r.GetAttr(ctx, inode, &attr); st != 0 {
+ if st := m.GetAttr(ctx, inode, &attr); st != 0 {
return st
}
if attr.Typ != TypeDirectory {
@@ -364,7 +364,7 @@ func GetSummary(r Meta, ctx Context, inode Ino, summary *Summary, recursive bool
ino := dirs[i]
entries := &entriesList[i]
eg.Go(func() error {
- st := r.Readdir(ctx, ino, 1, entries)
+ st := m.Readdir(ctx, ino, 1, entries)
if st != 0 && st != syscall.ENOENT {
return st
}
@@ -399,9 +399,9 @@ func GetSummary(r Meta, ctx Context, inode Ino, summary *Summary, recursive bool
return 0
}
-func FastGetSummary(r Meta, ctx Context, inode Ino, summary *Summary, recursive bool) syscall.Errno {
+func (m *baseMeta) FastGetSummary(ctx Context, inode Ino, summary *Summary, recursive bool) syscall.Errno {
var attr Attr
- if st := r.GetAttr(ctx, inode, &attr); st != 0 {
+ if st := m.GetAttr(ctx, inode, &attr); st != 0 {
return st
}
if attr.Typ != TypeDirectory {
@@ -413,6 +413,7 @@ func FastGetSummary(r Meta, ctx Context, inode Ino, summary *Summary, recursive
return 0
}
summary.Dirs++
+ summary.Size += uint64(align4K(0))
const concurrency = 50
dirs := []Ino{inode}
@@ -426,20 +427,20 @@ func FastGetSummary(r Meta, ctx Context, inode Ino, summary *Summary, recursive
entries := &entriesList[i]
stat := &dirStats[i]
eg.Go(func() error {
- s, err := r.GetDirStat(ctx, ino)
- if err != nil {
- return err
+ s, st := m.GetDirStat(ctx, ino)
+ if st != 0 {
+ return st
}
*stat = *s
var attr Attr
- if st := r.GetAttr(ctx, ino, &attr); st != 0 && st != syscall.ENOENT {
+ if st := m.GetAttr(ctx, ino, &attr); st != 0 && st != syscall.ENOENT {
return st
}
if attr.Nlink == 2 {
// leaf dir, no need to read entries
return nil
}
- if st := r.Readdir(ctx, ino, 0, entries); st != 0 && st != syscall.ENOENT {
+ if st := m.Readdir(ctx, ino, 0, entries); st != 0 && st != syscall.ENOENT {
return st
}
return nil
diff --git a/pkg/object/mem.go b/pkg/object/mem.go
index 758bd3490389..23f0ac9ded0d 100644
--- a/pkg/object/mem.go
+++ b/pkg/object/mem.go
@@ -129,16 +129,39 @@ func (m *memStore) Delete(key string) error {
}
func (m *memStore) List(prefix, marker, delimiter string, limit int64) ([]Object, error) {
- if delimiter != "" {
- return nil, notSupportedDelimiter
- }
m.Lock()
defer m.Unlock()
objs := make([]Object, 0)
+ commonPrefixsMap := make(map[string]bool, 0)
for k := range m.objects {
if strings.HasPrefix(k, prefix) && k > marker {
o := m.objects[k]
+ if delimiter != "" {
+ remainString := strings.TrimPrefix(k, prefix)
+ if pos := strings.Index(remainString, delimiter); pos != -1 {
+ commonPrefix := remainString[0 : pos+1]
+ if _, ok := commonPrefixsMap[commonPrefix]; ok {
+ continue
+ }
+ f := &file{
+ obj{
+ prefix + commonPrefix,
+ 0,
+ time.Unix(0, 0),
+ strings.HasSuffix(commonPrefix, "/"),
+ },
+ o.owner,
+ o.group,
+ o.mode,
+ false,
+ }
+ objs = append(objs, f)
+ commonPrefixsMap[commonPrefix] = true
+ continue
+ }
+ }
+
f := &file{
obj{
k,
diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go
index a397ae825c0b..487c615a207d 100644
--- a/pkg/object/object_storage_test.go
+++ b/pkg/object/object_storage_test.go
@@ -222,6 +222,14 @@ func testStorage(t *testing.T, s ObjectStorage) {
}
}
defer s.Delete("a1")
+ if err := s.Put("a1", bytes.NewReader(br)); err != nil {
+ t.Fatalf("PUT failed: %s", err.Error())
+ }
+ defer s.Delete("a/b/c/d/e/f")
+ if err := s.Put("a/b/c/d/e/f", bytes.NewReader(br)); err != nil {
+ t.Fatalf("PUT failed: %s", err.Error())
+ }
+
if err := s.Put("a1", bytes.NewReader(br)); err != nil {
t.Fatalf("PUT failed: %s", err.Error())
}
@@ -243,6 +251,24 @@ func testStorage(t *testing.T, s ObjectStorage) {
}
}
+ if obs, err := s.List("a/", "", "/", 10); err != nil {
+ if !(errors.Is(err, notSupportedDelimiter) || errors.Is(err, notSupported)) {
+ t.Fatalf("list with delimiter: %s", err)
+ } else {
+ t.Logf("list api error: %s", err)
+ }
+ } else {
+ if len(obs) != 3 {
+ t.Fatalf("list with delimiter should return three results but got %d", len(obs))
+ }
+ keys := []string{"a/a", "a/a1", "a/b/"}
+ for i, o := range obs {
+ if o.Key() != keys[i] {
+ t.Fatalf("should get key %s but got %s", keys[i], o.Key())
+ }
+ }
+ }
+
// test redis cluster list all api
keyTotal := 100
var sortedKeys []string
diff --git a/pkg/vfs/internal.go b/pkg/vfs/internal.go
index 5d4dfa6463bc..d1013683fac3 100644
--- a/pkg/vfs/internal.go
+++ b/pkg/vfs/internal.go
@@ -356,7 +356,7 @@ func (v *VFS) handleInternalMsg(ctx meta.Context, cmd uint32, r *utils.Buffer, o
}
wb := utils.NewBuffer(4)
- r := meta.GetSummary(v.Meta, ctx, inode, &summary, recursive != 0)
+ r := v.Meta.GetSummary(ctx, inode, &summary, recursive != 0)
if r != 0 {
msg := r.Error()
wb.Put32(uint32(len(msg)))
@@ -426,9 +426,9 @@ func (v *VFS) handleInternalMsg(ctx meta.Context, cmd uint32, r *utils.Buffer, o
var r syscall.Errno
go func() {
if strict {
- r = meta.GetSummary(v.Meta, ctx, inode, &info.Summary, recursive != 0)
+ r = v.Meta.GetSummary(ctx, inode, &info.Summary, recursive != 0)
} else {
- r = meta.FastGetSummary(v.Meta, ctx, inode, &info.Summary, recursive != 0)
+ r = v.Meta.FastGetSummary(ctx, inode, &info.Summary, recursive != 0)
}
close(done)
}()
diff --git a/pkg/vfs/reader.go b/pkg/vfs/reader.go
index 9acdb767e46e..72c67240c5b5 100644
--- a/pkg/vfs/reader.go
+++ b/pkg/vfs/reader.go
@@ -703,6 +703,11 @@ func NewDataReader(conf *Config, m meta.Meta, store chunk.ChunkStore) DataReader
return r
}
+func (r *dataReader) readBufferUsed() int64 {
+ used := atomic.LoadInt64(&readBufferUsed)
+ return used
+}
+
func (r *dataReader) checkReadBuffer() {
for {
r.Lock()
diff --git a/pkg/vfs/vfs.go b/pkg/vfs/vfs.go
index 0d1cd0ef6fb0..70e15f9cfd60 100644
--- a/pkg/vfs/vfs.go
+++ b/pkg/vfs/vfs.go
@@ -927,6 +927,7 @@ type VFS struct {
handlersGause prometheus.GaugeFunc
usedBufferSize prometheus.GaugeFunc
storeCacheSize prometheus.GaugeFunc
+ readBufferUsed prometheus.GaugeFunc
registry *prometheus.Registry
}
@@ -955,7 +956,7 @@ func NewVFS(conf *Config, m meta.Meta, store chunk.ChunkStore, registerer promet
}
go v.cleanupModified()
- initVFSMetrics(v, writer, registerer)
+ initVFSMetrics(v, writer, reader, registerer)
return v
}
@@ -992,7 +993,7 @@ func (v *VFS) cleanupModified() {
}
}
-func initVFSMetrics(v *VFS, writer DataWriter, registerer prometheus.Registerer) {
+func initVFSMetrics(v *VFS, writer DataWriter, reader DataReader, registerer prometheus.Registerer) {
if registerer == nil {
return
}
@@ -1022,9 +1023,19 @@ func initVFSMetrics(v *VFS, writer DataWriter, registerer prometheus.Registerer)
}
return 0.0
})
+ v.readBufferUsed = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
+ Name: "used_read_buffer_size_bytes",
+ Help: "size of currently used buffer for read",
+ }, func() float64 {
+ if dr, ok := reader.(*dataReader); ok {
+ return float64(dr.readBufferUsed())
+ }
+ return 0.0
+ })
_ = registerer.Register(v.handlersGause)
_ = registerer.Register(v.usedBufferSize)
_ = registerer.Register(v.storeCacheSize)
+ _ = registerer.Register(v.readBufferUsed)
}
func InitMetrics(registerer prometheus.Registerer) {