@@ -69,11 +69,11 @@ void Elector::start()
bump_epoch(epoch+1); // odd == election cycle
start_stamp = g_clock.now();
electing_me = true;
- acked_me.insert(whoami);
+ acked_me.insert(mon->whoami);
// bcast to everyone else
for (unsigned i=0; i<mon->monmap->size(); ++i) {
- if ((int)i == whoami) continue;
+ if ((int)i == mon->whoami) continue;
mon->messenger->send_message(new
MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap),
mon->monmap->get_inst(i));
}
@@ -151,7 +151,7 @@ void Elector::victory()
for (set<int>::iterator p = quorum.begin();
p != quorum.end();
++p) {
- if (*p == whoami) continue;
+ if (*p == mon->whoami) continue;
MMonElection *m = new MMonElection(MMonElection::OP_VICTORY,
epoch, mon->monmap);
m->quorum = quorum;
mon->messenger->send_message(m, mon->monmap->get_inst(*p));
@@ -186,7 +186,7 @@ void Elector::handle_propose(MMonElection *m)
}
}
- if (whoami < from) {
+ if (mon->whoami < from) {
// i would win over them.
if (leader_acked >= 0) { // we already acked someone
assert(leader_acked < from); // and they still win, of course
@@ -250,7 +250,7 @@ void Elector::handle_victory(MMonElection *m)
dout(5) << "handle_victory from " << m->get_source() << dendl;
int from = m->get_source().num();
- assert(from < whoami);
+ assert(from < mon->whoami);
assert(m->epoch % 2 == 0);
// i should have seen this election if i'm getting the victory.
@@ -32,7 +32,6 @@ class Monitor;
class Elector {
private:
Monitor *mon;
- int whoami;
Context *expire_event;
@@ -71,7 +70,7 @@ class Elector {
void handle_victory(class MMonElection *m);
public:
- Elector(Monitor *m, int w) : mon(m), whoami(w),
+ Elector(Monitor *m) : mon(m),
expire_event(0),
epoch(0),
electing_me(false),
@@ -93,7 +93,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger
*m, MonMap *map) :
state(STATE_STARTING), stopping(false),
- elector(this, w),
+ elector(this),
mon_epoch(0),
leader(0),
paxos(PAXOS_NUM), paxos_service(PAXOS_NUM),
@@ -114,7 +114,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger
*m, MonMap *map) :
Paxos *Monitor::add_paxos(int type)
{
- Paxos *p = new Paxos(this, whoami, type);
+ Paxos *p = new Paxos(this, type);
paxos[type] = p;
return p;
}
@@ -23,7 +23,7 @@
#define DOUT_SUBSYS paxos
#undef dout_prefix
-#define dout_prefix _prefix(mon, whoami, machine_name, state, last_committed)
+#define dout_prefix _prefix(mon, mon->whoami, machine_name, state,
last_committed)
static ostream& _prefix(Monitor *mon, int whoami, const char
*machine_name, int state, version_t last_committed) {
return *_dout << dbeginl
<< "mon" << whoami
@@ -87,7 +87,7 @@ void Paxos::collect(version_t oldpn)
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
- if (*p == whoami) continue;
+ if (*p == mon->whoami) continue;
MMonPaxos *collect = new MMonPaxos(mon->get_epoch(),
MMonPaxos::OP_COLLECT, machine_id);
collect->last_committed = last_committed;
@@ -336,7 +336,7 @@ void Paxos::begin(bufferlist& v)
// accept it ourselves
accepted.clear();
- accepted.insert(whoami);
+ accepted.insert(mon->whoami);
new_value = v;
mon->store->put_bl_sn(new_value, machine_name, last_committed+1);
@@ -356,7 +356,7 @@ void Paxos::begin(bufferlist& v)
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
- if (*p == whoami) continue;
+ if (*p == mon->whoami) continue;
dout(10) << " sending begin to mon" << *p << dendl;
MMonPaxos *begin = new MMonPaxos(mon->get_epoch(),
MMonPaxos::OP_BEGIN, machine_id);
@@ -483,7 +483,7 @@ void Paxos::commit()
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
- if (*p == whoami) continue;
+ if (*p == mon->whoami) continue;
dout(10) << " sending commit to mon" << *p << dendl;
MMonPaxos *commit = new MMonPaxos(mon->get_epoch(),
MMonPaxos::OP_COMMIT, machine_id);
@@ -527,7 +527,7 @@ void Paxos::extend_lease()
lease_expire = g_clock.now();
lease_expire += g_conf.mon_lease;
acked_lease.clear();
- acked_lease.insert(whoami);
+ acked_lease.insert(mon->whoami);
dout(7) << "extend_lease now+" << g_conf.mon_lease << " (" <<
lease_expire << ")" << dendl;
@@ -535,7 +535,7 @@ void Paxos::extend_lease()
for (set<int>::const_iterator p = mon->get_quorum().begin();
p != mon->get_quorum().end();
++p) {
- if (*p == whoami) continue;
+ if (*p == mon->whoami) continue;
MMonPaxos *lease = new MMonPaxos(mon->get_epoch(),
MMonPaxos::OP_LEASE, machine_id);
lease->last_committed = last_committed;
lease->lease_timestamp = lease_expire;
@@ -724,7 +724,7 @@ version_t Paxos::get_new_proposal_number(version_t gt)
last_pn /= 100;
last_pn++;
last_pn *= 100;
- last_pn += (version_t)whoami;
+ last_pn += (version_t)mon->whoami;
// write
mon->store->put_int(last_pn, machine_name, "last_pn");
@@ -68,7 +68,6 @@ class Paxos;
// i am one state machine.
class Paxos {
Monitor *mon;
- int whoami;
// my state machine info
int machine_id;
@@ -225,8 +224,8 @@ private:
version_t get_new_proposal_number(version_t gt=0);
public:
- Paxos(Monitor *m, int w,
- int mid) : mon(m), whoami(w),
+ Paxos(Monitor *m,
+ int mid) : mon(m),
machine_id(mid),
machine_name(get_paxos_name(mid)),
state(STATE_RECOVERING),
--
1.6.4.2
From eb7308104c33aa50195e3736483c6bf9a145a518 Mon Sep 17 00:00:00 2001
From: Paul Chiang <paul_chiang@tcloudcomputing.com>
Date: Mon, 7 Jun 2010 10:16:39 +0800
Subject: [PATCH 2/2] Introduced ceph mon remove command
Signed-off-by: Paul Chiang <paul_chiang@tcloudcomputing.com>
---
src/common/LogClient.h | 1 +
src/mon/Monitor.cc | 1 +
src/mon/Monitor.h | 1 +
src/mon/MonmapMonitor.cc | 61 ++++++++++++++++++++++++++++++++++++++++++++++
src/mon/MonmapMonitor.h | 1 +
5 files changed, 65 insertions(+), 0 deletions(-)
@@ -56,6 +56,7 @@ class LogClient : public Dispatcher {
void send_log();
void handle_log_ack(MLogAck *m);
void set_synchronous(bool sync) { is_synchronous = sync; }
+ void set_mon(int mon_id) { mon = mon_id; }
LogClient(Messenger *m, MonMap *mm) :
messenger(m), monmap(mm), mon(-1), is_synchronous(false),
@@ -110,6 +110,7 @@ Monitor::Monitor(int w, MonitorStore *s, Messenger
*m, MonMap *map) :
mon_caps = new MonCaps();
mon_caps->set_allow_all(true);
mon_caps->text = "allow *";
+ myaddr = map->get_inst(w).addr;
}
Paxos *Monitor::add_paxos(int type)
@@ -58,6 +58,7 @@ class Monitor : public Dispatcher {
public:
// me
int whoami;
+ entity_addr_t myaddr;
Messenger *messenger;
Mutex lock;
@@ -48,6 +48,7 @@ bool MonmapMonitor::update_from_paxos()
dout(10) << "update_from_paxos paxosv " << paxosv
<< ", my v " << mon->monmap->epoch << dendl;
+ int original_map_size = mon->monmap->size();
//read and decode
monmap_bl.clear();
bool success = paxos->read(paxosv, monmap_bl);
@@ -58,6 +59,20 @@ bool MonmapMonitor::update_from_paxos()
//save the bufferlist version in the paxos instance as well
paxos->stash_latest(paxosv, monmap_bl);
+ if (original_map_size != mon->monmap->size())
+ {
+ _update_whoami();
+
+ // call election?
+ if (mon->monmap->size() > 1) {
+ mon->call_election();
+ } else {
+ // we're standalone.
+ set<int> q;
+ q.insert(mon->whoami);
+ mon->win_election(1, q);
+ }
+ }
return true;
}
@@ -127,6 +142,8 @@ bool MonmapMonitor::preprocess_command(MMonCommand *m)
}
else if (m->cmd[1] == "add")
return false;
+ else if (m->cmd[1] == "remove")
+ return false;
}
if (r != -1) {
@@ -177,6 +194,23 @@ bool MonmapMonitor::prepare_command(MMonCommand *m)
paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs,
paxos->get_version()));
return true;
}
+ else if (m->cmd.size() == 3 && m->cmd[1] == "remove") {
+ entity_addr_t addr;
+ parse_ip_port(m->cmd[2].c_str(), addr);
+ bufferlist rdata;
+ if (!pending_map.contains(addr)) {
+ err = -ENOENT;
+ ss << "mon " << addr << " does not exist";
+ goto out;
+ }
+
+ pending_map.remove(addr);
+ pending_map.last_changed = g_clock.now();
+ ss << "removed mon" << " at " << addr << ", there are now " <<
pending_map.size() << " monitors" ;
+ getline(ss, rs);
+ paxos->wait_for_commit(new Monitor::C_Command(mon, m, 0, rs,
paxos->get_version()));
+ return true;
+ }
else
ss << "unknown command " << m->cmd[1];
} else
@@ -203,3 +237,30 @@ void MonmapMonitor::tick()
{
update_from_paxos();
}
+
+void MonmapMonitor::_update_whoami()
+{
+ // first check if there is any change
+ if (mon->whoami < mon->monmap->size() &&
+ mon->monmap->get_inst(mon->whoami).addr == mon->myaddr)
+ {
+ return;
+ }
+
+ // then check backwards starting from min(whoami-1, size-1) since
whoami only ever decreases
+ unsigned i=(mon->whoami-1) < mon->monmap->size() ?
(mon->whoami-1):(mon->monmap->size()-1);
+ for (; i>=0; i--)
+ {
+ if (mon->monmap->get_inst(i).addr == mon->myaddr)
+ {
+ dout(10) << "Changing whoami from " << mon->whoami << " to " <<
i << dendl;
+ mon->whoami = i;
+ mon->messenger->set_myname(entity_name_t::MON(i));
+ mon->logclient.set_mon(i);
+ return;
+ }
+ }
+ dout(0) << "Cannot find myself (mon" << mon->whoami << ", " <<
mon->myaddr << ") in new monmap! I must have been removed, shutting
down." << dendl;
+ mon->shutdown();
+}
+
@@ -67,6 +67,7 @@ class MonmapMonitor : public PaxosService {
private:
bufferlist monmap_bl;
+ void _update_whoami();
};