mq_timedsend & mq_timedreceive
Dmitriy Korovkin
dkorovkin@rambler.ru
Sun Jan 12 14:24:00 GMT 2003
Hi,
In my work on porting ISaGRAF PRO target to eCos I found that message
queue send and receive with timeout are required. So I've added functions
mq_timedsend and mq_timedreceive defined by POSIX 1003.1d draft. So,
patches to isoinfra, posix and kernel packages a attached. Your coments
are highly appreciated.
Regards,
--
Dmitriy
-------------- next part --------------
? eCos_posix.patch
Index: src/mqueue.cxx
===================================================================
RCS file: /cvs/ecos/ecos/packages/compat/posix/current/src/mqueue.cxx,v
retrieving revision 1.5
diff -r1.5 mqueue.cxx
84a85,86
> #endif
> #if defined CYGFUN_POSIX_MQUEUE_NOTIFY || defined CYGFUN_KERNEL_THREADS_TIMER
88d89
<
673a675,836
>
> //------------------------------------------------------------------------
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> externC int
> mq_timedsend( mqd_t mqdes, const char *msg_ptr, size_t msg_len,
> unsigned int msg_prio, const struct timespec * abs_timeout)
> {
> CYG_REPORT_FUNCTYPE( "returning %d" );
> CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u,
> abs_timeout = %lu, %ld",
> mqdes, msg_ptr, msg_len, msg_prio,
> abs_timeout->tv_sec, abs_timeout->tv_nsec);
> CYG_CHECK_DATA_PTRC( msg_ptr );
>
> struct mquser *user = (struct mquser *)mqdes;
> struct mqtabent *tabent = user->tabent;
>
> #ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR
> if ( user->magic != MQ_VALID_MAGIC ) {
> errno = EBADF;
> CYG_REPORT_RETVAL( -1 );
> return -1;
> }
> #endif
>
> if ( msg_len > (size_t)tabent->msgsize ) {
> errno = EMSGSIZE;
> CYG_REPORT_RETVAL( -1 );
> return -1;
> }
>
> if ( msg_prio > MQ_PRIO_MAX ) {
> errno = EINVAL;
> CYG_REPORT_RETVAL( -1 );
> return -1;
> }
>
> if ( (O_WRONLY != (user->flags & O_WRONLY)) &&
> (O_RDWR != (user->flags & O_RDWR)) ) {
> errno = EBADF;
> CYG_REPORT_RETVAL( -1 );
> return -1;
> }
>
> // go for it
> Cyg_Mqueue::qerr_t err;
> err = tabent->mq->put( msg_ptr, msg_len, msg_prio,
> ((user->flags & O_NONBLOCK) != O_NONBLOCK),
> cyg_timespec_to_ticks(abs_timeout));
> switch (err) {
>
> case Cyg_Mqueue::INTR:
> errno = EINTR;
> CYG_REPORT_RETVAL( -1 );
> return -1;
>
> case Cyg_Mqueue::WOULDBLOCK:
> CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,
> "Message queue assumed non-blocking when blocking requested"
> );
> errno = EAGAIN;
> CYG_REPORT_RETVAL( -1 );
> return -1;
>
> case Cyg_Mqueue::TIMEOUT:
> errno = ETIMEDOUT;
> CYG_REPORT_RETVAL( -1 );
> return -1;
>
> case Cyg_Mqueue::OK:
> CYG_REPORT_RETVAL( 0 );
> return 0;
>
> default:
> CYG_FAIL( "unhandled message queue return code" );
> return -1; // keep compiler happy
> } // switch
> } // mq_timedsend()
>
> //------------------------------------------------------------------------
>
>
> externC ssize_t
> mq_timedreceive( mqd_t mqdes, char *msg_ptr, size_t msg_len,
> unsigned int *msg_prio, const struct timespec * abs_timeout)
> {
> CYG_REPORT_FUNCTYPE( "returning %ld" );
> CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x,
> abs_timeout = %lu, %ld",
> mqdes, msg_ptr, msg_len, msg_prio,
> abs_timeout->tv_sec, abs_timeout->tv_nsec );
> CYG_CHECK_DATA_PTRC( msg_ptr );
> CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 );
> if ( NULL != msg_prio )
> CYG_CHECK_DATA_PTRC( msg_prio );
>
>
> struct mquser *user = (struct mquser *)mqdes;
> struct mqtabent *tabent = user->tabent;
>
> #ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR
> if ( user->magic != MQ_VALID_MAGIC ) {
> errno = EBADF;
> CYG_REPORT_RETVAL( -1 );
> return (ssize_t)-1;
> }
> #endif
>
> if ( (O_RDONLY != (user->flags & O_RDONLY)) &&
> (O_RDWR != (user->flags & O_RDWR)) ) {
> errno = EBADF;
> CYG_REPORT_RETVAL( -1 );
> return (ssize_t)-1;
> }
>
> if ( msg_len < (size_t)tabent->msgsize ) {
> errno = EMSGSIZE;
> CYG_REPORT_RETVAL( -1 );
> return (ssize_t)-1;
> }
>
> // go for it
> Cyg_Mqueue::qerr_t err;
> err = tabent->mq->get( msg_ptr, &msg_len, msg_prio,
> ((user->flags & O_NONBLOCK) != O_NONBLOCK),
> cyg_timespec_to_ticks(abs_timeout) );
> switch (err) {
>
> case Cyg_Mqueue::INTR:
> errno = EINTR;
> CYG_REPORT_RETVAL( -1 );
> return (ssize_t)-1;
>
> case Cyg_Mqueue::WOULDBLOCK:
> CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,
> "Message queue assumed non-blocking when blocking requested"
> );
> errno = EAGAIN;
> CYG_REPORT_RETVAL( -1 );
> return (ssize_t)-1;
>
> case Cyg_Mqueue::TIMEOUT:
> errno = ETIMEDOUT;
> CYG_REPORT_RETVAL( -1 );
> return -1;
>
> case Cyg_Mqueue::OK:
> CYG_ASSERT( msg_len <= (size_t)tabent->msgsize,
> "returned message too long" );
> if ( NULL != msg_prio )
> CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX,
> "returned message has invalid priority" );
> CYG_REPORT_RETVAL( msg_len );
> return (ssize_t)msg_len;
>
> default:
> CYG_FAIL( "unhandled message queue return code" );
> return (ssize_t)-1; // keep compiler happy
> } // switch
>
> } // mq_timedreceive()
>
674a838
> #endif
-------------- next part --------------
? mkthread
? eCos_kernel.patch
Index: include/mqueue.hxx
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/include/mqueue.hxx,v
retrieving revision 1.4
diff -r1.4 mqueue.hxx
85a86,89
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> INTR,
> TIMEOUT
> #else
86a91
> #endif
124a130,133
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> // put() copies len bytes of *buf into the queue at priority prio
> qerr_t put( const char *buf, size_t len, unsigned int prio,
> bool block=true, cyg_tick_count timeout = 0 );
125a135,140
> // get() returns the oldest highest priority message in the queue in *buf
> // and sets *prio to the priority (if prio is non-NULL) and *len to the
> // actual message size
> qerr_t get( char *buf, size_t *len, unsigned int *prio, bool block=true,
> cyg_tick_count timeout = 0 );
> #else
133a149
> #endif
Index: include/mqueue.inl
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/include/mqueue.inl,v
retrieving revision 1.5
diff -r1.5 mqueue.inl
273a274,278
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> CYGPRI_KERNEL_SYNCH_MQUEUE_INLINE Cyg_Mqueue::qerr_t
> Cyg_Mqueue::put( const char *buf, size_t len, unsigned int prio, bool block,
> cyg_tick_count timeout )
> #else
275a281
> #endif
288a295,303
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> if ( timeout != 0) {
> if ( false == putsem.wait(timeout) ) {
> err = TIMEOUT;
> goto exit;
> }
> }
> else
> #endif
380a396,400
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> CYGPRI_KERNEL_SYNCH_MQUEUE_INLINE Cyg_Mqueue::qerr_t
> Cyg_Mqueue::get( char *buf, size_t *len, unsigned int *prio, bool block,
> cyg_tick_count timeout )
> #else
382a403
> #endif
397a419,427
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> if ( timeout != 0) {
> if ( false == getsem.wait(timeout) ) {
> err = TIMEOUT;
> goto exit;
> }
> }
> else
> #endif
-------------- next part --------------
? eCos_posix.patch
Index: current/include/mqueue.h
===================================================================
RCS file: /cvs/ecos/ecos/packages/isoinfra/current/include/mqueue.h,v
retrieving revision 1.3
diff -r1.3 mqueue.h
70a71,73
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> #include <time.h> /* timespec */
> #endif
104a108,119
>
> #ifdef CYGFUN_KERNEL_THREADS_TIMER
> extern int
> mq_timedsend( mqd_t /* mqdes */, const char * /* msg_ptr */,
> size_t /* msg_len */, unsigned int /* msg_prio */,
> const struct timespec * /* abs_timeout */);
>
> extern ssize_t
> mq_timedreceive( mqd_t /* mqdes */, char * /* msg_ptr */,
> size_t /* msg_len */, unsigned int * /* msg_prio */,
> const struct timespec * /* abs_timeout */);
> #endif
More information about the Ecos-patches
mailing list