mq_timedsend & mq_timedreceive

Dmitriy Korovkin dkorovkin@rambler.ru
Sun Jan 12 14:24:00 GMT 2003


Hi,
In my work on porting ISaGRAF PRO target to eCos I found that message 
queue send and receive with timeout are required. So I've added functions 
mq_timedsend and mq_timedreceive defined by POSIX 1003.1d draft. So, 
patches to isoinfra, posix and kernel packages a attached. Your coments 
are highly appreciated.
Regards,
-- 
Dmitriy
-------------- next part --------------
? eCos_posix.patch
Index: src/mqueue.cxx
===================================================================
RCS file: /cvs/ecos/ecos/packages/compat/posix/current/src/mqueue.cxx,v
retrieving revision 1.5
diff -r1.5 mqueue.cxx
84a85,86
> #endif
> #if defined CYGFUN_POSIX_MQUEUE_NOTIFY || defined CYGFUN_KERNEL_THREADS_TIMER
88d89
< 
673a675,836
> 
> //------------------------------------------------------------------------
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> externC int
> mq_timedsend( mqd_t mqdes, const char *msg_ptr, size_t msg_len,
>          unsigned int msg_prio, const struct timespec * abs_timeout)
> {
>     CYG_REPORT_FUNCTYPE( "returning %d" );
>     CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u,
> 	                      abs_timeout = %lu, %ld",
>                          mqdes, msg_ptr, msg_len, msg_prio, 
>                          abs_timeout->tv_sec, abs_timeout->tv_nsec);
>     CYG_CHECK_DATA_PTRC( msg_ptr );
>     
>     struct mquser *user = (struct mquser *)mqdes;
>     struct mqtabent *tabent = user->tabent;
> 
> #ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR
>     if ( user->magic != MQ_VALID_MAGIC ) {
>         errno  = EBADF;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
>     }
> #endif
>     
>     if ( msg_len > (size_t)tabent->msgsize ) {
>         errno = EMSGSIZE;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
>     }
> 
>     if ( msg_prio > MQ_PRIO_MAX ) {
>         errno = EINVAL;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
>     }
> 
>     if ( (O_WRONLY != (user->flags & O_WRONLY)) && 
>          (O_RDWR != (user->flags & O_RDWR)) ) {
>         errno = EBADF;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
>     }
> 
>     // go for it
>     Cyg_Mqueue::qerr_t err;
>     err = tabent->mq->put( msg_ptr, msg_len, msg_prio,
>                            ((user->flags & O_NONBLOCK) != O_NONBLOCK), 
>                            cyg_timespec_to_ticks(abs_timeout));
>     switch (err) {
> 
>     case Cyg_Mqueue::INTR:
>         errno = EINTR;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
> 
>     case Cyg_Mqueue::WOULDBLOCK:
>         CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,
>                     "Message queue assumed non-blocking when blocking requested"
>             );
>         errno = EAGAIN;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
> 
>     case Cyg_Mqueue::TIMEOUT:
>         errno = ETIMEDOUT;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
>         
>     case Cyg_Mqueue::OK:
>         CYG_REPORT_RETVAL( 0 );
>         return 0;
> 
>     default:
>         CYG_FAIL( "unhandled message queue return code" );
>         return -1; // keep compiler happy
>     } // switch
> } // mq_timedsend()
> 
> //------------------------------------------------------------------------
> 
> 
> externC ssize_t
> mq_timedreceive( mqd_t mqdes, char *msg_ptr, size_t msg_len,
>             unsigned int *msg_prio, const struct timespec * abs_timeout)
> {
>     CYG_REPORT_FUNCTYPE( "returning %ld" );
>     CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x,
> 	                      abs_timeout = %lu, %ld",
>                          mqdes, msg_ptr, msg_len, msg_prio,
>                          abs_timeout->tv_sec, abs_timeout->tv_nsec );
>     CYG_CHECK_DATA_PTRC( msg_ptr );
>     CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 );
>     if ( NULL != msg_prio )
>         CYG_CHECK_DATA_PTRC( msg_prio );
>     
>     
>     struct mquser *user = (struct mquser *)mqdes;
>     struct mqtabent *tabent = user->tabent;
> 
> #ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR
>     if ( user->magic != MQ_VALID_MAGIC ) {
>         errno  = EBADF;
>         CYG_REPORT_RETVAL( -1 );
>         return (ssize_t)-1;
>     }
> #endif
>     
>     if ( (O_RDONLY != (user->flags & O_RDONLY)) && 
>          (O_RDWR != (user->flags & O_RDWR)) ) {
>         errno = EBADF;
>         CYG_REPORT_RETVAL( -1 );
>         return (ssize_t)-1;
>     }
> 
>     if ( msg_len < (size_t)tabent->msgsize ) {
>         errno = EMSGSIZE;
>         CYG_REPORT_RETVAL( -1 );
>         return (ssize_t)-1;
>     }
> 
>     // go for it
>     Cyg_Mqueue::qerr_t err;
>     err = tabent->mq->get( msg_ptr, &msg_len, msg_prio,
>                            ((user->flags & O_NONBLOCK) != O_NONBLOCK),
>                            cyg_timespec_to_ticks(abs_timeout) );
>     switch (err) {
> 
>     case Cyg_Mqueue::INTR:
>         errno = EINTR;
>         CYG_REPORT_RETVAL( -1 );
>         return (ssize_t)-1;
> 
>     case Cyg_Mqueue::WOULDBLOCK:
>         CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,
>                     "Message queue assumed non-blocking when blocking requested"
>             );
>         errno = EAGAIN;
>         CYG_REPORT_RETVAL( -1 );
>         return (ssize_t)-1;
> 
>     case Cyg_Mqueue::TIMEOUT:
>         errno = ETIMEDOUT;
>         CYG_REPORT_RETVAL( -1 );
>         return -1;
>         
>     case Cyg_Mqueue::OK:
>         CYG_ASSERT( msg_len <= (size_t)tabent->msgsize,
>                     "returned message too long" );
>         if ( NULL != msg_prio )
>             CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX,
>                         "returned message has invalid priority" );
>         CYG_REPORT_RETVAL( msg_len );
>         return (ssize_t)msg_len;
> 
>     default:
>         CYG_FAIL( "unhandled message queue return code" );
>         return (ssize_t)-1; // keep compiler happy
>     } // switch
>     
> } // mq_timedreceive()
> 
674a838
> #endif
-------------- next part --------------
? mkthread
? eCos_kernel.patch
Index: include/mqueue.hxx
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/include/mqueue.hxx,v
retrieving revision 1.4
diff -r1.4 mqueue.hxx
85a86,89
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
>         INTR,
> 		TIMEOUT
> #else
86a91
> #endif
124a130,133
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
>     // put() copies len bytes of *buf into the queue at priority prio
>     qerr_t put( const char *buf, size_t len, unsigned int prio,
>                 bool block=true, cyg_tick_count timeout = 0 );
125a135,140
>     // get() returns the oldest highest priority message in the queue in *buf
>     // and sets *prio to the priority (if prio is non-NULL) and *len to the
>     // actual message size
>     qerr_t get( char *buf, size_t *len, unsigned int *prio, bool block=true,
>                 cyg_tick_count timeout = 0 ); 
> #else
133a149
> #endif
Index: include/mqueue.inl
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/include/mqueue.inl,v
retrieving revision 1.5
diff -r1.5 mqueue.inl
273a274,278
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> CYGPRI_KERNEL_SYNCH_MQUEUE_INLINE Cyg_Mqueue::qerr_t
> Cyg_Mqueue::put( const char *buf, size_t len, unsigned int prio, bool block,
> 	cyg_tick_count timeout )
> #else
275a281
> #endif
288a295,303
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> 		if ( timeout != 0) {
> 			if ( false == putsem.wait(timeout) ) {
>				err = TIMEOUT;
>				goto exit;
>			}	
> 		}
> 		else
> #endif
380a396,400
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> CYGPRI_KERNEL_SYNCH_MQUEUE_INLINE Cyg_Mqueue::qerr_t
> Cyg_Mqueue::get( char *buf, size_t *len, unsigned int *prio, bool block,
> 	cyg_tick_count timeout )
> #else
382a403
> #endif
397a419,427
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> 		if ( timeout != 0) {
> 			if ( false == getsem.wait(timeout) ) {
>				err = TIMEOUT;
>				goto exit;
>			}	
> 		}
> 		else
> #endif
-------------- next part --------------
? eCos_posix.patch
Index: current/include/mqueue.h
===================================================================
RCS file: /cvs/ecos/ecos/packages/isoinfra/current/include/mqueue.h,v
retrieving revision 1.3
diff -r1.3 mqueue.h
70a71,73
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> #include <time.h> /* timespec */
> #endif
104a108,119
> 
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> extern int 
> mq_timedsend( mqd_t /* mqdes */, const char * /* msg_ptr */, 
>               size_t /* msg_len */, unsigned int /* msg_prio */,
>               const struct timespec * /* abs_timeout */);
> 
> extern ssize_t 
> mq_timedreceive( mqd_t /* mqdes */, char * /* msg_ptr */, 
>                  size_t /* msg_len */, unsigned int * /* msg_prio */,
>                  const struct timespec * /* abs_timeout */);
> #endif


More information about the Ecos-patches mailing list