epoll构建高性能Reactor网络模型:百万级并发实战

文摘   2024-10-22 09:01   广东  

点击上方【蓝字】关注博主

“ 在现代互联网应用中,高并发性能是至关重要的。Reactor模型作为处理并发I/O的常见模式,凭借其高效的事件处理机制,能够有效提升服务器的并发能力。本文将深入探讨如何使用epoll构建Reactor网络模型,并结合详细代码实现,帮助读者从零开始构建支持百万级并发的TCP服务器。

01

前言 

手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。

为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。


本节,在上一章节介绍了如何使用epoll开发高效的服务器,本节将介绍使用epoll构建reactor网络模型,实现异步事件处理。


网络并发,通俗的讲就是服务器可以承载的客户端数量,即服务器可以稳定保证客户端同时接入的数量。

02

reactor简介 

Reactor模型开发效率比直接使用IO多路复用要高,它一般是单线程的,设计目标是希望一个线程使用CPU的全部资源;带来的优点是,在每个事件处理中很多时候不需要考虑共享资源的互斥访问。
Reactor模式是处理并发IO比较常见的模式,用于同步IO,核心思想是将所有要处理的IO事件注册到一个中心IO多路复用器上,同时主线程或进程阻塞在IO多路复用器上;一旦有事件到来或准备就绪,多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。

Reactor的优点

  1. 响应快;不必为单个同步事件阻塞,虽然Reactor本身依然是同步的。

  2. 编程相对简单;可以最大程度的避免复杂的多线程及同步问题,尽可能的避免多线程、多进程的切换开销。

  3. 可扩展性;可通过增加Reactor实例个数,充分利用CPU资源。

  4. 高复用性;Reactor模型本身与事件处理逻辑无关,具有很高的复用性。

03

实现步骤 

3.1、定义Reactor模型相关结构体

reactor数据结构设计图如下:

结构说明:以fd作为索引,存放在block中;当一个fd到来时,通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。

数据结构的代码实现如下:

struct socket_item
{
/* data */
int fd; // socket的文件描述符
char *write_buffer; // 写缓冲区
char *read_buffer; // 读缓冲区
int write_length; // 已读字节数
int read_length; // 已写字节数

int status; // 状态标识,设置epfd的操作模式

int event; // 事件类型
void *arg; // 回调函数的参数
int(*callback)(int fd,int events,void* arg); // 回调函数
};


struct event_block
{
/* data */
struct socket_item *items; // 事件集合
struct event_block *next; // 指向像一个内存块
};

struct net_reactor
{
/* data */
int epollfd; // 事件块的数量
int block_count; // 事件块的数量
int finish_reactor; // 判断是否退出服务
struct event_block *start_block; // 事件块的起始地址
};

3.2、实现Reactor容器初始化功能

我们这里使用epoll作为IO多路复用器。
思路:初始化reactor内存块,避免脏数据;创建events和block并初始化,将events添加到block中,将block添加到reactor的链表中管理。

// 2.
int init_reactor(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
memset(reactor,0,sizeof(struct net_reactor));

// 创建epoll,作为IO多路复用器
reactor->epollfd=epoll_create(1);
if(reactor->epollfd==-1){
printf("create epfd in %s error %s\n", __func__, strerror(errno));
return REACTOR_CREATE_EPOLL_FAILED;
}

// 创建事件集
struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
if(items==NULL)
{
printf("create socket_item in %s error %s\n", __func__, strerror(errno));
close(reactor->epollfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));

// 创建事件内存块
struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
if(block==NULL)
{
printf("create block in %s error %s\n", __func__, strerror(errno));
free(items);
close(reactor->epollfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(block,0,sizeof(struct event_block));

block->items=items;
block->next=NULL;

reactor->block_count=1;
reactor->start_block=block;
reactor->finish_reactor=0;


return REACTOR_SUCCESS;
}

3.3、实现socket初始化功能

定义成一个函数,方便初始化多个监听端口。

// 3.
int init_socket(short port)
{
int fd=socket(AF_INET,SOCK_STREAM,0);
if(fd==-1)
{
printf("create socket in %s error %s\n", __func__, strerror(errno));
return -1;
}

int ret;
// nonblock
int flag = fcntl(fd, F_GETFL, 0);
flag |= O_NONBLOCK;
ret=fcntl(fd, F_SETFL, flag);
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
close(fd);
return -1;
}

// 绑定
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_addr.s_addr=htonl(INADDR_ANY);
server.sin_family=AF_INET;
server.sin_port=htons(port);
ret=bind(fd,(struct sockaddr*)&server,sizeof(server));
if(ret==-1)
{
printf("bind() in %s error %s\n", __func__, strerror(errno));
close(fd);
return -1;
}

// 监听
ret=listen(fd,LISTEN_BLK_SIZE);
if(ret==-1)
{
printf("listen failed : %s\n", strerror(errno));
close(fd);
return -1;
}
printf("listen server port : %d\n", port);

return fd;
}

3.4、实现Reactor动态扩容功能

为了实现高并发,服务器需要监听多个端口。当高并发时需要reactor容器进行扩容管理。

核心思路:找到链表的末端,分别为events和block分配内存并初始化,将events添加到block中,将block添加到reactor的链表中管理。

// 4. 实现Reactor动态扩容功能
static int reactor_resize(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;

// 找到链表末端
struct event_block *cur_block=reactor->start_block;
while(cur_block->next!=NULL)
{
cur_block=cur_block->next;
}

// 创建事件集
struct socket_item *items=(struct socket_item *)malloc(MAX_SOCKET_ITEMS*sizeof(struct socket_item));
if(items==NULL)
{
printf("create socket_item in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(items,0,MAX_SOCKET_ITEMS*sizeof(struct socket_item));

// 创建事件内存块
struct event_block *block=(struct event_block *)malloc(sizeof(struct event_block));
if(block==NULL)
{
printf("create block in %s error %s\n", __func__, strerror(errno));
free(items);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(block,0,sizeof(struct event_block));

block->next=NULL;
block->items=items;

cur_block->next=block;

reactor->block_count++;

return REACTOR_SUCCESS;
}

3.5、实现Reactor索引功能

思路:通过fd/MAX先找到fd对应的block号,再通过fd%MAX找到对应的偏移地址。

例如来了个fd=10000,每个块存放的最大item数量MAX=1024,那么fd对应的block序号等于10000/1024=9;偏移量等于10000%1024=784。这样就可以找到fd对应的数据存放地址item。

// 5. 实现Reactor索引功能
static struct socket_item *reactor_index(struct net_reactor *reactor,int socketfd)
{
if(reactor==NULL)
return NULL;
if(reactor->start_block==NULL)
return NULL;

// fd所在block序号
int block_id=socketfd/MAX_SOCKET_ITEMS;

// block序号不存在时自动扩容
while(block_id>=reactor->block_count)
{
if(reactor_resize(reactor)<0)
{
printf("reactor_resize in %s error %s\n", __func__, strerror(errno));
return NULL;
}
}

// 找到fd对应block的位置
struct event_block *cur_block=reactor->start_block;
int i=0;
while(i++ !=block_id && cur_block!=NULL)
{
cur_block=cur_block->next;
}

return &cur_block->items[socketfd%MAX_SOCKET_ITEMS];
}

3.6、实现设置事件信息功能

将事件的相关信息保存到数据结构中。主要实现填充关键信息到event结构体中。

// 6. 实现设置事件信息功能
static void reactor_event_set(int fd,struct socket_item *sockevent,NCALLBACK callback,void *arg)
{
sockevent->arg=arg;
sockevent->callback=callback;
sockevent->event=0;
sockevent->fd=fd;
}

3.7、实现IO事件监听功能

这里使用epoll作为IO多路复用器,将事件添加到epoll中监听。
思路:主要是epoll_ctl操作,将事件添加到reactor的event结构体中。

// 7. 实现设置IO事件监听功能
static int reactor_event_add(int epollfd,int events,struct socket_item *sockevent)
{
struct epoll_event ep_events={0,{0}};
ep_events.data.ptr=sockevent;
ep_events.events=events;
sockevent->event=events;

// 判断,设置epfd的操作模式
int options;
if(sockevent->status==1)
{
options=EPOLL_CTL_MOD;
}
else{
options=EPOLL_CTL_ADD;
sockevent->status=1;
}

if(epoll_ctl(epollfd,options,sockevent->fd,&ep_events)<0)
{
printf("event add failed [fd=%d], events[%d]\n", sockevent->fd, events);
printf("event add failed in %s error %s\n", __func__, strerror(errno));
return -1;
}
return 0;
}

3.8、实现IO事件移除功能

由于设置了非阻塞模式,当事件到来时,需要暂时移除监听,避免干扰。

// 8. 实现IO事件移除功能
static int reactor_event_del(int epollfd,struct socket_item *sockevent)
{
if(sockevent->status!=1)
return -1;
struct epoll_event ep_events={0,{0}};
ep_events
.data.ptr=sockevent;

sockevent
->status=0;
// 移除fd的监听
if(epoll_ctl(epollfd, EPOLL_CTL_DEL,sockevent->fd, &ep_events)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
return -1;
}

return 0;
}

3.9、实现Reactor事件监听功能

思路:设置fd的事件信息,添加事件到epoll监听。

// 9. 实现Reactor事件监听功能
int reactor_add_listener(struct net_reactor *reactor,int sockfd,NCALLBACK *acceptor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;

// 找到fd对应的event地址
struct socket_item *item=reactor_index(reactor,sockfd);
if(item==NULL)
{
printf("reactor_index failed in %s error %s\n", __func__, strerror(errno));
return REACTOR_ADD_LISTEN_FAILED;
}

reactor_event_set(sockfd,item,acceptor,reactor);

if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
{
return REACTOR_ADD_LISTEN_FAILED;
}
printf("add listen fd = %d\n",sockfd);
return REACTOR_SUCCESS;
}

3.10、实现recv回调函数

思路:找到fd对应的信息内存块;使用recv接收数据;暂时移除该事件的监听;如果接收成功,设置监听事件为是否可写,添加到IO多路复用器(epoll)中;返回收到的数据长度。

// 10:实现recv回调函数
static int callback_recv(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
return REACTOR_NULL;

// 找到fd对应的event地址
struct socket_item *item=reactor_index(reactor,fd);
if(item==NULL)
{
printf("callback_recv in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}

// 接收数据
int ret= recv(fd,item->read_buffer,BUFFER_LENGTH,0);

// 暂时移除监听
if(reactor_event_del(reactor->epollfd, item)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_EVENT_DEL_FAILED;
}
if(ret>0)
{
item->read_length+=ret;
printf("recv [%d]:%s\n", fd, item->read_buffer);

// demo
memcpy(item->write_buffer,item->read_buffer,ret);
item->write_buffer[ret]='\0';
item->write_length=ret;

reactor_event_set(fd,item,callback_send,reactor);
if(reactor_event_add(reactor->epollfd,EPOLLOUT,item)<0)
{
printf("reactor_event_add failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_ADD_LISTEN_FAILED;
}

}
else if(ret==0)
{
printf("recv_cb --> disconnect\n");
free(item->read_buffer);
free(item->write_buffer);
close(item->fd);
}
else
{
if(errno==EAGAIN || errno==EWOULDBLOCK)
{
// 表示没有数据可读。这时可以继续等待数据到来,或者关闭套接字。
}
else if (errno == ECONNRESET) {
// reactor_event_del(reactor->epollfd, item);
free(item->read_buffer);
free(item->write_buffer);
close(item->fd);
}
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return ret;
}

3.11、实现send回调函数

思路:找到fd对应的信息内存块;使用send发送数据;暂时移除该事件的监听;如果发送成功,设置监听事件为是否可读,添加到IO多路复用器(epoll)中;返回发送的数据长度。

// 11:实现send回调函数
static int callback_send(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
{
return REACTOR_NULL;
}

// 找到fd对应的event地址
struct socket_item *item=reactor_index(reactor,fd);
if(item==NULL)
{
printf("callback_recv in %s error %s\n", __func__, strerror(errno));
return REACTOR_MALLOC_MEMORY_FAILED;
}

int ret=send(fd,item->write_buffer,item->write_length,0);
// 暂时移除监听
if(reactor_event_del(reactor->epollfd, item)<0)
{
printf("reactor_event_del failed in %s error %s\n", __func__, strerror(errno));
//return REACTOR_EVENT_DEL_FAILED;
}
if (ret > 0)
{

printf("send[fd=%d], [%d]%s\n", fd, ret, item->write_buffer);
reactor_event_set(fd, item, callback_recv, reactor);
reactor_event_add(reactor->epollfd, EPOLLIN, item);
}
else
{
free(item->read_buffer);
free(item->write_buffer);
close(fd);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}

return ret;
}

3.12、实现accept回调函数

思路:使用accept获得连接的客户端fd;设置客户端fd为非阻塞模式;找到fd对应的信息内存块;设置fd的事件信息;设置监听事件为是否可读,添加到IO多路复用器(epoll)中。

// 12. 实现accept回调函数
int callback_accept(int fd, int events, void *arg)
{
struct net_reactor *reactor=(struct net_reactor *)arg;
if(reactor==NULL)
{
return REACTOR_NULL;
}

struct sockaddr_in client;
socklen_t len=sizeof(client);
int connectfd=accept(fd,(struct sockaddr*)&client,&len);
if(connectfd<0)
{
printf("accept failed in %s error %s\n", __func__, strerror(errno));
return REACTOR_ACCEPT_FAILED;
}

// 设置非阻塞
int flag = fcntl(connectfd, F_GETFL, 0);
flag |= O_NONBLOCK;
int ret=fcntl(connectfd, F_SETFL, flag);
if (ret == -1)
{
printf("fcntl O_NONBLOCK in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_FCNTL_FAILED;
}

// 找到fd对应的event地址
struct socket_item *item=reactor_index(reactor,connectfd);
if(item==NULL)
{
printf("reactor_index in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}

// 设置fd的事件信息
reactor_event_set(connectfd,item,callback_recv,reactor);

// 添加事件到epoll监听
if(reactor_event_add(reactor->epollfd,EPOLLIN,item)<0)
{
close(connectfd);
return REACTOR_ADD_LISTEN_FAILED;
}

// 为fd分配好读写缓冲区
item->read_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
if(item->read_buffer==NULL)
{
printf("mallc in %s error %s\n", __func__, strerror(errno));
close(connectfd);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(item->read_buffer,0,BUFFER_LENGTH * sizeof(char));
item->read_length=0;
item->write_buffer=(char *)malloc(BUFFER_LENGTH * sizeof(char));
if(item->write_buffer==NULL)
{
printf("mallc in %s error %s\n", __func__, strerror(errno));
close(connectfd);
free(item->read_buffer);
return REACTOR_MALLOC_MEMORY_FAILED;
}
memset(item->write_buffer,0,BUFFER_LENGTH * sizeof(char));
item->write_length=0;

printf("new connect [%s:%d], pos[%d]\n",inet_ntoa(client.sin_addr), ntohs(client.sin_port), connectfd);

return REACTOR_SUCCESS;
}

3.13、实现reactor运行函数

主要是epoll的等待功能,将监听到的事件进行回调处理。

// 13:实现reactor运行函数
int reactor_run(struct net_reactor *reactor)
{
if(reactor==NULL)
return REACTOR_NULL;
if(reactor->start_block==NULL)
return REACTOR_NULL;
if(reactor->epollfd<0)
return REACTOR_CREATE_EPOLL_FAILED;

struct epoll_event events[MAX_EPOLL_EVENTS + 1];

while(!reactor->finish_reactor)
{
int nready=epoll_wait(reactor->epollfd,events,MAX_EPOLL_EVENTS,1000);
if(nready<0)
{
printf("epoll wait error\n");
continue;
}
int i=0;
for(i=0;i<nready;i++)
{
struct socket_item *item=(struct socket_item *)events[i].data.ptr;
if((events[i].events & EPOLLIN) && (item->event&EPOLLIN))
{
// 处理可读事件
item->callback(item->fd, events[i].events, item->arg);
}
if((events[i].events & EPOLLOUT) && (item->event&EPOLLOUT))
{
// 处理可读事件
item->callback(item->fd, events[i].events, item->arg);
}
}

}
printf("Clearing memory......\n");
reactor_destory(reactor);

printf("finish reactor\n");
return REACTOR_SUCCESS;
}

3.14、实现reactor销毁功能

// 14:实现reactor销毁功能
int reactor_destory(struct net_reactor *reactor)
{
// 关闭epoll
close(reactor->epollfd);


struct event_block *blk= reactor->start_block;
struct event_block *next;

while (blk != NULL)
{
next = blk->next;
// 释放内存块
for(int i=0;i<MAX_SOCKET_ITEMS;i++)
{
if(blk->items[i].read_buffer!=NULL)
{
free(blk->items[i].read_buffer);
blk->items[i].read_buffer=NULL;
}
if(blk->items[i].write_buffer!=NULL)
{
free(blk->items[i].write_buffer);
blk->items[i].write_buffer=NULL;
}
}
free(blk->items);
free(blk);
blk = next;
}
return REACTOR_SUCCESS;
}

3.15、使用示例

  1. 创建structnet_reactor对象。

  2. 调用init_reactor初始化。

  3. 调用init_socket监听端口。

  4. 调用reactor_add_listener将端口添加到reactor中管理。

  5. 调用reactor_run运行reactor服务。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "reactor.h"

#define SERVER_PORT 9703
#define PORT_COUNT 2

int main(int argc,char **argv)
{
struct net_reactor *reactor=(struct net_reactor *)malloc(sizeof(struct net_reactor));
if(reactor==NULL)
{
perror("malloc struct net_reactor failed!\n");
return REACTOR_MALLOC_MEMORY_FAILED;
}

if(init_reactor(reactor)<0)
{
free(reactor);
return REACTOR_NULL;
}

unsigned short port = SERVER_PORT;
int sockfds[PORT_COUNT]={0};
int i=0;
for(i=0;i<PORT_COUNT;i++)
{
sockfds[i]=init_socket(port+i);
if(sockfds[i]>0)
{
if(reactor_add_listener(reactor,sockfds[i],callback_accept)<0)
printf("reactor_add_listener failed in %s : %d\n",__func__,__LINE__);
}
else
{
printf("init_socket failed in %s : %d\n",__func__,__LINE__);
}

}

reactor_run(reactor);

// 销毁 reactor
//reactor_destory(reactor);
reactor->finish_reactor=1;

// 关闭socket集
for(i=0;i<PORT_COUNT;i++)
{
close(sockfds[i]);
}

// 释放reactor
free(reactor);

return 0;

}

编译:

gcc -o server main.c reactor.c


04

完整代码

为了方便读者更完整地理解代码的实现,这里展示了全部核心代码片段,完整的代码文件已经整理完毕,并已上传至我的GitHub/Gitee仓库。可以关注博主获取完整的代码。


如果您在获取代码或者运行代码的过程中遇到问题,欢迎在评论区留言或私信我,我会尽力解答您的疑问。 


此外,为了帮助您更好地理解代码的结构和功能,我将在接下来的文章中对代码进行详细的解读,并分享一些实际应用的案例。 

期待与您共同学习和进步!


05

百万级并发连接测试 

自己实现的并发连接客户端程序,使用三台测试平台分别执行测试程序,平均一台连接30w+。需要注意的是,为了帮助有足够的fd可以分配,需要使用如下命令修改文件描述符限制:

ulimit -n 1024000

这个限制只在当前会话中生效,重新登录后将恢复为系统默认值。

如果需要永久修改系统fd限制,可以在/etc/security/limits.conf文件中设置。打开该文件,在文件末尾添加如下内容:

<用户名> hard nofile 1024 
<用户名> soft nofile 1024

另外,为了体现reactor的性能,需要将一些没必要的打印关掉,因为打印会影响性能。

客户端测试代码:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>

#include <sys/time.h>


#define MAX_BUFFER 128
#define MAX_EPOLLSIZE (384*1024)
#define MAX_PORT 1

#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)

int isContinue = 0;

static int ntySetNonblock(int fd) {
int flags;

flags = fcntl(fd, F_GETFL, 0);
if (flags < 0) return flags;
flags |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, flags) < 0) return -1;
return 0;
}

static int ntySetReUseAddr(int fd) {
int reuse = 1;
return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}



int main(int argc, char **argv) {
if (argc <= 2) {
printf("Usage: %s ip port\n", argv[0]);
exit(0);
}

const char *ip = argv[1];
int port = atoi(argv[2]);
int connections = 0;
char buffer[128] = {0};
int i = 0, index = 0;

struct epoll_event events[MAX_EPOLLSIZE];

int epoll_fd = epoll_create(MAX_EPOLLSIZE);

strcpy(buffer, " Data From MulClient\n");

struct sockaddr_in addr;
memset(&addr, 0, sizeof(struct sockaddr_in));

addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);

struct timeval tv_begin;
gettimeofday(&tv_begin, NULL);

while (1) {
if (++index >= MAX_PORT) index = 0;

struct epoll_event ev;
int sockfd = 0;

if (connections < 340000 && !isContinue) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket");
goto err;
}

//ntySetReUseAddr(sockfd);
addr.sin_port = htons(port+index);

if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {
perror("connect");
goto err;
}
ntySetNonblock(sockfd);
ntySetReUseAddr(sockfd);

sprintf(buffer, "Hello Server: client --> %d\n", connections);
send(sockfd, buffer, strlen(buffer), 0);

ev.data.fd = sockfd;
ev.events = EPOLLIN | EPOLLOUT;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);

connections ++;
}
//connections ++;
if (connections % 1000 == 999 || connections >= 340000) {
struct timeval tv_cur;
memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));

gettimeofday(&tv_begin, NULL);

int time_used = TIME_SUB_MS(tv_begin, tv_cur);
printf("connections: %d, sockfd:%d, time_used:%d\n", connections, sockfd, time_used);

int nfds = epoll_wait(epoll_fd, events, connections, 100);
for (i = 0;i < nfds;i ++) {
int clientfd = events[i].data.fd;

if (events[i].events & EPOLLOUT) {
sprintf(buffer, "data from %d\n", clientfd);
send(sockfd, buffer, strlen(buffer), 0);
} else if (events[i].events & EPOLLIN) {
char rBuffer[MAX_BUFFER] = {0};
ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);
if (length > 0) {
printf(" RecvBuffer:%s\n", rBuffer);

if (!strcmp(rBuffer, "quit")) {
isContinue = 0;
}

} else if (length == 0) {
printf(" Disconnect clientfd:%d\n", clientfd);
connections --;
close(clientfd);
} else {
if (errno == EINTR) continue;

printf(" Error clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
} else {
printf(" clientfd:%d, errno:%d\n", clientfd, errno);
close(clientfd);
}
}
}

usleep(500);
}

return 0;

err:
printf("error : %s\n", strerror(errno));
return 0;

}


06

总结

至此,我们最终实现了支持高并发的服务器程序,但是,这个服务器程序有些局限性,我们还要继续改善、优化。在改进之前,需要开发一个后台日志模块,这是服务器程序必须的,所以,下一个章节将开发一个高效的后台日志模块。

公众号: Lion 莱恩呀

微信号: 关注获取

扫码关注 了解更多内容

点个 在看 你最好看



Lion 莱恩呀
专注分享高性能服务器后台开发技术知识,涵盖多个领域,包括C/C++、Linux、网络协议、设计模式、中间件、云原生、数据库、分布式架构等。目标是通过理论与代码实践的结合,让世界上看似难以掌握的技术变得易于理解与掌握。
 最新文章