category
“分析ceph网络模块”系列文章解释了ceph守护进程如何相互通信。这篇文章介绍了网络体系结构概述。
Ceph存储集群大约由四种类型的守护程序组成:OSD守护程序(OSD)、Ceph监视器(mon)、Cef管理器(mgr)和Ceph元数据服务器(mds)。
Ceph官方文档提供了一个非常高级的图表,描述了Ceph架构:
High level Ceph architecture. [src]
但是,这些库(librbd、librgw、libcephfs、librados)是如何具体实现的,以与RADOS中的底层Ceph守护进程通信?守护进程如何相互通信以管理Ceph集群?本帖文将有关这一问题的帖子归纳为1 2 3 4 5 6 7。
1.Ceph客户端网络架构概述
高级Ceph架构。[src]
Ceph客户端使用Messenger类(目前唯一使用的Messenger是AsyncMessenger)与Ceph集群进行通信。Ceph用户可以通过使用librgw、librbd、cephfs(Ceph内核或Ceph-fuse,我将仅解释后者)或librados来使用Ceph集群。除了cephfs之外的所有库都在其底层系统中使用库,该系统使用AsyncMessenger进行通信。
如您所见,用于连接ceph-mon或ceph-mgr的MonClient和MgrClient也使用AsyncMessenger。换句话说,所有守护进程也使用Messenger类来相互通信。
详细的通信流程将在后面解释。
2.Ceph网络模块内部架构
Ceph网络架构。
为了了解客户端和守护进程是如何通信的,我们首先了解Ceph中使用的内部网络体系结构。上图是一个粗略绘制的Ceph网络架构类图。注意,它不是一个纯粹的UML类图;箭头的类型意味着:
- 带白色方块箭头的实线:表示类继承。
- 带线箭头的实线:表示所有权。
- 带线箭头的点线:表示参考。
Ceph网络功能执行流程。流也包括异步流,所以不应该将y轴视为时间轴。
我将重点关注的课程是1。AsyncMessenger,2。NetworkStack和Worker,3。处理器,4。EventCenter和5。异步连接。这些类纠缠在一起并相互作用以实现异步网络机制。
1.异步检测器
AsyncMessenger用于维护一组异步连接,它可能拥有一个绑定地址,并且接受的连接将由AsyncMessenger管理。
ceph/src/msg/async/AsyncMessenger.h
在通过Processor类建立连接后,AsyncMessenger管理与ceph::undered_map<entity_addrvec_t,AsyncConnectionRef>连接的连接。
在其构造函数中,它通过初始化NetworkStack和Processor类来准备一个较低的网络控制平面。AsyncMessenger是类的所有者,并负责管理这些类。
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
const std::string &type, std::string mname, uint64_t _nonce) {
...
auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
"AsyncMessenger::NetworkStack::" + transport_type, true, cct);
// NetworkStack::create() is called in StackSingleton::ready().
single->ready(transport_type);
stack = single->stack.get();
stack->start();
...
unsigned processor_num = 1;
if (stack->support_local_listen_table())
processor_num = stack->get_num_worker();
for (unsigned i = 0; i < processor_num; ++i)
processors.push_back(new Processor(this, stack->get_worker(i), cct));
}
2.NetworkStack和Worker
NetworkStack是当前使用的网络堆栈的抽象类。Ceph提供posix、RDMA和DPDK网络堆栈。本文仅解释posix网络堆栈:使用posix套接字API(bind()、listen()、send()和recv()等)。
NetworkStack构造函数在内部创建在独立线程中运行的Worker类实例。辅助线程使主线程摆脱阻塞的等待机制(如epoll_wait())。NetworkStack是所有相应Worker实例的所有者,并负责管理这些实例。
src/msg/async/Stack.h
class NetworkStack {
std::string type;
// Overridden by PosixStack, RDMAStack, and DPDKStack.
virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
protected:
std::vector<Wroker*> workers;
explicit NetworkStack(CephContext *c, const std::string &t);
public:
static std::shared_ptr<NetworkStack> create(CephContext *c, const std::string &type);
};
// Static function. Stack can only be instantiated through this function.
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const std::string &t) {
std::shared_ptr<NetworkStack> stack = nullptr;
// Create a proper stack regarding the type t
if (t == "posix")
stack.reset(new PosixNetworkStack(c, t));
...
const int InitEventNumber = 5000;
for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
// Calls PosixNetworkStack->create_worker(), which creates a new PosixWorker.
Worker *w = stack->create_worker(c, worker_id);
w->center.init(InitEventNumber, worker_id, t);
stack->workers.push_back(w);
}
return stack;
}
线程不是在NetworkStack构造函数中创建的,而是在NetworkStack::start()中创建的。AsyncMessenger在创建NetworkStack类实例后调用该函数。
src/msg/async/Stack.cc、src/msg/async/PoxStack.h
void NetworkStack::start() {
...
for (unsigned i = 0; i < num_workers; ++i) {
if (workers[i]->is_init())
continue;
std::function<void ()> thread = add_thread(i);
spawn_worker(i, std::move(thread));
}
}
std::function<void ()> NetworkStack::add_thread(unsigned worker_id) {
Worker *w = workers[worker_id];
return [this, w]() {
...
while (!w->done) {
// Calls epoll_wait with EpollDriver.
w->center.process_events(EventMaxWaitUs, &dur);
}
w->reset();
w->destroy();
};
}
void PosixNetworkStack::spawn_worker(unsigned i, std::function<void ()> &&func) override {
threads.resize(i+1);
threads[i] = std::thread(func);
}
我稍后会解释,但这篇文章假设我们使用Epoll作为事件处理机制。w->center.process_events()在内部调用被阻止的epoll_wait(),并在收到事件时唤醒。
当它接收到回调时,它调用给定回调引用变量的do_request()函数,在本例中,它是Processor类实例的listen_handler变量。
src/msg/async/AsyncMessenger.cc
class Processor::C_processor_accept : public EventCallback {
Processor *pro;
public:
explicit C_processor_accept(Processor *p): pro(p) {}
void do_request(uint64_t id) override {
pro->accept();
}
};
Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
: msgr(r), net(c), worker(w), listen_handler(new C_processor_accept(this)) {}
3.处理器
处理器由AsyncMessenger管理,并负责
绑定并侦听套接字,
当异步事件到达时创建回调实例,
并接受建立连接的请求。
src/msg/async/AsyncMessenger.cc
int Processor::bind(const entity_addrvec_t &bind_addrs,
const std::set<int>& avoid_ports,
entity_addrvec_t* bound_addrs) {
...
for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
auto& listen_addr = bound_addrs->v[k];
worker->center.submit_to(worker->center.get_id(),
[this, k, &listen_addr, &opts, &r]() {
// In this case, it calls PosixWorker::listen().
worker->listen(listen_addr, k, opts, &listen_sockets[k]);
},
false
);
}
...
}
int Processor::start() {
...
worker->center.submit_to(worker->center.get_id(),
[this]() {
for (auto& listen_socket : listen_sockets) {
worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
}
},
false
);
}
Processor:accept()在连接请求到达时由Processor类实例的listen_handler成员变量调用。
void Processor::accept() {
...
ConnectedSocket cli_socket;
Worker *w = worker;
listen_socket.accept(&cli_socket, opts, &addr, w);
msgr->add_accept(w, std::move(cli_socket), msgr->get_myaddrs().v[listen_socket.get_addr_slot()], addr);
...
}
4.活动中心
如前所述,Processor使用EventCenter为异步事件注册回调处理程序。EventCenter通过使用EventDriver来管理异步事件机制,EventDriver由EpollDriver、DPDKDriver、KqueueDriver和SelectDriver继承。在Linux中,默认情况下使用EpollDriver。
当Processor类由AsyncMessenger启动时,它会调用EventCenter类的create_file_event()。它在内部调用driver的add_event(),后者在EpollDriver中调用epoll_add()。
src/msg/async/AsyncMessenger.cc、src/msg/async/Event.cc、src/sg/async/EventEpoll.cc
void Processor::start() {
worker->center.submit_to(worker->center.get_id(), [this]() {
...
worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
}, false);
}
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
...
driver->add_event(fd, event->mask, mask);
...
}
int EpollDriver::add_event(int fd, int cur_mask, int add_mask) {
struct epoll_event ee;
int op = cur-mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = EPOLLET;
if (add_mask & EVENT_READABLE) ee.events |= EPOLLIN;
if (add_mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0;
ee.data.fd = fd;
epoll_ctl(epfd, op, fd, &ee);
...
}
请注意,当另一个线程已经调用epoll_wait时执行epoll_ctl并没有问题。请参阅此。
当一个线程在调用epoll_pwait()时被阻塞时,另一个线程可以向等待的epoll实例添加文件描述符。
因此,当工作线程已经在等待epoll_wait()返回时,我们可以在epoll中添加一个文件描述符。
5.异步连接
这篇文章没有解释这门课。建立连接后,Processor::accept()将创建ConnectedSocket和AsyncConnection类实例。
将在下一篇文章中讨论。
- 登录 发表评论
- 38 次浏览
最新内容
- 3 days 17 hours ago
- 3 days 19 hours ago
- 3 days 19 hours ago
- 6 days 11 hours ago
- 6 days 19 hours ago
- 6 days 19 hours ago
- 6 days 19 hours ago
- 6 days 19 hours ago
- 1 week 4 days ago
- 1 week 4 days ago