发布于 

Paxos 算法以及 Ceph 实现

本文包含 Paxos、Multi-Paxos 算法的原理以及 Ceph 中对 Paxos 算法的实现。

Basic Paxos 和 Multi-Paxos

Paxos 和 raft 算法看过不少次,但是始终属于一知半解,也没有个记录,这次把 Paxos 和 Multi-Paxos 的原理和 ceph 中 Paxos 的实现都记录一下:

首先是 Paxos 算法,朴素的 Paxos 算法是不包含 leader 角色的:

  • proposer (提议者): 提出提案,包含提案编号(proposal ID)和一个要协商的值(Val)
  • acceptor (决策者):参与决策,回应收到的来自 proposer 的提案,当多数 acceptor 赞成时提案通过,一般来说一个 proposer 同时也是一个 acceptor

Basic Paxos 算法通过两阶段通过一个决议:

  1. proposer 向 acceptors 发起 prepare 提案,这时 acceptors 会根据自身情况对提案进行回复,可以选择接受(promise)或者拒绝(可以沉默,也可以告知)
  2. proposer 得知多数 acceptor (quorum) 表示接受并承诺了这个提案,那么就会正式向 acceptors 发送 propose 请求,acceptor 会对收到的提案进行 accpet 处理

对于 prepare 消息和 promise 消息的描述如下:

  1. prepare 消息不需要携带提案内容,只需要携带全局唯一的 proposal ID 即可(比如时间戳 + rank 号)
  2. promise 请求由 acceptor 回复给 proposer,携带自身已经接受的提案中 proposal ID 最大的提案的 ID 和 Val,如果此前没有接受过提案则返回空值

另外如果 acceptor 在 prepare 阶段接受了一个 proposal 提案,那么 acceptor 需要做出两个承诺:

  1. 不再接受 proposal ID 小于等于当前请求的 prepare 请求
  2. 不再接受 proposal ID 小于当前请求的 propose 请求

proposer 在接收到 quorum 中 acceptor 的回应后,从中选出 proposal ID 最大的提案的 Val 作为本次要发起的提案,如果所有 acceptor 的回复均是空值,那么就可以自己随意决定提案 Val
acceptor 收到正式提案处理时接受并持久化当前的 proposal ID 和提案 Val

上述整体过程如下图所示:
Basic Paxos

注意一旦一个提案被多数 acceptor 接受(即形成决议)之后不会再被推翻,也就是说此后包含其他值的提案都不会再被接受了(即使新的提案拥有更大的 proposal ID)

理解 Basic Paxos 算法,最重要的一点是牢记 Basic Paxos 属于共识算法,它唯一的作用就是保证多个节点对某个值达成共识,更新这个值并不是 Basic Paxos 的任务。

Basic Paxos 具有的问题:

  1. 开销大(一次决议需要至少两次网络通信)
  2. 可能形成活锁
  3. 无法确定多个值

Multi-Paxos 相比于 Basic Paxos 的改进:

  1. 针对每个要确定的值执行一次 Paxos,多个 Paxos 实例之间使用 Instance ID 作为区分
  2. 引入 leader 的概念,只有 leader 可以提交 proposal,这样以来由于没有 proposer 竞争,一方面解决了活锁问题,另一方面也不需要 prepare,从而将两阶段提交改进为一阶段提交,降低了开销

选举 leader 的过程同样是一次决议的形成(当然也可以通过其他方式)。

如果多个(自认为是) leader 节点提交 proposal 那么将退化成 Basic Paxos,但不会导致不安全(相当于多个 proposer 同时提交 proposal).

一个可能的 Paxos 运行过程

了解了以上内容之后,这里接着给出一个可能的,当前一个 leader 发生故障而新的 leader 被选举出来之后发生的事情(取自 Paxos Made Simple):

这个新的 leader 也是这个一致性算法的所有实例中的学习者,它应该知道大多数已经被选定的命令。假设它知道 1-134、138 以及 139 号命令,也就是一致性算法的 1-134、138 以及 139 号实例的值。(我们稍后将会看到命令序列中的这样一个空缺是如何产生的。)然后它执行实例135-137以及所有大于139的实例的阶段1,假设这些执行的结果只确定了实例 135 和 140 中提议的值,但是其他实例中没有提议值的约束14。leader 执行实例 135 和 140 的阶段2,并因此可以选定 135 和 140 号命令。

leader 自身就像其他向 leader 学习 leader 所知道的所有命令的别的服务器一样,现在可以运行命令 1-135。因为136号和137号命令还没有选定,所以它还不能运行 138-140 号命令,尽管它知道 138-140 号命令。于是,我们让它通过提议将一个特殊的不会导致状态机状态切换的“no-op”命令作为第136号和137号命令(它可以通过执行一致性算法的 136 号和 137 号实例的阶段 2 来完成),以此快速填补空缺。一旦这些 no-op 命令被选定,那 138-140 号命令就可以被执行了。

现在从 1 到 140 的命令都被选定了。 leader 完成了一致性算法中大于 140 的所有实例的阶段 1,它可以在这些(完成阶段1的)实例的阶段2中自由地提议任意的值。它给某个客户端请求的下一个命令分配了 141 号命令,把它作为这个一致性算法的 141 号实例的阶段2提议的值。它接着将它收到的下一个客户端命令提议为第 142 号命令,以此类推。

leader 可以在它获知它提议的 141 号命令已被选定之前提议 142 号命令。它在提议第 141 号命令中发送的所有消息有可能丢失,而第 142 号命令会在其他服务器获知到 leader 提议的第 141 号命令之前被选定。当 leader 在实例 141 中没有收到对它的阶段2的预期响应时,它将会重发这些消息。如果一切顺利,它提议的命令会被选定。无论如何,它还是有可能在前面有失败,在选定的命令的序列上留下一段空缺。一般来说,假设一个 leader 可以提前获得 α 个命令——也就意味着,它可以在 1 到 i 号命令被选定之后提议第 i + 1 到 i + α 号命令。一个多达 α - 1 个命令的空缺可能随之形成。

一个新的被选定的 leader 执行一致性算法中的无限多的实例的阶段1——如果是在上面的场景中,就是实例 135-137,以及所有大于139的实例。让所有实例使用一样的提案编号,它可以通过向其他的服务器发送一个合理的短消息来实现这一点。在阶段1中,接受者当且仅当它已经收到了某个提议者的阶段2的消息的时候,它才会响应不止1个简单的OK。(在这个场景里,这是仅针对实例 135 和 140 的例子。)所以,一个(扮演接受者的)服务器可以用一个单一且合理短的消息回应所有的实例。因此,执行阶段1的无穷多个实例不会带来任何问题。

由于 leader 的故障以及新 leader 的选举理应很少发生,因此执行状态机命令——对命令/值达成一致的过程的有效成本,仅为运行这个一致性算法的阶段2的成本。可以看出,Paxos 一致性算法的第2阶段在存在故障的情况下,其达成协议的可能代价是所有算法中最小的。于是,Paxos 算法本质上是最优的。

Ceph 中的 Paxos 算法实现

通过上述章节我们基本了解了 Basic Paxos 和 Multi-Paxos 的基本运行方式,这一部分就来看一下 Ceph 中对于 Paxos 算法的实现和使用方式(Multi-Paxos 并没有具体的实现细节,所以 Ceph 中的只是其中一种实现方式)。

首先 Ceph 中的 Paxos (指 Multi-Paxos,下同) 在 Monitor 中实现,用于维护 monmap、osdmap、mdsmap、pgmap 等各种 map

在 Mon 启动时会初始化 Paxos(在 Monitor::init_paxos 中):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Paxos::init()
{
// load paxos variables from stable storage
last_pn = get_store()->get(get_name(), "last_pn");
accepted_pn = get_store()->get(get_name(), "accepted_pn");
last_committed = get_store()->get(get_name(), "last_committed");
first_committed = get_store()->get(get_name(), "first_committed");

dout(10) << __func__ << " last_pn: " << last_pn << " accepted_pn: "
<< accepted_pn << " last_committed: " << last_committed
<< " first_committed: " << first_committed << dendl;

dout(10) << "init" << dendl;
ceph_assert(is_consistent());
}

这里从存储中获取已经被持久化的 last-pnaccpeted_pnlast_committedfirst_committed, 其中 pn 就是 proposal num 也就是提案编号, committed 则是记录已提交的版本也就是已经达成决议的 Paxos 实例号

Probe 阶段

接着 Mon 会将自己的状态置为 STATE_PROBING,开始 bootstrap 以发现集群并同步数据:

1
2
state = STATE_PROBING;
bootstrap();

首先将自己加入一个外部的多数派集合(也就是首先默认自己不在当前集群 quorum,因为还没有同步数据):

1
2
3
// i'm outside the quorum
if (monmap->contains(name))
outside_quorum.insert(name);

接着向其他所有已知的 Mon 发送 MMonProbe 消息,其他 Mon 接到消息后进入 Monitor::handle_probe_probe 处理,这里有两种情况,正常情况下收到请求的 Mon 会向请求方返回应答,包括 leadr、 quorum 成员列表以及当前节点的 first_committedlast_committed,另一种情况则是发现请求方的 paxos 状态反而远新于自身,那么就会主动使当前节点 re-boostrap 重新同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (!is_probing() && !is_synchronizing()) {
// If the probing mon is way ahead of us, we need to re-bootstrap.
// Normally we capture this case when we initially bootstrap, but
// it is possible we pass those checks (we overlap with
// quorum-to-be) but fail to join a quorum before it moves past
// us. We need to be kicked back to bootstrap so we can
// synchonize, not keep calling elections.
if (paxos->get_version() + 1 < m->paxos_first_version) {
dout(1) << " peer " << m->get_source_addr() << " has first_committed "
<< "ahead of us, re-bootstrapping" << dendl;
bootstrap();
goto out;

}
}

接着回到刚启动的 Mon 通过 Monitor::handle_probe_reply 处理其他 Mon 的回复,同样会对其他 Mon 的 committed version 进行检查,如果中间有空隙或者需要版本差别太远那么就需要从其他 Mon 全量复制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   if (paxos->get_version() < m->paxos_first_version &&
m->paxos_first_version > 1) { // no need to sync if we're 0 and they start at 1.
dout(10) << " peer paxos first versions [" << m->paxos_first_version
<< "," << m->paxos_last_version << "]"
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
cancel_probe_timeout();
sync_start(other, true);
return;
}
if (paxos->get_version() + g_conf()->paxos_max_join_drift < m->paxos_last_version) {
dout(10) << " peer paxos last version " << m->paxos_last_version
<< " vs my version " << paxos->get_version()
<< " (too far ahead)"
<< dendl;
cancel_probe_timeout();
sync_start(other, false);
return;
}

除此以外暂时先不用立刻同步到最新版本,此时如果对方处于 quorum 中并且自身在 monmap 中那么就可以通过 start_election 加入 quorum:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if (m->quorum.size()) {
bool in_map = false;
const auto my_info = monmap->mon_info.find(name);
const map<string,string> *map_crush_loc{nullptr};
if (my_info != monmap->mon_info.end()) {
in_map = true;
map_crush_loc = &my_info->second.crush_loc;
}
if (in_map &&
!monmap->get_addrs(name).front().is_blank_ip() &&
(!need_set_crush_loc || (*map_crush_loc == crush_loc))) {
// i'm part of the cluster; just initiate a new election
start_election();
}

或者发现不在 quorum 中的 mon 数量已经超过了半数,那么就可以通过选举来形成一个新的多数派:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (monmap->contains(m->name)) {
dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
outside_quorum.insert(m->name);
}

unsigned need = monmap->min_quorum_size();
dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
if (outside_quorum.size() >= need) {
if (outside_quorum.count(name)) {
dout(10) << " that's enough to form a new quorum, calling election" << dendl;
start_election();
}
}

Election 选举

选举阶段通过 Monitor::start_election 开始:

1
2
3
4
5
6
7
void Monitor::start_election()
{
dout(10) << "start_election" << dendl;
state = STATE_ELECTING;
...
elector.call_election();
}

进入到 ElectionLogic::start 中,Mon 通过 epoch 的奇偶性判断当前 Mon 是否处于选举状态,如果 epoch 是偶数即稳定态,则需要使 epoch 加一进入选举态:

1
2
if (epoch % 2 == 0) {
bump_epoch(epoch+1); // odd == election cycle

接着在 Elector::propose_to_peers 中向 monmap 中的其他 Mon 发送 MMonElection::OP_PROPOSE 消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Elector::propose_to_peers(epoch_t e, bufferlist& logic_bl)
{
// bcast to everyone else
for (unsigned i=0; i<mon->monmap->size(); ++i) {
if ((int)i == mon->rank) continue;
MMonElection *m =
new MMonElection(MMonElection::OP_PROPOSE, e,
peer_tracker.get_encoded_bl(),
logic.strategy, mon->monmap);
m->sharing_bl = logic_bl;
m->mon_features = ceph::features::mon::get_supported();
m->mon_release = ceph_release();
mon->send_mon_message(m, i);
}
}

那么其他 Mon 在收到 MMonElection::OP_PROPOSE 后进入 Elector::handle_propose 处理,这里如果不满足一些条件的话会向请求客户端发一个 MMonElection::OP_NAK 消息,接着进入 ElectionLogic::receive_propose 中,这里对于不同的 strategy 设置进入不同的处理流程,在 CLASSIC 也就是默认策略下 Ceph 会人为制造 Mon 直接地位的不平等,也就是 rank 号越小的 Mon 越能赢得选举,那么如果对方的 rank 号比自己小的话就会通过 defer 赞成对方,同样的如果对方 rank 比自己还大,但自己又没有在选举状态的话那么就会发起一个新的选举尝试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (elector->get_my_rank() < from) {
// i would win over them.
if (leader_acked >= 0) { // we already acked someone
ceph_assert(leader_acked < from); // and they still win, of course
ldout(cct, 5) << "no, we already acked " << leader_acked << dendl;
} else {
// wait, i should win!
if (!electing_me) {
elector->trigger_new_election();
}
}
} else {
// they would win over me
if (leader_acked < 0 || // haven't acked anyone yet, or
leader_acked > from || // they would win over who you did ack, or
leader_acked == from) { // this is the guy we're already deferring to
defer(from);
} else {
// ignore them!
ldout(cct, 5) << "no, we already acked " << leader_acked << dendl;
}
}

ElectionLogic::defer 中会记录自己已经响应过的 Mon,如果新的 Propose 请求的 rank 小于自己曾经回应过的 Mon 才会回应,否则的话直接忽略掉。

当 Mon 收到 MMonElection::OP_ACK 回复后进入 Elector::handle_ack 处理,首先将对方加入 acked_me 中,如果所有人都承认自己的话宣布胜利,也就是 Paxos 的第二阶段,向其他 Mon 发送 MMonElection::OP_VICTORY 消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ElectionLogic::receive_ack(int from, epoch_t from_epoch)
{
...
acked_me.insert(from);
if (acked_me.size() == elector->paxos_size()) {
// if yes, shortcut to election finish
declare_victory();
}
...

void Elector::message_victory(const std::set<int>& quorum)
{
...
// tell everyone!
for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) {
if (*p == mon->rank) continue;
MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, get_epoch(),
peer_tracker.get_encoded_bl(),
logic.strategy, mon->monmap);
...

那如果在一定时间内没有收到所有人的回复的话,那么按照 Paxos 的规则只要大多数也就是过半的 Mon 表示同意的话也能形成 quorum,此时同样可以宣布选举胜利:

1
2
3
4
5
6
7
8
9
10
11
void ElectionLogic::end_election_period()
{
ldout(cct, 5) << "election period ended" << dendl;

// did i win?
if (electing_me &&
acked_me.size() > (elector->paxos_size() / 2)) {
// i win
declare_victory();
} else {
...

那么宣布胜利之后 Mon 就会把自己的状态设为 STATE_LEADER (这个过程在 Monitor::win_election 中完成),其他成员则把自己设为 STATE_PEON

Recovery 阶段

之前提到选举之前如果自己记录的 Propose 有部分缺失的话是不需要立刻恢复的,那么这里成为 leader 之后就要开始 Paxos Recovery 了,也就是之前示例中填补空洞的过程,在 Monitor::win_election 中调用 Paxos::leader_init 开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void Monitor::win_election(epoch_t epoch, const set<int>& active, uint64_t features,
const mon_feature_t& mon_features,
ceph_release_t min_mon_release,
const map<int,Metadata>& metadata)
{
...
paxos->leader_init();
...

void Paxos::leader_init()
{
cancel_events();
new_value.clear();

// discard pending transaction
pending_proposal.reset();

reset_pending_committing_finishers();

logger->inc(l_paxos_start_leader);

if (mon.get_quorum().size() == 1) {
state = STATE_ACTIVE;
return;
}

state = STATE_RECOVERING;
lease_expire = {};
dout(10) << "leader_init -- starting paxos recovery" << dendl;
collect(0);
}

未完待续

Propose 提案

未完待续

References