忧郁的大能猫
好奇的探索者,理性的思考者,踏实的行动者。
Table of Contents:
TCP网络编程最本质的是处理三个半事件:
1.连接的建立,包括服务端接受(accept)新连接和客户端成功发起(connect)连接。TCP连接一旦建立,客户端和服务端是平等的,可以各自收发数据。
2.连接的断开,包括主动断开(close、shutdown)和被动断开(read(2)返回0)。
3.消息到达,文件描述符可读。这是最为重要的一个事件,对它的处理方式决定了网络编程的风格(阻塞还是非阻塞,如何处理分包,应用层的缓冲如何设计,等等)。
3.5 消息发送完毕,这算半个。对于低流量的服务,可以不必关心这个事件;另外,这里的“发送完毕”是指将数据写入操作系统的缓冲区,将由TCP协议栈负责数据的发送与重传,不代表对方已经收到数据。
conn->setConnectionCallback(connectionCallback_); //用户传
conn->setMessageCallback(messageCallback_); //用户传
conn->setWriteCompleteCallback(writeCompleteCallback_); //用户传
conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafeint buffer
- TCP是一个无边界的字节流协议,接收方必须要处理“收到的数据尚不构成一条完整的消息"
output buffer
- 发送数据时,一次发很多,发不完,不能阻塞,得根据滑动TCP窗口得等对端收了后再发
- 如果buffer里还有待发送的20kB数据,程序又写入了50kB,那么网络库不应该直接调用write(),而应该把这50kB数据append在那20kB数据之后
- 如果output buffer里还有待发送的数据,而程序又想关闭连接,得等数据发送完毕再关
如何设计并使用缓冲区?
Buffer的设计要点:
·对外表现为一块连续的内存(char* p, int len),以方便客户代码的编写。
·其size()可以自动增长,以适应不同大小的消息。
Buffer大小的设计?
一方面我们希望减少系统调用,一次读的数据越多越划算。另一方面希望减少内存占用。如果有10000个并发连接,每个连接一建立就分配各50kB的读写缓冲区的话,将占用1GB内存。
在栈上准备一个65536字节的extrabuf,然后利用readv()来读取数据,iovec有两块,第一块指向muduo Buffer中的writable字节,另一块指向栈上的extrabuf。这样如果读入的数据不多,那么全部都读到Buffer中去了;如果长度超过Buffer的writable字节数,就会读到栈上的extrabuf里,然后程序再把extrabuf里的数据append()到Buffer中
这么做利用了临时栈上空间 14 ,避免每个连接的初始Buffer过大造成的内存浪费,也避免反复调用read()的系统开销
一个连接的生命周期过程,如何被创建,如何被销毁?
如果要主动关闭连接,如何保证先发送完缓冲区中的数据,保证对方已经收到全部数据?
本端主动断开
shutdown(sockfd, SHUT_WR), 关闭写端,但是还能读对方的数据\0后对端调用 close() 或 shutdown()TcpConnection::handleRead收到 \0后, handleClose() ->TcpServer::removeConnection() -> 从 TcpServer::connections_移除此TcpConnection -> 移出 channel_监听的事件TcpConnection被移除后,生命周期结束,起成员Socket 在析构时调用close()对端主动断开
对于使用muduo库而言,只需要掌握5个关键类:EventLoop、TcpServer、TcpClient 、TcpConnection、Buffer。
TcpConnection 的生命期依靠shared_ptr管理(即用户和库共同控制)。
Buffer 的生命期由 TcpConnection 控制。
其余类的生命期由用户控制。
Buffer和InetAddress具有值语义,可以拷贝;其他class都是对象语义,不可以拷贝
常见分包方法;
1.消息长度固定,比如muduo的roundtrip示例就采用了固定的16字节消息。
2.使用特殊的字符或字符串作为消息的边界,例如HTTP协议的headers以“\r\n”为字段的分隔符。
3.在每条消息的头部加一个长度字段。
4.利用消息本身的格式来分包,例如XML格式的消息中<root>...</root>的配对,或者JSON格式中的{ ... }的配对。解析这种消息格式通常会用到状态机(state machine)。
以第三种方式为例:
增加一个 LengthHeaderCodec 的中间层来进行编码解码,真正的逻辑通过设置回调函数设置到 LengthHeaderCodec 中。
- 当消息来时:
1. TcpServer 的 setMessageCallback 设置成 LengthHeaderCodec 的 onMessage, 业务逻辑的 onMessage回调设置到 LengthHeaderCodec 中
2. 先回调 LengthHeaderCodec 的 onMessage, 再调用业务逻辑的 onMessage
- 当发消息时:先调用 LengthHeaderCodec 的 send , 封好包之后再调用网络的 send



例子代码:example/asio/chat/client.cc
比如:客户端的例子。它要读取键盘输入,而EventLoop是独占线程的,所以需要两个线程:main()函数所在的线程负责读键盘,另外用一个EventLoopThread来处理网络IO。
onConnection回调时,把 TcpConnectionPtr 保存起来TcpConnectionPtr 进行消息的发送 conn->send(&buf);loop_->runInLoop( ), 把可调用对象插入到 eventLoop 的 pendingFunctors_ 中
member1member2method1()const char*和const std::string&提供两份重载了const char* ptr_;int length_;tid() 返回线程idname() 返回线程名字isMainThread() 实现return tid() == ::getpid();pthread_mutex_t mutex_;pid_t holder_; 持有当前锁的线程的id,通过CurrentThread::tid();来获得lock() 调用pthread_mutex_lock()unlock() 调用pthread_mutex_unlock()pthread_mutex_init()pthread_mutex_destroy()MutexLock& mutex_;MutexLock 的 lock()MutexLock 的 unlock()MutexLock& mutex_; 锁pthread_cond_t pcond_; 条件变量wait() 调用pthread_cond_wait()notify() 调用pthread_cond_signal()notifyAll() 调用pthread_cond_broadcast()pthread_cond_initpthread_cond_destroyMutexLock mutex_;Condition condition_int count_wait() 当count_ > 0时,调用wait()等待countDown() count_--, 若count_ == 0, 则调用notifyAll()std::function封装到里面std::function<void ()> func_; 执行的函数体string name_;CountDownLatch* latch_;runInThread() 执行func_并处理相关异常pthread_create, 从而传到子线程中pthread_t pthreadId_; 线程idCountDownLatch latch_; 作用是真正创建并执行了线程函数后通知主线程创建成功std::functionpthread_detach()start() 调用pthread_create()创建线程,传的函数指针是startThread(), 数据指针是 ThreadData, 最终是启线程执行func_join() 调用pthread_join()typedef std::function<void ()> Task;std::vector<std::unique_ptr<muduo::Thread>> threads_; 线程的容器std::deque<Task> queue_ 任务队列 Condition notEmpty_ notFull_ 用来控制队列的空或满的等待start(numThreads_) 启动线程池,创建numThreads_个Thread对象,开启numThreads_个线程,设置的可执行对象为runInThread()runInThread() 在while循环,从队列中取task并执行run(Task task) 把Task放到队列中stop() 把threads_中的线程挨个join掉typedef std::function<void ()> Task;std::vector<std::unique_ptr<muduo::Thread>> threads_; 线程的容器std::deque<Task> queue_ 任务队列 Condition notEmpty_ notFull_ 用来控制队列的空或满的等待pthread_key_create函数来创建一个线程局部键(pkey_),它有神奇的作用,通过此key,不同的线程可以设置和获取本线程独一份的数据pthread_key_delete函数来删除之前创建的线程局部键value(): 神奇的功能都在这里,当局部键为空的时候,new一个对象并把其指针赋给局部键,非空的时候把局部键转成对象返回destructor(void *x): 此deletor会传给pthread_key_create,用于在线程结束时自动调用,清理线程局部存储中的资源instance(): 返回线程单例对象instance() 时,ThreadLocalSingleton 会检查该线程是否已经拥有 T 类型的实例。Deleter 将其与当前线程的 TLS 键关联。pthread_key_t 关联的 destructor 函数会被调用,清理 T 类型的实例。T 实例,并且不会与其他线程的实例冲突。server_threaded_highperformance.ccChannel,EventLoop 不关心 Pooler 是如何维护 Channel 列表的。只关心获得了 activeChannels_ 就回调用户的方法std::unique_ptr<EPoller> poller_;std::vector<Channel*> activeChannels_std::unique_ptr<TimerQueue> timerQueue_; 处理定时任务,和其他事件的逻辑一样std::vector<Functor> pendingFunctors_; 在主循环的末尾执行,其他线程内调用EventLoop可以把执行的方法std::unique_ptr<Channel> wakeupChannel_; 用于唤醒被阻塞住的主循环loop();
Poller::poll() 获得当前活动事件的Channel列表,然后依次调用每个Channel的handleEvent()函数。updateChannel()、removeChannel() 调用 poller 来更新 Channelwakeup(); 当阻塞到poll()调用中,它用eventfd(2)来异步唤醒queueInLoop() 将cb放入队列,并在必要时唤醒IO线程runAt(), runAfter(), runEvery()std::map<int, Channel*> channels_ 维护着所有 fd 的Channel, 此Channel是所有的,EventLoop中的是活跃的virtual poll() = 0 、 virtual updateChannel() = 0 、 virtual removeChannel() = 0newDefaultPoller() 根据设置创建具体的pollerassertInLoopThreadpollfds_ 和 上层的事件结构 channels_std::vector<struct pollfd> pollfds_poll(); 调用系统的 poll(), 把实际发生的事件填充到 pollfds_ 中, 并调用 fillActiveChannels()fillActiveChannels() 通过pollfds_ 从 channels_ 获取发生的事件的 fd 的 Channel, 并将其设置到 EventLoop 的 activeChannels_ 中updateChannel() 向 pollfds_ 和 channels_ 中添加或更新其中的数据removeChannel() 从 pollfds_ 和 channels_ 中删除数据events_ 和 上层的事件数据结构 channels_std::vector<struct epoll_event> events_poll(); 调用系统的 poll(), 把实际发生的事件填充到 events_ 中, 并调用 fillActiveChannels()fillActiveChannels() 通过events_ 从 channels_ 获取发生的事件的 fd 的 Channel, 并将其设置到 EventLoop 的 activeChannels_ 中updateChannel() 向 events_ 和 channels_ 中添加或更新其中的数据removeChannel() 从 events_ 和 channels_ 中删除数据const int fd_; 底层关注的fdint events_ 关注那些事件,读、写事件。按位进行设置的, 如 enableReading() { events_ |= kReadEvent; update(); }int revents_ 调用 poll() 后返回的那些事件std::function<void()> readCallback_, writeCallback_, closeCallback_,errorCallback_ 各个回调函数handleEvent() 根据 revents_ 返回的事件调相应的回调函数enableReading() events_设置监听读,再调用update()update()、remove() 调用 EventLoop 的 updateChannel()、removeChannel()Timer* timer_; 、 int64_t sequence_;std::function<void()> callback_Timestamp expiration_; 过期时间const double interval_; 间隔const bool repeat_ 是否重复执行Channel timerfdChannel_, 使用了一个Channel来观察timerfd_上的readable事件std::set<std::pair<Timestamp, Timer*>> timers_ 按照时间排序,使用二叉搜索树维护timer,方便按时间查找std::set<std::pair<Timer*, int64_t>> activeTimers_ 按照timerId排序,方便通过id进行查找addTimer() 有更早的timer会重置timerfd, 供EventLoop使用,EventLoop会把它封装为更好用的runAt()、runAfter()、runEvery()等函数cancel() 通过传入TimerId参数,从而在维护的数据timer结构中找到timer并删除掉handleRead() 1. timer到期事件发生时调用 2. 调用 getExpired() 获取超时的timer 3. 回调timer的用户函数 4. 调用 reset()getExpired() 从timers_中得到过期的,并把过期的删除reset() 1.若到期的timer是定时执行的,则重新添加timer,否则删除 2. 从timers_获取最早到期的时间,并重置timerfdtimers_ 和 activeTimers_ 两个结构来维护timer?
timers_ 按过期时间排序的集合,可以高效地找到最早需要触发的定时器。而 activeTimers_ 则是根据对象地址和timerid排序的,通过TimerId可以在 activeTimers_ 中快速找到,但是无法在 timers_ 中快速找到timers_ 和 activeTimers_ 的大小始终保持一致,通过这个约束来确保数据结构的一致性和操作的正确性Thread thread_;EventLoop* loop_EventLoopThread() 创建Thread对象,并设置 threadFunc()为线程函数threadFunc() 线程体函数, 创建 EventLoop 对象,并执行其 loop() 成员函数startLoop() 启动线程并返回EventLoop 对象int numThreads_;std::vector<std::unique_ptr<EventLoopThread>> threads_;std::vector<EventLoop*> loops_;EventLoop* baseLoop_; 当numThreads_ == 0 是才使用baseLoop_, 好像是用处不大start() 1. 创建 numThreads_个EventLoopThread对象,并启动 EventLoop 对象 2. 填充 threads_、loops_getNextLoop() 轮训返回loop对象const int sockfd_;bindAddress()listen()accept()Acceptor 获得的 TcpConnectionEventLoop* loop_; // the acceptor loopstd::shared_ptr<EventLoopThreadPool> threadPool_; 线程池,里面时 ioLoopstd::unique_ptr<Acceptor> acceptor_;std::map<string, TcpConnectionPtr> connections_start()
EventLoop中增加Acceptor的读事件,开启网络监听threadPool_的start(), 开启IO线程池newConnection()
newConnection()设置到Acceptor的newConnectionCallback_上了threadPool_中获取ioLoopTcpConnection 对象,设置4个回调函数,并放到 connections_ 容器中TcpConnection 对象的事件的监听removeConnection()
connections_ 容器中移出TcpConnection::connectDestroyedaccept事件后的回调Channel acceptChannel_;Socket acceptSocket_;listen() 1.调网络API开始listen 2. acceptChannel_ 启用监听可读事件handleRead() 1.调网络API accept() 2. accpet到后回调 TcpServer 传过来的 newConnectionCallback_std::unique_ptr<Socket> socket_; 拥有TCP socket,它的析构函数会close(fd)connectionCallback_, messageCallback_, writeCompleteCallback_, closeCallback_ 各种回调Buffer inputBuffer_;, Buffer outputBuffer_;Constructor 把handleXXX()系列函数设置到 channel_ 的回调中handleRead();handleWrite();handleClose();handleError();std::vector<char> buffer_;size_t readerIndex_;size_t writerIndex_;std::function<void (int sockfd)> newConnectionCallback_ 连接成功后的回调connect() 建立连接,失败会进行重试connecting() 通过上一步的调用操作系统的connect()接口的返回值获得返回状态,若是连接中,则设置socket可写的回调handleWrite() 当连接可写时再通过sockets::getSocketError(sockfd);获取socket的状态判断是否连接成功EventLoop* loop_;ConnectorPtr connector_;connectionCallback_;、messageCallback_;、writeCompleteCallback_; 提供给用户的回调typedef std::function<void (const TcpConnectionPtr&,const MessagePtr&,Timestamp)> ProtobufMessageCallbackProtobufMessageCallback messageCallback_; 用户业务逻辑的回调,回调经过 ProtobufDispatcher 后拿到的直接就是具体的protobuf消息格式onMessage(const TcpConnectionPtr& conn,Buffer* buf,Timestamp receiveTime); 解码解析的主要逻辑,解析完回调上层传过来的用户回调send(TcpConnectionPtr& conn,Message& message) 编码并进行发生的逻辑std::map<string key, Callback> callbacks_ 保存所有的处理函数onProtobufMessage() 收到数据解析成protobuf::Message的数据后,根据名字查找相应的处理函数registerMessageCallback() 注册消息处理的函数在注册出处理函数的时候,把类型模板参数传进去
注册回调函数的时候根据传入的模板类型进行向下转型
.proto 中定义并由protoc自动生成MessageType type REQUEST or RESPONSEfixed64 idstring servicestring methodbytes requestbytes responseErrorCode errorTcpServer server_;std::map<std::string, ::google::protobuf::Service*> services_;registerService(::google::protobuf::Service*); 注册rpc服务, 并调用channel->setServices(&services_); 设置到channel中onConnection(const TcpConnectionPtr& conn) 连接来了后设置RpcChannel::onMessage为MessageCallback::google::protobuf::RpcChannel 并实现其中的 CallMethod() 方法std::map<int64_t, OutstandingCall> outstandings_ 发送rpc后把rpc返回的回调放在此结构中,用于结果返回时的调用std::map<std::string, ::google::protobuf::Service*>* services_;CallMethod() 构造 RpcMessage 对象, 设置 service、method、request,并调用RpcCodec的send方法进行发送onMessage() 调用 RpcCodec 的 onMessage 方法,在构造时把 onRpcMessage() 回调传入到 RpcCodec中onRpcMessage() 使用 RpcCodec 解码后的 RpcMessage 对象
CallMethod()方法typedef ProtobufCodecLiteT<RpcMessage, rpctag> RpcCodec; 用的就是 ProtobufCodecLiteT 的编码解码HttpCallback httpCallback_; 用户传的回调函数,所有http逻辑都写在这个里面。传入的函数类型为void onRequest(const HttpRequest& req, HttpResponse* resp)setHttpCallback(const HttpCallback& cb) 设置用户的回调onMessage() 1. 调用context->parseRequest() 2. 调用onRequest()onRequest() 1.调用 httpCallback_ 回调 2. 根据 HttpResponse 对象,生成http报文,并发送HttpRequest request_;HttpRequestParseState state_; 标识当前解析的状态,用于解析http请求parseRequest(Buffer* buf, Timestamp receiveTime) 解析首行的请求体processRequestLine(const char* begin, const char* end) 解析请求行onConnection() 建立连接的时候创建Method method_;string path_;string query_;std::map<string, string> headers_;o;HttpServer 的 httpCallback_ 之前创建,并将其引用传递给 httpCallback_ 的参数std::map<string, string> headers_;HttpStatusCode statusCode_;string statusMessage_;string body_;appendToBuffer(Buffer* output) 根据HttpResponse对象构造http相应报文,并设置到 buffer 中,