This patch implements inline data support for Ceph. Signed-off-by: Yunchuan Wen <yunchuanwen@xxxxxxxxxxxxxxx> Signed-off-by: Li Wang <liwang@xxxxxxxxxxxxxxx> --- Against v1: With simplified process under multiple-writer case, referred to http://pad.ceph.com/p/mds-inline-data, http://www.spinics.net/lists/ceph-devel/msg16018.html --- src/ceph_mds.cc | 1 + src/client/Client.cc | 202 +++++++++++++++++++++++++++++++++++++------ src/client/Client.h | 4 + src/client/Inode.h | 5 ++ src/include/ceph_features.h | 2 + src/include/ceph_fs.h | 3 + src/include/rados.h | 1 + src/mds/CInode.cc | 22 +++++ src/mds/Capability.h | 2 + src/mds/Locker.cc | 7 ++ src/mds/mdstypes.cc | 12 ++- src/mds/mdstypes.h | 3 + src/messages/MClientCaps.h | 18 +++- src/messages/MClientReply.h | 9 ++ src/osd/ReplicatedPG.cc | 5 +- src/osdc/Objecter.h | 21 ++++- 16 files changed, 283 insertions(+), 34 deletions(-) diff --git a/src/ceph_mds.cc b/src/ceph_mds.cc index 88b807b..dac676f 100644 --- a/src/ceph_mds.cc +++ b/src/ceph_mds.cc @@ -243,6 +243,7 @@ int main(int argc, const char **argv) CEPH_FEATURE_UID | CEPH_FEATURE_NOSRCADDR | CEPH_FEATURE_DIRLAYOUTHASH | + CEPH_FEATURE_MDS_INLINE_DATA | CEPH_FEATURE_PGID64 | CEPH_FEATURE_MSG_AUTH; uint64_t required = diff --git a/src/client/Client.cc b/src/client/Client.cc index 77fd208..f47579f 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -485,6 +485,8 @@ void Client::update_inode_file_bits(Inode *in, uint64_t time_warp_seq, utime_t ctime, utime_t mtime, utime_t atime, + uint64_t inline_version, + bufferlist& inline_data, int issued) { bool warn = false; @@ -495,6 +497,11 @@ void Client::update_inode_file_bits(Inode *in, << " local " << in->time_warp_seq << dendl; uint64_t prior_size = in->size; + if (inline_version > in->inline_version) { + in->inline_data = inline_data; + in->inline_version = inline_version; + } + if (truncate_seq > in->truncate_seq || (truncate_seq == in->truncate_seq && size > in->size)) { ldout(cct, 10) << "size " << in->size << " -> " << size << dendl; @@ -511,6 +518,13 @@ void Client::update_inode_file_bits(Inode *in, _invalidate_inode_cache(in, truncate_size, prior_size - truncate_size, true); } } + + // truncate inline data + if (in->inline_version < CEPH_INLINE_DISABLED) { + uint32_t len = in->inline_data.length(); + if (size < len) + in->inline_data.splice(size, len - size); + } } if (truncate_seq >= in->truncate_seq && in->truncate_size != truncate_size) { @@ -645,6 +659,7 @@ Inode * Client::add_update_inode(InodeStat *st, utime_t from, MetaSession *sessi update_inode_file_bits(in, st->truncate_seq, st->truncate_size, st->size, st->time_warp_seq, st->ctime, st->mtime, st->atime, + st->inline_version, st->inline_data, issued); } @@ -2353,6 +2368,11 @@ void Client::send_cap(Inode *in, MetaSession *session, Cap *cap, in->ctime.encode_timeval(&m->head.ctime); m->head.time_warp_seq = in->time_warp_seq; + if (flush & CEPH_CAP_FILE_WR) { + m->inline_version = in->inline_version; + m->inline_data = in->inline_data; + } + in->reported_size = in->size; m->set_snap_follows(follows); cap->wanted = want; @@ -3482,7 +3502,9 @@ void Client::handle_cap_trunc(MetaSession *session, Inode *in, MClientCaps *m) issued |= implemented; update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(), m->get_time_warp_seq(), m->get_ctime(), - m->get_mtime(), m->get_atime(), issued); + m->get_mtime(), m->get_atime(), + m->inline_version, m->inline_data, + issued); m->put(); } @@ -3589,7 +3611,8 @@ void Client::handle_cap_grant(MetaSession *session, Inode *in, Cap *cap, MClient in->xattr_version = m->head.xattr_version; } update_inode_file_bits(in, m->get_truncate_seq(), m->get_truncate_size(), m->get_size(), - m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(), issued); + m->get_time_warp_seq(), m->get_ctime(), m->get_mtime(), m->get_atime(), + m->inline_version, m->inline_data, issued); // max_size if (cap == in->auth_cap && @@ -5643,6 +5666,57 @@ void Client::unlock_fh_pos(Fh *f) f->pos_locked = false; } +int Client::migration_inline_data(Inode *in) +{ + ObjectOperation ops; + bufferlist inline_version_bl; + ::encode(in->inline_version, inline_version_bl); + ops.cmpxattr("inline_version", + CEPH_OSD_CMPXATTR_OP_GT, + CEPH_OSD_CMPXATTR_MODE_U64, + CEPH_OSD_OP_FLAG_NOENTOK, + inline_version_bl); + bufferlist inline_data = in->inline_data; + ops.write(0, inline_data, in->truncate_size, in->truncate_seq); + ops.setxattr("inline_version", inline_version_bl); + + char oid_buf[32]; + snprintf(oid_buf, sizeof(oid_buf), "%llx.00000000", (long long unsigned)in->ino); + object_t oid = oid_buf; + + Mutex flock("Client::migration_inline_data flock"); + Cond cond; + bool done = false; + int ret; + Context *oncommit = new C_SafeCond(&flock, &cond, &done, &ret); + + objecter->mutate(oid, + OSDMap::file_to_object_locator(in->layout), + ops, + in->snaprealm->get_snap_context(), + ceph_clock_now(cct), + 0, + NULL, + oncommit); + + client_lock.Unlock(); + flock.Lock(); + while (!done) + cond.Wait(flock); + flock.Unlock(); + client_lock.Lock(); + + if (ret >= 0 || ret == -ECANCELED) { + in->inline_data.clear(); + in->inline_version = CEPH_INLINE_DISABLED; + mark_caps_dirty(in, CEPH_CAP_FILE_WR); + check_caps(in, false); + + ret = 0; + } + + return ret; +} // @@ -5688,6 +5762,30 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl) movepos = true; } + if (in->inline_version < CEPH_INLINE_DISABLED) { + if (!(have & CEPH_CAP_FILE_CACHE)) { + r = migration_inline_data(in); + if (r < 0) + goto done; + } else { + uint32_t len = in->inline_data.length(); + + uint64_t endoff = offset + size; + if (endoff > in->size) + endoff = in->size; + + if (endoff > len) { + if (offset < len) + bl->substr_of(in->inline_data, offset, len - offset); + bl->append_zero(endoff - len); + } else if (endoff > (uint64_t)offset) { + bl->substr_of(in->inline_data, offset, endoff - offset); + } + + goto success; + } + } + if (!conf->client_debug_force_sync_read && (cct->_conf->client_oc && (have & CEPH_CAP_FILE_CACHE))) { @@ -5704,6 +5802,8 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl) goto done; } +success: + if (movepos) { // adjust fd pos f->pos = offset+bl->length(); @@ -5995,6 +6095,29 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl; + if (in->inline_version < CEPH_INLINE_DISABLED) { + if (endoff > CEPH_INLINE_SIZE || !(have & CEPH_CAP_FILE_BUFFER)) { + r = migration_inline_data(in); + if (r < 0) + goto done; + } else { + uint32_t len = in->inline_data.length(); + + if (endoff < len) + in->inline_data.copy(endoff, len - endoff, bl); + + if (offset < len) + in->inline_data.splice(offset, len - offset); + else if (offset > len) + in->inline_data.append_zero(offset - len); + + in->inline_data.append(bl); + in->inline_version++; + + goto success; + } + } + if (cct->_conf->client_oc && (have & CEPH_CAP_FILE_BUFFER)) { // do buffered write if (!in->oset.dirty_or_tx) @@ -6045,7 +6168,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) } // if we get here, write was successful, update client metadata - +success: // time lat = ceph_clock_now(cct); lat -= start; @@ -7719,33 +7842,60 @@ int Client::_fallocate(Fh *fh, int mode, int64_t offset, int64_t length) return r; if (mode & FALLOC_FL_PUNCH_HOLE) { - Mutex flock("Client::_punch_hole flock"); - Cond cond; - bool done = false; - Context *onfinish = new C_SafeCond(&flock, &cond, &done); - Context *onsafe = new C_Client_SyncCommit(this, in); + if (in->inline_version < CEPH_INLINE_DISABLED && + (have & CEPH_CAP_FILE_BUFFER)) { + bufferlist bl; + int len = in->inline_data.length(); + if (offset < len) { + if (offset > 0) + in->inline_data.copy(0, offset, bl); + int size = length; + if (offset + size > len) + size = len - offset; + if (size > 0) + bl.append_zero(size); + if (offset + size < len) + in->inline_data.copy(offset + size, len - offset - size, bl); + in->inline_data = bl; + in->inline_version++; + } + in->mtime = ceph_clock_now(cct); + mark_caps_dirty(in, CEPH_CAP_FILE_WR); + } else { + if (in->inline_version < CEPH_INLINE_DISABLED) { + r = migration_inline_data(in); + if (r < 0) + goto done; + } - unsafe_sync_write++; - get_cap_ref(in, CEPH_CAP_FILE_BUFFER); + Mutex flock("Client::_punch_hole flock"); + Cond cond; + bool done = false; + Context *onfinish = new C_SafeCond(&flock, &cond, &done); + Context *onsafe = new C_Client_SyncCommit(this, in); - _invalidate_inode_cache(in, offset, length, true); - r = filer->zero(in->ino, &in->layout, - in->snaprealm->get_snap_context(), - offset, length, - ceph_clock_now(cct), - 0, true, onfinish, onsafe); - if (r < 0) - goto done; + unsafe_sync_write++; + get_cap_ref(in, CEPH_CAP_FILE_BUFFER); - in->mtime = ceph_clock_now(cct); - mark_caps_dirty(in, CEPH_CAP_FILE_WR); + _invalidate_inode_cache(in, offset, length, true); + r = filer->zero(in->ino, &in->layout, + in->snaprealm->get_snap_context(), + offset, length, + ceph_clock_now(cct), + 0, true, onfinish, onsafe); + if (r < 0) + goto done; - client_lock.Unlock(); - flock.Lock(); - while (!done) - cond.Wait(flock); - flock.Unlock(); - client_lock.Lock(); + in->mtime = ceph_clock_now(cct); + mark_caps_dirty(in, CEPH_CAP_FILE_WR); + + client_lock.Unlock(); + flock.Lock(); + while (!done) + cond.Wait(flock); + flock.Unlock(); + client_lock.Lock(); + } } else if (!(mode & FALLOC_FL_KEEP_SIZE)) { uint64_t size = offset + length; if (size > in->size) { diff --git a/src/client/Client.h b/src/client/Client.h index c7c9cef..5fc05f4 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -420,6 +420,9 @@ protected: void handle_lease(MClientLease *m); + // inline data + int migration_inline_data(Inode *in); + // file caps void check_cap_issue(Inode *in, Cap *cap, unsigned issued); void add_update_cap(Inode *in, MetaSession *session, uint64_t cap_id, @@ -495,6 +498,7 @@ protected: void update_inode_file_bits(Inode *in, uint64_t truncate_seq, uint64_t truncate_size, uint64_t size, uint64_t time_warp_seq, utime_t ctime, utime_t mtime, utime_t atime, + uint64_t inline_version, bufferlist& inline_data, int issued); Inode *add_update_inode(InodeStat *st, utime_t ttl, MetaSession *session); Dentry *insert_dentry_inode(Dir *dir, const string& dname, LeaseStat *dlease, diff --git a/src/client/Inode.h b/src/client/Inode.h index cc054a6..bb17706 100644 --- a/src/client/Inode.h +++ b/src/client/Inode.h @@ -111,6 +111,10 @@ class Inode { version_t version; // auth only version_t xattr_version; + // inline data + uint64_t inline_version; + bufferlist inline_data; + bool is_symlink() const { return (mode & S_IFMT) == S_IFLNK; } bool is_dir() const { return (mode & S_IFMT) == S_IFDIR; } bool is_file() const { return (mode & S_IFMT) == S_IFREG; } @@ -207,6 +211,7 @@ class Inode { rdev(0), mode(0), uid(0), gid(0), nlink(0), size(0), truncate_seq(1), truncate_size(-1), time_warp_seq(0), max_size(0), version(0), xattr_version(0), + inline_version(0), flags(0), dir_hashed(false), dir_replicated(false), auth_cap(NULL), dirty_caps(0), flushing_caps(0), flushing_cap_seq(0), shared_gen(0), cache_gen(0), diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index c0f01cc..70ee921 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -40,6 +40,7 @@ #define CEPH_FEATURE_MON_SCRUB (1ULL<<33) #define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34) #define CEPH_FEATURE_OSD_CACHEPOOL (1ULL<<35) +#define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<36) /* * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature @@ -103,6 +104,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) { CEPH_FEATURE_MON_SCRUB | \ CEPH_FEATURE_OSD_PACKED_RECOVERY | \ CEPH_FEATURE_OSD_CACHEPOOL | \ + CEPH_FEATURE_MDS_INLINE_DATA | \ 0ULL) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/include/ceph_fs.h b/src/include/ceph_fs.h index 6c41d14..406b51e 100644 --- a/src/include/ceph_fs.h +++ b/src/include/ceph_fs.h @@ -522,6 +522,9 @@ struct ceph_filelock { int ceph_flags_to_mode(int flags); +/* inline data state */ +#define CEPH_INLINE_DISABLED ((__u64)-1) +#define CEPH_INLINE_SIZE (1 << 12) /* capability bits */ #define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */ diff --git a/src/include/rados.h b/src/include/rados.h index 178c171..c387a2e 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -342,6 +342,7 @@ enum { enum { CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */ CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */ + CEPH_OSD_OP_FLAG_NOENTOK = 4, /* ignore NOENT error */ }; #define EOLDSNAPC 85 /* ORDERSNAP flag set; writer has old snapc*/ diff --git a/src/mds/CInode.cc b/src/mds/CInode.cc index 46f8d33..729f126 100644 --- a/src/mds/CInode.cc +++ b/src/mds/CInode.cc @@ -2825,6 +2825,16 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, e.files = i->dirstat.nfiles; e.subdirs = i->dirstat.nsubdirs; + // inline data + uint64_t inline_version = 0; + bufferlist inline_data; + if (!cap || (cap->client_inline_version < i->inline_version)) { + inline_version = i->inline_version; + inline_data = i->inline_data; + if (cap) + cap->client_inline_version = i->inline_version; + } + // nest (do same as file... :/) i->rstat.rctime.encode_timeval(&e.rctime); e.rbytes = i->rstat.rbytes; @@ -2863,6 +2873,7 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, bytes += (sizeof(__u32) + sizeof(__u32)) * dirfragtree._splits.size(); bytes += sizeof(__u32) + symlink.length(); bytes += sizeof(__u32) + xbl.length(); + bytes += sizeof(__u64) + sizeof(__u32) + inline_data.length(); if (bytes > max_bytes) return -ENOSPC; } @@ -2958,6 +2969,10 @@ int CInode::encode_inodestat(bufferlist& bl, Session *session, ::encode(i->dir_layout, bl); } ::encode(xbl, bl); + if (session->connection->has_feature(CEPH_FEATURE_MDS_INLINE_DATA)) { + ::encode(inline_version, bl); + ::encode(inline_data, bl); + } return valid; } @@ -2990,6 +3005,13 @@ void CInode::encode_cap_message(MClientCaps *m, Capability *cap) i->atime.encode_timeval(&m->head.atime); m->head.time_warp_seq = i->time_warp_seq; + if (cap->client_inline_version < i->inline_version) { + m->inline_version = cap->client_inline_version = i->inline_version; + m->inline_data = i->inline_data; + } else { + m->inline_version = 0; + } + // max_size is min of projected, actual. uint64_t oldms = oi->client_ranges.count(client) ? oi->client_ranges[client].range.last : 0; uint64_t newms = pi->client_ranges.count(client) ? pi->client_ranges[client].range.last : 0; diff --git a/src/mds/Capability.h b/src/mds/Capability.h index fb6b3dc..995ea3a 100644 --- a/src/mds/Capability.h +++ b/src/mds/Capability.h @@ -209,6 +209,7 @@ private: public: snapid_t client_follows; version_t client_xattr_version; + uint64_t client_inline_version; xlist<Capability*>::item item_session_caps; xlist<Capability*>::item item_snaprealm_caps; @@ -223,6 +224,7 @@ public: mseq(0), suppress(0), stale(false), client_follows(0), client_xattr_version(0), + client_inline_version(0), item_session_caps(this), item_snaprealm_caps(this) { g_num_cap++; g_num_capa++; diff --git a/src/mds/Locker.cc b/src/mds/Locker.cc index 99bd761..4f1d322 100644 --- a/src/mds/Locker.cc +++ b/src/mds/Locker.cc @@ -2686,6 +2686,7 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t * utime_t mtime = m->get_mtime(); utime_t ctime = m->get_ctime(); uint64_t size = m->get_size(); + uint64_t inline_version = m->inline_version; if (((dirty & CEPH_CAP_FILE_WR) && mtime > pi->mtime) || ((dirty & CEPH_CAP_FILE_EXCL) && mtime != pi->mtime)) { @@ -2705,6 +2706,12 @@ void Locker::_update_cap_fields(CInode *in, int dirty, MClientCaps *m, inode_t * pi->size = size; pi->rstat.rbytes = size; } + if (in->inode.is_file() && + (dirty & CEPH_CAP_FILE_WR) && + inline_version > pi->inline_version) { + pi->inline_version = inline_version; + pi->inline_data = m->inline_data; + } if ((dirty & CEPH_CAP_FILE_EXCL) && atime != pi->atime) { dout(7) << " atime " << pi->atime << " -> " << atime << " for " << *in << dendl; diff --git a/src/mds/mdstypes.cc b/src/mds/mdstypes.cc index 6886786..8634adf 100644 --- a/src/mds/mdstypes.cc +++ b/src/mds/mdstypes.cc @@ -204,7 +204,7 @@ ostream& operator<<(ostream& out, const client_writeable_range_t& r) */ void inode_t::encode(bufferlist &bl) const { - ENCODE_START(7, 6, bl); + ENCODE_START(8, 8, bl); ::encode(ino, bl); ::encode(rdev, bl); @@ -227,6 +227,8 @@ void inode_t::encode(bufferlist &bl) const ::encode(mtime, bl); ::encode(atime, bl); ::encode(time_warp_seq, bl); + ::encode(inline_version, bl); + ::encode(inline_data, bl); ::encode(client_ranges, bl); ::encode(dirstat, bl); @@ -244,7 +246,7 @@ void inode_t::encode(bufferlist &bl) const void inode_t::decode(bufferlist::iterator &p) { - DECODE_START_LEGACY_COMPAT_LEN(7, 6, 6, p); + DECODE_START_LEGACY_COMPAT_LEN(8, 6, 6, p); ::decode(ino, p); ::decode(rdev, p); @@ -273,6 +275,12 @@ void inode_t::decode(bufferlist::iterator &p) ::decode(mtime, p); ::decode(atime, p); ::decode(time_warp_seq, p); + if (struct_v >= 8) { + ::decode(inline_version, p); + ::decode(inline_data, p); + } else { + inline_version = CEPH_INLINE_DISABLED; + } if (struct_v >= 3) { ::decode(client_ranges, p); } else { diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 902e310..928167c 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -335,6 +335,8 @@ struct inode_t { utime_t mtime; // file data modify time. utime_t atime; // file data access time. uint32_t time_warp_seq; // count of (potential) mtime/atime timewarps (i.e., utimes()) + bufferlist inline_data; + uint64_t inline_version; map<client_t,client_writeable_range_t> client_ranges; // client(s) can write to these ranges @@ -356,6 +358,7 @@ struct inode_t { size(0), truncate_seq(0), truncate_size(0), truncate_from(0), truncate_pending(0), time_warp_seq(0), + inline_version(1), version(0), file_data_version(0), xattr_version(0), backtrace_version(0) { clear_layout(); memset(&dir_layout, 0, sizeof(dir_layout)); diff --git a/src/messages/MClientCaps.h b/src/messages/MClientCaps.h index 117f241..260d714 100644 --- a/src/messages/MClientCaps.h +++ b/src/messages/MClientCaps.h @@ -21,7 +21,7 @@ class MClientCaps : public Message { - static const int HEAD_VERSION = 2; // added flock metadata + static const int HEAD_VERSION = 3; // added flock metadata, inline data static const int COMPAT_VERSION = 1; public: @@ -29,6 +29,8 @@ class MClientCaps : public Message { bufferlist snapbl; bufferlist xattrbl; bufferlist flockbl; + uint64_t inline_version; + bufferlist inline_data; int get_caps() { return head.caps; } int get_wanted() { return head.wanted; } @@ -148,6 +150,13 @@ public: if (head.xattr_len) xattrbl = middle; + if (header.version >= 3) { + ::decode(inline_version, p); + ::decode(inline_data, p); + } else { + inline_version = CEPH_INLINE_DISABLED; + } + // conditionally decode flock metadata if (header.version >= 2) ::decode(flockbl, p); @@ -160,6 +169,13 @@ public: middle = xattrbl; + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ::encode(inline_version, payload); + ::encode(inline_data, payload); + } else { + header.version = 2; + } + // conditionally include flock metadata if (features & CEPH_FEATURE_FLOCK) { ::encode(flockbl, payload); diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h index 896245f..a8e83c2 100644 --- a/src/messages/MClientReply.h +++ b/src/messages/MClientReply.h @@ -108,6 +108,8 @@ struct InodeStat { uint64_t truncate_size; utime_t ctime, mtime, atime; version_t time_warp_seq; + bufferlist inline_data; + uint64_t inline_version; frag_info_t dirstat; nest_info_t rstat; @@ -174,6 +176,13 @@ struct InodeStat { xattr_version = e.xattr_version; ::decode(xattrbl, p); + + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ::decode(inline_version, p); + ::decode(inline_data, p); + } else { + inline_version = CEPH_INLINE_DISABLED; + } } // see CInode::encode_inodestat for encoder. diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b391e17..30f7d01 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -2398,8 +2398,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops) result = osd->store->getattr(coll, soid, name.c_str(), xattr); else result = osd->store->getattr(coll, src_obc->obs.oi.soid, name.c_str(), xattr); - if (result < 0 && result != -EEXIST && result != -ENODATA) + int flags = le32_to_cpu(op.flags); + if (result < 0 && result != -EEXIST && result != -ENODATA && + (!(flags & CEPH_OSD_OP_FLAG_NOENTOK) || result != -ENOENT)) { break; + } ctx->delta_stats.num_rd++; ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(xattr.length(), 10); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 154ee41..230745b 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -112,9 +112,10 @@ struct ObjectOperation { osd_op.indata.append(name); osd_op.indata.append(data); } - void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& data) { + void add_xattr_cmp(int op, const char *name, uint8_t cmp_op, uint8_t cmp_mode, uint32_t flags, const bufferlist& data) { OSDOp& osd_op = add_op(op); osd_op.op.op = op; + osd_op.op.flags = flags; osd_op.op.xattr.name_len = (name ? strlen(name) : 0); osd_op.op.xattr.value_len = data.length(); osd_op.op.xattr.cmp_op = cmp_op; @@ -279,8 +280,16 @@ struct ObjectOperation { out_handler[p] = h; out_rval[p] = prval; } - void write(uint64_t off, bufferlist& bl) { + void write(uint64_t off, bufferlist& bl, + uint64_t truncate_size, + uint32_t truncate_seq) { add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl); + OSDOp& o = *ops.rbegin(); + o.op.extent.truncate_size = truncate_size; + o.op.extent.truncate_seq = truncate_seq; + } + void write(uint64_t off, bufferlist& bl) { + write(off, bl, 0, 0); } void write_full(bufferlist& bl) { add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); @@ -453,7 +462,10 @@ struct ObjectOperation { add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); } void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, const bufferlist& bl) { - add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl); + add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, 0, bl); + } + void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode, uint32_t flags, const bufferlist& bl) { + add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, flags, bl); } void rmxattr(const char *name) { bufferlist bl; @@ -733,11 +745,12 @@ struct ObjectOperation { } void cmpxattr(const char *name, const bufferlist& val, - int op, int mode) { + int op, int mode, int flags = 0) { add_xattr(CEPH_OSD_OP_CMPXATTR, name, val); OSDOp& o = *ops.rbegin(); o.op.xattr.cmp_op = op; o.op.xattr.cmp_mode = mode; + o.op.flags = flags; } void src_cmpxattr(const object_t& srcoid, snapid_t srcsnapid, const char *name, const bufferlist& val, -- 1.7.9.5 -- To unsubscribe from this list: send the line "unsubscribe ceph-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html