0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

基于Select/Poll实现并发服务器(二)

嵌入式大杂烩 来源:嵌入式大杂烩 作者:嵌入式大杂烩 2022-06-20 00:26 次阅读

开发环境:

RT-Thread版本:4.0.3

操作系统:Windows10

Keil版本:V5.30

RT-Thread Studio版本:2.0.1

开发板MCUSTM32H750XB

LWIP:2.0.2

3 Select/Poll概述

在LWIP中,如果要实现并发服务器,可以基于Sequentaial API来实现,这种方式需要使用多线程,也就是为每个连接创建一个线程来处理数据。而在资源受限的嵌入式设备来说,如果为每个连接都创建一个线程,这种资源的消耗是巨大的,因此,我们需要换一种实现思路,也就是使用IO多路复用的机制来实现,也就是select机制。

Select/Poll则是POSIX所规定,一般操作系统或协议栈均有实现。

值得注意的是,poll和select都是基于内核函数sys_poll实现的,不同在于在Linux系统中select是从BSD Unix系统继承而来,poll则是从System V Unix系统继承而来,因此两种方式相差不大。poll函数没有最大文件描述符数量的限制。poll和 select与一样,大量文件描述符的数组被整体复制于用户和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大。

3.1 Select函数

在BSD Socket中,select函数原型如下:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);

参数说明】

  • nfds:select监视的文件句柄数,一般设为要监视各文件中的最大文件描述符值加1。
  • readfds:文件描述符集合监视文件集中的任何文件是否有数据可读,当select函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符。
  • writefds:文件描述符集合监视文件集中的任何文件是否有数据可写,当select函数返回的时候,writefds将清除其中不可写的文件描述符,只留下可写的文件描述符。
  • exceptfds:文件集将监视文件集中的任何文件是否发生错误,可用于其他的用途,例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select函数返回的时候,exceptfds将清除其中的其他文件描述符,只留下标记有OOB数据的文件描述符。
  • timeout参数是一个指向 struct timeval类型的指针,它可以使 select()在等待 timeout时间后若没有文件描述符准备好则返回。其timeval结构用于指定这段时间的秒数和微秒数。它可以使select处于三种状态:

(1)若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;

(2)若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;

(3) timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。

timeval结构体定义

struct timeval
{
    int tv_sec;/*秒 */
    int tv_usec;/*微妙 */
};

【返回值】

  • int:若有就绪描述符返回其数目,若超时则为0,若出错则为-1

下列操作用来设置、清除、判断文件描述符集合。

FD_ZERO(fd_set *set);//清除一个文件描述符集。

FD_SET(int fd,fd_set *set);//将一个文件描述符加入文件描述符集中。

FD_CLR(int fd,fd_set *set);//将一个文件描述符从文件描述符集中清除。

FD_ISSET(int fd,fd_set *set);//判断文件描述符是否被置位

fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。

select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。

3.2 Poll函数

poll的函数原型:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

【参数说明】

  • fds:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll()监视多个文件描述符。

struct pollfd原型如下:

typedef struct pollfd {
        int fd;                //需要被检测或选择的文件描述符
        short events;        //对文件描述符fd上感兴趣的事件
        short revents;         //文件描述符fd上当前实际发生的事件
} pollfd_t;

其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。

  • nfds:记录数组fds中描述符的总数量。
  • timeout:指定等待的毫秒数,无论 I/O是否准备好,poll()都会返回,和select函数是类似的。

【返回值】

  • int:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;

poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。这也是和select不同的地方。

4 LWIP的select/poll实现

好了,接下来看看LWIP是如何实现select/poll的。

4.1 lwip_select实现

目前LWIP已经完全实现select,它是基于信号量的机制来实现的,函数名是lwip_select。

LWIP实现Select的基本流程如下:

1.依次检套接字集合中的每个套接字的事件表示,若有效,则记录该套接字。

2.若存在一个或多事件,则返回,否则创建一个信号量并阻塞等待,记录信号量的结构体是select_cb_list,是一个链表,在[sockets.c]文件中定义的:

static struct lwip_select_cb *select_cb_list;//管理select的链表

lwip_select_cb原型如下:

/** Description for a task waiting in select */
struct lwip_select_cb {
 /** Pointer to the next waiting task */
 struct lwip_select_cb *next;
 /** Pointer to the previous waiting task */
 struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT
 /** readset passed to select */
 fd_set *readset;
 /** writeset passed to select */
 fd_set *writeset;
 /** unimplemented: exceptset passed to select */
 fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL
 /** fds passed to poll; NULL if select */
 struct pollfd *poll_fds;
 /** nfds passed to poll; 0 if select */
 nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL */
 /** don't signal the same semaphore twice: set to 1 when signalled */
 int sem_signalled;//是否释放信号领
 /** semaphore to wake up a task waiting for select */
 SELECT_SEM_T sem;//select阻塞的信号量
};

3.当套接字集合初始化,会向netconn结构注册回调函数event_callback,当有是事件发生时,回调函数就被被执行,而且回调函数会遍历select_cb_list,如果套接字在select_cb_list中,则select_cb_list释放一个信号量。

好了,接下来看看LWIP的select具体实现,其原型如下:

int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
            struct timeval *timeout)
{
 u32_t waitres = 0;//记录select等待时间
 int nready;
 fd_set lreadset, lwriteset, lexceptset;//记录发生事件的套接字
 u32_t msectimeout;
 int i;
 int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREAD
 int waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEX
 fd_set used_sockets;
#endif
 SYS_ARCH_DECL_PROTECT(lev);

 LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\n",
                              maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
                             timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,
                              timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));

 if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {
    set_errno(EINVAL);
    return -1;
 }

 lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);

 /* Go through each socket in each list to count number of sockets which
     currently match */
 //检测套接字集合中是否发生事件
 nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);

 if (nready < 0) {
    /* one of the sockets in one of the fd_sets was invalid */
    set_errno(EBADF);
    lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
    return -1;
 } else if (nready > 0) {
    /* one or more sockets are set, no need to wait */
    LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
 } else {
    /* If we don't have any current events, then suspend if we are supposed to */
    if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));
      /* This is OK as the local fdsets are empty and nready is zero,
         or we would have returned earlier. */
    } else {
      /* None ready: add our semaphore to list:
         We don't actually need any dynamic memory. Our entry on the
         list is only valid while we are in this function, so it's ok
         to use local variables (unless we're running in MPU compatible
         mode). */
      API_SELECT_CB_VAR_DECLARE(select_cb);
      API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);
     memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));

      API_SELECT_CB_VAR_REF(select_cb).readset = readset;
      API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
     API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREAD
      API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
      if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
        /* failed to create semaphore */
        set_errno(ENOMEM);
        lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
      API_SELECT_CB_VAR_FREE(select_cb);
        return -1;
      }
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

     lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

      /* Increase select_waiting for each socket we are interested in */
      maxfdp2 = maxfdp1;
      for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {
        if ((readset && FD_ISSET(i, readset)) ||
            (writeset && FD_ISSET(i, writeset)) ||
            (exceptset && FD_ISSET(i, exceptset))) {
          struct lwip_sock *sock;
          SYS_ARCH_PROTECT(lev);
          sock = tryget_socket_unconn_locked(i);
          if (sock != NULL) {
            sock->select_waiting++;//读写异常通知,并且socket是存在的,则会将select_wainting增加1
            if (sock->select_waiting == 0) {
              /* overflow - too many threads waiting */
              sock->select_waiting--;
              nready = -1;
              maxfdp2 = i;
              SYS_ARCH_UNPROTECT(lev);
              done_socket(sock);
              set_errno(EBUSY);
              break;
            }
            SYS_ARCH_UNPROTECT(lev);
            done_socket(sock);
          } else {
            /* Not a valid socket */
            nready = -1;
            maxfdp2 = i;
            SYS_ARCH_UNPROTECT(lev);
            set_errno(EBADF);
            break;
          }
        }
      }
   
      if (nready >= 0) {
        /* Call lwip_selscan again: there could have been events between
         the last scan (without us on the list) and putting us on the list! */

//执行完上述操作,再次扫描一次是否有socket有事件产生
        nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
        if (!nready) {
          /* Still none ready, just wait to be woken */
          if (timeout == 0) {
            /* Wait forever */
            msectimeout = 0;
          } else {
            long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));
            if (msecs_long <= 0) {
              /* Wait 1ms at least (0 means wait forever) */
              msectimeout = 1;
            } else {
              msectimeout = (u32_t)msecs_long;
            }
          }
         //休眠指定时间,让出cpu控制权
          waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
          waited = 1;
#endif
        }
      }
      
      /* Decrease select_waiting for each socket we are interested in */
      for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {
        if ((readset && FD_ISSET(i, readset)) ||
            (writeset && FD_ISSET(i, writeset)) ||
            (exceptset && FD_ISSET(i, exceptset))) {
          struct lwip_sock *sock;
          SYS_ARCH_PROTECT(lev);
          sock = tryget_socket_unconn_locked(i);
          if (sock != NULL) {
            /* for now, handle select_waiting==0... */
           LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
            if (sock->select_waiting > 0) {
              sock->select_waiting--;//休眠结束,将对应socket->select_waiting减1
            }
            SYS_ARCH_UNPROTECT(lev);
            done_socket(sock);
          } else {
            SYS_ARCH_UNPROTECT(lev);
            /* Not a valid socket */
            nready = -1;
            set_errno(EBADF);
          }
        }
      }

     lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

#if LWIP_NETCONN_SEM_PER_THREAD
      if (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
        /* don't leave the thread-local semaphore signalled */
       sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
      }
#else /* LWIP_NETCONN_SEM_PER_THREAD */
      sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
      API_SELECT_CB_VAR_FREE(select_cb);

      if (nready < 0) {
        /* This happens when a socket got closed while waiting */
        lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
        return -1;
      }

      if (waitres == SYS_ARCH_TIMEOUT) {
        /* Timeout */
        LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));
        /* This is OK as the local fdsets are empty and nready is zero,
           or we would have returned earlier. */
      } else {
        /* See what's set now after waiting */
        nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
        LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
      }
    }
 }

 lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
 set_errno(0);
 if (readset) {
    *readset = lreadset;
 }
 if (writeset) {
    *writeset = lwriteset;
 }
 if (exceptset) {
    *exceptset = lexceptset;
 }
 return nready;
}

以上代码最核心的就是socket->select_waiting加1和减1的地方,当socket存在且的确需要监听事件,且并不是进来事件就已经产生或者已经超时,一定会加1;然后线程会有可能会进行休眠;正常情况下,休眠结束后,socket->select_waiting减1,离开该函数,socket->select_waiting恢复原值。但是,如果在休眠期间进行了close(socket),则通过try_socket(socket)获取不到socket结构体,则socket->select_waiting不会进行减1,后面执行一系列语句后,退出该函数,socket->select_waiting没有恢复原值,且比进来时大1。针对该函数,socket->select_waiting加1的次数是>=减1的次数,所以如果只要在函数退出时没有恢复原值,则socket->select_waiting永远不可能再减为0了,此时socket资源就出现了假占用,该socket再也不能被其他人使用了。

lwip_select函数实现的具体流程如下:

pYYBAGKvNCuAXsdNAAOSn01iYa0750.png

Select的实现有个重要的结构体lwip_sock,其原型如下:

/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
 /** sockets currently are built on netconns, each socket has one netconn */
 struct netconn *conn;
 /** data that was left from the previous read */
 union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL
 /** number of times data was received, set by event_callback(),
      tested by the receive and select functions */
 s16_t rcvevent;
 /** number of times data was ACKed (free send buffer), set by event_callback(),
      tested by select */
 u16_t sendevent;
 /** error happened for this socket, set by event_callback(), tested by select */
 u16_t errevent;
 /** counter of how many threads are waiting for this socket using select */
 SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX
 /* counter of how many threads are using a struct lwip_sock (not the 'int') */
 u8_t fd_used;
 /* status of pending close/delete actions */
 u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP 1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif

#ifdef SAL_USING_POSIX
 rt_wqueue_t wait_head;
#endif
};

在socket数据接收时,lwip_sock利用netconn相关的接收函数获得一个pbuf(对于TCP)或者一个netbuf(对于UDP)数据,而这二者封装的数据可能大于socket用户指定的数据接收长度,因此在这种情况下,这两个数据包需要暂时保存在socket中,以待用户下一次读取,这里lastdata就用于指向未被用户完全读取的数据包,而lastoffset则指向了未读取的数据在数据包中的偏移。lwip_sock最后的五个字段是为select机制实现时使用的。

lwip_socket是上层Socket API中的实现,它对netconn结构的封装和增强,描述一个具体连接。它基于内核netconn来实现所有逻辑,conn指向了与socket对应的netconn结构。Netconn原型如下:

/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);

/** A netconn descriptor */
struct netconn {
 /** type of the netconn (TCP, UDP or RAW) */
 enum netconn_type type;
 /** current state of the netconn */
 enum netconn_state state;
 /** the lwIP internal protocol control block */
 union {
    struct ip_pcb *ip;
    struct tcp_pcb *tcp;
    struct udp_pcb *udp;
    struct raw_pcb *raw;
 } pcb;
 /** the last asynchronous unreported error this netconn had */
 err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD
 /** sem that is used to synchronously execute functions in the core context */
 sys_sem_t op_completed;
#endif
 /** mbox where received packets are stored until they are fetched
      by the netconn application thread (can grow quite big) */
 sys_mbox_t recvmbox;
#if LWIP_TCP
 /** mbox where new connections are stored until processed
      by the application thread */
 sys_mbox_t acceptmbox;
#endif /* LWIP_TCP */
#if LWIP_NETCONN_FULLDUPLEX
 /** number of threads waiting on an mbox. This is required to unblock
      all threads when closing while threads are waiting. */
 int mbox_threads_waiting;
#endif
 /** only used for socket layer */
#if LWIP_SOCKET
 int socket;
#endif /* LWIP_SOCKET */
#if LWIP_SO_SNDTIMEO
 /** timeout to wait for sending data (which means enqueueing data for sending
      in internal buffers) in milliseconds */
 s32_t send_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVTIMEO
 /** timeout in milliseconds to wait for new data to be received
      (or connections to arrive for listening netconns) */
 u32_t recv_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVBUF
 /** maximum amount of bytes queued in recvmbox
      not used for TCP: adjust TCP_WND instead! */
 int recv_bufsize;
 /** number of bytes currently in recvmbox to be received,
      tested against recv_bufsize to limit bytes on recvmbox
      for UDP and RAW, used for FIONREAD */
 int recv_avail;
#endif /* LWIP_SO_RCVBUF */
#if LWIP_SO_LINGER
   /** values <0 mean linger is disabled, values > 0 are seconds to linger */
 s16_t linger;
#endif /* LWIP_SO_LINGER */
 /** flags holding more netconn-internal state, see NETCONN_FLAG_* defines */
 u8_t flags;
#if LWIP_TCP
 /** TCP: when data passed to netconn_write doesn't fit into the send buffer,
      this temporarily stores the message.
      Also used during connect and close. */
 struct api_msg *current_msg;
#endif /* LWIP_TCP */
 /** A callback function that is informed about events for this netconn */
 netconn_callback callback;
};

前文已经提到,套接字集合初始化时,会向netconn结构注册回调函数event_callback,这个回调函数就是结构体netconn中netconn_callback,接下来看看netconn_callback函数原型:

/**
 * Callback registered in the netconn layer for each socket-netconn.
 * Processes recvevent (data available) and wakes up tasks waiting for select.
 *
 * @note for LWIP_TCPIP_CORE_LOCKING any caller of this function
 * must have the core lock held when signaling the following events
 * as they might cause select_list_cb to be checked:
 * NETCONN_EVT_RCVPLUS数据被内核接收则会产生该事件
 * NETCONN_EVT_SENDPLUS数据成功发送则产生该事件
 * NETCONN_EVT_ERROR连接错误则产生该事件
 * This requirement will be asserted in select_check_waiters()
 */
static void
event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
{
 int s, check_waiters;
 struct lwip_sock *sock;
 SYS_ARCH_DECL_PROTECT(lev);

 LWIP_UNUSED_ARG(len);

 /* Get socket */
 if (conn) {
    s = conn->socket;
    if (s < 0) {
      /* Data comes in right away after an accept, even though
       * the server task might not have created a new socket yet.
      * Just count down (or up) if that's the case and we
       * will use the data later. Note that only receive events
       * can happen before the new socket is set up. */
      SYS_ARCH_PROTECT(lev);
      if (conn->socket < 0) {
        if (evt == NETCONN_EVT_RCVPLUS) {
          /* conn->socket is -1 on initialization
             lwip_accept adjusts sock->recvevent if conn->socket < -1 */
          conn->socket--;
        }
        SYS_ARCH_UNPROTECT(lev);
        return;
      }
      s = conn->socket;
      SYS_ARCH_UNPROTECT(lev);
    }

    sock = get_socket(s);//获取socket对应的结构
    if (!sock) {
      return;
    }
 } else {
    return;
 }

 check_waiters = 1;
 //进入临界区,根据事件来更新socket的event值
 SYS_ARCH_PROTECT(lev);
 /* Set event as required */
 switch (evt) {
    case NETCONN_EVT_RCVPLUS://数据被内核收到
      sock->rcvevent++;
      if (sock->rcvevent > 1) {
        check_waiters = 0;
      }
      break;
    case NETCONN_EVT_RCVMINUS://数据被用户读取
      sock->rcvevent--;
      check_waiters = 0;
      break;
    case NETCONN_EVT_SENDPLUS://输出发送成功
      if (sock->sendevent) {
        check_waiters = 0;
      }
      sock->sendevent = 1;
      break;
    case NETCONN_EVT_SENDMINUS://用户写入数据到缓冲区
      sock->sendevent = 0;
      check_waiters = 0;
      break;
    case NETCONN_EVT_ERROR://连接错误
      sock->errevent = 1;
      break;
    default:
      LWIP_ASSERT("unknown event", 0);
      break;
 }
 //事件设置完毕,唤醒阻塞的select函数
 if (sock->select_waiting && check_waiters) {
    /* Save which events are active */
    int has_recvevent, has_sendevent, has_errevent;
    has_recvevent = sock->rcvevent > 0;//数据可读事件
    has_sendevent = sock->sendevent != 0;//数据可写事件
    has_errevent = sock->errevent != 0;//数据异常事件
    SYS_ARCH_UNPROTECT(lev);
    /* Check any select calls waiting on this socket */
    select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);
 } else {
    SYS_ARCH_UNPROTECT(lev);
 }
 done_socket(sock);
}

综上,event_callback的本质就是readset、writeset、exceptset集合的监听,并对rcvevent、sendevent、errevent的填写,并阻塞的lwip_select函数发送信号量。而lwip_select的本质就是对rcvevent、sendevent、errevent的读取,并执行相应的操作,lwip_select主要是通过lwip_selscan来扫描事件的。

4.2 lwip_poll实现

LWIP也完全实现poll,函数名是lwip_poll。lwip_poll和lwip_select的实现机制差不多,只是lwip_poll使用pollfd的结构来存储描述符的,它是基于链表来存储的,这样lwip_poll函数没有最大文件描述符数量的限制。lwip_poll函数原型如下:

int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
 u32_t waitres = 0;
 int nready;
 u32_t msectimeout;
#if LWIP_NETCONN_SEM_PER_THREAD
 int waited = 0;
#endif

 LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",
                  (void*)fds, (int)nfds, timeout));
 LWIP_ERROR("lwip_poll: invalid fds", ((fds != NULL && nfds > 0) || (fds == NULL && nfds == 0)),
             set_errno(EINVAL); return -1;);

 lwip_poll_inc_sockets_used(fds, nfds);

 /* Go through each struct pollfd to count number of structures
     which currently match */
 nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);

 if (nready < 0) {
    lwip_poll_dec_sockets_used(fds, nfds);
    return -1;
 }

 /* If we don't have any current events, then suspend if we are supposed to */
 if (!nready) {
    API_SELECT_CB_VAR_DECLARE(select_cb);

    if (timeout == 0) {
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));
      goto return_success;
    }
    API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(EAGAIN); lwip_poll_dec_sockets_used(fds, nfds); return -1);
   memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));

    /* None ready: add our semaphore to list:
       We don't actually need any dynamic memory. Our entry on the
       list is only valid while we are in this function, so it's ok
       to use local variables. */

    API_SELECT_CB_VAR_REF(select_cb).poll_fds = fds;
    API_SELECT_CB_VAR_REF(select_cb).poll_nfds = nfds;
#if LWIP_NETCONN_SEM_PER_THREAD
    API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
    if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
      /* failed to create semaphore */
      set_errno(EAGAIN);
      lwip_poll_dec_sockets_used(fds, nfds);
      API_SELECT_CB_VAR_FREE(select_cb);
      return -1;
    }
#endif /* LWIP_NETCONN_SEM_PER_THREAD */

   lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

    /* Increase select_waiting for each socket we are interested in.
       Also, check for events again: there could have been events between
       the last scan (without us on the list) and putting us on the list! */
    nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);

    if (!nready) {
      /* Still none ready, just wait to be woken */
      if (timeout < 0) {
        /* Wait forever */
        msectimeout = 0;
      } else {
        /* timeout == 0 would have been handled earlier. */
        LWIP_ASSERT("timeout > 0", timeout > 0);
        msectimeout = timeout;
      }
      waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
      waited = 1;
#endif
    }

    /* Decrease select_waiting for each socket we are interested in,
       and check which events occurred while we waited. */
    nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);

   lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));

#if LWIP_NETCONN_SEM_PER_THREAD
    if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
      /* don't leave the thread-local semaphore signalled */
     sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
    }
#else /* LWIP_NETCONN_SEM_PER_THREAD */
   sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
    API_SELECT_CB_VAR_FREE(select_cb);

    if (nready < 0) {
      /* This happens when a socket got closed while waiting */
      lwip_poll_dec_sockets_used(fds, nfds);
      return -1;
    }

    if (waitres == SYS_ARCH_TIMEOUT) {
      /* Timeout */
      LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));
      goto return_success;
    }
 }

 LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:
 lwip_poll_dec_sockets_used(fds, nfds);
 set_errno(0);
 return nready;
}

和lwip_select一样也是对事件进行扫描,只是扫描函数是lwip_pollscan而已。后面的内容就不在分析,有兴趣请参看LWIP源码。

lwip_poll函数实现的具体流程如下:

poYBAGKvNHaAG9pFAANqPl2x03E105.png

5并发服务器实现

前文讲解了select/poll机制在LWIP的实现,接下来将使用select/poll来实现并发服务器。这里以select为例。

select并发服务器模型:

socket(...); //创建套接字
bind(...);  //绑定
listen(...); //监听
 
while(1)
{
    if(select(...) > 0) //检测监听套接字是否可读
    {
        if(FD_ISSET(...)>0) //套接字可读,证明有新客户端连接服务器 
        {
            accpet(...);//取出已经完成的连接
            process(...);//处理请求,反馈结果
        }
    }
    close(...); //关闭连接套接字:accept()返回的套接字
}

因此,基于select实现的并发服务器模型如下:

poYBAGKvNJKAGFIMAAJJ6XRHL4Y205.png

从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

Server:

/**
 ******************************************************************************
 * @file               server.c
 * @author             BruceOu
 * @rtt version        V4.0.3
 * @version            V1.0
 * @date              2022-06-08
 * @blog              https://blog.bruceou.cn/
 * @Official Accounts  嵌入式实验楼
 * @brief              基于select的服务器
 ******************************************************************************
 */
#include 
#include 
#include 
#include 
#include 
#include 

#define SERVER_PORT  8888
#define BUFF_SIZE 1024

static char recvbuff[BUFF_SIZE];

static void net_server_thread_entry(void *parameter)
{
    int sfd, cfd, maxfd, i, nready, n;

    struct sockaddr_in server_addr, client_addr;

    struct netdev *netdev = RT_NULL;

    char sendbuff[] = "Hello client!";

    socklen_t client_addr_len;
    fd_set all_set, read_set;

    //FD_SETSIZE里面包含了服务器的fd
    int clientfds[FD_SETSIZE - 1];

    /*通过名称获取 netdev网卡对象 */
    netdev = netdev_get_by_name((char*)parameter);
    if (netdev == RT_NULL)
    {
        rt_kprintf("get network interface device(%s) failed.\n", (char*)parameter);
    }

    //创建socket
    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        rt_kprintf("Socket create failed.\n");
    }

    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    //server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    /* 获取网卡对象中 IP 地址信息 */
    server_addr.sin_addr.s_addr = netdev->ip_addr.addr;

    //绑定socket
    if (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
    {
        rt_kprintf("socket bind failed.\n");
        closesocket(sfd);
    }
    rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);

    //监听socket
    if(listen(sfd, 5) == -1)
    {
        rt_kprintf("listen error");
    }
    else
    {
        rt_kprintf("listening...\n");
    }

    client_addr_len = sizeof(client_addr);

    //初始化 maxfd等于 sfd
    maxfd = sfd;

    //清空fdset
    FD_ZERO(&all_set);

    //把sfd文件描述符添加到集合中
    FD_SET(sfd, &all_set);

    //初始化客户端fd的集合
    for(i = 0; i < FD_SETSIZE -1 ; i++)
    {
        //初始化为-1
        clientfds[i] = -1;
    }
    while(1)
    {
        //每次select返回之后,fd_set集合就会变化,再select时,就不能使用,
        //所以我们要保存设置fd_set 和 读取的fd_set
        read_set = all_set;
        nready = select(maxfd + 1, &read_set, NULL, NULL, NULL);

        //没有超时机制,不会返回0
        if(nready < 0)
        {
           rt_kprintf("select error \r\n");

        }

        //判断监听的套接字是否有数据
        if(FD_ISSET(sfd, &read_set))
        {
            //有客户端进行连接了
            cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);
            if(cfd < 0)
            {
                rt_kprintf("accept socket error\r\n");
                //继续select
                continue;
            }
            rt_kprintf("new client connect fd = %d\r\n", cfd);

            //把新的cfd 添加到fd_set集合中
            FD_SET(cfd, &all_set);

            //更新要select的maxfd
            maxfd = (cfd > maxfd)?cfd:maxfd;

            //把新的cfd保存到cfds集合中
            for(i = 0; i < FD_SETSIZE -1 ; i++)
            {
                if(clientfds[i] == -1)
                {
                    clientfds[i] = cfd;
                    //退出,不需要添加
                    break;
                }
            }

            //没有其他套接字需要处理:这里防止重复工作,就不去执行其他任务
            if(--nready == 0)
            {
                //继续select
                continue;
            }
        }

        //遍历所有的客户端文件描述符
        for(i = 0; i < FD_SETSIZE -1 ; i++)
        {
            if(clientfds[i] == -1)
            {
                //继续遍历
                continue;
            }

            //判断是否在fd_set集合里面
            if(FD_ISSET(clientfds[i], &read_set))
            {
                n = recv(clientfds[i], recvbuff, sizeof(recvbuff), 0);
                rt_kprintf("clientfd %d:  %s \r\n",clientfds[i], recvbuff);

                if(n <= 0)
                {
                    //从集合里面清除
                    FD_CLR(clientfds[i], &all_set);
                    //当前的客户端fd 赋值为-1
                    clientfds[i] = -1;                }
                else
                {
                    //写回客户端
                    n = send(clientfds[i], sendbuff, strlen(sendbuff), 0);
                    if(n < 0)
                    {
                        //从集合里面清除
                        FD_CLR(clientfds[i], &all_set);

                        //当前的客户端fd 赋值为-1
                        clientfds[i] = -1;
                    }
                }
            }
        }
    }
}

static int server(int argc, char **argv)
{
    rt_err_t ret = RT_EOK;

    if (argc != 2)
    {
        rt_kprintf("bind_test [netdev_name]  --bind network interface device by name.\n");
        return -RT_ERROR;
    }

    /* 创建 serial 线程 */
    rt_thread_t thread = rt_thread_create("server",
                                         net_server_thread_entry,
                                         argv[1],
                                          4096,
                                          10,
                                          10);

    /* 创建成功则启动线程 */
    if (thread != RT_NULL)
    {
        rt_thread_startup(thread);
    }
    else
    {
        ret = RT_ERROR;
    }

    return ret;
}


#ifdef FINSH_USING_MSH
#include 
MSH_CMD_EXPORT(server, network interface device test);
#endif /* FINSH_USING_MSH */

Client:【Linux版】

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define SERVPORT 8888
 
int main(int argc,char *argv[]) 
{
    char sendbuf[] = "Client1 : Hello Rtthread!";
    char recvbuf[2014];

    int sockfd,sendbytes;
    struct sockaddr_in serv_addr;//需要连接的服务器地址信息

    if (argc != 2)
    {
       perror("init error");
    }

    //1.创建socket
    //AF_INET表示IPV4
    //SOCK_STREAM表示TCP
    if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) 
    {
        perror("socket");
        exit(1);
    }

    //填充服务器地址信息
    serv_addr.sin_family       = AF_INET; //网络层的IP协议: IPV4
    serv_addr.sin_port          = htons(SERVPORT); //传输层的端口号
    serv_addr.sin_addr.s_addr   = inet_addr(argv[1]); //网络层的IP地址: 实际的服务器IP地址
    bzero(&(serv_addr.sin_zero),8); //保留的8字节置零

    //2.发起对服务器的连接信息
    //三次握手,需要将sockaddr_in类型的数据结构强制转换为sockaddr
    if((connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(struct sockaddr))) < 0) {
        perror("connect failed!");
        exit(1);
    }

    printf("connect successful! \n");

    //3.发送消息给服务器端
    while (1)
    {
        send(sockfd, sendbuf, strlen(sendbuf), 0);

        recv(sockfd, recvbuf, sizeof(recvbuf), 0);

       printf("Server : %s \n", recvbuf);

       sleep(2);
    }

    //4.关闭
    close(sockfd);

}

Client:【RT-Thread版】

/**
 ******************************************************************************
 * @file               client.c
 * @author             BruceOu
 * @rtt version        V4.0.3
 * @version            V1.0
 * @date               2022-06-01
 * @blog              https://blog.bruceou.cn/
 * @Official Accounts  嵌入式实验楼
 * @brief              客户端
 ******************************************************************************
 */
#include 
#include 
#include 
#include 
#include 
#include 

#define SERVER_HOST  "192.168.101.8"
#define SERVER_PORT  8888

static int client(int argc, char **argv)
{
    struct sockaddr_in client_addr;
    struct sockaddr_in server_addr;
    struct netdev *netdev = RT_NULL;
    int sockfd = -1;

    char sendbuf[] = "Hello RT-Thread! \r\n";
    char recvbuf[2014];

    if (argc != 2)
    {
        rt_kprintf("bind_test [netdev_name] --bind network interface device by name.\n");
        return -RT_ERROR;
    }

    /*通过名称获取 netdev网卡对象 */
    netdev = netdev_get_by_name(argv[1]);
    if (netdev == RT_NULL)
    {
        rt_kprintf("get network interface device(%s) failed.\n", argv[1]);
        return -RT_ERROR;
    }

    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
        rt_kprintf("Socket create failed.\n");
        return -RT_ERROR;
    }

    /* 初始化需要绑定的客户端地址 */
    client_addr.sin_family = AF_INET;
    client_addr.sin_port = htons(8080);
    /* 获取网卡对象中 IP 地址信息 */
    client_addr.sin_addr.s_addr = netdev->ip_addr.addr;
    rt_memset(&(client_addr.sin_zero), 0, sizeof(client_addr.sin_zero));

    if (bind(sockfd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr)) < 0)
    {
        rt_kprintf("socket bind failed.\n");
        closesocket(sockfd);
        return -RT_ERROR;
    }
    rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);

    /*初始化预连接的服务端地址 */
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(SERVER_PORT);
    server_addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
    rt_memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));

    /*连接到服务端 */
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
    {
        rt_kprintf("socket connect failed!\n");
        closesocket(sockfd);
        return -RT_ERROR;
    }
    else
    {
        rt_kprintf("socket connect success!\n");
    }

    while (1)
    {
        send(sockfd, sendbuf, strlen(sendbuf), 0);

        recv(sockfd, recvbuf, sizeof(recvbuf), 0);

        fputs(recvbuf, stdout);

        memset(recvbuf, 0, sizeof(recvbuf));

        rt_thread_mdelay(500);
    }

    /* 关闭连接 */
    closesocket(sockfd);
    return RT_EOK;
}

#ifdef FINSH_USING_MSH
#include 
MSH_CMD_EXPORT(client, network interface device test);
#endif /* FINSH_USING_MSH */

接下来就是验证了,关于ART-Pi的联网部分就不再赘述了有不懂的看前面的章节。

现在ART-Pi上开启服务器:

Server:

pYYBAGKvNOyABq-jAABaJ0B6Yds202.png

然后开启客户端,笔者的客户端在Ubuntu上运行的:

Client:

pYYBAGKvNPKAZoqPAAExWaaNDmI789.png

笔者这里使用的客户端只有两个,有兴趣的也可以使用多个客户端。

当然啦,如果懒得写客户端,也可使用网络调试助手测试。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 服务器
    +关注

    关注

    12

    文章

    9010

    浏览量

    85160
  • API
    API
    +关注

    关注

    2

    文章

    1483

    浏览量

    61797
  • RT-Thread
    +关注

    关注

    31

    文章

    1271

    浏览量

    39903
  • select
    +关注

    关注

    0

    文章

    28

    浏览量

    3903
  • ART-Pi
    +关注

    关注

    0

    文章

    23

    浏览量

    1282
收藏 人收藏

    评论

    相关推荐

    我国首款亿级并发服务器系统实现量产

    我国高性能计算领军企业中科曙光29日在天津宣布,曙光星河云服务器系统正式量产。这是我国首款亿级并发服务器系统,也是“十二五”期间国家863计划信息技术领域“亿级并发
    发表于 11-30 15:47 825次阅读

    基于Select/Poll实现并发服务器(一)

    LWIP:2.0.2   并发服务器支持多个客户端的同时连接,最大可接入的客户端数取决于内核控制块的个数。当使用Socket API时,要使服务器能够同时支持多个客户端的连接,必须引入多任务机制,为每个
    的头像 发表于 06-20 00:20 3758次阅读
    基于<b class='flag-5'>Select</b>/<b class='flag-5'>Poll</b><b class='flag-5'>实现</b><b class='flag-5'>并发</b><b class='flag-5'>服务器</b>(一)

    在DragonBoard 410c上实现并发处理TCP服务器

    服务,让传感和相关的控制设备接入,为此,本期blog将向大家介绍如何使用gevent高性能的并发处理库在draognbaord 410c上来实现一个高性能的TCP
    发表于 09-25 15:53

    高性能高并发服务器架构分享

    由于自己正在做一个高性能大用户量的论坛程序,对高性能高并发服务器架构比较感兴趣,于是在网上收集了不少这方面的资料和大家分享。希望能和大家交流 msn: ———————————————————————————————————————  初创网站与开源软件 6  谈谈大型
    发表于 09-16 06:45

    如何利用多线程去构建一种TCP并发服务器

    一、实验目的和要求1了解TCP/IP协议2掌握Socket编程,熟悉基于TCP和UDP的传输模型3掌握多线程编程4掌握基于TCP的并发服务器设计、实验内容和原理实验内容:编写C程序,利用多线程构建
    发表于 12-22 08:03

    【沁恒微CH32V307评估板试用体验】基于LWIP实现并发服务器

    程,这是最常用的并发服务器设计。但是多线程/多进程消耗资源多,处理起来也比较复杂,本文将基于LWIP协议栈的Select/Poll机制实现
    发表于 06-01 23:27

    【飞凌嵌入式OK3568-C开发板试用体验】第4章 基于 Select Poll的TCP发服务器

    ,本文将基于Select/Poll机制实现并发服务器。 4.1 IO模型概述在具体讲解基于Select
    发表于 06-09 22:45

    【原创精选】RT-Thread征文精选技术文章合集

    。RT-Thread自动初始化详解GD32 RISC-V系列 BSP框架制作与移植GD32407V-START开发板的BSP框架制作与移植基于Select/Poll实现并发
    发表于 07-26 14:56

    【LuckFox Pico Plus开发板免费试用】基于 Select Poll的TCP发服务器

    ,处理起来也比较复杂,本文将基于Select/Poll机制实现并发服务器。 1 IO模型概述 在具体讲解基于
    发表于 10-21 13:31

    Linux环境并发服务器设计技术研究

    讲述并发服务器设计的主要技术,包括多进程服务器、多线程服务器和I/ O 复用服务器,同时对以上服务器
    发表于 04-24 10:02 16次下载

    Linux内核中select, poll和epoll的区别

    先说pollpollselect为大部分Unix/Linux程序员所熟悉,这俩个东西原理类似,性能上也不存在明显差异,但select对所监控的文件描述符数量有限制,所以这里选用
    发表于 05-14 16:24 1694次阅读

    服务器的高并发能力如何提升?

    服务器的高并发能力如何提升? 服务器并发能力体现着服务器在单位时间内的很强数据处理能力,一般来说,如果企业的互联网业务需要面对大量的同时在
    的头像 发表于 03-17 17:07 977次阅读

    网站服务器并发数的计算方法是什么?

    并发数也就是指同时访问服务器站点的连接数,所以站长为了后期避免主机服务器等资源出现过剩浪费及资源不足等问题的出现,都会对服务器并发数进行计
    的头像 发表于 04-12 15:22 3110次阅读

    并发服务器的设计与实现

    并发服务器支持多个客户端的连接,最大可接入的客户端数取决于内核控制块的个数。当使用Socket API时,要使服务器能够同时支持多个客户端的连接,必须引入多任务机制,为每个连接创建一个单独的任务来处理连接上的数据,我们将这个设计
    的头像 发表于 04-25 15:35 807次阅读
    <b class='flag-5'>并发</b><b class='flag-5'>服务器</b>的设计与<b class='flag-5'>实现</b>

    服务器并发的概念

    自己调整系统的相关参数 并发的概念是什么?什么是并发? 对于服务器并发的概念,下面几点是错误的定义 ①服务器处理客户端请求的数量:没有时间、
    的头像 发表于 11-10 10:05 4944次阅读
    <b class='flag-5'>服务器</b><b class='flag-5'>并发</b>的概念