Logo Search packages:      
Sourcecode: u++ version File versions

uNBIO.cc

//                              -*- Mode: C++ -*- 
// 
// uC++ Version 5.0.1, Copyright (C) Peter A. Buhr 1994
// 
// uNBIO.cc -- non-blocking IO
// 
// Author           : Peter A. Buhr
// Created On       : Mon Mar  7 13:56:53 1994
// Last Modified By : Peter A. Buhr
// Last Modified On : Sat Aug  7 11:21:51 2004
// Update Count     : 491
//
// This  library is free  software; you  can redistribute  it and/or  modify it
// under the terms of the GNU Lesser General Public License as published by the
// Free Software  Foundation; either  version 2.1 of  the License, or  (at your
// option) any later version.
// 
// This library is distributed in the  hope that it will be useful, but WITHOUT
// ANY  WARRANTY;  without even  the  implied  warranty  of MERCHANTABILITY  or
// FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
// for more details.
// 
// You should  have received a  copy of the  GNU Lesser General  Public License
// along  with this library.
// 


#define __U_KERNEL__


#include <uC++.h>
#include <uAssert.h>
//#include <uDebug.h>


#include <cstring>                              // strerror
#include <cerrno>
#include <sys/socket.h>
#if defined( __linux__ )
#include <sys/param.h>                          // howmany
#endif


#ifndef NBBY
#define NBBY 8
#endif


static const int uSelectTimeOut = 1;                  // how many seconds may elapse before call to select times out
static const short uBitsPerValue[16] = { 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4 };

static inline int uCountBits( unsigned long int x ) {
  if ( x == 0 ) return 0;
    int cnt;
    for ( cnt = 0; x > 0; x >>= 4 ) {
      cnt += uBitsPerValue[x & 0xf];
    } // for
    return cnt;
} // uCountBits


//######################### uNBIO #########################


void uNBIO::uCheckIO( uDuration delay ) {
    int i;
    struct timeval timeout = delay;             // convert to timeval for select
    int tmasks = howmany( maxFD + 1, NFDBITS );       // total number of masks in fd set

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uCheckIO: timeout:%ld,%ld\n", this, timeout.tv_sec, timeout.tv_usec );
#endif // __U_DEBUG_H__

    // Make a local copy of the fd sets because the select operation destroys
    // the sets and the master information is lost.

    fd_set lrfds = uRFDs;
    fd_set lwfds = uWFDs;
    fd_set lefds = uEFDs;

    // This processor is about to become idle. First, interrupts are disabled
    // because the following operations affect some kernel data structures.
    // Second, preemption is turned off because it is now controlled by the
    // "select" timeout.  Third, the idle state is set so that if a task is
    // migrated to the cluster this processor is on, the processor is woken
    // up. Unfortunately, there is race condition between seeing that the
    // processor is idle and the SIGALRM sent to wake it; the signal can be
    // sent and received before the UNIX process actually blocks with the
    // "select". In such a case, the SIGALRM is treated as spurious.
    // Therefore, it is necessary to poll.

    THREAD_GETMEM( uSelf )->uDisableInterrupts();

    int uPrevTime = uThisProcessor().uGetPreemption();      // remember previous preemption time
    if ( uPrevTime != 0 ) {                     // optimize out UNIX call if possible
      uThisProcessor().uSetContextSwitchEvent( 0 );   // turn off preemption or it keeps waking the UNIX processor
    } // if

    uThisCluster().uMakeProcessorIdle( uThisProcessor() );

#ifdef __U_DEBUG_H__
#if 1
    uDebugAcquire();
    uDebugPrt2( "(uNBIO &)0x%p.uCheckIO, lrfds:", this );
    for ( i = 0; i < tmasks; i += 1 ) {
      uDebugPrt2( "%lx ", lrfds.fds_bits[i] );
    } // for
    uDebugPrt2( "\n" );
    uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, lwfds:", this );
    for ( i = 0; i < tmasks; i += 1 ) {
      uDebugPrt2( "%lx ", lwfds.fds_bits[i] );
    } // for
    uDebugPrt2( "\n" );
    uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, lefds:", this );
    for ( i = 0; i < tmasks; i += 1 ) {
      uDebugPrt2( "%lx ", lefds.fds_bits[i] );
    } // for
    uDebugPrt2( "\n" );
    uDebugRelease();
#endif
#endif // __U_DEBUG_H__

    // Check for IO pending on any descriptors.

    uFound = ::select( maxFD + 1, &lrfds, &lwfds, &lefds, &timeout );

    uThisCluster().uMakeProcessorActive( uThisProcessor() );

    if ( uPrevTime != 0 ) {                     // optimize out UNIX call if possible
      uThisProcessor().uSetContextSwitchEvent( uPrevTime ); // reset processor preemption time
    } // if

    THREAD_GETMEM( uSelf )->uEnableInterrupts();      // does necessary roll forward

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uCheckIO: select returns: found %d, timeout:%ld,%ld\n", this, uFound, timeout.tv_sec, timeout.tv_usec );
#endif // __U_DEBUG_H__

    if ( uFound > 0 ) {                         // I/O has occurred ?
#ifdef __U_DEBUG_H__
      uDebugPrt( "(uNBIO &)0x%p.uCheckIO: found %d pending IO operations\n", this, uFound );
#if 1
      uDebugAcquire();
      uDebugPrt2( "(uNBIO &)0x%p.uCheckIO, lrfds:", this );
      for ( i = 0; i < tmasks; i += 1 ) {
          uDebugPrt2( "%lx ", lrfds.fds_bits[i] );
      } // for
      uDebugPrt2( "\n" );
      uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, lwfds:", this );
      for ( i = 0; i < tmasks; i += 1 ) {
          uDebugPrt2( "%lx ", lwfds.fds_bits[i] );
      } // for
      uDebugPrt2( "\n" );
      uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, lefds:", this );
      for ( i = 0; i < tmasks; i += 1 ) {
          uDebugPrt2( "%lx ", lefds.fds_bits[i] );
      } // for
      uDebugPrt2( "\n" );
      uDebugRelease();
#endif
#endif // __U_DEBUG_H__

      // Remove the ready I/O fds from the master lists as these I/O
      // operations have completed.

      for ( i = 0; i < tmasks; i += 1 ) {
          uRFDs.fds_bits[i] ^= lrfds.fds_bits[i];
          uWFDs.fds_bits[i] ^= lwfds.fds_bits[i];
          uEFDs.fds_bits[i] ^= lefds.fds_bits[i];
      } // for

      // Check if maxFD needs to be reset.

      if ( ! ( FD_ISSET( maxFD, &uRFDs ) || FD_ISSET( maxFD, &uWFDs ) || FD_ISSET( maxFD, &uEFDs ) ) ) {  
          setMaxFD();
      } // if

      // Check to see which tasks are waiting for ready I/O operations and
      // wake them.

      uNBIOnode *p;
      int cnt;
      for ( uSeqGen<uNBIOnode> gen(uPendingIO); gen >> p; ) {
          cnt = 0;
          if ( p->fdType == uNBIOnode::singleFd ) {   // single fd
#ifdef __U_DEBUG_H__
            uDebugPrt( "(uNBIO &)0x%p.uCheckIO: found task %.256s (0x%p) waiting on single fd %d with mask 0x%x\n",
                     this, p->uPendingTask->uGetName(), p->uPendingTask, p->smfd.sfd.fd, *p->smfd.sfd.uRWE );
#endif // __U_DEBUG_H__
            int temp = 0;
            if ( (*p->smfd.sfd.uRWE & uCluster::uReadSelect) && FD_ISSET( p->smfd.sfd.fd, &lrfds ) ) {
                temp |= uCluster::uReadSelect;
                cnt += 1;
            } // if
            if ( (*p->smfd.sfd.uRWE & uCluster::uWriteSelect ) && FD_ISSET( p->smfd.sfd.fd, &lwfds ) ) {
                temp |= uCluster::uWriteSelect;
                cnt += 1;
            } // if
            if ( (*p->smfd.sfd.uRWE & uCluster::uExceptSelect) && FD_ISSET( p->smfd.sfd.fd, &lefds ) ) {
                temp |= uCluster::uExceptSelect;
                cnt += 1;
            } // if
            if ( cnt != 0 ) {             // reset user mask only if I/O available
                *p->smfd.sfd.uRWE = temp;
            } // if
          } else {                              // multiple fds
#ifdef __U_DEBUG_H__
            uDebugPrt( "(uNBIO &)0x%p.uCheckIO: found task %.256s (0x%p) waiting for multiple fd, nfd:%ld\n",
                     this, p->uPendingTask->uGetName(), p->uPendingTask, p->smfd.mfd.uNFDs );
#endif // __U_DEBUG_H__
            fd_set tfds;                        // temporary masks to construct result mask
            int nmasks = howmany( p->smfd.mfd.uNFDs, NFDBITS );

            for ( i = 0; i < nmasks - 1; i += 1 ) {   // fd bits of interest
                tfds.fds_bits[i] = 0;
            } // for

            int shift = p->smfd.mfd.uNFDs % NFDBITS;
            if ( shift == 0 ) {
                tfds.fds_bits[nmasks - 1] = 0;
            } else {
                tfds.fds_bits[nmasks - 1] = -1L << shift;
            } // if
#ifdef __U_DEBUG_H__
#if 1
            uDebugAcquire();
            uDebugPrt2( "(uNBIO &)0x%p.uCheckIO, tfds:", this );
            for ( i = 0; i < nmasks; i += 1 ) {
                uDebugPrt2( "%lx ", tfds.fds_bits[i] );
            } // for
            uDebugPrt2( "\n" );
            uDebugRelease();
#endif
#endif // __U_DEBUG_H__
#ifdef __U_DEBUG_H__
#if 1
            uDebugAcquire();
            if ( p->smfd.mfd.uRFDs != NULL ) {
                uDebugPrt2( "(uNBIO &)0x%p.uCheckIO, uRFDs:", this );
                for ( i = 0; i < tmasks; i += 1 ) {
                  uDebugPrt2( "%lx ", p->smfd.mfd.uRFDs->fds_bits[i] );
                } // for
                uDebugPrt2( "\n" );
            } // if
            if ( p->smfd.mfd.uWFDs != NULL ) {
                uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, uWFDs:", this );
                for ( i = 0; i < tmasks; i += 1 ) {
                  uDebugPrt2( "%lx ", p->smfd.mfd.uWFDs->fds_bits[i] );
                } // for
                uDebugPrt2( "\n" );
            } // if
            if ( p->smfd.mfd.uEFDs != NULL ) {
                uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, uEFDs:", this );
                for ( i = 0; i < tmasks; i += 1 ) {
                  uDebugPrt2( "%lx ", p->smfd.mfd.uEFDs->fds_bits[i] );
                } // for
                uDebugPrt2( "\n" );
            } // if
            uDebugRelease();
#endif
#endif // __U_DEBUG_H__
            if ( p->smfd.mfd.uRFDs ) {          // non-null user mask ?
                for ( i = 0; i < nmasks; i += 1 ) {
                  fd_mask temp = p->smfd.mfd.uRFDs->fds_bits[i] & ( tfds.fds_bits[i] | lrfds.fds_bits[i] );
                  p->smfd.mfd.uRFDs->fds_bits[i] = temp;
                  cnt += uCountBits( temp );
                } // for
            } // if
            if ( p->smfd.mfd.uWFDs ) {          // non-null user mask ?
                for ( i = 0; i < nmasks; i += 1 ) {
                  fd_mask temp = p->smfd.mfd.uWFDs->fds_bits[i] & ( tfds.fds_bits[i] | lwfds.fds_bits[i] );
                  p->smfd.mfd.uWFDs->fds_bits[i] = temp;
                  cnt += uCountBits( temp );
                } // for
            } // if
            if ( p->smfd.mfd.uEFDs ) {          // non-null user mask ?
                for ( i = 0; i < nmasks; i += 1 ) {
                  fd_mask temp = p->smfd.mfd.uEFDs->fds_bits[i] & ( tfds.fds_bits[i] | lefds.fds_bits[i] );
                  p->smfd.mfd.uEFDs->fds_bits[i] = temp;
                  cnt += uCountBits( temp );
                } // for
            } // if
#ifdef __U_DEBUG_H__
#if 1
            uDebugPrt( "(uNBIO &)0x%p.uCheckIO, cnt:%d\n", this, cnt );
            uDebugAcquire();
            if ( p->smfd.mfd.uRFDs != NULL ) {
                uDebugPrt2( "(uNBIO &)0x%p.uCheckIO, uRFDs:", this );
                for ( i = 0; i < tmasks; i += 1 ) {
                  uDebugPrt2( "%lx ", p->smfd.mfd.uRFDs->fds_bits[i] );
                } // for
                uDebugPrt2( "\n" );
            } // if
            if ( p->smfd.mfd.uWFDs != NULL ) {
                uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, uWFDs:", this );
                for ( i = 0; i < tmasks; i += 1 ) {
                  uDebugPrt2( "%lx ", p->smfd.mfd.uWFDs->fds_bits[i] );
                } // for
                uDebugPrt2( "\n" );
            } // if
            if ( p->smfd.mfd.uEFDs != NULL ) {
                uDebugPrt2( "        (uNBIO &)0x%p.uCheckIO, uEFDs:", this );
                for ( i = 0; i < tmasks; i += 1 ) {
                  uDebugPrt2( "%lx ", p->smfd.mfd.uEFDs->fds_bits[i] );
                } // for
                uDebugPrt2( "\n" );
            } // if
            uDebugRelease();
#endif
#endif // __U_DEBUG_H__
          } // if
          if ( cnt != 0 || p->uTimedout ) {           // I/O completed for this task or timed out ?
#ifdef __U_DEBUG_H__
            uDebugPrt( "(uNBIO &)0x%p.uCheckIO: removing node 0x%p, cnt:%d, uTimedout:%d\n", this, p, cnt, p->uTimedout );
#endif // __U_DEBUG_H__
            uPendingIO.uRemove( p );            // remove node from list of waiting tasks
            p->nfds = cnt;                      // set return value
            uSignal p->uPending;                // wake up waiting task (empty for IOPoller)
          } // if
      } // for
    } else if ( uFound == 0 ) {                       // time limit expired, no IO is ready
#ifdef __U_DEBUG_H__
      uDebugPrt( "(uNBIO &)0x%p.uCheckIO: time limit expired, errno:%d\n", this, errno );
#endif // __U_DEBUG_H__
      // Check for timed out IO

      uNBIOnode *p;
      for ( uSeqGen<uNBIOnode> gen(uPendingIO); gen >> p; ) {
          if ( p->uTimedout ) {                 // timed out waiting for I/O for this task ?
#ifdef __U_DEBUG_H__
            uDebugPrt( "(uNBIO &)0x%p.uCheckIO: removing node 0x%p\n", this, p );
#endif // __U_DEBUG_H__
            uPendingIO.uRemove( p );            // remove node from list of waiting tasks
            p->nfds = 0;                        // set return value
            uSignal p->uPending;                // wake up waiting task (empty for IOPoller)
          } // if
      } // for
    } else {
      // Either an EINTR occurred or one of the clients specified a bad file
      // number, and a EBADF was received.  This is handled by waking up all
      // the clients, telling them that IO has occured so that they will
      // retry their IO operation and get the error message at that point.
      // They can recover from their errors at that point more gracefully
      // than can be done here.

      if ( errno == EINTR ) {
          // probably sigalrm from migrate, do nothing
      } else if ( errno == EBADF ) {
          // Received an unexpected error, chances are that one of the tasks
          // has fouled up a call to some IO routine. Wake up all the tasks
          // that were waiting for IO, allow them to retry their IO call and
          // hope they catch the error this time.

          uNBIOnode *p;
          for ( uSeqGen<uNBIOnode> gen(uPendingIO); gen >> p; ) {
            uPendingIO.uRemove( p );            // remove node from list of waiting tasks
            p->nfds = -1;                       // mark the fact that something is wrong
            uSignal p->uPending;                // wake up waiting task (empty for IOPoller)
          } // for
      } else {
          uAbort( "(uNBIO &)0x%p.uCheckIO() : internal error, error(%d) %s.", this, errno, strerror( errno ) );
      } // if
    } // if
} // uNBIO::uCheckIO


bool uNBIO::uCheckPoller( uNBIOnode &uNode ) {
    uPendingIO.uAddTail( &uNode );              // node is removed by IOPoller

    // If this is the first task to register interest, this task is nominated
    // as the IO poller task.

    if ( uIOPoller == NULL ) {
      uIOPoller = &uThisTask();                 // make this task the poller
#ifdef __U_DEBUG_H__
      uDebugPrt( "(uNBIO &)0x%p.uCheckPoller, set poller task %.256s (0x%p)\n", this, uIOPoller->uGetName(), uIOPoller );
#endif // __U_DEBUG_H__
      return true;
    } else {
#ifdef __U_DEBUG_H__
      uDebugPrt( "(uNBIO &)0x%p.uCheckPoller, blocking non-poller I/O task %.256s (0x%p))\n", this, uThisTask().uGetName(), &uThisTask() );
#endif // __U_DEBUG_H__
      // This is not the poller task so add it to the list of tasks waiting
      // for IO events.  It is woken when an IO event is ready or nominated
      // as the poller.

      uWait uNode.uPending;
      if ( uNode.uListed() ) {                  // poller ?
          uAssert( uIOPoller == &uThisTask() );
          return true;
      } else {
          return false;
      } // if
    } // if
} // uNBIO::uCheckPoller

 
void uNBIO::setMaxFD(){
    int prevMax = maxFD;
    maxFD = 0;
#if 1
    unsigned long combined;

    // This code makes assumptions about the implemenetation of fd_set.  Namely
    // that each chunk is an unsigned long and that the rightmost bit contains
    // the lowest numbered fd.
    _STATIC_ASSERT_( (sizeof(combined) * 8) == NFDBITS );
    
    // Search backwards from previous max
    for ( int i = howmany( prevMax, NFDBITS ); i >= 0 ; i -= 1 ) {
      if ( uRFDs.fds_bits[i] || uWFDs.fds_bits[i] || uEFDs.fds_bits[i] ) {
          // combined is guaranteed to have at least one bit set
          combined = (uRFDs.fds_bits[i] | uWFDs.fds_bits[i] | uEFDs.fds_bits[i]);
          int j;
          for ( j = NFDBITS - 1; ! (combined & ( 1ul << j ) ); j -= 1 );
          maxFD = j + i * NFDBITS ;
          break;
      } // if
    } // for
#else
    // This version can run faster.  Consider if maxFD was 35 and is now 33.
    // Only two iterations are required (two because it starts at the previous
    // max, it could start at previous max - 1).  In the algorithm above, one
    // check confirms that the chunk is the same and then 30 iterations are
    // required to get from bit 31 to 1.  The algorithm above is significantly
    // better if the difference between the previous and new max is large,
    // i.e., more that 2 chunks.  An optimization might be to fold this
    // algorithm into the above algorithm and use it under certain conditions.

    for (int i = prevMax; i >= 0; i -= 1 ) {
      if ( FD_ISSET( i, &uRFDs ) || FD_ISSET( i, &uWFDs ) || FD_ISSET( i, &uEFDs ) ) {
          maxFD = i;
          break;
      } // if
    } // for
#endif
} // uNBIO::setMaxFD


void uNBIO::uCloseFD( int fd ) {
    FD_CLR( fd, &uRFDs );
    FD_CLR( fd, &uWFDs );
    FD_CLR( fd, &uEFDs );
    
    if ( fd == maxFD ) {                        // increase maxFD if necessary
      setMaxFD();
    } // if
} // uNBIO::uCloseFD


bool uNBIO::uPollIO( uNBIOnode &uNode ) {
    // If there are other tasks on the ready queue of this cluster that can
    // still execute, do a nonblocking select call, otherwise do a blocking
    // select call so that the application does not waste cpu cycles.  In
    // either case, return to the caller the number of IO operations that are
    // now possible.  If this number is zero, the caller yields the processor,
    // allowing other tasks to execute and then tries again.  Notice that this
    // delay must occur outside of the mutex members of this monitor, so that
    // other tasks can enter the monitor and register their interest in other
    // IO events.

#ifdef __U_MULTI__
    if ( uThisCluster().uReadyTasksEmpty() ) {
#else
    // uOkToSelect is set in the uniprocessor kernel when *all* clusters have
    // empty ready queues.
    if ( uOkToSelect ) {
#endif // __U_MULTI__
      uTime nexttime;
      uDuration timeout;

      nexttime = uThisProcessor().uEvents->uNextAlarm(); // find first non-context-switch event
      timeout = nexttime - uThisProcessor().uGetClock().uGetTime(); // time to sleep

      // The minimum wake up time is always uSelectTimeOut to deal with the
      // problem of a missing SIGALRM due to the race condition with
      // "select".

      if ( 0 < timeout && timeout < uSelectTimeOut ) {
          uCheckIO( timeout );
      } else {
          uCheckIO( uSelectTimeOut );
      } // if

      uOkToSelect = false;                      // reset select flag
    } else {
      uCheckIO( 0 );
    } // if

    // If the IOPoller's I/O completed, attempt to nominate another waiting
    // task to be the IOPoller.

    if ( ! uNode.uListed() ) {                        // IOPoller's node removed ?
      if ( ! uPendingIO.uEmpty() ) {                  // any other tasks waiting for IO?
          // For the uniprocessor kernel, the new poller will appear to have
          // found some IO because uFound is not cleared until the new poller
          // makes a call to check for IO.

          uNBIOnode *p = uPendingIO.uHead();          // node from list of waiting tasks
          uIOPoller = p->uPendingTask;          // next poller task
#ifdef __U_DEBUG_H__
          uDebugPrt( "(uNBIO &)0x%p.uPollIO, poller %.256s (0x%p) nominating task %.256s (0x%p) to be next poller\n",
                   this, uThisTask().uGetName(), &uThisTask(), uIOPoller->uGetName(), uIOPoller );
#endif // __U_DEBUG_H__
          uSignal p->uPending;                  // wake up waiting task
      } else {
          uIOPoller = NULL;
      } // if
      return false;
    } else {
      return true;
    } // if
} // uNBIO::uPollIO


bool uNBIO::uInitSfd( uNBIOnode &uNode ) {
    if ( uNode.smfd.sfd.fd > maxFD ) {                // increase maxFD if necessary
      maxFD = uNode.smfd.sfd.fd;
    } // if

    // set the appropriate fd bit in the master fd mask
    if ( *uNode.smfd.sfd.uRWE & uCluster::uReadSelect ) FD_SET( uNode.smfd.sfd.fd, &uRFDs );
    if ( *uNode.smfd.sfd.uRWE & uCluster::uWriteSelect ) FD_SET( uNode.smfd.sfd.fd, &uWFDs );
    if ( *uNode.smfd.sfd.uRWE & uCluster::uExceptSelect ) FD_SET( uNode.smfd.sfd.fd, &uEFDs );

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uInitSfd, adding node 0x%p\n", this, &uNode );
#endif // __U_DEBUG_H__

    return uCheckPoller( uNode );
} // uNBIO::uInitSfd


bool uNBIO::uInitSfd( uNBIOnode &uNode, uEventNode &uTimeoutEvent, uProcessor &proc ) {
    if ( uNode.smfd.sfd.fd > maxFD ) {                // increase maxFD if necessary
      maxFD = uNode.smfd.sfd.fd;
    } // if

    // set the appropriate fd bit in the master fd mask
    if ( *uNode.smfd.sfd.uRWE & uCluster::uReadSelect ) FD_SET( uNode.smfd.sfd.fd, &uRFDs );
    if ( *uNode.smfd.sfd.uRWE & uCluster::uWriteSelect ) FD_SET( uNode.smfd.sfd.fd, &uWFDs );
    if ( *uNode.smfd.sfd.uRWE & uCluster::uExceptSelect ) FD_SET( uNode.smfd.sfd.fd, &uEFDs );

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uInitSfd, adding node 0x%p\n", this, &uNode );
#endif // __U_DEBUG_H__

    proc.uEvents->uAddEvent( uTimeoutEvent, proc );

    return uCheckPoller( uNode );
} // uNBIO::uInitSfd


uNBIO::uNBIO() {
#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uNBIO\n", this );
#endif // __U_DEBUG_H__
    FD_ZERO( &uRFDs );                          // clear the read set
    FD_ZERO( &uWFDs );                          // clear the write set
    FD_ZERO( &uEFDs );                          // clear the exceptional set
    maxFD = 0;                                  // all masks are clear
    uIOPoller = NULL;                           // no task is first yet
    uOkToSelect = false;
} // uNBIO::uNBIO


uNBIO::~uNBIO() {
} // uNBIO::~uNBIO


int uNBIO::uSelect( int fd, int &rwe, timeval *timeout ) {
#ifdef __U_DEBUG_H__
#if 1
    uDebugAcquire();
    uDebugPrt2( "(uNBIO &)0x%p.uSelect1, fd:%d, rwe:%x", this, fd, rwe );
    if ( timeout != NULL ) {
      uDebugPrt2( ", timeout:%ld.%ld\n", timeout->tv_sec, timeout->tv_usec );
    } else {
      uDebugPrt2( "\n" );
    } // if
    uDebugRelease();
#endif
#endif // __U_DEBUG_H__

#ifdef __U_DEBUG__
    if ( fd < 0 || FD_SETSIZE <= fd ) {
      uAbort( ": attempt to select on file descriptor %d, which exceeds range 0-%d.",
            fd, FD_SETSIZE - 1 );
    } // if
#endif // __U_DEBUG__

    uNBIOnode uNode;
    uNode.nfds = 0;
    uNode.uPendingTask = &uThisTask();
    uNode.fdType = uNBIOnode::singleFd;
    uNode.uTimedout = false;
    uNode.smfd.sfd.fd = fd;
    uNode.smfd.sfd.uRWE = &rwe;

    if ( timeout != NULL ) {                    // timeout ?
      if ( timeout->tv_sec == 0 && timeout->tv_usec == 0 ) { // optimization
          // It is unnecessary to create a timeout event for this trivial
          // case. Just mark the timeout as having already occurred.

          uNode.uTimedout = true;
          if ( uInitSfd( uNode ) ) {                  // poller task ?
            while ( uPollIO( uNode ) ) uThisTask().uYield(); // busy wait
          } // if
      } else {
          uDuration delay( timeout->tv_sec, timeout->tv_usec * 1000 );
          uTime time = uActiveProcessorKernel->uKernelClock.uGetTime() + delay;
          uSelectTimeoutHndlr handler( uThisTask(), uNode );

          uEventNode uTimeoutEvent( uThisTask(), handler, time ); // event node for event list
          uTimeoutEvent.uExecuteLocked = true;
          uProcessor &proc = uThisProcessor();  // copy the current processor as it could change during execution

          if ( uInitSfd( uNode, uTimeoutEvent, proc ) ) { // poller task ?
            while ( uPollIO( uNode ) ) uThisTask().uYield(); // busy wait
          } // if

          // always doing remove guarantees the node is not deallocated prematurely
          proc.uEvents->uRemoveEvent( uTimeoutEvent, proc );
      } // if
    } else {
      if ( uInitSfd( uNode ) ) {                // poller task ?
          while ( uPollIO( uNode ) ) uThisTask().uYield(); // busy wait
      } // if
    } // if

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uSelect1, exits, cnt:%d\n", this, uNode.nfds );
#endif // __U_DEBUG_H__
    return uNode.nfds;
} // uNBIO::uSelect


bool uNBIO::uInitMfds( fd_mask nfds, uNBIOnode &uNode ) {
    if ( nfds > maxFD ) {                       // increase maxFD if necessary
      maxFD = nfds - 1;
    } // if

    // set the appropriate fd bits in the master fd mask
    for ( unsigned int i = 0; i < howmany( nfds, NFDBITS ); i += 1 ) {
      // mask pointers can be NULL => nothing in that mask
      if ( uNode.smfd.mfd.uRFDs ) uRFDs.fds_bits[i] |= uNode.smfd.mfd.uRFDs->fds_bits[i];
      if ( uNode.smfd.mfd.uWFDs ) uWFDs.fds_bits[i] |= uNode.smfd.mfd.uWFDs->fds_bits[i];
      if ( uNode.smfd.mfd.uEFDs ) uEFDs.fds_bits[i] |= uNode.smfd.mfd.uEFDs->fds_bits[i];
    } // for

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uInitMfds, adding node 0x%p\n", this, &uNode );
#endif // __U_DEBUG_H__

    return uCheckPoller( uNode );
} // uNBIO::uInitMfds

     
bool uNBIO::uInitMfds( fd_mask nfds, uNBIOnode &uNode, uEventNode &uTimeoutEvent, uProcessor &proc ) {
    if ( nfds > maxFD ) {                       // increase maxFD if necessary
      maxFD = nfds - 1;
    } // if

    // set the appropriate fd bits in the master fd mask
    for ( unsigned int i = 0; i < howmany( nfds, NFDBITS ); i += 1 ) {
      // mask pointers can be NULL => nothing in that mask
      if ( uNode.smfd.mfd.uRFDs ) uRFDs.fds_bits[i] |= uNode.smfd.mfd.uRFDs->fds_bits[i];
      if ( uNode.smfd.mfd.uWFDs ) uWFDs.fds_bits[i] |= uNode.smfd.mfd.uWFDs->fds_bits[i];
      if ( uNode.smfd.mfd.uEFDs ) uEFDs.fds_bits[i] |= uNode.smfd.mfd.uEFDs->fds_bits[i];
    } // for

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uInitMfds, adding node 0x%p\n", this, &uNode );
#endif // __U_DEBUG_H__

    proc.uEvents->uAddEvent( uTimeoutEvent, proc );

    return uCheckPoller( uNode );
} // uNBIO::uInitMfds


int uNBIO::uSelect( fd_mask nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, timeval *timeout ) {
#ifdef __U_DEBUG_H__
#if 1
    int i;
    int tmasks = howmany( FD_SETSIZE, NFDBITS );      // total number of masks in fd set
    uDebugPrt( "(uNBIO &)0x%p.uSelect2, nfds:%ld\n", this, nfds );
    uDebugAcquire();
    if ( rfds != NULL ) {
      uDebugPrt2( "(uNBIO &)0x%p.uSelect2, rfds:", this );
      for ( i = 0; i < tmasks; i += 1 ) {
          uDebugPrt2( "%lx ", rfds->fds_bits[i] );
      } // for
      uDebugPrt2( "\n" );
    } // if
    if ( wfds != NULL ) {
      uDebugPrt2( "        (uNBIO &)0x%p.uSelect2, wfds:", this );
      for ( i = 0; i < tmasks; i += 1 ) {
          uDebugPrt2( "%lx ", wfds->fds_bits[i] );
      } // for
      uDebugPrt2( "\n" );
    } // if
    if ( efds != NULL ) {
      uDebugPrt2( "        (uNBIO &)0x%p.uSelect2, efds:", this );
      for ( i = 0; i < tmasks; i += 1 ) {
          uDebugPrt2( "%lx ", efds->fds_bits[i] );
      } // for
      uDebugPrt2( "\n" );
    } // if
    if ( timeout != NULL ) {
      uDebugPrt2( "        (uNBIO &)0x%p.uSelect2, timeout:%ld.%ld\n", this, timeout->tv_sec, timeout->tv_usec );
    } // if
    uDebugRelease();
#endif
#endif // __U_DEBUG_H__

#ifdef __U_DEBUG__
    if ( nfds < 1 || FD_SETSIZE < nfds ) {
      uAbort( ": attempt to select with a file descriptor set size of %ld, which exceeds range 0-%d.",
            (long int)nfds, FD_SETSIZE );
    } // if
#endif // __U_DEBUG__

    uNBIOnode uNode;
    uNode.nfds = 0;
    uNode.uPendingTask = &uThisTask();
    uNode.fdType = uNBIOnode::multipleFds;
    uNode.uTimedout = false;
    uNode.smfd.mfd.uNFDs = nfds;
    uNode.smfd.mfd.uRFDs = rfds;
    uNode.smfd.mfd.uWFDs = wfds;
    uNode.smfd.mfd.uEFDs = efds;

    if ( timeout != NULL ) {                    // timeout ?
      if ( timeout->tv_sec == 0 && timeout->tv_usec == 0 ) { // optimization
          // It is unnecessary to create a timeout event for this trivial
          // case. Just mark the timeout as having already occurred.

          uNode.uTimedout = true;
          if ( uInitMfds( nfds, uNode ) ) {           // poller task ?
            while ( uPollIO( uNode ) ) uThisTask().uYield(); // busy wait
          } // if
      } else {
          uDuration delay( timeout->tv_sec, timeout->tv_usec * 1000 );
          uTime time = uActiveProcessorKernel->uKernelClock.uGetTime() + delay;
          uSelectTimeoutHndlr handler( uThisTask(), uNode );

          uEventNode uTimeoutEvent( uThisTask(), handler, time ); // event node for event list
          uTimeoutEvent.uExecuteLocked = true;
          uProcessor &proc = uThisProcessor();  // copy the current processor as it could change during execution

          if ( uInitMfds( nfds, uNode, uTimeoutEvent, proc ) ) { // poller task ?
            while ( uPollIO( uNode ) ) uThisTask().uYield(); // busy wait
          } // if

          // always doing remove guarantees the node is not deallocated prematurely
          proc.uEvents->uRemoveEvent( uTimeoutEvent, proc );
      } // if
    } else {
      if ( uInitMfds( nfds, uNode ) ) {         // poller task ?
          while ( uPollIO( uNode ) ) uThisTask().uYield(); // busy wait
      } // if
    } // if

#ifdef __U_DEBUG_H__
    uDebugPrt( "(uNBIO &)0x%p.uSelect2, exits, cnt:%d\n", this, uNode.nfds );
#endif // __U_DEBUG_H__
    return uNode.nfds;
} // uNBIO::uSelect

 
// Local Variables: //
// compile-command: "gmake install" //
// End: //

Generated by  Doxygen 1.6.0   Back to index