This is the mail archive of the pthreads-win32@sources.redhat.com mailing list for the pthreas-win32 project.
Index Nav: | [Date Index] [Subject Index] [Author Index] [Thread Index] | |
---|---|---|
Message Nav: | [Date Prev] [Date Next] | [Thread Prev] [Thread Next] |
Other format: | [Raw text] |
/* Command line options are (in this order): Number of Sender threads Number of Receiver threads CV wait time (microsecs) Trace level Sender sleeps (bool) Receiver sleeps (bool) Monitor rate (per second) E.g. condvar10.exe 8 8 1000000 -10 1 0 10 Log info is to 'condvar10.log'. */ #include <stdio.h> #include <sys/timeb.h> #include <stdlib.h> #include <signal.h> #include "condvar10.h" char * logFile = "condvar10.log"; const DWORD MILLISEC_PER_SEC = 1000L; const DWORD MICROSEC_PER_NANOSEC = 1000L; const DWORD NANOSEC_PER_MILLISEC = 1000000L; const DWORD MICROSEC_PER_SEC = 1000000L; const DWORD NANOSEC_PER_SEC = 1000000000L; enum { Receiver = 0, Sender, MaxThreads = 100 }; pthread_t tid[2][MaxThreads]; typedef struct thrState_t_ { int op; int watchdog; int signalled; pthread_mutex_t opLock; } thrState_t; thrState_t thrState[2][MaxThreads]; void* recvReq(void *arg); void* sendReq(void *arg); void SendData(); void RecvData(); int msg=0; int trace=1; int sendSleep=0; int recvSleep=0; int monitorRate=10; DWORD monitorInterval; DWORD counter=0; DWORD lastCount=0; DWORD received=0; DWORD TOs=0; pthread_mutex_t lock; pthread_cond_t sig; int noSthr = 1; int noRthr = 1; DWORD timeint = 5*MICROSEC_PER_SEC; // 5 sec int ThreadRecvCount[MaxThreads]; int ThreadTOCount[MaxThreads]; int ThreadSentCount[MaxThreads]; enum Operations { SLock = 0x00000001, ELock = 0x00000002, SUnlock = 0x00000010, EUnlock = 0x00000020, SWait = 0x00000100, EWait = 0x00000200, WaitTimeout = 0x00000400, SSignal = 0x00001000, ESignal = 0x00002000, MsgFalse = 0x40000000, MsgTrue = 0x80000000, }; void RecvData(int threadNum); void SendData(int threadNum); void * recvReq(void *arg); void * sendReq(void *arg); pthread_mutex_t LOGX; #define OPENLOG(_openMode) \ { \ FILE * LOGFP; \ (void)pthread_mutex_lock(&LOGX); \ if ((LOGFP=fopen(logFile, _openMode)) == NULL) \ { \ fprintf(stdout, "Line %d: Log open error\n", __LINE__); \ fflush(stdout); \ } \ else \ { #define CLOSELOG(_exitAfterClose) \ fclose(LOGFP); \ if(_exitAfterClose) exit(1); \ } \ (void)pthread_mutex_unlock(&LOGX); \ } #define LOGERR \ { \ if(status!=0) \ { \ OPENLOG("a"); \ fprintf(LOGFP,"Error at line %d, status %d\n",__LINE__, status); \ CLOSELOG(1); \ } \ } void PR (char *s) { long id; if(trace>0) { id=GetCurrentThreadId (); OPENLOG("a"); fprintf(LOGFP,"TH-%lx:%s\n",id,s); CLOSELOG(0); } } void SetOp(int SR, int threadNum, int op) { int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock); LOGERR; thrState[SR][threadNum].op = op; status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock); LOGERR; } void OrOp(int SR, int threadNum, int op) { int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock); LOGERR; thrState[SR][threadNum].op |= op; status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock); LOGERR; } void SetWatchdog(int SR, int threadNum, int woof) { int status = pthread_mutex_lock(&thrState[SR][threadNum].opLock); LOGERR; thrState[SR][threadNum].watchdog = woof; status = pthread_mutex_unlock(&thrState[SR][threadNum].opLock); LOGERR; } BOOL logPoint(long increment) { if (counter >= lastCount + increment) { lastCount = counter; return TRUE; } return FALSE; } void PrintOptions (FILE * fp) { fprintf(fp, "Options are (in this order):\n"); fprintf(fp, " %-30s: %8ld\n", "Number of Sender threads", noSthr); fprintf(fp, " %-30s: %8ld\n", "Number of Receiver threads", noRthr); fprintf(fp, " %-30s: %8ld\n", "CV wait time (microsecs)", timeint); fprintf(fp, " %-30s: %8ld\n", "Trace level", trace); fprintf(fp, " %-30s: %8ld\n", "Sender sleeps (bool)", sendSleep); fprintf(fp, " %-30s: %8ld\n", "Receiver sleeps (bool)", recvSleep); fprintf(fp, " %-30s: %8ld\n", "Monitor rate (per second)", monitorRate); putc('\n',fp); } int main(int argc , char * argv[]) { pthread_mutexattr_t la; int status; int ii; DWORD milliseconds = 0; DWORD lastSendWatch = 0; DWORD lastRecvWatch = 0; int r = 0; DWORD seconds = 0; DWORD lastLogSeconds = 0; char * rotor = "/-\\|"; if(argc>1) { noSthr=atoi(argv[1]); if (noSthr >= MaxThreads) { printf("Requested too many Secnder threads = %d. Max is %d\n", noSthr, MaxThreads); exit(1); } } if(argc>2) { noRthr=atoi(argv[2]); if (noRthr >= MaxThreads) { printf("Requested too many Receiver threads = %d. Max is %d\n", noRthr, MaxThreads); exit(1); } } if(argc>3) { timeint=atoi(argv[3]); } if(argc>4) { trace=atoi(argv[4]); } if(argc>5) { sendSleep=atoi(argv[5]); } if(argc>6) { recvSleep=atoi(argv[6]); } if(argc>7) { monitorRate=atoi(argv[7]); // Round to nearest 10 so that 1000ms (1 sec) is a multiple of the interval monitorRate=((monitorRate+5)/10)*10; } monitorInterval = MILLISEC_PER_SEC/monitorRate; if (pthread_mutexattr_init(&la) != 0 || pthread_mutexattr_settype(&la, PTHREAD_MUTEX_ERRORCHECK) != 0 || pthread_mutex_init(&LOGX, &la) != 0) { printf("Line %d: Error initialising log mutex.\n", __LINE__); exit(1); } status = pthread_mutex_init(&lock, &la); LOGERR; status = pthread_cond_init(&sig, NULL); LOGERR; PrintOptions(stdout); fflush(stdout); OPENLOG("w"); PrintOptions(LOGFP); CLOSELOG(0); for(ii = 0; ii < noRthr; ii++) { status = pthread_mutex_init(&thrState[Receiver][ii].opLock, &la); LOGERR; status = pthread_create(&tid[Receiver][ii], NULL, (PTHREAD_START_ROUTINE_DECL)&recvReq, (void *)ii); LOGERR; } for(ii = 0; ii < noSthr; ii++) { status = pthread_mutex_init(&thrState[Sender][ii].opLock, &la); LOGERR; status = pthread_create(&tid[Sender][ii], NULL, (PTHREAD_START_ROUTINE_DECL)&sendReq, (void *)ii); LOGERR; } status = pthread_mutexattr_destroy(&la); LOGERR; while(1) //Monitor threads until they hang { int stillRunning; BOOL newSecond; Sleep(monitorInterval); milliseconds+=monitorInterval; newSecond = (milliseconds >= MILLISEC_PER_SEC); putchar(rotor[r=((r++)&0x3)]); putchar('\b'); // Log Sends and Receives/Timeouts if (trace > 0 || (trace < 0 && trace >= -5000 && (logPoint(-trace) || seconds > lastLogSeconds + 1 /* At least 1 second */))) { int ii; lastLogSeconds = seconds; OPENLOG("a"); fprintf(LOGFP,"count=%010ld, Thr/Recvd/TOs", counter); for (ii=0;ii<noRthr;ii++) { fprintf(LOGFP, " %d/%04d/%04d", ii, ThreadRecvCount[ii], ThreadTOCount[ii]); ThreadRecvCount[ii]=0; ThreadTOCount[ii]=0; } fprintf(LOGFP," : Thr/Sent"); for (ii=0;ii<noSthr;ii++) { fprintf(LOGFP, " %d/%04d", ii, ThreadSentCount[ii]); ThreadSentCount[ii]=0; } putc('\n',LOGFP); CLOSELOG(0); } // Check for hung threads. stillRunning=noSthr; if (!sendSleep || seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC))) { for (ii=0;ii<noSthr;ii++) { status = pthread_mutex_lock(&thrState[Sender][ii].opLock); LOGERR; if (thrState[Sender][ii].signalled == 0) { OPENLOG("a"); fprintf(LOGFP, "Thread %2d: didn't emit signal.\n", ii); CLOSELOG(0); } thrState[Sender][ii].signalled = 0; if (thrState[Sender][ii].watchdog == 0) { stillRunning--; OPENLOG("a"); fprintf(LOGFP, "Thread %d: Sender operation trace: 0x%x\n", ii, thrState[Sender][ii].op); CLOSELOG(0); } status = pthread_mutex_unlock(&thrState[Sender][ii].opLock); LOGERR; } if (stillRunning==0) { OPENLOG("a"); fprintf(LOGFP,"Line %d: exit.\n", __LINE__); CLOSELOG(0); exit(1); } } if (!(recvSleep || sendSleep) || (recvSleep && seconds > (lastRecvWatch + (2*timeint/MICROSEC_PER_SEC))) || (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))) { stillRunning=noRthr; for (ii=0;ii<noRthr;ii++) { status = pthread_mutex_lock(&thrState[Receiver][ii].opLock); LOGERR; if (thrState[Receiver][ii].watchdog == 0) { stillRunning--; OPENLOG("a"); fprintf(LOGFP, "Thread %d: Receiver operation trace: 0x%x\n", ii, thrState[Receiver][ii].op); CLOSELOG(0); } status = pthread_mutex_unlock(&thrState[Receiver][ii].opLock); LOGERR; } if (stillRunning==0) { OPENLOG("a"); fprintf(LOGFP,"Line %d: exit.\n", __LINE__); CLOSELOG(0); exit(1); } } if (newSecond) { milliseconds = 0; seconds++; OPENLOG("a"); fprintf(LOGFP, "==Seconds=[%ld]==Msg=[%d]==Count=[%ld]==Received=[%ld]==TOs=[%ld]====\n", \ seconds, msg, counter, received, TOs); for (ii=0;ii<noRthr;ii++) { fprintf(LOGFP, " %d/%04d/%04d", ii, ThreadRecvCount[ii], ThreadTOCount[ii]); ThreadRecvCount[ii]=0; ThreadTOCount[ii]=0; } fprintf(LOGFP," : Thr/Sent"); for (ii=0;ii<noSthr;ii++) { fprintf(LOGFP, " %d/%04d", ii, ThreadSentCount[ii]); ThreadSentCount[ii]=0; } putc('\n',LOGFP); for (ii=0;ii<noSthr;ii++) { status = pthread_mutex_lock(&thrState[Sender][ii].opLock); LOGERR; fprintf(LOGFP, "S%d/0x%x ", ii, thrState[Sender][ii].op); status = pthread_mutex_unlock(&thrState[Sender][ii].opLock); LOGERR; } putc('\n',LOGFP); for (ii=0;ii<noRthr;ii++) { status = pthread_mutex_lock(&thrState[Receiver][ii].opLock); LOGERR; fprintf(LOGFP, "R%d/0x%x ", ii, thrState[Receiver][ii].op); status = pthread_mutex_unlock(&thrState[Receiver][ii].opLock); LOGERR; } putc('\n',LOGFP); CLOSELOG(0); // Reset watchdogs #if 1 if (!sendSleep || (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))) { lastSendWatch = seconds; for (ii=0;ii<noSthr;ii++) { SetWatchdog(Sender,ii,0); } } if (!(recvSleep || sendSleep) || (recvSleep && seconds > (lastRecvWatch + (2*timeint/MICROSEC_PER_SEC))) || (sendSleep && seconds > (lastSendWatch + (2*timeint/MICROSEC_PER_SEC)))) { lastRecvWatch = seconds; for (ii=0;ii<noRthr;ii++) { SetWatchdog(Receiver,ii,0); } } #endif } } return 0; } ///////////// void * sendReq(void *arg) { int thr = (int)arg; DWORD sleepTime_ms = (2*timeint*MILLISEC_PER_SEC)/MICROSEC_PER_SEC; OPENLOG("a"); fprintf(LOGFP, "Sender Thread %2d id %lx started:", thr, GetCurrentThreadId()); if (sendSleep) { fprintf(LOGFP, " sleep time %ld ms", sleepTime_ms); } putc('\n', LOGFP); CLOSELOG(0); thrState[Sender][thr].watchdog = 1; thrState[Sender][thr].signalled = 0; while(1) { SetWatchdog(Sender,thr,1); SendData(thr); // Note: If timeint < 500 microseconds we just yield the CPU // with Sleep(0). if (sendSleep) { Sleep(sleepTime_ms); } } return 0; } ///////////// void SendData(int threadNum) { int status; SetOp(Sender,threadNum,SLock); status = pthread_mutex_lock(&lock); OrOp(Sender,threadNum,ELock); LOGERR; PR("lock -01"); if(msg==0) { OrOp(Sender,threadNum,MsgFalse); msg=1; counter++; ThreadSentCount[threadNum]++; PR("unlock -01"); OrOp(Sender,threadNum,SUnlock); status=pthread_mutex_unlock(&lock); OrOp(Sender,threadNum,EUnlock); LOGERR; PR("signal -01"); OrOp(Sender,threadNum,SSignal); status = pthread_cond_signal(&sig); OrOp(Sender,threadNum,ESignal); LOGERR; thrState[Sender][threadNum].signalled++; } else { OrOp(Sender,threadNum,MsgTrue); PR("unlock -01"); OrOp(Sender,threadNum,SUnlock); status=pthread_mutex_unlock(&lock); OrOp(Sender,threadNum,EUnlock); LOGERR; } } /////////////// void * recvReq(void *arg) { int thr = (int)arg; OPENLOG("a"); fprintf(LOGFP, "Receiver Thread %2d id %lx started\n", thr, GetCurrentThreadId()); CLOSELOG(0); thrState[Receiver][thr].watchdog = 1; ThreadRecvCount[thr]=0; ThreadTOCount[thr]=0; while(1) { SetWatchdog(Receiver,thr,1); RecvData(thr); if (recvSleep) { Sleep(0); } } return 0; } /////////////// void RecvData(int threadNum) { int status; SetOp(Receiver,threadNum,SLock); status = pthread_mutex_lock(&lock); OrOp(Receiver,threadNum,ELock); LOGERR; PR("lock -11"); while (msg == 0) { struct timespec abstime; struct _timeb currSysTime; OrOp(Receiver,threadNum,MsgFalse); _ftime(&currSysTime); abstime.tv_sec = currSysTime.time; abstime.tv_nsec = NANOSEC_PER_MILLISEC * currSysTime.millitm; // printf("Now: %ld.%ld\n", abstime.tv_sec, abstime.tv_nsec); // fflush(stdout); abstime.tv_nsec += (timeint%MICROSEC_PER_SEC)*MICROSEC_PER_NANOSEC; if (abstime.tv_nsec >= NANOSEC_PER_SEC) { abstime.tv_nsec -= NANOSEC_PER_SEC; abstime.tv_sec++; } abstime.tv_sec += timeint/MICROSEC_PER_SEC; // printf("TO : %ld.%ld\n", abstime.tv_sec, abstime.tv_nsec); // fflush(stdout); PR("wait/unlock -11"); OrOp(Receiver,threadNum,SWait); status = pthread_cond_timedwait(&sig, &lock, &abstime); OrOp(Receiver,threadNum,EWait); PR("lock/awake -11"); if (status == ETIMEDOUT) { ThreadTOCount[threadNum]++; TOs++; PR("timeout -11"); PR("unlock -11"); OrOp(Receiver,threadNum,WaitTimeout); OrOp(Receiver,threadNum,SUnlock); status=pthread_mutex_unlock(&lock); OrOp(Receiver,threadNum,EUnlock); LOGERR; return ; } LOGERR; } if (msg==1) { OrOp(Receiver,threadNum,MsgTrue); } ThreadRecvCount[threadNum]++; msg=0; received++; PR("unlock -11"); OrOp(Receiver,threadNum,SUnlock); status=pthread_mutex_unlock(&lock); OrOp(Receiver,threadNum,EUnlock); LOGERR; return ; }
Attachment:
condvar10.h
Description: type
CP = copy RM = erase MKDIR = mkdir TOUCH = echo Passed > ECHO = @echo CPHDR = pthread.h semaphore.h sched.h #OPTIM = /O2 /Ob2 OPTIM = # C++ Exceptions VCEFLAGS = /GX /TP /DPtW32NoCatchWarn /D__CLEANUP_CXX VCELIB = ../lib/pthreadVCE.lib VCEDLL = pthreadVCE.dll # Structured Exceptions VSEFLAGS = /D__CLEANUP_SEH VSELIB = ../lib/pthreadVSE.lib VSEDLL = pthreadVSE.dll # C cleanup code VCFLAGS = /D__CLEANUP_C VCLIB = ../lib/pthreadVC.lib VCDLL = pthreadVC.dll # C++ Exceptions in application - using VC version of pthreads dll VCXFLAGS = /GX /TP /D__CLEANUP_C CFLAGS= $(OPTIM) /W3 /WX /MD /nologo /Yd /Zi -D_WIN32_WINNT=0x400 LFLAGS= /INCREMENTAL:NO INCLUDES=-I. -I../include COPYFILES = $(CPHDR) $(CPLIB) $(CPDLL) TEST = condvar10.exe EHFLAGS = default: VC VCE: @ nmake CPLIB="$(VCELIB)" CPDLL="$(VCEDLL)" EHFLAGS="$(VCEFLAGS)" $(TEST) VSE: @ nmake CPLIB="$(VSELIB)" CPDLL="$(VSEDLL)" EHFLAGS="$(VSEFLAGS)" $(TEST) VC: @ nmake CPLIB="$(VCLIB)" CPDLL="$(VCDLL)" EHFLAGS="$(VCFLAGS)" $(TEST) VCX: @ nmake CPLIB="$(VCLIB)" CPDLL="$(VCDLL)" EHFLAGS="$(VCXFLAGS)" $(TEST) .c.exe: @ $(ECHO) $(CC) $(EHFLAGS) $(CFLAGS) $(INCLUDES) $< /Fe$@ /link $(LFLAGS) $(CPLIB) @ $(CC) $(EHFLAGS) $(CFLAGS) $(INCLUDES) $< /Fe$@ /link $(LFLAGS) $(CPLIB) .c.i: @ $(CC) /P $(VCEFLAGS) $(CFLAGS) $(INCLUDES) $< $(COPYFILES): @ $(ECHO) Copying $@ @ $(CP) $(BUILD_DIR)\$@ . pthread.dll: @ $(CP) $(CPDLL) $*.dll @ $(CP) $(CPLIB) $*.lib clean: - $(RM) *.e - $(RM) *.i - $(RM) *.obj - $(RM) *.pdb - $(RM) *.o - $(RM) *.asm - $(RM) *.exe - $(RM) *.log
CP = copy MV = rename RM = erase MKDIR = mkdir TOUCH = echo Passed > ECHO = @echo MAKE = make # # Mingw32 # GLANG = c++ CC = gcc XXCFLAGS = CFLAGS = -O3 -g -UNDEBUG -Wall $(XXCFLAGS) #CFLAGS = -g -O0 -UNDEBUG -Wall $(XXCFLAGS) INCLUDES = -I. -I../include LIBDIRS = -L. -L../lib GCX = DUMMY HDR = pthread.h semaphore.h sched.h LIB = ../lib/libpthread$(GCX).a DLL = pthread$(GCX).dll # If a test case returns a non-zero exit code to the shell, make will # stop. TEST = condvar10.exe GC: $(MAKE) GCX=GC XXCFLAGS="-x c -D__CLEANUP_C" $(TEST) GCE: $(MAKE) GCX=GCE XXCFLAGS="-mthreads -x c++ -D__CLEANUP_CXX" $(TEST) GCX: $(MAKE) GCX=GC XXCFLAGS="-mthreads -x c++ -D__CLEANUP_C" $(TEST) %.exe: %.c @ $(ECHO) Compiling $@ @ $(ECHO) $(CC) $(CFLAGS) -o $@ $^ $(INCLUDES) $(LIBDIRS) -lpthread$(GCX) @ $(CC) $(CFLAGS) -o $@ $^ $(INCLUDES) $(LIBDIRS) -lpthread$(GCX) %.pre: %.c @ $(CC) -E $(CFLAGS) -o $@ $^ $(INCLUDES) %.s: %.c @ $(CC) -S $(CFLAGS) -o $@ $^ $(INCLUDES) clean: - $(RM) *.a - $(RM) *.e - $(RM) *.i - $(RM) *.obj - $(RM) *.pdb - $(RM) *.exe - $(RM) *.log
Index Nav: | [Date Index] [Subject Index] [Author Index] [Thread Index] | |
---|---|---|
Message Nav: | [Date Prev] [Date Next] | [Thread Prev] [Thread Next] |