@@ -1,3 +1,4 @@
+#include "common/ITX.h"
#include "JournalingObjectStore.h"
@@ -286,6 +287,7 @@
assert(journal_lock.is_locked());
dout(10) << "op_journal_transactions " << op << " " << tls << dendl;
+ itx_profile.write_checkpoint("JournalingObjectStore::_op_journal_transactions: ", ITX_JOURNAL_PROCESS_TRANSACTION_START, op, tls);
if (journal && journal->is_writeable()) {
bufferlist tbl;
unsigned data_len = 0, data_align = 0;
@@ -197,6 +197,9 @@
uint32_t get_data_length() {
return largest_data_len;
}
+ uint32_t get_largest_data_off() {
+ return largest_data_off;
+ }
uint32_t get_data_offset() {
if (largest_data_off_in_tbl) {
return largest_data_off_in_tbl +
@@ -29,6 +29,7 @@
#include "global/global_init.h"
#include "messages/MGenericMessage.h"
+#include "common/ITX.h"
#include <netdb.h>
@@ -307,6 +308,7 @@
pipe->pipe_lock.Lock();
list<Message *>& m_queue = pipe->in_q[priority];
Message *m = m_queue.front();
+ int qlen = m_queue.size();
m_queue.pop_front();
if (m_queue.empty()) {
@@ -351,7 +353,7 @@
uint64_t msize = m->get_dispatch_throttle_size();
m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message.
- ldout(cct,1) << "<== " << m->get_source_inst()
+ ldout(cct,1) << "<== " << "[prio: " << priority << ", qsize: " << qlen << "] " << m->get_source_inst()
<< " " << m->get_seq()
<< " ==== " << *m
<< " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
@@ -360,6 +362,7 @@
<< " " << m->get_footer().data_crc << ")"
<< " " << m << " con " << m->get_connection()
<< dendl;
+ itx_profile.write_checkpoint("SimpleMessenger::dispatch_entry", ITX_DISPATCH_OP, m);
ms_deliver_dispatch(m);
dispatch_throttle_release(msize);
@@ -555,6 +558,7 @@
msgr->dispatch_queue.lock.Unlock();
} else {
// just queue message under pipe lock.
+ ldout(msgr->cct,1) << "queue_received: queue len: " << queue.size() << ", in_qlen: " << in_qlen+1 << dendl;
queue.push_back(m);
}
@@ -1639,6 +1643,7 @@
<< m->get_seq() << " " << m << " " << *m
<< dendl;
queue_received(m);
+ itx_profile.write_checkpoint("SimpleMessenger::read_message", ITX_MSG_SM_QUEUED, m);
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
@@ -1869,8 +1874,11 @@
int aborted;
Message *message;
utime_t recv_stamp = ceph_clock_now(msgr->cct);
+ utime_t complete_stamp;
+ utime_t decoded_stamp;
bool waited_on_throttle = false;
+
uint64_t message_size = header.front_len + header.middle_len + header.data_len;
if (message_size) {
if (policy.throttler) {
@@ -1985,12 +1993,18 @@
ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length()
<< " byte message" << dendl;
+ complete_stamp = ceph_clock_now(msgr->cct);
message = decode_message(msgr->cct, header, footer, front, middle, data);
+ decoded_stamp = ceph_clock_now(msgr->cct);
if (!message) {
ret = -EINVAL;
goto out_dethrottle;
}
+ itx_profile.write_checkpoint("SimpleMessenger::read_message", ITX_RECEIVED, message, recv_stamp);
+ itx_profile.write_checkpoint("SimpleMessenger::read_message", ITX_MSG_COMPLETE, message, complete_stamp, data_len);
+ itx_profile.write_checkpoint("SimpleMessenger::read_message", ITX_MSG_DECODED, message, decoded_stamp);
+
message->set_throttler(policy.throttler);
// store reservation size in message, so we don't get confused
@@ -48,6 +48,8 @@
#include <fstream>
#include <sstream>
+#include "common/ITX.h"
+
#include "FileStore.h"
#include "common/BackTrace.h"
#include "include/types.h"
@@ -2143,6 +2145,7 @@
op_tp.unlock();
+ itx_profile.write_checkpoint("FileStore::queue_op:", ITX_FS_QUEUE_OP, o->op, o->tls);
dout(5) << "queue_op " << o << " seq " << o->op
<< " " << *osr
<< " " << o->bytes << " bytes"
@@ -2205,7 +2208,11 @@
Op *o = osr->peek_queue();
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
+
+ itx_profile.write_checkpoint("_do_op: do_transactions begin :", ITX_FS_PROCESS_TRANSACTION_START, o->op, o->tls);
int r = do_transactions(o->tls, o->op);
+ itx_profile.write_checkpoint("_do_op: do_transactions end:", ITX_FS_PROCESS_TRANSACTION_DONE, o->op, o->tls);
+
op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
@@ -2292,6 +2299,7 @@
dump_transactions(o->tls, o->op, osr);
if (m_filestore_journal_parallel) {
+ itx_profile.write_checkpoint("FileStore::parallel: queue_transactions start", ITX_QUEUE_TRANSACTION_START, o->op, o->tls);
dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl;
_op_journal_transactions(o->tls, o->op, ondisk, osd_op);
@@ -2299,6 +2307,7 @@
// queue inside journal lock, to preserve ordering
queue_op(osr, o);
} else if (m_filestore_journal_writeahead) {
+ itx_profile.write_checkpoint("FileStore::writeahead: queue_transactions start", ITX_QUEUE_TRANSACTION_START, o->op, o->tls);
dout(5) << "queue_transactions (writeahead) " << o->op << " " << o->tls << dendl;
osr->queue_journal(o->op);
@@ -2310,11 +2319,13 @@
assert(0);
}
op_submit_finish(o->op);
+ itx_profile.write_checkpoint("FileStore::queue_transactions done", ITX_QUEUE_TRANSACTION_END, o->op);
return 0;
}
uint64_t op = op_submit_start();
dout(5) << "queue_transactions (trailing journal) " << op << " " << tls << dendl;
+ itx_profile.write_checkpoint("FileStore::trailing: queue_transactions start", ITX_QUEUE_TRANSACTION_START, op, tls);
if (m_filestore_do_dump)
dump_transactions(tls, op, osr);
@@ -3185,6 +3196,7 @@
}
out:
+ itx_profile.write_checkpoint("FileStore::_write done writing to filesystem", ITX_FS_WRITE_DONE, cid, oid, offset, len);
dout(10) << "write " << cid << "/" << oid << " " << offset << "~" << len << " = " << r << dendl;
return r;
}
@@ -39,6 +39,7 @@
#include "mds/inode_backtrace.h" // Ugh
#include "common/config.h"
+#include "common/ITX.h"
#include "include/compat.h"
#include "json_spirit/json_spirit_value.h"
@@ -549,6 +550,8 @@
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
reply->set_data(outdata);
reply->set_result(result);
+ itx_profile.write_checkpoint("ReplicatedPG::do_pg_op: commit", ITX_PG_OP_COMMIT, m);
+ itx_profile.write_checkpoint("ReplicatedPG::do_pg_op: commit", ITX_OP_FINISHED, m);
osd->client_messenger->send_message(reply, m->get_connection());
delete filter;
}
@@ -950,6 +953,8 @@
reply->set_version(info.last_update);
ctx->reply = NULL;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ itx_profile.write_checkpoint("ReplicatedPG::do_op: commit read", ITX_OP_READ_COMMIT, m);
+ itx_profile.write_checkpoint("ReplicatedPG::do_op: commit read", ITX_OP_FINISHED, m);
osd->client_messenger->send_message(reply, m->get_connection());
delete ctx;
put_object_context(obc);
@@ -3379,6 +3384,10 @@
repop->applying = false;
repop->applied = true;
+ if (repop->ctx->op) {
+ itx_profile.write_checkpoint("ReplicatedPG::op_applied: ", ITX_PG_OP_LOCAL_APPLIED, repop->ctx->op->request);
+ }
+
// (logical) local ack.
int whoami = osd->get_nodeid();
@@ -3430,6 +3439,9 @@
if (repop->ctx->op)
repop->ctx->op->mark_event("op_commit");
+ if (repop->ctx->op) {
+ itx_profile.write_checkpoint("ReplicatedPG::op_commit: ", ITX_PG_OP_LOCAL_COMMIT, repop->ctx->op->request);
+ }
if (repop->aborted) {
dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
} else if (repop->waitfor_disk.count(osd->get_nodeid()) == 0) {
@@ -3508,6 +3520,7 @@
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+ itx_profile.write_checkpoint("ReplicatedPG::eval_repop: commit", ITX_OP_COMMIT, m);
osd->client_messenger->send_message(reply, m->get_connection());
repop->sent_disk = true;
}
@@ -3525,6 +3538,7 @@
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
+ itx_profile.write_checkpoint("ReplicatedPG::eval_repop: ack", ITX_OP_ACK, m);
osd->client_messenger->send_message(reply, m->get_connection());
repop->sent_ack = true;
}
@@ -3541,7 +3555,9 @@
// done.
if (repop->waitfor_ack.empty() && repop->waitfor_disk.empty()) {
repop->done = true;
-
+ if (m) {
+ itx_profile.write_checkpoint("ReplicatedPG::eval_repop: ack", ITX_OP_FINISHED, m);
+ }
calc_min_last_complete_ondisk();
// kick snap_trimmer if necessary
@@ -3626,6 +3642,9 @@
}
wr->pg_trim_to = pg_trim_to;
+ if ((i == 1) && (ctx->op) && (ctx->op->request)) {
+ itx_profile.write_checkpoint("ReplicatedPG::issue_repop", ITX_PG_SEND_REPOP, ctx->op->request);
+ }
osd->cluster_messenger->send_message(wr, get_osdmap()->get_cluster_inst(peer));
// keep peer_info up to date
@@ -4162,7 +4181,7 @@
else
opname = "trans";
- dout(10) << "sub_op_modify " << opname
+ dout(10) << "sub_op_modify " << opname
<< " " << soid
<< " v " << m->version
<< (m->noop ? " NOOP" : "")
@@ -4249,6 +4268,7 @@
rm->bytes_written = rm->opt.get_encoded_bytes();
+
} else {
// just trim the log
if (m->pg_trim_to != eversion_t()) {
@@ -4279,6 +4299,7 @@
// send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
+ itx_profile.write_checkpoint("ReplicatedPG::sub_op_modify_applied: ACK", ITX_SUBOP_ACK, rm->opt);
osd->cluster_messenger->send_message(ack, get_osdmap()->get_cluster_inst(rm->ackerosd));
}
@@ -4298,6 +4319,7 @@
unlock();
if (done) {
+ itx_profile.write_checkpoint("ReplicatedPG::sub_op_modify_commit", ITX_OP_FINISHED, rm->opt);
delete rm->ctx;
delete rm;
put();
@@ -4321,6 +4343,7 @@
MOSDSubOpReply *commit = new MOSDSubOpReply((MOSDSubOp*)rm->op->request, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
+ itx_profile.write_checkpoint("ReplicatedPG::sub_op_modify_commit: ONDISK", ITX_SUBOP_COMMIT, rm->opt);
osd->cluster_messenger->send_message(commit, get_osdmap()->get_cluster_inst(rm->ackerosd));
}
@@ -4329,6 +4352,7 @@
unlock();
if (done) {
+ itx_profile.write_checkpoint("ReplicatedPG::sub_op_modify_commit", ITX_OP_FINISHED, rm->opt);
delete rm->ctx;
delete rm;
put();
@@ -92,6 +92,7 @@
#include "common/safe_io.h"
#include "common/HeartbeatMap.h"
#include "common/admin_socket.h"
+#include "common/ITX.h"
#include "global/signal_handler.h"
#include "global/pidfile.h"
@@ -2529,6 +2530,11 @@
ss << "reset pg recovery stats";
pg_recovery_stats.reset();
}
+ else if (cmd[0] == "code_profile") {
+ stringstream s;
+ itx_profile.cmd_handler(cmd, s);
+ ss << " profiling data" << s.str();
+ }
else {
ss << "unrecognized command! " << cmd;
@@ -2700,16 +2706,22 @@
bool OSD::ms_dispatch(Message *m)
{
// lock!
+ itx_profile.write_checkpoint("OSD::ms_dispatch", ITX_OSD_DISPATCH1, m);
osd_lock.Lock();
+ itx_profile.write_checkpoint("OSD::ms_dispatch", ITX_OSD_DISPATCH2, m);
while (dispatch_running) {
dout(10) << "ms_dispatch waiting for other dispatch thread to complete" << dendl;
dispatch_cond.Wait(osd_lock);
}
dispatch_running = true;
+ itx_profile.write_checkpoint("OSD::ms_dispatch", ITX_OSD_DISPATCH3, m);
do_waiters();
+ itx_profile.write_checkpoint("OSD::ms_dispatch", ITX_OSD_DISPATCH4, m);
_dispatch(m);
+ itx_profile.write_checkpoint("OSD::ms_dispatch", ITX_OSD_DISPATCH5, m);
do_waiters();
+ itx_profile.write_checkpoint("OSD::ms_dispatch", ITX_OSD_DISPATCH6, m);
dispatch_running = false;
dispatch_cond.Signal();
@@ -5265,7 +5277,9 @@
}
// we don't need encoded payload anymore
+ itx_profile.write_checkpoint("OSD::handle_op: clear start", ITX_OSD_HANDLE_OP_CLEAR_START, m);
m->clear_payload();
+ itx_profile.write_checkpoint("OSD::handle_op: clear done", ITX_OSD_HANDLE_OP_CLEAR_DONE, m);
// require same or newer map
if (!require_same_or_newer_map(op, m->get_map_epoch()))
@@ -5573,6 +5587,7 @@
dout(15) << *pg << " enqueue_op " << op->request << " "
<< *(op->request) << dendl;
assert(pg->is_locked());
+ itx_profile.write_checkpoint("OSD::enqueue_op", ITX_OSD_ENQUEUE, pg, op->request);
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
@@ -5682,6 +5697,7 @@
pg->op_queue.pop_front();
dout(10) << "dequeue_op " << *op->request << " pg " << *pg << dendl;
+ itx_profile.write_checkpoint("OSD::dequeue_op", ITX_OSD_DEQUEUE, pg, op->request);
// share map?
// do this preemptively while we hold osd_lock and pg->lock
@@ -970,7 +970,7 @@
libcommon_la_SOURCES = $(libcommon_files)
libcommon_la_CFLAGS= ${CRYPTO_CFLAGS} ${AM_CFLAGS}
libcommon_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
-libcommon_la_LDFLAGS = -lrt
+libcommon_la_LDFLAGS = -lrt -lboost_regex
noinst_LTLIBRARIES += libcommon.la
libglobal_la_SOURCES = \
@@ -1041,6 +1041,7 @@
msg/Message.cc \
msg/Messenger.cc \
msg/SimpleMessenger.cc \
+ common/ITX.cc \
msg/msg_types.cc \
msg/tcp.cc \
os/hobject.cc \
@@ -1106,6 +1107,12 @@
libmon_a_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
noinst_LIBRARIES += libmon.a
+libITX_la_SOURCES = \
+ common/ITX.cc
+libITX_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
+libITX_la_LIBADD =
+noinst_LTLIBRARIES += libITX.la
+
libmds_a_SOURCES = \
mds/Dumper.cc \
mds/Resetter.cc \
@@ -1553,6 +1560,7 @@
msg/Message.h\
msg/Messenger.h\
msg/SimpleMessenger.h\
+ common/ITX.h\
msg/msg_types.h\
msg/tcp.h\
objclass/objclass.h\
@@ -0,0 +1,1074 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include <string>
+#include <sstream>
+
+#include "include/assert.h"
+#include "include/xlist.h"
+
+#include "common/ITX.h"
+#include "common/config.h"
+
+#define dout_subsys ceph_subsys_ITX
+#undef dout_prefix
+#define dout_prefix *_dout << "ITX "
+
+
+ITX itx_profile;
+
+template<class T>
+std::string t_to_string(T i)
+{
+ std::stringstream ss;
+ std::string s;
+ ss << i;
+ s = ss.str();
+
+ return s;
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, Message *m, utime_t stamp)
+{
+ dout(20) << "write_checkpoint(string, uint32_t, Message, utime_t): " << label << dendl;
+ write_checkpoint(label, where, m, stamp, 0);
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, Message *m, utime_t stamp, unsigned data_len)
+{
+ string obj_name;
+ string id;
+ utime_t tmp;
+ uint64_t off, len;
+ uint32_t opclass = ITX_OPCLASS_UNKNOWN;
+
+ if (m == NULL) return;
+
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP: {
+ MOSDOp *mm = (MOSDOp *)m;
+ obj_name = mm->get_oid().name;
+ get_offset_and_len(mm->ops, &off, &len);
+ dout(20) << "write_checkpoint(string ,uint32_t , Message, utime_t, unsigned): " << label << ", CEPH_MSG_OSD_OP " << off << "+" << len << dendl;
+ opclass = get_opclass(mm);
+ break;
+ }
+ case MSG_OSD_SUBOP: {
+ MOSDSubOp *mm = (MOSDSubOp *)m;
+ ObjectStore::Transaction ta;
+
+ if (mm->ops.size() == 0) {
+ bufferlist::iterator p = mm->get_data().begin();
+ obj_name = mm->poid.oid.name;
+ ::decode(ta, p);
+ off = ta.get_largest_data_off();
+ len = ta.get_data_length();
+ // get_offset_and_len(mm->ops, &off, &len);
+ dout(1) << "write_checkpoint(string ,uint32_t , Message, utime_t, unsigned): " << label << ", MSG_OSD_SUBOP " << obj_name << "+" << off << "+" << len << dendl;
+ opclass = get_opclass(mm);
+ }
+ break;
+ }
+ default: {
+ obj_name = "unknown";
+ dout(1) << "write_checkpoint(string ,uint32_t , Message, utime_t, unsigned): " << label << ", unknown type: " << m->get_type() << dendl;
+ break;
+ }
+ }
+
+ if (obj_name.compare("unknown") != 0) {
+ checkpoint_key key(obj_name, off, len);
+ // id = create_id(obj_name, off, len);
+ // if (get_timestamp(obj_name, where, tmp) == false) {
+ dout(2) << "set timestamp for: " << key << " from " << itx_checkpoint_names[where] << dendl;
+ set_timestamp(key, where, stamp);
+ // }
+ if (data_len != 0) {
+ set_datalen(key, data_len);
+ }
+ }
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, PG *pg, Message *m)
+{
+ string obj_name;
+ string id;
+ utime_t received;
+ utime_t tmp;
+ uint32_t cpid;
+ uint32_t opclass = ITX_OPCLASS_UNKNOWN;
+ uint64_t off, len;
+
+ if (m == NULL) return;
+
+ dout(20) << "write_checkpoint(string, uint32_t, PG, Message): " << label << dendl;
+ received.tv.tv_sec = 0;
+ received.tv.tv_nsec = 0;
+
+ obj_name = "unknown";
+ cpid = where;
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP: {
+ MOSDOp *mm = (MOSDOp *)m;
+ obj_name = mm->get_oid().name;
+ if (where == ITX_DISPATCH_OP) {
+ cpid = ITX_DISPATCH_MSG_OSD_OP;
+ received = mm->get_recv_stamp();
+ }
+ get_offset_and_len(mm->ops, &off, &len);
+ opclass = get_opclass(mm);
+ break;
+ }
+ case CEPH_MSG_OSD_OPREPLY: {
+ MOSDOpReply *mm = (MOSDOpReply *)m;
+ obj_name = mm->get_oid().name;
+ if (where == ITX_DISPATCH_OP) cpid = ITX_DISPATCH_MSG_OSD_OPREPLY;
+ break;
+ }
+ case MSG_OSD_SUBOP: {
+ MOSDSubOp *mm = (MOSDSubOp *)m;
+ ObjectStore::Transaction ta;
+
+ if (where == ITX_DISPATCH_OP) cpid = ITX_DISPATCH_MSG_OSD_SUBOP;
+ if (mm->ops.size() == 0) {
+ bufferlist::iterator p = mm->get_data().begin();
+ obj_name = mm->poid.oid.name;
+ ::decode(ta, p);
+ off = ta.get_largest_data_off();
+ len = ta.get_data_length();
+ opclass = get_opclass(mm);
+ }
+ break;
+ }
+ case MSG_OSD_SUBOPREPLY: {
+ MOSDSubOpReply *mm = (MOSDSubOpReply *)m;
+ obj_name = mm->poid.oid.name;
+ if (where == ITX_DISPATCH_OP) cpid = ITX_DISPATCH_MSG_OSD_SUBOPREPLY;
+ get_offset_and_len(mm->ops, &off, &len);
+ break;
+ }
+ default: {
+ break;
+ }
+ }
+
+ if (obj_name != "unknown") {
+ checkpoint_key key(obj_name, off, len);
+ // id = create_id(obj_name, off, len);
+ if (received.tv.tv_sec != 0) {
+ if (get_timestamp(key, ITX_RECEIVED, tmp) == false) {
+ dout(2) << "set timestamp for: " << key << " from " << itx_checkpoint_names[where] << dendl;
+ set_timestamp(key, ITX_RECEIVED, received, opclass);
+ }
+ }
+ dout(2) << "save timestamp for: " << key << " from " << itx_checkpoint_names[where] << dendl;
+ save_timestamp(key, cpid, opclass);
+ }
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, list<ObjectStore::Transaction*>& tls)
+{
+ dout(20) << "write_checkpoint(string, uint32_t, list<Transaction>): " << label << dendl;
+ for (list<ObjectStore::Transaction*>::iterator p = tls.begin(); p != tls.end(); p++) {
+ write_checkpoint(label, where, **p);
+ }
+}
+
+list<checkpoint_key>
+ITX::write_checkpoint(string label, uint32_t where, ObjectStore::Transaction& t)
+{
+ string op_name = "unknown";
+ checkpoint_key *key = NULL;
+ hobject_t oid;
+ uint64_t off, len;
+ list<checkpoint_key> keys;
+ ObjectStore::Transaction::iterator i = t.begin();
+
+ dout(20) << "write_checkpoint(string, uint32_t, Transaction): " << label << dendl;
+ while (i.have_op()) {
+ off = 0;
+ len = 0;
+ int op = i.get_op();
+ switch (op) {
+ case ObjectStore::Transaction::OP_NOP:
+ op_name = "OP_NOP";
+ break;
+ case ObjectStore::Transaction::OP_TOUCH:
+ {
+ op_name = "OP_TOUCH";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_WRITE:
+ {
+ op_name = "OP_WRITE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ off = (i.get_length());
+ len = (i.get_length());
+ bufferlist bl;
+ i.get_bl(bl);
+ key = new checkpoint_key(oid.oid.name, off, len);
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_ZERO:
+ {
+ op_name = "OP_ZERO";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ off = (i.get_length());
+ len = (i.get_length());
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_TRIMCACHE:
+ {
+ op_name = "OP_TRIMCACHE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ off = (i.get_length());
+ len = (i.get_length());
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_TRUNCATE:
+ {
+ op_name = "OP_TRUNCATE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_REMOVE:
+ {
+ op_name = "OP_REMOVE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_SETATTR:
+ {
+ op_name = "OP_SETATTR";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ string name = i.get_attrname();
+ bufferlist bl;
+ i.get_bl(bl);
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_SETATTRS:
+ {
+ op_name = "OP_SETATTRS";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ map<string, bufferptr> aset;
+ i.get_attrset(aset);
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_RMATTR:
+ {
+ op_name = "OP_RMATTR";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ string name = i.get_attrname();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_RMATTRS:
+ {
+ op_name = "OP_RMATTRS";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_CLONE:
+ {
+ op_name = "OP_CLONE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ hobject_t noid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_CLONERANGE:
+ {
+ op_name = "OP_CLONERANGE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ hobject_t noid = i.get_oid();
+ off = (i.get_length());
+ len = (i.get_length());
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_CLONERANGE2:
+ {
+ op_name = "OP_CLONERANGE2";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ hobject_t noid = i.get_oid();
+ off = (i.get_length());
+ len = (i.get_length());
+ uint64_t dstoff = i.get_length();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_MKCOLL:
+ {
+ op_name = "OP_MKCOLL";
+ coll_t cid = i.get_cid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_RMCOLL:
+ {
+ op_name = "OP_RKCOLL";
+ coll_t cid = i.get_cid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_COLL_ADD:
+ {
+ op_name = "OP_COLL_ADD";
+ coll_t ocid = i.get_cid();
+ coll_t ncid = i.get_cid();
+ oid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_COLL_REMOVE:
+ {
+ op_name = "OP_COLL_REMOVE";
+ coll_t cid = i.get_cid();
+ oid = i.get_oid();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_COLL_SETATTR:
+ {
+ op_name = "OP_COLL_SETATTR";
+ coll_t cid = i.get_cid();
+ string name = i.get_attrname();
+ bufferlist bl;
+ i.get_bl(bl);
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_COLL_RMATTR:
+ {
+ op_name = "OP_COLL_RMATTR";
+ coll_t cid = i.get_cid();
+ string name = i.get_attrname();
+ }
+ break;
+
+ case ObjectStore::Transaction::OP_STARTSYNC:
+ op_name = "OP_STARTSYNC";
+ break;
+
+ case ObjectStore::Transaction::OP_COLL_RENAME:
+ {
+ op_name = "OP_COLL_RENAME";
+ coll_t cid(i.get_cid());
+ coll_t ncid(i.get_cid());
+ }
+ break;
+
+ default:
+ op_name = "op unkown";
+ break;
+ }
+
+ if ((key != NULL) && (key->oid.compare("unknown") != 0)) {
+ keys.push_back(*key);
+ dout(2) << "save timestamp for: " << op_name << " " << key << " from " << itx_checkpoint_names[where] << dendl;
+ save_timestamp(*key, where);
+ }
+ }
+
+ if (keys.empty() == false) {
+ keys.sort();
+ keys.unique();
+ }
+ return(keys);
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, uint64_t tls_id, list<ObjectStore::Transaction*>& tls)
+{
+ list<checkpoint_key> keys;
+
+ dout(20) << "write_checkpoint(string, uint32_t, uint64_t, list<Transaction>): " << label << dendl;
+ label.append("-");
+ label.append(t_to_string(tls_id));
+ for (list<ObjectStore::Transaction*>::iterator p = tls.begin(); p != tls.end(); p++) {
+ if (*p != NULL) {
+ keys = write_checkpoint(label, where, **p);
+ save_tls_id(keys, tls_id);
+ }
+ }
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, coll_t cid, const hobject_t &oid, uint64_t off, size_t len)
+{
+ dout(20) << "write_checkpoint(string, uint32_t, coll_t, hobject_t, uint64_t, size_t): " << label << dendl;
+ checkpoint_key *key;
+ uint64_t u64_len;
+ u64_len = len;
+ key = new checkpoint_key(oid.oid.name, off, len);
+ dout(2) << "save timestamp for: " << key << " from " << itx_checkpoint_names[where] << dendl;
+ save_timestamp(*key, where);
+}
+
+void
+ITX::write_checkpoint(string label, uint32_t where, uint64_t tls_id)
+{
+ list<checkpoint_key> keys;
+ list<checkpoint_key>::iterator i;
+ checkpoint_key *key;
+
+ dout(20) << "write_checkpoint(string label, uint32_t where, uint64_t tls_id): " << label << dendl;
+ tls_lk->Lock();
+ if (tls_ids.find(tls_id) != tls_ids.end()) {
+ keys = tls_ids[tls_id];
+ if (keys.empty() == false) {
+ for (i = keys.begin(); i != keys.end(); i++) {
+ key = &(*i);
+ dout(2) << "save timestamp for: " << *key << " from " << itx_checkpoint_names[where] << dendl;
+ save_timestamp(*key, where);
+ }
+ }
+ drop_tls_id(tls_id);
+ }
+ tls_lk->Unlock();
+}
+
+string
+ITX::create_id(string oid, uint64_t offset, uint64_t len)
+{
+ ostringstream os;
+ os << oid << "+" << offset << "+" << len;
+ return(os.str());
+}
+
+void ITX::get_offset_and_len(vector<OSDOp> ops, uint64_t *off, uint64_t *len)
+{
+ if ((ops.empty() == false) && (ceph_osd_op_type_data(ops[0].op.op))) {
+ *off = (ops[0].op.extent.offset);
+ *len = (ops[0].op.extent.length);
+ } else {
+ *off = 0;
+ *len = 0;
+ }
+}
+
+uint32_t
+ITX::get_opclass(const MOSDOp *mm)
+{
+ uint32_t opclass = ITX_OPCLASS_UNKNOWN;
+ __le16 op;
+
+ if (mm->ops.empty() == false) {
+ op = mm->ops[0].op.op;
+ switch (op) {
+ case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
+ opclass = ITX_OPCLASS_WRITE;
+ break;
+ case CEPH_OSD_OP_READ:
+ opclass = ITX_OPCLASS_READ;
+ break;
+ default:
+ opclass = ITX_OPCLASS_UNKNOWN;
+ break;
+ }
+ }
+ return opclass;
+}
+
+uint32_t
+ITX::get_opclass(const MOSDSubOp *mm)
+{
+ uint32_t opclass = ITX_OPCLASS_UNKNOWN;
+ __le16 op;
+
+ if (mm->ops.empty() == false) {
+ op = mm->ops[0].op.op;
+ switch (op) {
+ case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
+ opclass = ITX_OPCLASS_SUBOP_WRITE;
+ break;
+ case CEPH_OSD_OP_READ:
+ opclass = ITX_OPCLASS_SUBOP_READ;
+ break;
+ case CEPH_OSD_OP_PUSH:
+ opclass = ITX_OPCLASS_PUSH;
+ break;
+ case CEPH_OSD_OP_PULL:
+ opclass = ITX_OPCLASS_PULL;
+ break;
+ default:
+ opclass = ITX_OPCLASS_UNKNOWN;
+ break;
+ }
+ } else {
+ opclass = ITX_OPCLASS_NOOPS;
+ }
+ return opclass;
+}
+
+void
+ITX::print_timestamp(int where, utime_t timestamps[], ostream &out)
+{
+ __u32 sec, nsec;
+ sec = timestamps[where].tv.tv_sec;
+ if (sec > 0) sec -= start_sec;
+ nsec = timestamps[where].tv.tv_nsec / 10000;
+ out << " " << itx_checkpoint_names[where] << ": ";
+ out << sec << '.' << std::setw(5) << nsec;
+ out << std::endl;
+}
+
+void
+ITX::print_checkpoint(hash_map<const checkpoint_key, checkpoint *>::iterator p, ostream &out)
+{
+ checkpoint * cp;
+
+ out << " pending timestamps: key " << p->first << std::endl;
+ cp = p->second;
+ out << " opclass: " << cp->opclass << ", length: " << cp->data_len << std::endl;
+ for (int i = 0; i < ITX_MAX_CHECKPOINT; i++) {
+ print_timestamp(i, cp->timestamps, out);
+ }
+ out << std::endl;
+}
+
+void
+ITX::print_checkpoint_list(hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl, ostream &out)
+{
+ list<checkpoint *>::iterator p;
+ checkpoint * cp;
+
+ out << " timestamps: key " << pl->first << ", " << pl->second.size() << " entries" << std::endl;
+ for (p = pl->second.begin(); p != pl->second.end(); p++) {
+ cp = *p;
+ out << " opclass: " << cp->opclass << ", length: " << cp->data_len << std::endl;
+ for (int i = 0; i < ITX_MAX_CHECKPOINT; i++) {
+ print_timestamp(i, cp->timestamps, out);
+ }
+ out << std::endl;
+ }
+}
+
+void
+ITX::print_primary_write_checkpoints(hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl, ostream &out)
+{
+ list<checkpoint *>::iterator p;
+ checkpoint * cp;
+ const checkpoint_key *key;
+ utime_t *tsp;
+
+ key = &(pl->first);
+ for (p = pl->second.begin(); p != pl->second.end(); p++) {
+ cp = *p;
+ if (cp->opclass == ITX_OPCLASS_WRITE) {
+/*
+ ITX_RECEIVED = 0, SM: header received : 109.13477
+ ITX_MSG_COMPLETE, SM: message complete : 109.13485
+ ITX_MSG_DECODED, SM: message decoded : 109.13486
+ ITX_DISPATCH_MSG_OSD_OP, SM: dispatch message MOSDOp : 109.22271
+ ITX_OSD_ENQUEUE, OSD: enqueue : 109.22377
+ ITX_OSD_DEQUEUE, OSD: dequeue : 109.23192
+ ITX_PG_SEND_REPOP, PG: send replication request : 109.23206
+ ITX_QUEUE_TRANSACTION_ START, FS: start transactions : 109.23219
+ ITX_JOURNAL_PROCESS_TRANSACTION_START, Journal: start transactions (journal only): 109.23237
+ ITX_FS_QUEUE_OP, FS: queue operation for execution : 109.23256
+ ITX_QUEUE_TRANSACTION_END, FS: end transaction : 109.23263
+ ITX_FS_PROCESS_TRANSACTION_START, FS: start transactions (FS only) : 109.23275
+ ITX_FS_WRITE_DONE, FS: write to FS finished : 109.23306
+ ITX_FS_PROCESS_TRANSACTION_DONE, FS: end transactions (FS only) : 109.23329
+ ITX_PG_OP_LOCAL_APPLIED, PG: op locally applied : 109.23381
+ ITX_PG_OP_LOCAL_COMMIT, PG: op locally committed : 109.23288
+ ITX_OP_COMMIT, PG: send write commit to client : 109.23881
+ ITX_OP_FINISHED, operation finished : 109.23883
+*/
+
+ tsp = cp->timestamps;
+ if ((tsp[ITX_RECEIVED].tv.tv_nsec != 0) &&
+ (tsp[ITX_MSG_COMPLETE].tv.tv_nsec != 0) &&
+ (tsp[ITX_MSG_DECODED].tv.tv_nsec != 0) &&
+ (tsp[ITX_DISPATCH_MSG_OSD_OP].tv.tv_nsec != 0) &&
+ (tsp[ITX_OSD_ENQUEUE].tv.tv_nsec != 0) &&
+ (tsp[ITX_OSD_DEQUEUE].tv.tv_nsec != 0) &&
+ (tsp[ITX_PG_SEND_REPOP].tv.tv_nsec != 0) &&
+ (tsp[ITX_QUEUE_TRANSACTION_START].tv.tv_nsec != 0) &&
+ (tsp[ITX_JOURNAL_PROCESS_TRANSACTION_START].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_QUEUE_OP].tv.tv_nsec != 0) &&
+ (tsp[ITX_QUEUE_TRANSACTION_END].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_PROCESS_TRANSACTION_START].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_WRITE_DONE].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_PROCESS_TRANSACTION_DONE].tv.tv_nsec != 0) &&
+ (tsp[ITX_PG_OP_LOCAL_APPLIED].tv.tv_nsec != 0) &&
+ (tsp[ITX_PG_OP_LOCAL_COMMIT].tv.tv_nsec != 0) &&
+ (tsp[ITX_OP_COMMIT].tv.tv_nsec != 0) &&
+ (tsp[ITX_OP_FINISHED].tv.tv_nsec != 0))
+ {
+ out << key->oid << " " << key->offset << " " << key->len << " ";
+ utime_t start = tsp[ITX_RECEIVED];
+ utime_t delta;
+ __u32 s;
+ if (start.tv.tv_sec > 0) s = start.tv.tv_sec - start_sec;
+ out << s << '.' << std::setw(5) << start.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_MSG_DECODED] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_MSG_SM_QUEUED] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_DISPATCH_MSG_OSD_OP] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH1] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_HANDLE_OP_CLEAR_START] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_HANDLE_OP_CLEAR_DONE] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH4] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH5] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH6] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_ENQUEUE] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DEQUEUE] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_PG_SEND_REPOP] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_JOURNAL_PROCESS_TRANSACTION_START] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_PG_OP_LOCAL_COMMIT] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OP_COMMIT] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OP_FINISHED] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << std::endl;
+ }
+ }
+ }
+}
+
+void
+ITX::print_secondary_write_checkpoints(hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl, ostream &out)
+{
+ list<checkpoint *>::iterator p;
+ checkpoint * cp;
+ const checkpoint_key *key;
+ utime_t *tsp;
+
+ key = &(pl->first);
+ for (p = pl->second.begin(); p != pl->second.end(); p++) {
+ cp = *p;
+ if (cp->opclass == ITX_OPCLASS_NOOPS) {
+/*
+ * opclass: 7, length: 127535
+ ITX_RECEIVED: SM: header received : 10.10338
+ ITX_MSG_COMPLETE: SM: message complete : 10.10348
+ ITX_MSG_DECODED; SM: message decoded : 10.10378
+ ITX_DISPATCH_MSG_OSD_SUBOP: SM: dispatch message MOSDSubOp : 10.10970
+ ITX_OSD_ENQUEUE: OSD: enqueue : 10.11162
+ ITX_OSD_DEQUEUE: OSD: dequeue : 10.11585
+ ITX_SUBOP_COMMIT: PG: send subop commit to primary : 10.11795
+ ITX_QUEUE_TRANSACTION_START: FS: start transactions : 10.11608
+ ITX_JOURNAL_PROCESS_TRANSACTION_START: Journal: start transactions (journal only): 10.11631
+ ITX_FS_QUEUE_OP: FS: queue operation for execution : 10.11656
+ ITX_QUEUE_TRANSACTION_END: FS: end transaction : 10.11665
+ ITX_FS_PROCESS_TRANSACTION_START: FS: start transactions (FS only): 10.11689
+ ITX_FS_WRITE_DONE: FS: write to FS finished : 10.11740
+ ITX_FS_PROCESS_TRANSACTION_DONE: FS: end transactions (FS only) : 10.11804
+ ITX_OP_FINISHED: operation finished : 10.11908
+*/
+
+ tsp = cp->timestamps;
+ if ((tsp[ITX_RECEIVED].tv.tv_nsec != 0) &&
+ (tsp[ITX_MSG_COMPLETE].tv.tv_nsec != 0) &&
+ (tsp[ITX_MSG_DECODED].tv.tv_nsec != 0) &&
+ (tsp[ITX_DISPATCH_MSG_OSD_SUBOP].tv.tv_nsec != 0) &&
+ (tsp[ITX_OSD_ENQUEUE].tv.tv_nsec != 0) &&
+ (tsp[ITX_OSD_DEQUEUE].tv.tv_nsec != 0) &&
+ (tsp[ITX_SUBOP_COMMIT].tv.tv_nsec != 0) &&
+ (tsp[ITX_QUEUE_TRANSACTION_START].tv.tv_nsec != 0) &&
+ (tsp[ITX_JOURNAL_PROCESS_TRANSACTION_START].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_QUEUE_OP].tv.tv_nsec != 0) &&
+ (tsp[ITX_QUEUE_TRANSACTION_END].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_PROCESS_TRANSACTION_START].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_WRITE_DONE].tv.tv_nsec != 0) &&
+ (tsp[ITX_FS_PROCESS_TRANSACTION_DONE].tv.tv_nsec != 0) &&
+ (tsp[ITX_OP_FINISHED].tv.tv_nsec != 0))
+ {
+ out << key->oid << " " << key->offset << " " << key->len << " ";
+ utime_t start = tsp[ITX_RECEIVED];
+ utime_t delta;
+ __u32 s;
+ if (start.tv.tv_sec > 0) s = start.tv.tv_sec - start_sec;
+ out << s << '.' << std::setw(5) << start.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_MSG_DECODED] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_MSG_SM_QUEUED] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_DISPATCH_MSG_OSD_SUBOP] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH1] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH4] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH5] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DISPATCH6] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_ENQUEUE] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OSD_DEQUEUE] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_FS_PROCESS_TRANSACTION_START] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_JOURNAL_PROCESS_TRANSACTION_START] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_SUBOP_COMMIT] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << " ";
+ delta = tsp[ITX_OP_FINISHED] - start;
+ out << delta.tv.tv_sec << '.' << std::setw(5) << delta.tv.tv_nsec / 10000 << std::endl;
+ }
+ }
+ }
+}
+void
+ITX::handle_get_pending_timestamps(string key, ostream &out)
+{
+ hash_map<const checkpoint_key, checkpoint *>::iterator p;
+
+
+ lk->Lock();
+ out << std::endl;
+ if (key == "") {
+ out << setfill('0');
+ for (p = cps.begin(); p != cps.end(); p++) {
+ print_checkpoint(p, out);
+ }
+ out << setfill(' ');
+ } else {
+ checkpoint_key cp_key(key);
+ p = cps.find(cp_key);
+ out << setfill('0');
+ if (p != cps.end()) {
+ print_checkpoint(p, out);
+
+ } else {
+ int keylen = key.length();
+ out << " timestamps: pattern " << key << std::endl;
+ for (p = cps.begin(); p != cps.end(); p++) {
+ if (p->first.oid.compare(0, keylen, key) != 0) continue;
+ print_checkpoint(p, out);
+ }
+ }
+ out << setfill(' ');
+ out << std::endl;
+ }
+ lk->Unlock();
+}
+
+void
+ITX::handle_get_timestamps(string key, ostream &out)
+{
+ hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl;
+
+
+ lk->Lock();
+ out << std::endl;
+ if (key == "") {
+ out << setfill('0');
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ print_checkpoint_list(pl, out);
+ }
+ out << setfill(' ');
+ } else {
+ checkpoint_key cp_key(key);
+ pl = cphistory.find(cp_key);
+ out << setfill('0');
+ if (pl != cphistory.end()) {
+ print_checkpoint_list(pl, out);
+
+ } else {
+ int keylen = key.length();
+ out << " timestamps: pattern " << key << std::endl;
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ if (pl->first.oid.compare(0, keylen, key) != 0) continue;
+ print_checkpoint_list(pl, out);
+ }
+ }
+ out << setfill(' ');
+ out << std::endl;
+ }
+ lk->Unlock();
+}
+
+void
+ITX::handle_get_primary_writes(string key, ostream &out)
+{
+ hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl;
+
+
+ lk->Lock();
+ out << std::endl;
+ out << "# 3 4 5 6 7 8 9 10 11 12 13 " << std::endl;
+ out << "# oid offset len received decoded SMqueued SMdispatch OSDenq OSDdeq sendRepOp JournalStart local_commit commit finish" << std::endl;
+ if (key == "") {
+ out << setfill('0');
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ print_primary_write_checkpoints(pl, out);
+ }
+ out << setfill(' ');
+ } else {
+ checkpoint_key cp_key(key);
+ pl = cphistory.find(cp_key);
+ out << setfill('0');
+ if (pl != cphistory.end()) {
+ print_primary_write_checkpoints(pl, out);
+
+ } else {
+ int keylen = key.length();
+ out << " timestamps: pattern " << key << std::endl;
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ if (pl->first.oid.compare(0, keylen, key) != 0) continue;
+ print_primary_write_checkpoints(pl, out);
+ }
+ }
+ out << setfill(' ');
+ out << std::endl;
+ }
+ lk->Unlock();
+}
+
+void
+ITX::handle_get_secondary_writes(string key, ostream &out)
+{
+ hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl;
+
+
+ lk->Lock();
+ out << std::endl;
+ out << "# 3 4 5 6 7 8 9 10 11 12 " << std::endl;
+ out << "# oid offset len received decoded SMqueued SMdispatch OSDenq OSDdeq FSstart JournalStart subop_commit finish" << std::endl;
+ if (key == "") {
+ out << setfill('0');
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ print_secondary_write_checkpoints(pl, out);
+ }
+ out << setfill(' ');
+ } else {
+ checkpoint_key cp_key(key);
+ pl = cphistory.find(cp_key);
+ out << setfill('0');
+ if (pl != cphistory.end()) {
+ print_secondary_write_checkpoints(pl, out);
+
+ } else {
+ int keylen = key.length();
+ out << " timestamps: pattern " << key << std::endl;
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ if (pl->first.oid.compare(0, keylen, key) != 0) continue;
+ print_secondary_write_checkpoints(pl, out);
+ }
+ }
+ out << setfill(' ');
+ out << std::endl;
+ }
+ lk->Unlock();
+}
+
+void
+ITX::handle_get_info(ostream &out)
+{
+ out << setfill(' ');
+
+ lk->Lock();
+ out << std::endl;
+ out << " number of pending operations: " << cps.size() << std::endl;
+ out << " number of completed objects: " << cphistory.size() << std::endl;
+ lk->Unlock();
+ // tls_lk->Lock();
+ // out << " number of tls_ids: " << tls_ids.size() << std::endl;
+ // tls_lk->Unlock();
+ out << " object filter: " << ITX::objectid_filter.str();
+
+ out << setfill(' ');
+ out << std::endl;
+}
+
+void
+ITX::handle_get_pending_oids(string key, ostream &out)
+{
+ hash_map<const checkpoint_key, checkpoint * >::iterator p;
+ lk->Lock();
+ out << std::endl;
+ if (key == "") {
+ out << " pending oids: " << std::endl;
+ for (p = cps.begin(); p != cps.end(); p++) {
+ out << " " << p->first << std::endl;
+ }
+ } else {
+ checkpoint_key cp_key(key);
+ p = cps.find(cp_key);
+ if (p != cps.end()) {
+ out << " pending oid: " << p->first << " exists"<< std::endl;
+ } else {
+ int keylen = key.length();
+ out << " pending oids: pattern " << key << std::endl;
+ for (p = cps.begin(); p != cps.end(); p++) {
+ if (p->first.oid.compare(0, keylen, key) != 0) continue;
+ out << " " << p->first << std::endl;
+ }
+ }
+ }
+ lk->Unlock();
+}
+
+void
+ITX::handle_get_oids(string key, ostream &out)
+{
+ hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl;
+ lk->Lock();
+ out << std::endl;
+ if (key == "") {
+ out << " completed oids: " << std::endl;
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ out << " " << pl->first << std::endl;
+ }
+ } else {
+ checkpoint_key cp_key(key);
+ pl = cphistory.find(cp_key);
+ if (pl != cphistory.end()) {
+ out << " completed oid: " << pl->first << " exists"<< std::endl;
+ } else {
+ int keylen = key.length();
+ out << " completed oids: pattern " << key << std::endl;
+ for (pl = cphistory.begin(); pl != cphistory.end(); pl++) {
+ if (pl->first.oid.compare(0, keylen, key) != 0) continue;
+ out << " " << pl->first << std::endl;
+ }
+ }
+ }
+ lk->Unlock();
+}
+void
+ITX::handle_clear(ostream &out)
+{
+ lk->Lock();
+ tls_lk->Lock();
+ cphistory.erase(cphistory.begin(), cphistory.end());
+ cps.erase(cps.begin(), cps.end());
+ tls_ids.erase(tls_ids.begin(), tls_ids.end());
+ ITX::objectid_filter.assign("__notdefined__", boost::regex_constants::basic);
+ tls_lk->Unlock();
+ lk->Unlock();
+ handle_get_info(out);
+}
+
+void
+ITX::cmd_handler(const std::vector<std::string>& cmd, ostream &out)
+{
+ string op = cmd[1];
+
+ if (op == "help") {
+ out << std::endl;
+ out << " code_profile operations:" << std::endl;
+ out << " info: get summary info" << std::endl;
+ out << " setfilter <regex>: set filter for object IDs" << std::endl;
+ out << " completed-oids: object IDS for which complete profiling data is available" << std::endl;
+ out << " pending-oids: object IDS for which incomplete profiling data is available" << std::endl;
+ out << " get-completed [prefix]: get profiling data for completed objects" << std::endl;
+ out << " get-primary-writes [prefix]: get condensed profiling data for primary write operations" << std::endl;
+ out << " get-secondary-writes [prefix]: get condensed profiling data for replica write operations" << std::endl;
+ out << " get-pending [prefix]: get profiling data for pending operations" << std::endl;
+ out << " clear: free storage of profiled data and clear the object ID filter" << std::endl;
+ }
+ else if (op == "get-completed") {
+ string key;
+ if (cmd.size() < 3) {
+ key = "";
+ } else {
+ key = cmd[2];
+ }
+ handle_get_timestamps(key, out);
+ }
+ else if (op == "get-primary-writes") {
+ string key;
+ if (cmd.size() < 3) {
+ key = "";
+ } else {
+ key = cmd[2];
+ }
+ handle_get_primary_writes(key, out);
+ }
+ else if (op == "get-secondary-writes") {
+ string key;
+ if (cmd.size() < 3) {
+ key = "";
+ } else {
+ key = cmd[2];
+ }
+ handle_get_secondary_writes(key, out);
+ }
+ else if (op == "get-pending") {
+ string key;
+ if (cmd.size() < 3) {
+ key = "";
+ } else {
+ key = cmd[2];
+ }
+ handle_get_pending_timestamps(key, out);
+ }
+ else if (op == "info") {
+ handle_get_info(out);
+ }
+ else if (op == "setfilter") {
+ string filter;
+ if (cmd.size() < 3) {
+ out << " ERROR: no filter specified";
+ out << setfill(' ');
+ out << std::endl;
+ } else {
+ handle_setfilter(cmd[2]);
+ }
+ }
+ else if (op == "completed-oids") {
+ string key;
+ if (cmd.size() < 3) {
+ key = "";
+ } else {
+ key = cmd[2];
+ }
+ handle_get_oids(key, out);
+ }
+ else if (op == "pending-oids") {
+ string key;
+ if (cmd.size() < 3) {
+ key = "";
+ } else {
+ key = cmd[2];
+ }
+ handle_get_pending_oids(key, out);
+ }
+ else if (op == "clear") {
+ handle_clear(out);
+ }
+}
@@ -0,0 +1,423 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#ifndef ITX_H
+#define ITX_H
+
+#include <string>
+
+#include <ext/hash_map>
+#include <ext/hash_set>
+#include <time.h>
+#include <boost/regex.hpp>
+
+#include "include/assert.h"
+#include "include/xlist.h"
+
+#include "common/ceph_context.h"
+#include "common/Mutex.h"
+#include "msg/Message.h"
+
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
+#include "osd/OSD.h"
+#include "osd/ReplicatedPG.h"
+#include "osd/OpRequest.h"
+#include "os/ObjectStore.h"
+
+
+
+
+// from where is the checkpoint called
+enum itx_checkpoint {
+ ITX_RECEIVED = 0,
+ ITX_MSG_COMPLETE,
+ ITX_MSG_DECODED,
+ ITX_MSG_SM_QUEUED,
+ ITX_DISPATCH_OP,
+ ITX_DISPATCH_MSG_OSD_OP,
+ ITX_DISPATCH_MSG_OSD_OPREPLY,
+ ITX_DISPATCH_MSG_OSD_SUBOP,
+ ITX_DISPATCH_MSG_OSD_SUBOPREPLY,
+ ITX_OSD_DISPATCH1,
+ ITX_OSD_DISPATCH2,
+ ITX_OSD_DISPATCH3,
+ ITX_OSD_DISPATCH4,
+ ITX_OSD_DISPATCH5,
+ ITX_OSD_DISPATCH6,
+ ITX_OSD_HANDLE_OP_CLEAR_START,
+ ITX_OSD_HANDLE_OP_CLEAR_DONE,
+ ITX_OSD_ENQUEUE,
+ ITX_OSD_DEQUEUE,
+ ITX_PG_SEND_REPOP,
+ ITX_SUBOP_ACK,
+ ITX_SUBOP_COMMIT,
+ ITX_QUEUE_TRANSACTION_START,
+ ITX_JOURNAL_PROCESS_TRANSACTION_START,
+ ITX_FS_QUEUE_OP,
+ ITX_QUEUE_TRANSACTION_END,
+ ITX_FS_PROCESS_TRANSACTION_START,
+ ITX_FS_WRITE_DONE,
+ ITX_FS_PROCESS_TRANSACTION_DONE,
+ ITX_PG_OP_LOCAL_APPLIED,
+ ITX_PG_OP_LOCAL_COMMIT,
+ ITX_PG_OP_COMMIT,
+ ITX_OP_COMMIT,
+ ITX_OP_READ_COMMIT,
+ ITX_OP_ACK,
+ ITX_OP_FINISHED,
+ ITX_MAX_CHECKPOINT
+};
+
+enum itx_opclass {
+ ITX_OPCLASS_UNKNOWN = 0,
+ ITX_OPCLASS_READ,
+ ITX_OPCLASS_WRITE,
+ ITX_OPCLASS_SUBOP_READ,
+ ITX_OPCLASS_SUBOP_WRITE,
+ ITX_OPCLASS_PULL,
+ ITX_OPCLASS_PUSH,
+ ITX_OPCLASS_NOOPS
+};
+
+static string itx_checkpoint_names[] = {
+ "SM: header received ",
+ "SM: message complete ",
+ "SM: message decoded ",
+ "SM: message queued for dispatching ",
+ "SM: dispatch message ",
+ "SM: dispatch message MOSDOp ",
+ "SM: dispatch message MOSDOpReply ",
+ "SM: dispatch message MOSDSubOp ",
+ "SM: dispatch message MOSDSubOpReply ",
+ "OSD: ms dispatch 1 ",
+ "OSD: ms dispatch 2 ",
+ "OSD: ms dispatch 3 ",
+ "OSD: ms dispatch 4 ",
+ "OSD: ms dispatch 5 ",
+ "OSD: ms dispatch 6 ",
+ "OSD: ms handle_op clear start ",
+ "OSD: ms handle_op clear done ",
+ "OSD: enqueue ",
+ "OSD: dequeue ",
+ "PG: send replication request ",
+ "PG: send subop ack to primary ",
+ "PG: send subop commit to primary ",
+ "FS: start transactions ",
+ "Journal: start transactions (journal only)",
+ "FS: queue operation for execution ",
+ "FS: end transaction ",
+ "FS: start transactions (FS only) ",
+ "FS: write to FS finished ",
+ "FS: end transactions (FS only) ",
+ "PG: op locally applied ",
+ "PG: op locally committed ",
+ "PG: do_pg_op commit ",
+ "PG: send write commit to client ",
+ "PG: send read commit to client ",
+ "PG: send ack to client ",
+ "operation finished ",
+ "ITX_MAX_CHECKPOINT"
+ };
+
+class checkpoint_key {
+public:
+ string oid;
+ unsigned offset;
+ unsigned len;
+public:
+ checkpoint_key(string s, unsigned off, unsigned len):
+ oid(s),
+ offset(off),
+ len(len)
+ { }
+ checkpoint_key(string s):
+ oid(s)
+ {
+ offset = 0;
+ len = 0;
+ }
+ ~checkpoint_key() {};
+
+
+};
+
+namespace __gnu_cxx // hash function is part of the same namespace as hash_map
+{
+ template<> struct hash< const checkpoint_key >
+ {
+ size_t operator()( const checkpoint_key& x ) const
+ {
+ ostringstream os;
+ os << x.oid << "+" << x.offset << "+" << x.len;
+ return hash< const char* >()( os.str().c_str());
+ }
+ };
+}
+
+inline bool operator==(const checkpoint_key &s1, const checkpoint_key &s2)
+{
+ return ((strcmp(s1.oid.c_str(), s2.oid.c_str()) == 0) &&
+ (s1.offset == s2.offset) &&
+ (s1.len == s2.len));
+}
+
+inline bool operator<(const checkpoint_key &s1, const checkpoint_key &s2)
+{
+ ostringstream os1, os2;
+ os1 << s1.oid << "+" << s1.offset << "+" << s1.len;
+ os2 << s2.oid << "+" << s2.offset << "+" << s2.len;
+ return ( os1.str().c_str() < os2.str().c_str());
+}
+
+namespace std // hash_map uses std::equal_to; to override: std must be modified
+{
+ template<> struct equal_to< const checkpoint_key >
+ {
+ bool operator()(const checkpoint_key &s1, const checkpoint_key &s2) const
+ {
+ return s1 == s2;
+ }
+ };
+}
+
+class ITX {
+ __u32 start_sec;
+ Mutex *lk;
+ Mutex *tls_lk;
+ boost::regex objectid_filter;
+
+
+ class checkpoint {
+ public:
+ ITX *profile;
+ uint32_t opclass;
+ unsigned data_len;
+ utime_t timestamps[ITX_MAX_CHECKPOINT];
+
+ public:
+ checkpoint(ITX *p)
+ {
+ profile = p;
+ opclass = ITX_OPCLASS_UNKNOWN;
+ data_len = 0;
+ for (int i = 0; i < ITX_MAX_CHECKPOINT; i++) {
+ timestamps[i].tv.tv_sec = 0;
+ timestamps[i].tv.tv_nsec = 0;
+ // = utime_t();
+ }
+ }
+ ~checkpoint() {};
+
+ void set_timestamp(itx_checkpoint where)
+ {
+ profile->lk->Lock();
+ timestamps[where] = ceph_clock_now(g_ceph_context);
+ profile->lk->Unlock();
+
+ }
+ };
+
+ hash_map<string, checkpoint *>cpsold;
+
+ hash_map<const checkpoint_key, checkpoint *>cps;
+
+ hash_map<uint64_t, list<checkpoint_key> >tls_ids;
+
+ hash_map<string, list<checkpoint *> >cphistoryold;
+
+ hash_map<const checkpoint_key, list<checkpoint *> >cphistory;
+
+public:
+ ITX()
+ {
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ utime_t n(&tv);
+ start_sec = n.tv.tv_sec;
+ lk = new Mutex("ITX_profile");
+ tls_lk = new Mutex("ITX_profile_tls");
+
+ objectid_filter.assign("__notdefined__", boost::regex_constants::basic);
+ };
+ virtual ~ITX() {};
+
+ void write_checkpoint(string label, uint32_t where, Message *m, utime_t stamp, unsigned data_len);
+ void write_checkpoint(string label, uint32_t where, Message *m, utime_t stamp);
+ void write_checkpoint(string label, uint32_t where, PG *pg, Message *m);
+ void write_checkpoint(string label, uint32_t where, PG *pg, OpRequest *op) {
+ write_checkpoint(label, where, pg, op->request);
+ }
+ void write_checkpoint(string label, uint32_t where, Message *m) {
+ write_checkpoint(label, where, NULL, m);
+ }
+ void write_checkpoint(string label, uint32_t where, OpRequest *op) {
+ write_checkpoint(label, where, NULL, op->request);
+ }
+ void write_checkpoint(string label, uint32_t where, list<ObjectStore::Transaction *>& tls);
+ void write_checkpoint(string label, uint32_t where, uint64_t op, list<ObjectStore::Transaction*>& tls);
+
+ list<checkpoint_key> write_checkpoint(string label, uint32_t where, ObjectStore::Transaction& t);
+ void write_checkpoint(string label, uint32_t where, coll_t cid, const hobject_t &oid, uint64_t off, size_t len);
+ void write_checkpoint(string label, uint32_t where, uint64_t op);
+
+ void cmd_handler(const std::vector<std::string>& cmd, ostream &out);
+ void handle_get_pending_timestamps(const string key, ostream &out);
+ void handle_get_timestamps(const string key, ostream &out);
+ void handle_get_primary_writes(const string key, ostream &out);
+ void handle_get_secondary_writes(const string key, ostream &out);
+ void handle_get_info(ostream &out);
+ void handle_setfilter(const string &filter) {
+ boost::regex_constants::syntax_option_type flags = boost::regex_constants::basic;
+ objectid_filter.assign(filter, flags);
+ }
+ void handle_get_pending_oids(const string key, ostream &out);
+ void handle_get_oids(const string key, ostream &out);
+ void handle_clear(ostream &out);
+
+private:
+
+
+ void save_timestamp(const checkpoint_key &key, uint32_t where, const uint32_t opclass)
+ {
+ checkpoint *p;
+
+ if (regex_match(key.oid, objectid_filter)) {
+ lk->Lock();
+ if (cps.find(key) == cps.end()) {
+ cps[key] = new checkpoint(this);
+ }
+ p = cps[key];
+ p->timestamps[where] = ceph_clock_now(g_ceph_context);
+ if (p->opclass == ITX_OPCLASS_UNKNOWN) {
+ p->opclass = opclass;
+ }
+ if (where == ITX_OP_FINISHED) {
+ if (cphistory.find(key) == cphistory.end()) {
+ cphistory[key] = list<checkpoint *>();
+ }
+ // DEBUG g_ceph_context->_dout << " ITX checkpoint: FINISH: " << where << ", " << oid << std::endl;
+ cphistory[key].push_back(p);
+ cps.erase(key);
+ }
+ lk->Unlock();
+ }
+ }
+
+ void save_timestamp(const checkpoint_key &key, uint32_t where)
+ {
+ save_timestamp(key, where, ITX_OPCLASS_UNKNOWN);
+ }
+
+ void set_timestamp(const checkpoint_key &key, uint32_t where, utime_t tstamp)
+ {
+ set_timestamp(key, where, tstamp, ITX_OPCLASS_UNKNOWN);
+ }
+
+
+ void set_timestamp(const checkpoint_key &key, uint32_t where, utime_t tstamp, const uint32_t opclass)
+ {
+ checkpoint *p;
+ if (regex_match(key.oid, objectid_filter)) {
+ lk->Lock();
+ if (cps.find(key) == cps.end()) {
+ cps[key] = new checkpoint(this);
+ }
+ p = cps[key];
+ p->timestamps[where] = tstamp;
+ if (p->opclass == ITX_OPCLASS_UNKNOWN) {
+ p->opclass = opclass;
+ }
+ lk->Unlock();
+ }
+ }
+
+ void set_datalen(const checkpoint_key &key, unsigned data_len)
+ {
+ lk->Lock();
+ if ((data_len != 0) && (cps.find(key) != cps.end())) {
+ cps[key]->data_len = data_len;
+ }
+ lk->Unlock();
+ }
+
+ bool get_timestamp(const checkpoint_key &key, uint32_t where, utime_t &tstamp)
+ {
+ bool retval = true;
+
+ lk->Lock();
+
+ if (cps.find(key) == cps.end()) {
+ retval = false;
+ }
+
+ if (retval == true) {
+ if (where >= ITX_MAX_CHECKPOINT) {
+ retval = false;
+ }
+ }
+
+ if (retval == true) {
+ tstamp.tv.tv_sec = cps[key]->timestamps[where].tv.tv_sec;
+ tstamp.tv.tv_nsec = cps[key]->timestamps[where].tv.tv_nsec;
+ if ((tstamp.tv.tv_sec == 0) && (tstamp.tv.tv_nsec == 0)) {
+ retval = false;
+ }
+ }
+
+ lk->Unlock();
+ return retval;
+ }
+
+ void save_tls_id(list<checkpoint_key> keys, uint64_t tls_id)
+ {
+ list<checkpoint_key> tmp;
+ hash_map<uint64_t, list<checkpoint_key> >::iterator i;
+
+ tls_lk->Lock();
+ i = tls_ids.find(tls_id);
+ if (i != tls_ids.end()) {
+ tmp = tls_ids[tls_id];
+ tmp.merge(keys);
+ tmp.sort();
+ tmp.unique();
+ tls_ids.erase(i);
+ tls_ids[tls_id] = tmp;
+ } else {
+ tls_ids[tls_id] = keys;
+ }
+ tls_lk->Unlock();
+ }
+
+ void drop_tls_id(uint64_t tls_id)
+ {
+ hash_map<uint64_t, list<checkpoint_key> >::iterator i;
+
+ i = tls_ids.find(tls_id);
+ if (i != tls_ids.end()) {
+ tls_ids.erase(i);
+ }
+ }
+
+ string create_id(string oid, uint64_t offset, uint64_t len);
+ void print_timestamp(int where, utime_t timestamps[], ostream &out);
+ void print_condensed_timestamp(int where, utime_t timestamps[], ostream &out);
+ void print_checkpoint(hash_map<const checkpoint_key, checkpoint *>::iterator p, ostream &out);
+ void print_checkpoint_list(hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl, ostream &out);
+ void print_primary_write_checkpoints(hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl, ostream &out);
+ void print_secondary_write_checkpoints(hash_map<const checkpoint_key, list<checkpoint *> >::iterator pl, ostream &out);
+
+ void get_offset_and_len(vector<OSDOp> ops, uint64_t *off, uint64_t *len);
+ uint32_t get_opclass(const MOSDOp *mm);
+ uint32_t get_opclass(const MOSDSubOp *mm);
+
+};
+
+inline ostream& operator<<(ostream& out, const checkpoint_key& cp_key) {
+ return out << '[' << cp_key.oid << ']' << '+' << cp_key.offset << '+' << cp_key.len;
+}
+
+extern ITX itx_profile;
+
+#endif /* ITX_H */
@@ -68,6 +68,7 @@
SUBSYS(optracker, 0, 5)
SUBSYS(objclass, 0, 5)
SUBSYS(filestore, 1, 5)
+SUBSYS(ITX, 1, 5)
SUBSYS(journal, 1, 5)
SUBSYS(ms, 0, 5)
SUBSYS(mon, 1, 5)