Skip to content

Commit

Permalink
ceph: rados pool namespace support
Browse files Browse the repository at this point in the history
This patch adds codes that decode pool namespace information in
cap message and request reply. Pool namespace is saved in i_layout,
it will be passed to libceph when doing read/write.

Signed-off-by: Yan, Zheng <[email protected]>
  • Loading branch information
ukernel authored and idryomov committed Jul 28, 2016
1 parent cd08e0a commit 779fe0f
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 77 deletions.
67 changes: 52 additions & 15 deletions fs/ceph/addr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1730,14 +1730,16 @@ enum {
POOL_WRITE = 2,
};

static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
s64 pool, struct ceph_string *pool_ns)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
struct rb_node **p, *parent;
struct ceph_pool_perm *perm;
struct page **pages;
size_t pool_ns_len;
int err = 0, err2 = 0, have = 0;

down_read(&mdsc->pool_perm_rwsem);
Expand All @@ -1749,17 +1751,31 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
else if (pool > perm->pool)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
int ret = ceph_compare_string(pool_ns,
perm->pool_ns,
perm->pool_ns_len);
if (ret < 0)
p = &(*p)->rb_left;
else if (ret > 0)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
}
}
}
up_read(&mdsc->pool_perm_rwsem);
if (*p)
goto out;

dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
if (pool_ns)
dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
pool, (int)pool_ns->len, pool_ns->str);
else
dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);

down_write(&mdsc->pool_perm_rwsem);
p = &mdsc->pool_perm_tree.rb_node;
parent = NULL;
while (*p) {
parent = *p;
Expand All @@ -1769,8 +1785,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
else if (pool > perm->pool)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
int ret = ceph_compare_string(pool_ns,
perm->pool_ns,
perm->pool_ns_len);
if (ret < 0)
p = &(*p)->rb_left;
else if (ret > 0)
p = &(*p)->rb_right;
else {
have = perm->perm;
break;
}
}
}
if (*p) {
Expand All @@ -1788,6 +1813,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
rd_req->r_flags = CEPH_OSD_FLAG_READ;
osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
rd_req->r_base_oloc.pool = pool;
if (pool_ns)
rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);

err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
Expand Down Expand Up @@ -1841,14 +1868,20 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
goto out_unlock;
}

perm = kmalloc(sizeof(*perm), GFP_NOFS);
pool_ns_len = pool_ns ? pool_ns->len : 0;
perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
if (!perm) {
err = -ENOMEM;
goto out_unlock;
}

perm->pool = pool;
perm->perm = have;
perm->pool_ns_len = pool_ns_len;
if (pool_ns_len > 0)
memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
perm->pool_ns[pool_ns_len] = 0;

rb_link_node(&perm->node, parent, p);
rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
err = 0;
Expand All @@ -1860,19 +1893,20 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
out:
if (!err)
err = have;
dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
if (pool_ns)
dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
pool, (int)pool_ns->len, pool_ns->str, err);
else
dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
return err;
}

int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
{
s64 pool;
struct ceph_string *pool_ns;
int ret, flags;

/* does not support pool namespace yet */
if (ci->i_pool_ns_len)
return -EIO;

if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
NOPOOLPERM))
return 0;
Expand All @@ -1896,7 +1930,9 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
return 0;
}

ret = __ceph_pool_perm_get(ci, pool);
pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
ret = __ceph_pool_perm_get(ci, pool, pool_ns);
ceph_put_string(pool_ns);
if (ret < 0)
return ret;

Expand All @@ -1907,8 +1943,9 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
flags |= CEPH_I_POOL_WR;

spin_lock(&ci->i_ceph_lock);
if (pool == ci->i_layout.pool_id) {
ci->i_ceph_flags = flags;
if (pool == ci->i_layout.pool_id &&
pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
ci->i_ceph_flags |= flags;
} else {
pool = ci->i_layout.pool_id;
flags = ci->i_ceph_flags;
Expand Down
46 changes: 26 additions & 20 deletions fs/ceph/caps.c
Original file line number Diff line number Diff line change
Expand Up @@ -2779,12 +2779,11 @@ static void invalidate_aliases(struct inode *inode)
*/
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
u64 inline_version,
void *inline_data, int inline_len,
struct ceph_string **pns, u64 inline_version,
void *inline_data, u32 inline_len,
struct ceph_buffer *xattr_buf,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued,
u32 pool_ns_len)
struct ceph_cap *cap, int issued)
__releases(ci->i_ceph_lock)
__releases(mdsc->snap_rwsem)
{
Expand Down Expand Up @@ -2896,11 +2895,18 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
/* file layout may have changed */
s64 old_pool = ci->i_layout.pool_id;
struct ceph_string *old_ns;

ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
ci->i_pool_ns_len = pool_ns_len;
if (ci->i_layout.pool_id != old_pool)
old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, *pns);

if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;

*pns = old_ns;

/* size/truncate_seq? */
queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(grant->truncate_seq),
Expand Down Expand Up @@ -3423,20 +3429,18 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_cap *cap;
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
struct ceph_snap_realm *realm;
struct ceph_snap_realm *realm = NULL;
struct ceph_string *pool_ns = NULL;
int mds = session->s_mds;
int op, issued;
u32 seq, mseq;
struct ceph_vino vino;
u64 cap_id;
u64 size, max_size;
u64 tid;
u64 inline_version = 0;
void *inline_data = NULL;
u32 inline_len = 0;
void *snaptrace;
size_t snaptrace_len;
u32 pool_ns_len = 0;
void *p, *end;

dout("handle_caps from mds%d\n", mds);
Expand All @@ -3450,11 +3454,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
op = le32_to_cpu(h->op);
vino.ino = le64_to_cpu(h->ino);
vino.snap = CEPH_NOSNAP;
cap_id = le64_to_cpu(h->cap_id);
seq = le32_to_cpu(h->seq);
mseq = le32_to_cpu(h->migrate_seq);
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);

snaptrace = h + 1;
snaptrace_len = le32_to_cpu(h->snap_trace_len);
Expand Down Expand Up @@ -3493,6 +3494,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
u64 flush_tid;
u32 caller_uid, caller_gid;
u32 osd_epoch_barrier;
u32 pool_ns_len;
/* version >= 5 */
ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
/* version >= 6 */
Expand All @@ -3502,6 +3504,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_decode_32_safe(&p, end, caller_gid, bad);
/* version >= 8 */
ceph_decode_32_safe(&p, end, pool_ns_len, bad);
if (pool_ns_len > 0) {
ceph_decode_need(&p, end, pool_ns_len, bad);
pool_ns = ceph_find_or_create_string(p, pool_ns_len);
p += pool_ns_len;
}
}

/* lookup ino */
Expand All @@ -3522,7 +3529,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap = ceph_get_cap(mdsc, NULL);
cap->cap_ino = vino.ino;
cap->queue_release = 1;
cap->cap_id = cap_id;
cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq;
cap->seq = seq;
spin_lock(&session->s_cap_lock);
Expand Down Expand Up @@ -3557,10 +3564,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
handle_cap_grant(mdsc, inode, h,
handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued,
pool_ns_len);
msg->middle, session, cap, issued);
if (realm)
ceph_put_snap_realm(mdsc, realm);
goto done_unlocked;
Expand All @@ -3582,10 +3588,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h,
handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued,
pool_ns_len);
msg->middle, session, cap, issued);
goto done_unlocked;

case CEPH_CAP_OP_FLUSH_ACK:
Expand Down Expand Up @@ -3616,6 +3621,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(pool_ns);
return;

bad:
Expand Down
20 changes: 16 additions & 4 deletions fs/ceph/inode.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)

memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
ci->i_pool_ns_len = 0;

ci->i_fragtree = RB_ROOT;
mutex_init(&ci->i_fragtree_mutex);
Expand Down Expand Up @@ -571,7 +570,7 @@ void ceph_destroy_inode(struct inode *inode)
if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob);

ceph_put_string(ci->i_layout.pool_ns);
ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));

call_rcu(&inode->i_rcu, ceph_i_callback);
}
Expand Down Expand Up @@ -736,6 +735,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int issued = 0, implemented, new_issued;
struct timespec mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL;
struct ceph_string *pool_ns = NULL;
struct ceph_cap *new_cap = NULL;
int err = 0;
bool wake = false;
Expand Down Expand Up @@ -763,6 +763,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
iinfo->xattr_len);
}

if (iinfo->pool_ns_len > 0)
pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
iinfo->pool_ns_len);

spin_lock(&ci->i_ceph_lock);

/*
Expand Down Expand Up @@ -818,11 +822,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if (new_version ||
(new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
s64 old_pool = ci->i_layout.pool_id;
struct ceph_string *old_ns;

ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
ci->i_pool_ns_len = iinfo->pool_ns_len;
if (ci->i_layout.pool_id != old_pool)
old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);

if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;

pool_ns = old_ns;

queue_trunc = ceph_fill_file_size(inode, issued,
le32_to_cpu(info->truncate_seq),
le64_to_cpu(info->truncate_size),
Expand Down Expand Up @@ -989,6 +1000,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
ceph_put_cap(mdsc, new_cap);
if (xattr_blob)
ceph_buffer_put(xattr_blob);
ceph_put_string(pool_ns);
return err;
}

Expand Down
3 changes: 3 additions & 0 deletions fs/ceph/ioctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)
ceph_ino(inode), dl.object_no);

oloc.pool = ci->i_layout.pool_id;
oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
ceph_oid_printf(&oid, "%s", dl.object_name);

r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);

ceph_oloc_destroy(&oloc);
if (r < 0) {
up_read(&osdc->lock);
return r;
Expand Down
Loading

0 comments on commit 779fe0f

Please sign in to comment.