发布于 

Ceph-读写流程:客户端写流程分析(二)

本文接上文对 Filer::write_trunc 及之后的流程做了梳理和总结.

正文

本文是 Ceph 读写流程:客户端写流程分析 的后续,前文包括 ll_openll_write 的处理流程。

Client 通过 Filer::write_trunc 写文件时会做两件事: 通过 Striper::file_to_extents 做一次客户端条带化,将要写入的部分转换为对象的 extent 集合;接着通过 objecter->sg_write_trunc 进行写入

1
2
3
4
5
6
7
8
void write_trunc (...)
{
vector<ObjectExtent> extents;
Striper::file_to_extents(cct, ino, layout, offset, len, truncate_size,
extents);
objecter->sg_write_trunc(extents, snapc, bl, mtime, flags,
truncate_size, truncate_seq, oncommit, op_flags);
}

而在 sg_write_trunc 中,objecter 会向每个 ObjectExtent 对应的 OSD 发送 op 请求,这里通过 write_trunc 完成,考虑 extents.size() == 1 的情况如下:

1
2
3
4
5
6
if (extents.size() == 1) {
write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
extents[0].length, snapc, bl, mtime, flags,
extents[0].truncate_size, trunc_seq, oncommit,
0, 0, op_flags);
} else {

接着从 Objecter::write_trunc 到请求被发送到分几步进行,首先在 write_trunc 中 new 一个 Op 并调用 op_submit:

1
2
3
4
5
ceph_tid_t write_trunc(...)
{
...
Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver);
op_submit(o, &tid);

Objecter::op_submit 中调用 Objecter::op_submit_with_budget 看需不需要 throttle,接着正式调用 Objecter::_op_submit,在 _op_submit 中获取或创建 OSDSession,然后通过 _send_op 发送:

1
2
3
4
5
void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid)
{
r = _get_session(op->target.osd, &s, sul);
...
_send_op(op);

Objecter::_send_op 中通过 Op 构建 MOSDOp 并通过 session 对应的 connection 发送到 OSD:

1
2
3
4
5
void Objecter::_send_op(Op *op)
{
MOSDOp *m = _prepare_osd_op(op);
...
op->session->con->send_message(m);

MOSDOp 继承自 MOSDFastDispatchOp(Message 的子类),header.type 类型是 CEPH_MSG_OSD_OP,消息到达 OSD 后通过 ms_fast_dispath 处理,正常情况下通过 dispatch_session_waiting 加入到处理队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void OSD::ms_fast_dispatch(Message *m)
{
...
} else {
// legacy client, and this is an MOSDOp (the *only* fast dispatch
// message that didn't have an explicit spg_t); we need to map
// them to an spg_t while preserving delivery order.
auto priv = m->get_connection()->get_priv();
if (auto session = static_cast<Session*>(priv.get()); session) {
std::lock_guard l{session->session_dispatch_lock};
op->get();
session->waiting_on_map.push_back(*op);
OSDMapRef nextmap = service.get_nextmap_reserved();
dispatch_session_waiting(session, nextmap);
service.release_map(nextmap);
}
}
...

void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap)
{
auto i = session->waiting_on_map.begin();
while (i != session->waiting_on_map.end()) {
...
spg_t pgid;
if (m->get_type() == CEPH_MSG_OSD_OP) {
pg_t actual_pgid = osdmap->raw_pg_to_pg(static_cast<const MOSDOp*>(m)->get_pg());
if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {
continue;
}
}
enqueue_op(pgid, std::move(op), m->get_map_epoch());
}
...


void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch)
{
...
op_shardedwq.queue(
OpSchedulerItem(
unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
cost, priority, stamp, owner, epoch));

可以看到实际上就是向 op_shardedwq 中投递了一个 PGOpItem,而这个 op_shardedwq 实际上关联到的就是 osd_op_tp 这个 SharedThreadPool,因此 PGOpItem 投递进来之后就会被某一个 Worker 执行,执行的过程也是类似的,从 dequeue_op 开始

OSD::dequeue_op 中,worker 拿到了 op 和其对应的 pg (已经拿到了 pglock),于是调用 PrimaryLogPG::do_request 执行:

1
2
3
void OSD::dequeue_op(PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle)
{
pg->do_request(op, handle);

进入到 PrimaryLogPG::do_request 则会按照 op 的类型做不同的处理,之前我们提到 MOSDOpheader.type 类型是 CEPH_MSG_OSD_OP,因此这里进入 do_op 中,其它类型对应不同的处理,比如 do_backfilldo_scan 等等:

1
2
3
4
5
6
7
void PrimaryLogPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle)
{
...
switch (msg_type) {
case CEPH_MSG_OSD_OP:
...
do_op(op);

继续进入到 PrimaryLogPG::do_op 中,在这个函数中做的事情非常多非常复杂,这里看主要流程,排除异常处理流程(需要 defer 或者 discard 的情况)后 do_op 函数中首先通过 PrimaryLogPG::find_object_context 查找对象上下文,由于此时还没有所以会在调用时通过 PrimaryLogPG::create_object_context 创建一个并返回,至于 object_context 的作用和意义这里先不做深究:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
   2312int r = find_object_context(
2313 ▏ ▏ oid, &obc, can_create,
2314 ▏ ▏ m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
2315 ▏ ▏ &missing_oid);

int PrimaryLogPG::find_object_context(..., ObjectContextRef *pobc, ...)
{
if (oid.snap == CEPH_NOSNAP) {
ObjectContextRef obc = get_object_context(oid, can_create);
*pobc = obc;
return 0;
}
...

ObjectContextRef PrimaryLogPG::get_object_context(...)
{
ObjectContextRef obc = object_contexts.lookup(soid);
if (obc) {
...
} else {
if (attrs) {
...
} else {
int r = pgbackend->objects_get_attr(soid, OI_ATTR, &bv);
if (r < 0) {
object_info_t oi(soid);
SnapSetContext *ssc = get_snapset_context(soid, true, 0, false);
obc = create_object_context(oi, ssc);
return obc;
}
}
}
...

接着往下走创建一个 OpContext,并正式通过 execute_ctx 开始执行:

1
2
OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
execute_ctx(ctx);