Ceph-读写流程:客户端写流程分析(二)
本文接上文对 Filer::write_trunc
及之后的流程做了梳理和总结.
正文 本文是 Ceph 读写流程:客户端写流程分析 的后续,前文包括 ll_open
和 ll_write
的处理流程。
Client 通过 Filer::write_trunc
写文件时会做两件事: 通过 Striper::file_to_extents
做一次客户端条带化,将要写入的部分转换为对象的 extent 集合;接着通过 objecter->sg_write_trunc
进行写入
1 2 3 4 5 6 7 8 void write_trunc (...) { vector<ObjectExtent> extents; Striper::file_to_extents (cct, ino, layout, offset, len, truncate_size, extents); objecter->sg_write_trunc (extents, snapc, bl, mtime, flags, truncate_size, truncate_seq, oncommit, op_flags); }
而在 sg_write_trunc
中,objecter 会向每个 ObjectExtent
对应的 OSD 发送 op 请求,这里通过 write_trunc
完成,考虑 extents.size() == 1
的情况如下:
1 2 3 4 5 6 if (extents.size () == 1 ) { write_trunc (extents[0 ].oid, extents[0 ].oloc, extents[0 ].offset, extents[0 ].length, snapc, bl, mtime, flags, extents[0 ].truncate_size, trunc_seq, oncommit, 0 , 0 , op_flags);} else {
接着从 Objecter::write_trunc
到请求被发送到分几步进行,首先在 write_trunc
中 new 一个 Op 并调用 op_submit
:
1 2 3 4 5 ceph_tid_t write_trunc (...) { ... Op *o = new Op (oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, oncommit, objver); op_submit (o, &tid);
Objecter::op_submit
中调用 Objecter::op_submit_with_budget
看需不需要 throttle,接着正式调用 Objecter::_op_submit
,在 _op_submit
中获取或创建 OSDSession
,然后通过 _send_op
发送:
1 2 3 4 5 void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_tid_t *ptid){ r = _get_session(op->target.osd, &s, sul); ... _send_op(op);
Objecter::_send_op
中通过 Op
构建 MOSDOp
并通过 session
对应的 connection
发送到 OSD:
1 2 3 4 5 void Objecter::_send_op(Op *op){ MOSDOp *m = _prepare_osd_op(op); ... op->session->con->send_message (m);
MOSDOp
继承自 MOSDFastDispatchOp
(Message
的子类),header.type
类型是 CEPH_MSG_OSD_OP
,消息到达 OSD 后通过 ms_fast_dispath
处理,正常情况下通过 dispatch_session_waiting
加入到处理队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 void OSD::ms_fast_dispatch (Message *m) { ... } else { auto priv = m->get_connection ()->get_priv (); if (auto session = static_cast <Session*>(priv.get ()); session) { std::lock_guard l{session->session_dispatch_lock}; op->get (); session->waiting_on_map.push_back (*op); OSDMapRef nextmap = service.get_nextmap_reserved (); dispatch_session_waiting (session, nextmap); service.release_map (nextmap); } } ... void OSD::dispatch_session_waiting (const ceph::ref_t <Session>& session, OSDMapRef osdmap) { auto i = session->waiting_on_map.begin (); while (i != session->waiting_on_map.end ()) { ... spg_t pgid; if (m->get_type () == CEPH_MSG_OSD_OP) { pg_t actual_pgid = osdmap->raw_pg_to_pg (static_cast <const MOSDOp*>(m)->get_pg ()); if (!osdmap->get_primary_shard (actual_pgid, &pgid)) { continue ; } } enqueue_op (pgid, std::move (op), m->get_map_epoch ()); } ... void OSD::enqueue_op (spg_t pg, OpRequestRef&& op, epoch_t epoch) { ... op_shardedwq.queue ( OpSchedulerItem ( unique_ptr <OpSchedulerItem::OpQueueable>(new PGOpItem (pg, std::move (op))), cost, priority, stamp, owner, epoch));
可以看到实际上就是向 op_shardedwq
中投递了一个 PGOpItem
,而这个 op_shardedwq
实际上关联到的就是 osd_op_tp
这个 SharedThreadPool
,因此 PGOpItem
投递进来之后就会被某一个 Worker 执行,执行的过程也是类似的,从 dequeue_op
开始
在 OSD::dequeue_op
中,worker 拿到了 op 和其对应的 pg (已经拿到了 pglock),于是调用 PrimaryLogPG::do_request
执行:
1 2 3 void OSD::dequeue_op (PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle) { pg->do_request (op, handle);
进入到 PrimaryLogPG::do_request
则会按照 op 的类型做不同的处理,之前我们提到 MOSDOp
的 header.type
类型是 CEPH_MSG_OSD_OP
,因此这里进入 do_op
中,其它类型对应不同的处理,比如 do_backfill
、do_scan
等等:
1 2 3 4 5 6 7 void PrimaryLogPG::do_request (OpRequestRef& op, ThreadPool::TPHandle &handle) { ... switch (msg_type) { case CEPH_MSG_OSD_OP: ... do_op (op);
继续进入到 PrimaryLogPG::do_op
中,在这个函数中做的事情非常多非常复杂,这里看主要流程,排除异常处理流程(需要 defer 或者 discard 的情况)后 do_op
函数中首先通过 PrimaryLogPG::find_object_context
查找对象上下文,由于此时还没有所以会在调用时通过 PrimaryLogPG::create_object_context
创建一个并返回,至于 object_context
的作用和意义这里先不做深究:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 2312 ▏ int r = find_object_context ( 2313 ▏ ▏ oid, &obc, can_create, 2314 ▏ ▏ m->has_flag (CEPH_OSD_FLAG_MAP_SNAP_CLONE), 2315 ▏ ▏ &missing_oid); int PrimaryLogPG::find_object_context (..., ObjectContextRef *pobc, ...) { if (oid.snap == CEPH_NOSNAP) { ObjectContextRef obc = get_object_context (oid, can_create); *pobc = obc; return 0 ; } ... ObjectContextRef PrimaryLogPG::get_object_context (...) { ObjectContextRef obc = object_contexts.lookup (soid); if (obc) { ... } else { if (attrs) { ... } else { int r = pgbackend->objects_get_attr (soid, OI_ATTR, &bv); if (r < 0 ) { object_info_t oi (soid) ; SnapSetContext *ssc = get_snapset_context (soid, true , 0 , false ); obc = create_object_context (oi, ssc); return obc; } } } ...
接着往下走创建一个 OpContext
,并正式通过 execute_ctx
开始执行:
1 2 OpContext *ctx = new OpContext (op, m->get_reqid (), &m->ops, obc, this ); execute_ctx (ctx);