@@ -63,8 +63,10 @@ public:
int create(pool_t pool, const std::string& oid, bool exclusive);
int write(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
+ int write(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
int write_full(pool_t pool, const std::string& oid, bufferlist& bl);
int read(pool_t pool, const std::string& oid, off_t off, bufferlist& bl, size_t len);
+ int read(pool_t pool, const std::string& oid, off_t off, void *buf, size_t len);
int remove(pool_t pool, const std::string& oid);
int trunc(pool_t pool, const std::string& oid, size_t size);
@@ -135,4 +137,3 @@ public:
}
#endif
-
@@ -72,11 +72,12 @@ class RadosClient : public Dispatcher
Mutex lock;
Cond cond;
+ static hash_map<tid_t, bufferptr*> buffer_map;
+ static bufferptr* fetch_buffer_func(tid_t tid);
-
public:
RadosClient() : messenger(NULL), lock("radosclient") {
- messenger = new SimpleMessenger();
+ messenger = new SimpleMessenger(&RadosClient::fetch_buffer_func);
}
~RadosClient();
@@ -132,8 +133,10 @@ public:
// io
int create(PoolCtx& pool, const object_t& oid, bool exclusive);
int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+ int write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
int write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl);
int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+ int read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len);
int remove(PoolCtx& pool, const object_t& oid);
int stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime);
int trunc(PoolCtx& pool, const object_t& oid, size_t size);
@@ -870,6 +873,15 @@ int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist
return len;
}
+int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
+{
+ bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
+ bufferlist bl;
+
+ bl.push_back(bp);
+ return write(pool, oid, off, bl, len);
+}
+
int RadosClient::write_full(PoolCtx& pool, const object_t& oid, bufferlist& bl)
{
utime_t ut = g_clock.now();
@@ -1116,6 +1128,46 @@ int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist&
return bl.length();
}
+int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, void *buf, size_t len)
+{
+ SnapContext snapc;
+
+ Mutex mylock("RadosClient::read::mylock");
+ Cond cond;
+ bool done;
+ int r;
+ Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ bufferptr bp = buffer::create_static(len, static_cast<char *>(buf));
+ bufferlist bl;
+
+ bl.push_back(bp);
+ lock.Lock();
+ ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
+ tid_t tid = objecter->get_tid();
+ buffer_map[tid] = &bp;
+ objecter->read_with_tid(oid, layout,
+ off, len, pool.snap_seq, &bl, 0,
+ onack, tid);
+ lock.Unlock();
+
+ mylock.Lock();
+ while (!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
+ buffer_map.erase(tid);
+ dout(10) << "Objecter returned from read r=" << r << dendl;
+
+ if (r < 0)
+ return r;
+
+ if (bl.length() < len) {
+ dout(10) << "Returned length " << bl.length()
+ << " less than original length "<< len << dendl;
+ }
+
+ return bl.length();
+}
+
int RadosClient::stat(PoolCtx& pool, const object_t& oid, uint64_t *psize, time_t *pmtime)
{
SnapContext snapc;
@@ -1251,6 +1303,13 @@ int RadosClient::getxattrs(PoolCtx& pool, const object_t& oid, map<std::string,
return r;
}
+hash_map<tid_t, bufferptr*> RadosClient::buffer_map;
+
+bufferptr* RadosClient::fetch_buffer_func(tid_t tid)
+{
+ return buffer_map[tid];
+}
+
// ---------------------------------------------
namespace librados {
@@ -1401,6 +1460,14 @@ int Rados::write(rados_pool_t pool, const string& o, off_t off, bufferlist& bl,
return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
}
+int Rados::write(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
+{
+ if (!client)
+ return -EINVAL;
+ object_t oid(o);
+ return ((RadosClient *)client)->write(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
+}
+
int Rados::write_full(rados_pool_t pool, const string& o, bufferlist& bl)
{
if (!client)
@@ -1433,6 +1500,14 @@ int Rados::read(rados_pool_t pool, const string& o, off_t off, bufferlist& bl, s
return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
}
+int Rados::read(rados_pool_t pool, const string& o, off_t off, void *buf, size_t len)
+{
+ if (!client)
+ return -EINVAL;
+ object_t oid(o);
+ return ((RadosClient *)client)->read(*(RadosClient::PoolCtx *)pool, oid, off, buf, len);
+}
+
int Rados::getxattr(rados_pool_t pool, const string& o, const char *name, bufferlist& bl)
{
if (!client)
@@ -51,7 +51,7 @@ static ostream& _prefix(SimpleMessenger *messenger) {
#define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
-
+SimpleMessenger::fetch_buffer_callback_t SimpleMessenger::fetch_buffer_callback = 0;
/********************************************
* Accepter
@@ -1786,36 +1786,47 @@ int SimpleMessenger::Pipe::read_message(Message **pm)
data_len = le32_to_cpu(header.data_len);
data_off = le32_to_cpu(header.data_off);
if (data_len) {
- int left = data_len;
- if (data_off & ~PAGE_MASK) {
- // head
- int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
- (unsigned)left);
- bufferptr bp = buffer::create(head);
- if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+ if (fetch_buffer_callback) {
+ bufferptr *bpp = fetch_buffer_callback(header.tid);
+ if (!bpp)
+ goto allocate_buffer;
+ if (tcp_read( sd, bpp->c_str(), data_len, messenger->timeout ) < 0)
goto out_dethrottle;
- data.push_back(bp);
- left -= head;
- dout(20) << "reader got data head " << head << dendl;
- }
+ data.push_back(*bpp);
+ dout(20) << "reader got data " << data_len << dendl;
+ }else{
+ allocate_buffer:
+ int left = data_len;
+ if (data_off & ~PAGE_MASK) {
+ // head
+ int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
+ (unsigned)left);
+ bufferptr bp = buffer::create(head);
+ if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
+ goto out_dethrottle;
+ data.push_back(bp);
+ left -= head;
+ dout(20) << "reader got data head " << head << dendl;
+ }
- // middle
- int middle = left & PAGE_MASK;
- if (middle > 0) {
- bufferptr bp = buffer::create_page_aligned(middle);
- if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
- goto out_dethrottle;
- data.push_back(bp);
- left -= middle;
- dout(20) << "reader got data page-aligned middle " << middle << dendl;
- }
+ // middle
+ int middle = left & PAGE_MASK;
+ if (middle > 0) {
+ bufferptr bp = buffer::create_page_aligned(middle);
+ if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
+ goto out_dethrottle;
+ data.push_back(bp);
+ left -= middle;
+ dout(20) << "reader got data page-aligned middle " << middle << dendl;
+ }
- if (left) {
- bufferptr bp = buffer::create(left);
- if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
- goto out_dethrottle;
- data.push_back(bp);
- dout(20) << "reader got data tail " << left << dendl;
+ if (left) {
+ bufferptr bp = buffer::create(left);
+ if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
+ goto out_dethrottle;
+ data.push_back(bp);
+ dout(20) << "reader got data tail " << left << dendl;
+ }
}
}
@@ -567,6 +567,9 @@ private:
SimpleMessenger *messenger; //hack to make dout macro work, will fix
int timeout;
+ typedef bufferptr* (*fetch_buffer_callback_t) (tid_t);
+ static fetch_buffer_callback_t fetch_buffer_callback;
+
public:
SimpleMessenger() :
Messenger(entity_name_t()),
@@ -580,6 +583,20 @@ public:
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
}
+
+ SimpleMessenger(fetch_buffer_callback_t callback) :
+ Messenger(entity_name_t()),
+ accepter(this),
+ lock("SimpleMessenger::lock"), started(false), did_bind(false),
+ dispatch_throttler(g_conf.ms_dispatch_throttle_bytes), need_addr(true),
+ destination_stopped(true), my_type(-1),
+ global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
+ reaper_thread(this), reaper_started(false), reaper_stop(false),
+ dispatch_thread(this), messenger(this) {
+ fetch_buffer_callback = callback;
+ // for local dmsg delivery
+ dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
+ }
~SimpleMessenger() {
delete dispatch_queue.local_pipe;
}
@@ -530,6 +530,21 @@ private:
o->outbl = pbl;
return op_submit(o);
}
+ tid_t read_with_tid(const object_t& oid, ceph_object_layout ol,
+ uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
+ Context *onfinish, tid_t tid) {
+ vector<OSDOp> ops(1);
+ ops[0].op.op = CEPH_OSD_OP_READ;
+ ops[0].op.extent.offset = off;
+ ops[0].op.extent.length = len;
+ ops[0].op.extent.truncate_size = 0;
+ ops[0].op.extent.truncate_seq = 0;
+ Op *o = new Op(oid, ol, ops, flags, onfinish, 0);
+ o->snapid = snap;
+ o->outbl = pbl;
+ o->tid = tid;
+ return op_submit(o);
+ }
tid_t read_trunc(const object_t& oid, ceph_object_layout ol,
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
uint64_t trunc_size, __u32 trunc_seq,
@@ -725,6 +740,8 @@ private:
void list_objects(ListContext *p, Context *onfinish);
+ tid_t get_tid(void) { return ++last_tid; }
+
// -------------------------
// pool ops
private: