一、准备

项目地址:echo

  • 阻塞式 I/O 模型:等待到数据准备好后,再继续处理。
  • 非阻塞式 I/O 模型:直接返回,若数据未准备好返回错误码。
  • I/O 复用模型:阻塞在 select 这样的系统调用上,而非等待数据准备上。
  • 异步 I/O 模型:进程处理完数据后,通知用户。

以上前三种模型为同步 I/O 模型,具体实例与实现思路见下。

阻塞 I/O - 单线程

服务端与客户端链接,当没有数据时会挂起当前进程,直到有数据到来才唤醒进程去处理数据。

阻塞 I/O - 多线程

单线程中若进程被阻塞便无法处理其他请求,可以将每个客户端的链接交由不同线程处理,即使进程(线程)被阻塞也不会影响其他链接。

阻塞 I/O - 线程池

在多线程基础上优化线程的创建开销。

非阻塞 I/O - 忙等待轮询

若数据没有准备好,会直接返回错误码,可以通过不断询问所有链接的客户端,判断是否有客户端的数据准备好。

I/O 复用 - select

若客户端没有数据时会被阻塞,直到有数据准备好,轮询所有客户端是否有数据准备好。

二、客户端 / 服务端

客户端从标准输入接收数据,发送给服务端,并接收由服务端发回的数据。

// client.cpp
// fd - 建立套接字的文件句柄
int run(int fd) {
  char buffer[MAX_BUFFER];
  char sIn[MAX_BUFFER];
  while(true) {
    std::cout << "> ";
    std::cin.getline(sIn, sizeof(sIn));
    // 阻塞式 I/O 模型
    // 直到发送成功消息后继续
    ssize_t sSend = send(fd, sIn, strlen(sIn), 0);
    // 阻塞式 I/O 模型
    // 直到接收成功消息后继续
    ssize_t sRecv = recv(fd, (void*)buffer, MAX_BUFFER, 0);
    buffer[sRecv] = '\0';
    std::cout << "- " << buffer << std::endl;
  }
  return RUN_OK;
}

服务端从与客户端建立的套接字中读取数据,回相同数据到客户端。

// server_main.cpp
// 阻塞式 I/O 模型 
// 收发 echo 数据
// fd - 建立套接字的文件句柄
int doEcho(int fd) {
  char buffer[4096];
  while(true) {
    // 阻塞式 I/O 模型
    // 直到成功接收客户端数据后继续
    ssize_t sRecv = recv(fd, (void*)buffer, MAX_BUFFER, 0);
    if(sRecv == 0) { break; }
    if(sRecv == -1) { exit(ERR_RECV); }
    // 阻塞式 I/O 模型
    // 直到成功发送客户端数据后继续
    send(fd, buffer, sRecv, 0);
  }
  close(fd);
  return RUN_OK;
}

三、单线程服务端

// single thread server - 单线程模式
int run(int iSockfd) {
  // 循环创建链接
  while(true) {
    int fd = accept(iSockfd, nullptr, nullptr);
    if(fd < 0) { exit(ERR_ACCEPT); }   
    // 阻塞式 I/O 模型
    doEcho(fd);
  }
  return RUN_OK;
}

运行客户端一,正确回复;同时运行客户端二,消息阻塞

$ ./server_signle_thread 7090
$ ./client 127.0.0.1 7090
> hello world!
- hello world!
$ ./client 127.0.0.1 7090
> the other thread

分析:当客户端一与服务端建立连接成功时,客户端二与服务端建立连接,需要等待服务端再次运行到 accept函数处才能建立连接,只有与客户端一断开链接后,服务端doEcho函数才会返回。

所以单线程下服务端只能与单个客户端交互收发消息。

结束客户端一,客户端二有正确回复

$ ./client 127.0.0.1 7090
> hello world!
- hello world!
> ^C
$ ./client 127.0.0.1 7090
> the other thread
> the other thread

四、多线程服务端

// multi thread server - 多线程模式
int run(int iSockfd) {
  // 循环创建链接
  while(true) {
    int fd = accept(iSockfd, nullptr, nullptr);
    if(fd < 0) {
      exit(ERR_ACCEPT);
    }
    // 启动线程处理 doEcho 函数
    std::thread tEcho(doEcho, fd);
    tEcho.detach();
  }
  return RUN_OK;
}

利用多线程,将doEcho函数分离,客户端链接成功后,服务端会在启动doEcho线程后继续运行代码到accept函数处,接受其他客户端的链接。

现在可以同时处理多个客户端的连接,但是线程的创建和销毁复杂度高、浪费资源。

如果不断链接和断开,程序将会不断的创建和结束线程,产生大量系统调用,在用户态和内核态之间不断切换,效率低下。

解决多线程创建和销毁的效率问题,可以使用线程池,预先申请足够的线程数量,获取链接时再将处理函数绑在线程池分配好的线程上,避免线程销毁和启动开销。

五、线程池服务端

// thread pool server - 线程池模式
int run(int iSockfd) {
  /* 创建线程数量为 iThread */
  ThreadPool tp(iThread);
  while(true) {
    int fd = accept(iSockfd, nullptr, nullptr);
    if(fd < 0) {
      exit(ERR_ACCEPT);
    }
    // 加入线程池
    tp.addTask(doEcho, fd);
  }
  return RUN_OK;
}

线程池模型能够很好处理线程创建和销毁问题,但是线程池无法动态扩展。

请求数量是线程池中线程数量的倍数,导致线程池的请求等待队列拥塞。

六、忙等待轮询服务端

// nonblock - 非阻塞模式
int run(int iSockfd) {
  std::vector<int> fdList;
  char buffer[MAX_BUFFER];
    // 设置为非阻塞式 I/O 模型
  fcntl(iSockfd, F_SETFL, O_NONBLOCK);
  while(true) {
    // 非阻塞式 I/O 模型
    // 立刻返回是否有链接
    int fd = accept(iSockfd, nullptr, nullptr);
    if(fd < 0 && errno != EAGAIN) { exit(ERR_ACCEPT); } 
    else if(fd > 0) {
      fcntl(fd, F_SETFL, O_NONBLOCK);
      fdList.push_back(fd);
    }
    // 非阻塞式 I/O 模型
    // 轮询所有 I/O 并判断是否有数据准备好
    for(auto it = fdList.begin(); it != fdList.end(); ) {
      ssize_t sRecv = recv(*it, buffer, sizeof(buffer), 0);
      if(sRecv == 0) {
        close(*it);
        it = fdList.erase(it);
      }else if(sRecv < 0 && errno != EAGAIN) {
        std::cerr << "error to recv" << std::endl;
        it = fdList.erase(it);
      }else if(sRecv < 0) { ++it; }
      else { send(*it, buffer, sRecv, 0); ++it; }
    }
  }
  return RUN_OK;
}

忙等待轮询操作中所有 IO 都是非阻塞的,所以需要反复检测所有 IO 是否可以收发数据,直到可以进行读写操作。

如果全部 IO 都没有数据,遍历所有 IO 将会占用大量 CPU 资源。

七、 select 服务端

int run(int iSockfd) {
  char buffer[MAX_BUFFER];
  int maxSockfd = iSockfd;
  std::vector<int> fds;
    // 设置为非阻塞, 同步 I/O 模型
  fcntl(iSockfd, F_SETFL, O_NONBLOCK);
  // 设置 select 函数需要的集合
  fd_set rset;
  FD_ZERO(&rset);
  FD_SET(iSockfd, &rset);
  while(true) {
    fd_set set = rset; // 拷贝一份,防止出错
    // I/O 复用模型, select 阻塞到此处
    // 等待 set 中的文件句柄有数据
    select(maxSockfd + 1, &set, nullptr, nullptr, nullptr);
    // 遍历文件句柄,查找哪一个有数据
    for(auto it = fds.begin(); it != fds.end(); ) {
      if(FD_ISSET(*it, &set)) { 
        ssize_t sRecv = recv(*it, buffer, sizeof(buffer), 0);
        if(sRecv < 0) { exit(ERR_RECV); }
        else if(sRecv == 0) {
          close(*it);
          FD_CLR(*it, &rset);
          it = fds.erase(it);
        }else { send(*it, buffer, sRecv, 0); ++it; }
      }else { ++it; }
    }
    if(FD_ISSET(iSockfd, &set)) {
      int fd = accept(iSockfd, nullptr, nullptr);
      if(fd < 0 && errno != EAGAIN) { exit(ERR_ACCEPT); } 
      else if(fd > 0) {
        fcntl(fd, F_SETFL, O_NONBLOCK);
        fds.push_back(fd);
        FD_SET(fd, &rset);
        maxSockfd = maxSockfd > fd ? maxSockfd : fd;
      }
    }
  }
  return RUN_OK;
}

处理方式与忙等待中的轮询相同,使用了select系统调用避免忙等待,在没有数据到来时会阻塞进程,直到有数据时才会唤醒,主动轮询查找能够处理的文件句柄。

八、测试

本地开启 10 进程,每进程开启 100 链接并发送字符串,获取来回交互平均时间。

单线程模式由于服务端只能处理一个链接,使用 1000 进程,每进程 1 链接测试。

模式平均时延(s)
单线程-
多线程32.678
线程池(1024线程数)0.029
忙等待轮询68.172
select41.699
Last modification:May 31st, 2020 at 07:26 pm