跳转到主要内容

热门内容

今日:


总体:


最近浏览:


Chinese, Simplified

category

“分析ceph网络模块”系列文章解释了ceph守护进程如何相互通信。这篇文章介绍了网络体系结构概述。

Ceph存储集群大约由四种类型的守护程序组成:OSD守护程序(OSD)、Ceph监视器(mon)、Cef管理器(mgr)和Ceph元数据服务器(mds)。

Ceph官方文档提供了一个非常高级的图表,描述了Ceph架构:


High level Ceph architecture. [src]

但是,这些库(librbd、librgw、libcephfs、librados)是如何具体实现的,以与RADOS中的底层Ceph守护进程通信?守护进程如何相互通信以管理Ceph集群?本帖文将有关这一问题的帖子归纳为1 2 3 4 5 6 7。

1.Ceph客户端网络架构概述



高级Ceph架构。[src]

Ceph客户端使用Messenger类(目前唯一使用的Messenger是AsyncMessenger)与Ceph集群进行通信。Ceph用户可以通过使用librgw、librbd、cephfs(Ceph内核或Ceph-fuse,我将仅解释后者)或librados来使用Ceph集群。除了cephfs之外的所有库都在其底层系统中使用库,该系统使用AsyncMessenger进行通信。

如您所见,用于连接ceph-mon或ceph-mgr的MonClient和MgrClient也使用AsyncMessenger。换句话说,所有守护进程也使用Messenger类来相互通信。

详细的通信流程将在后面解释。

2.Ceph网络模块内部架构



Ceph网络架构。

为了了解客户端和守护进程是如何通信的,我们首先了解Ceph中使用的内部网络体系结构。上图是一个粗略绘制的Ceph网络架构类图。注意,它不是一个纯粹的UML类图;箭头的类型意味着:

  1. 带白色方块箭头的实线:表示类继承。
  2. 带线箭头的实线:表示所有权。
  3. 带线箭头的点线:表示参考。



Ceph网络功能执行流程。流也包括异步流,所以不应该将y轴视为时间轴。

我将重点关注的课程是1。AsyncMessenger,2。NetworkStack和Worker,3。处理器,4。EventCenter和5。异步连接。这些类纠缠在一起并相互作用以实现异步网络机制。

1.异步检测器


AsyncMessenger用于维护一组异步连接,它可能拥有一个绑定地址,并且接受的连接将由AsyncMessenger管理。

ceph/src/msg/async/AsyncMessenger.h

在通过Processor类建立连接后,AsyncMessenger管理与ceph::undered_map<entity_addrvec_t,AsyncConnectionRef>连接的连接。

在其构造函数中,它通过初始化NetworkStack和Processor类来准备一个较低的网络控制平面。AsyncMessenger是类的所有者,并负责管理这些类。

AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
                              const std::string &type, std::string mname, uint64_t _nonce) {
 ...
 auto single = &cct->lookup_or_create_singleton_object<StackSingleton>(
   "AsyncMessenger::NetworkStack::" + transport_type, true, cct);
 // NetworkStack::create() is called in StackSingleton::ready().
 single->ready(transport_type);
 stack = single->stack.get();
 stack->start();
 ...
 unsigned processor_num = 1;
 if (stack->support_local_listen_table())
   processor_num = stack->get_num_worker();
 for (unsigned i = 0; i < processor_num; ++i)
   processors.push_back(new Processor(this, stack->get_worker(i), cct));
}


2.NetworkStack和Worker


NetworkStack是当前使用的网络堆栈的抽象类。Ceph提供posix、RDMA和DPDK网络堆栈。本文仅解释posix网络堆栈:使用posix套接字API(bind()、listen()、send()和recv()等)。

NetworkStack构造函数在内部创建在独立线程中运行的Worker类实例。辅助线程使主线程摆脱阻塞的等待机制(如epoll_wait())。NetworkStack是所有相应Worker实例的所有者,并负责管理这些实例。

src/msg/async/Stack.h

class NetworkStack {
 std::string type;
 // Overridden by PosixStack, RDMAStack, and DPDKStack.
 virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
 
protected:
 std::vector<Wroker*> workers;
 explicit NetworkStack(CephContext *c, const std::string &t);
public:
 static std::shared_ptr<NetworkStack> create(CephContext *c, const std::string &type);
};
// Static function. Stack can only be instantiated through this function.
std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const std::string &t) {
 std::shared_ptr<NetworkStack> stack = nullptr;
 // Create a proper stack regarding the type t
 if (t == "posix")
   stack.reset(new PosixNetworkStack(c, t));
 ...
 const int InitEventNumber = 5000;
 for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
   // Calls PosixNetworkStack->create_worker(), which creates a new PosixWorker.
   Worker *w = stack->create_worker(c, worker_id);
   w->center.init(InitEventNumber, worker_id, t);
   stack->workers.push_back(w);
 }
 return stack;
}


线程不是在NetworkStack构造函数中创建的,而是在NetworkStack::start()中创建的。AsyncMessenger在创建NetworkStack类实例后调用该函数。

src/msg/async/Stack.cc、src/msg/async/PoxStack.h

void NetworkStack::start() {
 ...
 for (unsigned i = 0; i < num_workers; ++i) {
   if (workers[i]->is_init())
     continue;
   std::function<void ()> thread = add_thread(i);
   spawn_worker(i, std::move(thread));
 }
}
std::function<void ()> NetworkStack::add_thread(unsigned worker_id) {
 Worker *w = workers[worker_id];
 return [this, w]() {
   ...
   while (!w->done) {
     // Calls epoll_wait with EpollDriver.
     w->center.process_events(EventMaxWaitUs, &dur);
   }
   w->reset();
   w->destroy();
 };
}
void PosixNetworkStack::spawn_worker(unsigned i, std::function<void ()> &&func) override {
 threads.resize(i+1);
 threads[i] = std::thread(func);
}


我稍后会解释,但这篇文章假设我们使用Epoll作为事件处理机制。w->center.process_events()在内部调用被阻止的epoll_wait(),并在收到事件时唤醒。

当它接收到回调时,它调用给定回调引用变量的do_request()函数,在本例中,它是Processor类实例的listen_handler变量。

src/msg/async/AsyncMessenger.cc

class Processor::C_processor_accept : public EventCallback {
 Processor *pro;
public:
 explicit C_processor_accept(Processor *p): pro(p) {}
 void do_request(uint64_t id) override {
   pro->accept();
 }
};
Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c)
 : msgr(r), net(c), worker(w), listen_handler(new C_processor_accept(this)) {}


3.处理器


处理器由AsyncMessenger管理,并负责

绑定并侦听套接字,
当异步事件到达时创建回调实例,
并接受建立连接的请求。
src/msg/async/AsyncMessenger.cc

int Processor::bind(const entity_addrvec_t &bind_addrs,
                   const std::set<int>& avoid_ports,
                   entity_addrvec_t* bound_addrs) {
 ...
 for (unsigned k = 0; k < bind_addrs.v.size(); ++k) {
   auto& listen_addr = bound_addrs->v[k];
   worker->center.submit_to(worker->center.get_id(),
       [this, k, &listen_addr, &opts, &r]() {
         // In this case, it calls PosixWorker::listen().
         worker->listen(listen_addr, k, opts, &listen_sockets[k]);
       },
       false
   );
 }
 ...
}
int Processor::start() {
 ...
 worker->center.submit_to(worker->center.get_id(),
     [this]() {
       for (auto& listen_socket : listen_sockets) {
         worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
       }
     },
     false
 );
}


Processor:accept()在连接请求到达时由Processor类实例的listen_handler成员变量调用。

void Processor::accept() {
 ...
 ConnectedSocket cli_socket;
 Worker *w = worker;
 listen_socket.accept(&cli_socket, opts, &addr, w);
 msgr->add_accept(w, std::move(cli_socket), msgr->get_myaddrs().v[listen_socket.get_addr_slot()], addr);
 ...
}


4.活动中心


如前所述,Processor使用EventCenter为异步事件注册回调处理程序。EventCenter通过使用EventDriver来管理异步事件机制,EventDriver由EpollDriver、DPDKDriver、KqueueDriver和SelectDriver继承。在Linux中,默认情况下使用EpollDriver。

当Processor类由AsyncMessenger启动时,它会调用EventCenter类的create_file_event()。它在内部调用driver的add_event(),后者在EpollDriver中调用epoll_add()。

src/msg/async/AsyncMessenger.cc、src/msg/async/Event.cc、src/sg/async/EventEpoll.cc

void Processor::start() {
 worker->center.submit_to(worker->center.get_id(), [this]() {
   ...
   worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler);
 }, false);
}
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) {
 ...
 driver->add_event(fd, event->mask, mask);
 ...
}

int EpollDriver::add_event(int fd, int cur_mask, int add_mask) {
 struct epoll_event ee;
 int op = cur-mask == EVENT_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
 ee.events = EPOLLET;
 if (add_mask & EVENT_READABLE) ee.events |= EPOLLIN;
 if (add_mask & EVENT_WRITABLE) ee.events |= EPOLLOUT;
 ee.data.u64 = 0;
 ee.data.fd = fd;
 epoll_ctl(epfd, op, fd, &ee);
 ...
}


请注意,当另一个线程已经调用epoll_wait时执行epoll_ctl并没有问题。请参阅此。

当一个线程在调用epoll_pwait()时被阻塞时,另一个线程可以向等待的epoll实例添加文件描述符。

因此,当工作线程已经在等待epoll_wait()返回时,我们可以在epoll中添加一个文件描述符。

5.异步连接


这篇文章没有解释这门课。建立连接后,Processor::accept()将创建ConnectedSocket和AsyncConnection类实例。

将在下一篇文章中讨论。

  1. Ceph ASyncMessenger 简析 I ↩︎

  2. Ceph Async Messenger ↩︎

  3. Ceph ASync Messenger ↩︎

  4. Ceph async network communication source code analysis 1 ↩︎

  5. Ceph AsyncMessenger source analysis (below) ↩︎

  6. Ceph network module - AsyncMessenger data structure analysis ↩︎

  7. Ceph network module - AsyncMessenger code flow analysis ↩︎

本文地址
最后修改
星期三, June 5, 2024 - 22:28
Article