From 49398925a223b519ae92a191d528c41f4fc34bda Mon Sep 17 00:00:00 2001 From: Chris Meek Date: Tue, 7 Feb 2012 16:23:08 -0500 Subject: [PATCH] PR13609: Parallel Server Connections Used pthreads to parallelize the server. A new thread is created to handle each incomming connection. The maximum number of threads created is limited by the new --max-threads=N option. If a value of 0 is passed, no new threads are created and each connection is handled in the main thread, in serial. The default value is equal to the number of processors on the host. --- cscommon.h | 8 +++ stap-server | 90 +++++++++++++++++++--------- stap-serverd.cxx | 149 +++++++++++++++++++++++++++++++++++++---------- 3 files changed, 188 insertions(+), 59 deletions(-) diff --git a/cscommon.h b/cscommon.h index 70f286074..70ca9867c 100644 --- a/cscommon.h +++ b/cscommon.h @@ -48,6 +48,14 @@ struct cs_protocol_version const char *v; }; +struct thread_arg +{ + PRFileDesc *tcpSocket; + CERTCertificate *cert; + SECKEYPrivateKey *privKey; + PRNetAddr addr; +}; + #if HAVE_NSS extern int read_from_file (const std::string &fname, cs_protocol_version &data); extern std::string get_cert_serial_number (const CERTCertificate *cert); diff --git a/stap-server b/stap-server index 1562fcbd6..8e8644006 100644 --- a/stap-server +++ b/stap-server @@ -49,26 +49,28 @@ OPT_OTHER= OPT_PORT_IX=0 OPT_LOG_IX=0 OPT_SSL_IX=0 +OPT_MAXTHREADS_IX=0 echo_usage () { echo $"Usage: $prog {start|stop|restart|condrestart|try-restart|force-reload|status} [options]" echo $"Options:" - echo $" -c configfile : specify additional global configuration file." - echo $" -a arch : specify the target architecture." - echo $" -r release : specify the kernel release." - echo $" -I path : augment the search path for tapsets." - echo $" -R path : specify the location of the systemtap runtime." - echo $" -B options : specify 'make' options for building systemtap modules." - echo $" -D name[=value] : add a macro definition for building systemtap modules." - echo $" -u username : specify the user who will run the server(s)." - echo $" -i : specify a server for each installed kernel release." - echo $" -n nickname : specify a server configuration by nickname." - echo $" -p pid : specify a server or server configuration by process id." - echo $" -P : use a password for the server's NSS certificate database." - echo $" -k : keep server temporary files." - echo $" --port port : specify the network port to be used by the server." - echo $" --log path : specify the location of the server's log file." - echo $" --ssl path : specify the location of the server's certificate database." + echo $" -c configfile : specify additional global configuration file." + echo $" -a arch : specify the target architecture." + echo $" -r release : specify the kernel release." + echo $" -I path : augment the search path for tapsets." + echo $" -R path : specify the location of the systemtap runtime." + echo $" -B options : specify 'make' options for building systemtap modules." + echo $" -D name[=value] : add a macro definition for building systemtap modules." + echo $" -u username : specify the user who will run the server(s)." + echo $" -i : specify a server for each installed kernel release." + echo $" -n nickname : specify a server configuration by nickname." + echo $" -p pid : specify a server or server configuration by process id." + echo $" -P : use a password for the server's NSS certificate database." + echo $" -k : keep server temporary files." + echo $" --port port : specify the network port to be used by the server." + echo $" --log path : specify the location of the server's log file." + echo $" --ssl path : specify the location of the server's certificate database." + echo $" --max-threads threads : specify the maximum number of worker threads to handle concurrent requests." echo $"" echo $"All options may be specified more than once." echo $"" @@ -86,12 +88,16 @@ echo_usage () { echo $"" echo $"If --ssl is not specified, the default is '$stap_sysconfdir/ssl/server'." echo $"" + echo $"If --max-threads is not specified, the default is the number of processors." + echo $"" + echo $"If --max-threads is specified with a value of 0, all requests are handled in the main thread." + echo $"" echo $"Each -D, -I and -B option specifies an additional macro, path or option respectively" echo $"to be applied to subsequent servers specified." echo $"" - echo $"Each --port, --log and --ssl option is added to an option-specific list which will" - echo $"be applied, in turn, to each server specified. If more servers are specified than" - echo $"options in a given list, the default for that option will be used for subsequent" + echo $"Each --port, --log, --ssl and --max-threads option is added to an option-specific list" + echo $"which will be applied, in turn, to each server specified. If more servers are specified" + echo $"than options in a given list, the default for that option will be used for subsequent" echo $"servers." echo $"" echo $"For other options, each new instance overrides the previous setting." @@ -261,6 +267,10 @@ parse_args () { # arguments OPT_SSL+=("$2") shift 1 ;; + --max-threads) + OPT_MAXTHREADS+=("$2") + shift 1 + ;; --) ;; *) @@ -308,6 +318,13 @@ add_distributed_options () { else SERVER_CMDS+=("SSL=\"\"") fi + # The --max-threads option + if test -n "${OPT_MAXTHREADS[$OPT_MAXTHREADS_IX]}"; then + SERVER_CMDS+=("MAXTHREADS=\"`quote_for_cmd "${OPT_MAXTHREADS[$OPT_MAXTHREADS_IX]}"`\"") + OPT_MAXTHREADS_IX=$(($OPT_MAXTHREADS_IX + 1)) + else + SERVER_CMDS+=("MAXTHREADS=\"\"") + fi } # Process the -i flag. @@ -435,6 +452,7 @@ add_server_commands () { SERVER_CMDS+=("LOG=\"`quote_for_cmd "$LOG"`\"") test -n "$PORT" && SERVER_CMDS+=("PORT=\"`quote_for_cmd "$PORT"`\"") test -n "$SSL" && SERVER_CMDS+=("SSL=\"`quote_for_cmd "$SSL"`\"") + test -n "$MAXTHREADS" && SERVER_CMDS+=("MAXTHREADS=\"`quote_for_cmd "$MAXTHREADS"`\"") } echo_server_options () { @@ -456,6 +474,7 @@ echo_server_options () { echo -n " --log \"`quote_for_cmd "$LOG"`\"" test -n "$PORT" && echo -n " --port \"`quote_for_cmd "$PORT"`\"" test -n "$SSL" && echo -n " --ssl \"`quote_for_cmd "$SSL"`\"" + test -n "$MAXTHREADS" && echo -n " --max-threads \"`quote_for_cmd "$MAXTHREADS"`\"" echo } @@ -490,6 +509,7 @@ init_server_opts () { LOG=$LOG_FILE PORT= SSL= + MAXTHREADS= } # Double quotes, backslashes within generated command @@ -560,6 +580,7 @@ interpret_server_config () { local local_LOG= local local_PORT= local local_SSL= + local local_MAXTHREADS= local input while read -r -u3 input @@ -622,6 +643,9 @@ interpret_server_config () { SSL=*) local_SSL="${input:4}" ;; + MAXTHREADS=*) + local_MAXTHREADS="${input:11}" + ;; \#*) ;; # Comment, do nothing "") @@ -646,6 +670,7 @@ interpret_server_config () { LOG="$local_LOG" PORT="$local_PORT" SSL="$local_SSL" + MAXTHREADS="$local_MAXTHREADS" } # Interpret the contents of a server status file. @@ -671,6 +696,7 @@ load_server_config () { local LOG= local PORT= local SSL= + local MAXTHREADS= interpret_server_config "$f" || continue # Other options default to empty. These ones don't. [ -z "$ARCH" ] && ARCH=`get_arch` @@ -705,6 +731,7 @@ get_server_pid_by_config () { local target_LOG="$LOG" local target_PORT="$PORT" local target_SSL="$SSL" + local target_MAXTHREADS="$MAXTHREADS" # Check the status file for each running server to see if it matches # the one currently configured. We're checking for a given configuration, @@ -712,16 +739,17 @@ get_server_pid_by_config () { for f in "$STAT_PATH"/*.stat; do test ! -e "$f" && continue interpret_server_status "$f" || continue - test "X$ARCH" = "X$target_ARCH" || continue - test "X$RELEASE" = "X$target_RELEASE" || continue - test "X$INCLUDE" = "X$target_INCLUDE" || continue - test "X$RUNTIME" = "X$target_RUNTIME" || continue - test "X$BUILD" = "X$target_BUILD" || continue - test "X$DEFINE" = "X$target_DEFINE" || continue - test "X$USER" = "X$target_USER" || continue - test "X$LOG" = "X$target_LOG" || continue - test "X$PORT" = "X$target_PORT" || continue - test "X$SSL" = "X$target_SSL" || continue + test "X$ARCH" = "X$target_ARCH" || continue + test "X$RELEASE" = "X$target_RELEASE" || continue + test "X$INCLUDE" = "X$target_INCLUDE" || continue + test "X$RUNTIME" = "X$target_RUNTIME" || continue + test "X$BUILD" = "X$target_BUILD" || continue + test "X$DEFINE" = "X$target_DEFINE" || continue + test "X$USER" = "X$target_USER" || continue + test "X$LOG" = "X$target_LOG" || continue + test "X$PORT" = "X$target_PORT" || continue + test "X$SSL" = "X$target_SSL" || continue + test "X$MAXTHREADS" = "X$target_MAXTHREADS" || continue echo `basename "$f" | sed 's/.stat//'` # Server has a pid return done @@ -737,6 +765,7 @@ get_server_pid_by_config () { LOG="$target_LOG" PORT="$target_PORT" SSL="$target_SSL" + MAXTHREADS="$target_MAXTHREADS" } get_server_pid_by_nickname () { @@ -810,6 +839,7 @@ start_server () { server_cmd="$server_cmd --log=\"`quote_for_cmd "$LOG"`\"" test -n "$PORT" && server_cmd="$server_cmd --port \"`quote_for_cmd "$PORT"`\"" test -n "$SSL" && server_cmd="$server_cmd --ssl \"`quote_for_cmd "$SSL"`\"" + test -n "$MAXTHREADS" && server_cmd="$server_cmd --max-threads \"`quote_for_cmd "$MAXTHREADS"`\"" # Start the server here. local pid @@ -848,6 +878,7 @@ start_server () { echo "LOG=$LOG" >> "$server_status_file" echo "PORT=$PORT" >> "$server_status_file" echo "SSL=$SSL" >> "$server_status_file" + echo "MAXTHREADS=$MAXTHREADS" >> "$server_status_file" do_success $"$prog start `echo_server_options`" } @@ -1186,6 +1217,7 @@ OPTS=`getopt -s bash -u --options 'a:B:c:D:iI:n:p:kPr:R:u:' \ --longoptions 'log:' \ --longoptions 'port:' \ --longoptions 'ssl:' \ + --longoptions 'max-threads:' \ -- "$@"` if [ $? -ne 0 ]; then echo "Error: Argument parse error: $@" >&2 diff --git a/stap-serverd.cxx b/stap-serverd.cxx index 96a141eaf..280eba201 100644 --- a/stap-serverd.cxx +++ b/stap-serverd.cxx @@ -39,6 +39,7 @@ extern "C" { #include #include #include +#include #include #include @@ -74,7 +75,8 @@ extern int optind; static cs_protocol_version client_version; static bool set_rlimits; static bool use_db_password; -static int port; +static unsigned short port; +static long max_threads; static string cert_db_path; static string stap_options; static string uname_r; @@ -100,7 +102,7 @@ static struct rlimit translator_RLIMIT_CPU; static struct rlimit translator_RLIMIT_NPROC; static struct rlimit translator_RLIMIT_AS; -static string stapstderr; +sem_t sem_client; // Message handling. // Server_error messages are printed to stderr and logged, if requested. @@ -115,7 +117,7 @@ server_error (const string &msg, int logit = true) // client_error messages are treated as server errors and also printed to the client's stderr. static void -client_error (const string &msg) +client_error (const string &msg, string stapstderr) { server_error (msg); if (! stapstderr.empty ()) @@ -183,13 +185,16 @@ parse_options (int argc, char **argv) { int long_opt = 0; char *num_endptr; + long port_tmp; #define LONG_OPT_PORT 1 #define LONG_OPT_SSL 2 #define LONG_OPT_LOG 3 +#define LONG_OPT_MAXTHREADS 4 static struct option long_options[] = { { "port", 1, & long_opt, LONG_OPT_PORT }, { "ssl", 1, & long_opt, LONG_OPT_SSL }, { "log", 1, & long_opt, LONG_OPT_LOG }, + { "max-threads", 1, & long_opt, LONG_OPT_MAXTHREADS }, { NULL, 0, NULL, 0 } }; int grc = getopt_long (argc, argv, "a:B:D:I:kPr:R:", long_options, NULL); @@ -239,7 +244,15 @@ parse_options (int argc, char **argv) switch (long_opt) { case LONG_OPT_PORT: - port = (int) strtoul (optarg, &num_endptr, 10); + port_tmp = strtol (optarg, &num_endptr, 10); + if (*num_endptr != '\0') + fatal (_F("%s: cannot parse number '--%s=%s'", argv[0], + long_options[long_opt - 1].name, optarg)); + else if (port_tmp < 0 || port_tmp > 65535) + fatal (_F("%s: invalid entry: port must be between 0 and 65535 '--%s=%s'", argv[0], + long_options[long_opt - 1].name, optarg)); + else + port = (unsigned short) port_tmp; break; case LONG_OPT_SSL: cert_db_path = optarg; @@ -247,6 +260,15 @@ parse_options (int argc, char **argv) case LONG_OPT_LOG: process_log (optarg); break; + case LONG_OPT_MAXTHREADS: + max_threads = strtol (optarg, &num_endptr, 0); + if (*num_endptr != '\0') + fatal (_F("%s: cannot parse number '--%s=%s'", argv[0], + long_options[long_opt - 1].name, optarg)); + else if (max_threads < 0) + fatal (_F("%s: invalid entry: max threads must not be negative '--%s=%s'", argv[0], + long_options[long_opt - 1].name, optarg)); + break; default: if (optarg) server_error (_F("%s: unhandled option '--%s=%s'", argv[0], @@ -603,6 +625,7 @@ initialize (int argc, char **argv) { client_version = "1.0"; // Assumed until discovered otherwise use_db_password = false; port = 0; + max_threads = sysconf( _SC_NPROCESSORS_ONLN ); // Default to number of processors keep_temp = false; struct utsname utsname; uname (& utsname); @@ -955,7 +978,7 @@ writeDataToSocket(PRFileDesc *sslSocket, const char *responseFileName) } static void -get_stap_locale (const string &staplang, vector &envVec) +get_stap_locale (const string &staplang, vector &envVec, string stapstderr) { // If the client version is < 1.6, then no file containing environment // variables defining the locale has been passed. @@ -1018,7 +1041,7 @@ get_stap_locale (const string &staplang, vector &envVec) pos = line.find("="); if (pos == string::npos) { - client_error(_F("Localization key=value line '%s' cannot be parsed", line.c_str())); + client_error(_F("Localization key=value line '%s' cannot be parsed", line.c_str()), stapstderr); continue; } key = line.substr(0, pos); @@ -1029,7 +1052,7 @@ get_stap_locale (const string &staplang, vector &envVec) if (locVars.find(key) == locVars.end()) { // Not fatal. Just ignore it. - client_error(_F("Localization key '%s' not found in global list", key.c_str())); + client_error(_F("Localization key '%s' not found in global list", key.c_str()), stapstderr); continue; } @@ -1037,7 +1060,7 @@ get_stap_locale (const string &staplang, vector &envVec) if ((regexec(&checkre, value.c_str(), (size_t) 0, NULL, 0) != 0)) { // Not fatal. Just ignore it. - client_error(_F("Localization value '%s' contains illegal characters", value.c_str())); + client_error(_F("Localization value '%s' contains illegal characters", value.c_str()), stapstderr); continue; } @@ -1151,7 +1174,7 @@ getRequestedPrivilege (const vector &stapargv) /* Run the translator on the data in the request directory, and produce output in the given output directory. */ static void -handleRequest (const string &requestDirName, const string &responseDirName) +handleRequest (const string &requestDirName, const string &responseDirName, string stapstderr) { vector stapargv; int rc; @@ -1260,7 +1283,7 @@ handleRequest (const string &requestDirName, const string &responseDirName) // Environment variables (possibly empty) to be passed to spawn_and_wait(). string staplang = requestDirName + "/locale"; vector envVec; - get_stap_locale (staplang, envVec); + get_stap_locale (staplang, envVec, stapstderr); /* All ready, let's run the translator! */ rc = spawn_and_wait(stapargv, "/dev/null", stapstdout.c_str (), stapstderr.c_str (), @@ -1453,14 +1476,15 @@ spawn_and_wait (const vector &argv, #undef CHECKRC } -/* Function: int handle_connection() +/* Function: void *handle_connection() * * Purpose: Handle a connection to a socket. Copy in request zip * file, process it, copy out response. Temporary directories are * created & destroyed here. */ -static SECStatus -handle_connection (PRFileDesc *tcpSocket, CERTCertificate *cert, SECKEYPrivateKey *privKey) + +void * +handle_connection (void *arg) { PRFileDesc * sslSocket = NULL; SECStatus secStatus = SECFailure; @@ -1471,9 +1495,22 @@ handle_connection (PRFileDesc *tcpSocket, CERTCertificate *cert, SECKEYPrivateKe char requestDirName[PATH_MAX]; char responseDirName[PATH_MAX]; char responseFileName[PATH_MAX]; + string stapstderr; /* Cannot be global since we need a unique + copy for each connection.*/ vector argv; PRInt32 bytesRead; + /* Detatch to avoid a memory leak */ + if(max_threads > 0) + pthread_detach(pthread_self()); + + /* Unpack the arg */ + thread_arg *t_arg = (thread_arg *) arg; + PRFileDesc *tcpSocket = t_arg->tcpSocket; + CERTCertificate *cert = t_arg->cert; + SECKEYPrivateKey *privKey = t_arg->privKey; + PRNetAddr addr = t_arg->addr; + tmpdir[0]='\0'; /* prevent cleanup-time /bin/rm of uninitialized directory */ #if 0 // already done on the listenSocket @@ -1581,7 +1618,7 @@ handle_connection (PRFileDesc *tcpSocket, CERTCertificate *cert, SECKEYPrivateKe /* Handle the request zip file. An error therein should still result in a response zip file (containing stderr etc.) so we don't have to have a result code here. */ - handleRequest(requestDirName, responseDirName); + handleRequest(requestDirName, responseDirName, stapstderr); /* Zip the response. */ argv.clear (); @@ -1596,7 +1633,7 @@ handle_connection (PRFileDesc *tcpSocket, CERTCertificate *cert, SECKEYPrivateKe server_error (_("Unable to compress server response")); goto cleanup; } - + secStatus = writeDataToSocket (sslSocket, responseFileName); cleanup: @@ -1624,7 +1661,25 @@ cleanup: } } - return secStatus; + if (secStatus != SECSuccess) + server_error (_("Error processing client request")); + + // Log the end of the request. + log (_F("Request from %d.%d.%d.%d:%d complete", + (addr.inet.ip ) & 0xff, + (addr.inet.ip >> 8) & 0xff, + (addr.inet.ip >> 16) & 0xff, + (addr.inet.ip >> 24) & 0xff, + addr.inet.port)); + + /* Increment semephore to indicate this thread is finished. */ + if (max_threads > 0) + { + sem_post(&sem_client); + pthread_exit(0); + } + else + return 0; } /* Function: int accept_connection() @@ -1639,6 +1694,13 @@ accept_connections (PRFileDesc *listenSocket, CERTCertificate *cert) PRFileDesc *tcpSocket; SECStatus secStatus; CERTCertDBHandle *dbHandle; + pthread_t tid; + + /* Initialize semephore with the maximum number of threads + * defined by --max-threads. If it is not defined, the + * default is the number of processors */ + if (max_threads > 0) + sem_init(&sem_client, 0, max_threads); dbHandle = CERT_GetDefaultCertDB (); @@ -1670,20 +1732,39 @@ accept_connections (PRFileDesc *listenSocket, CERTCertificate *cert) addr.inet.port)); /* XXX: alarm() or somesuch to set a timeout. */ - /* XXX: fork() or somesuch to handle concurrent requests. */ /* Accepted the connection, now handle it. */ - secStatus = handle_connection (tcpSocket, cert, privKey); - if (secStatus != SECSuccess) - server_error (_("Error processing client request")); - // Log the end of the request. - log (_F("Request from %d.%d.%d.%d:%d complete", - (addr.inet.ip ) & 0xff, - (addr.inet.ip >> 8) & 0xff, - (addr.inet.ip >> 16) & 0xff, - (addr.inet.ip >> 24) & 0xff, - addr.inet.port)); + /* Wait for a thread to finish if there are none available */ + if(max_threads >0) + { + int value; + sem_getvalue(&sem_client, &value); + if(value <= 0) + log(_("Server is overloaded. Processing times may be longer than normal.")); + else if (value == max_threads) + log(_("Processing 1 request...")); + else + log(_F("Processing %d concurrent requests...", ((int)max_threads - value) + 1)); + + sem_wait(&sem_client); + } + + /* Create the argument structure to pass to pthread_create + * (or directly to handle_connection if max_threads == 0 */ + thread_arg t_arg; + t_arg.tcpSocket = tcpSocket; + t_arg.cert = cert; + t_arg.privKey = privKey; + t_arg.addr = addr; + + /* Handle the conncection */ + if (max_threads > 0) + /* Create the worker thread and handle the connection. */ + pthread_create(&tid, NULL, handle_connection, &t_arg); + else + /* Since max_threads == 0, don't spawn a new thread, just handle in the current thread. */ + handle_connection(&t_arg); // If our certificate is no longer valid (e.g. has expired), then exit. secStatus = CERT_VerifyCertNow (dbHandle, cert, PR_TRUE/*checkSig*/, @@ -1695,6 +1776,9 @@ accept_connections (PRFileDesc *listenSocket, CERTCertificate *cert) } } + if (max_threads > 0) + sem_destroy(&sem_client); + SECKEY_DestroyPrivateKey (privKey); return SECSuccess; } @@ -1836,11 +1920,11 @@ listen () switch (errorNumber) { case PR_ADDRESS_NOT_AVAILABLE_ERROR: - server_error (_F("Network port %d is unavailable. Trying another port", port)); + server_error (_F("Network port %hu is unavailable. Trying another port", port)); port = 0; // Will automatically select an available port continue; case PR_ADDRESS_IN_USE_ERROR: - server_error (_F("Network port %d is busy. Trying another port", port)); + server_error (_F("Network port %hu is busy. Trying another port", port)); port = 0; // Will automatically select an available port continue; default: @@ -1859,7 +1943,12 @@ listen () goto done; } port = PR_ntohs (addr.inet.port); - log (_F("Using network port %d", port)); + log (_F("Using network port %hu", port)); + + if (max_threads > 0) + log (_F("Using a maximum of %ld threads", max_threads)); + else + log (_("Concurrency disabled")); // Listen for connection on the socket. The second argument is the maximum size of the queue // for pending connections. -- 2.43.5