@@ -142,12 +142,13 @@ void AnchorServer::_prepare(bufferlist &bl, uint64_t reqid, int bymds)
}
inc(ino);
pending_create[version] = ino; // so we can undo
+ pending_ops[ino].push_back(pair<version_t, Context*>(version, NULL));
break;
-
case TABLE_OP_DESTROY:
version++;
pending_destroy[version] = ino;
+ pending_ops[ino].push_back(pair<version_t, Context*>(version, NULL));
break;
case TABLE_OP_UPDATE:
@@ -155,6 +156,7 @@ void AnchorServer::_prepare(bufferlist &bl, uint64_t reqid, int bymds)
version++;
pending_update[version].first = ino;
pending_update[version].second = trace;
+ pending_ops[ino].push_back(pair<version_t, Context*>(version, NULL));
break;
default:
@@ -163,8 +165,56 @@ void AnchorServer::_prepare(bufferlist &bl, uint64_t reqid, int bymds)
//dump();
}
-void AnchorServer::_commit(version_t tid)
+bool AnchorServer::check_pending(version_t tid, MMDSTableRequest *req, list<Context *>& finished)
+{
+ inodeno_t ino;
+ if (pending_create.count(tid))
+ ino = pending_create[tid];
+ else if (pending_destroy.count(tid))
+ ino = pending_destroy[tid];
+ else if (pending_update.count(tid))
+ ino = pending_update[tid].first;
+ else
+ assert(0);
+
+ assert(pending_ops.count(ino));
+ list< pair<version_t, Context*> >& pending = pending_ops[ino];
+ list< pair<version_t, Context*> >::iterator p = pending.begin();
+ if (p->first == tid) {
+ assert(p->second == NULL);
+ } else {
+ while (p != pending.end()) {
+ if (p->first == tid)
+ break;
+ p++;
+ }
+ assert(p != pending.end());
+ assert(p->second == NULL);
+ // not the earliest pending operation, wait if it's a commit
+ if (req) {
+ p->second = new C_MDS_RetryMessage(mds, req);
+ return false;
+ }
+ }
+
+ pending.erase(p);
+ if (pending.empty()) {
+ pending_ops.erase(ino);
+ } else {
+ for (p = pending.begin(); p != pending.end() && p->second; p++) {
+ finished.push_back(p->second);
+ p->second = NULL;
+ }
+ }
+ return true;
+}
+
+bool AnchorServer::_commit(version_t tid, MMDSTableRequest *req)
{
+ list<Context *> finished;
+ if (!check_pending(tid, req, finished))
+ return false;
+
if (pending_create.count(tid)) {
dout(7) << "commit " << tid << " create " << pending_create[tid] << dendl;
pending_create.erase(tid);
@@ -206,10 +256,16 @@ void AnchorServer::_commit(version_t tid)
// bump version.
version++;
//dump();
+
+ mds->queue_waiters(finished);
+ return true;
}
void AnchorServer::_rollback(version_t tid)
{
+ list<Context *> finished;
+ check_pending(tid, NULL, finished);
+
if (pending_create.count(tid)) {
inodeno_t ino = pending_create[tid];
dout(7) << "rollback " << tid << " create " << ino << dendl;
@@ -234,6 +290,7 @@ void AnchorServer::_rollback(version_t tid)
// bump version.
version++;
//dump();
+ mds->queue_waiters(finished);
}
/* This function DOES put the passed message before returning */
@@ -30,6 +30,7 @@ class AnchorServer : public MDSTableServer {
map<version_t, inodeno_t> pending_create;
map<version_t, inodeno_t> pending_destroy;
map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
+ map<inodeno_t, list<pair<version_t, Context*> > > pending_ops;
void reset_state();
void encode_server_state(bufferlist& bl) {
@@ -47,17 +48,27 @@ class AnchorServer : public MDSTableServer {
::decode(pending_create, p);
::decode(pending_destroy, p);
::decode(pending_update, p);
+
+ map<version_t, inodeno_t> sort;
+ sort.insert(pending_create.begin(), pending_create.end());
+ sort.insert(pending_destroy.begin(), pending_destroy.end());
+ for (map<version_t, pair<inodeno_t, vector<Anchor> > >::iterator p = pending_update.begin();
+ p != pending_update.end(); p++)
+ sort[p->first] = p->second.first;
+ for (map<version_t, inodeno_t>::iterator p = sort.begin(); p != sort.end(); p++)
+ pending_ops[p->second].push_back(pair<version_t, Context*>(p->first, NULL));
}
bool add(inodeno_t ino, inodeno_t dirino, __u32 dn_hash, bool replace);
void inc(inodeno_t ino, int ref=1);
void dec(inodeno_t ino, int ref=1);
+ bool check_pending(version_t tid, MMDSTableRequest *req, list<Context *>& finished);
void dump();
// server bits
void _prepare(bufferlist &bl, uint64_t reqid, int bymds);
- void _commit(version_t tid);
+ bool _commit(version_t tid, MMDSTableRequest *req=NULL);
void _rollback(version_t tid);
void handle_query(MMDSTableRequest *m);
};
@@ -82,7 +82,9 @@ void MDSTableServer::handle_commit(MMDSTableRequest *req)
assert(g_conf->mds_kill_mdstable_at != 5);
- _commit(tid);
+ if (!_commit(tid, req))
+ return;
+
_note_commit(tid);
mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_COMMIT, 0, -1,
tid, version));
@@ -78,7 +78,7 @@ private:
public:
virtual void handle_query(MMDSTableRequest *m) = 0;
virtual void _prepare(bufferlist &bl, uint64_t reqid, int bymds) = 0;
- virtual void _commit(version_t tid) = 0;
+ virtual bool _commit(version_t tid, MMDSTableRequest *req=NULL) = 0;
virtual void _rollback(version_t tid) = 0;
virtual void _server_update(bufferlist& bl) { assert(0); }
@@ -101,7 +101,7 @@ bool SnapServer::_is_prepared(version_t tid)
pending_destroy.count(tid);
}
-void SnapServer::_commit(version_t tid)
+bool SnapServer::_commit(version_t tid, MMDSTableRequest *req)
{
if (pending_create.count(tid)) {
dout(7) << "commit " << tid << " create " << pending_create[tid] << dendl;
@@ -134,6 +134,7 @@ void SnapServer::_commit(version_t tid)
// bump version.
version++;
//dump();
+ return true;
}
void SnapServer::_rollback(version_t tid)
@@ -70,7 +70,7 @@ public:
// server bits
void _prepare(bufferlist &bl, uint64_t reqid, int bymds);
bool _is_prepared(version_t tid);
- void _commit(version_t tid);
+ bool _commit(version_t tid, MMDSTableRequest *req=NULL);
void _rollback(version_t tid);
void _server_update(bufferlist& bl);
void handle_query(MMDSTableRequest *m);