/* Head of the fd list. Also contains
the cluster_socket details */
static struct local_client local_client_head;
+static int _local_client_count = 0;
static unsigned short global_xid = 0; /* Last transaction ID issued */
static unsigned max_cluster_message;
static unsigned max_cluster_member_name_len;
+static void _add_client(struct local_client *new_client, struct local_client *existing_client)
+{
+ _local_client_count++;
+ DEBUGLOG("(%p) Adding listener for fd %d. (Now %d monitored fds.)\n", new_client, new_client->fd, _local_client_count);
+ new_client->next = existing_client->next;
+ existing_client->next = new_client;
+}
+
+int add_client(struct local_client *new_client)
+{
+ _add_client(new_client, &local_client_head);
+
+ return 0;
+}
+
+/* Returns 0 if delfd is found and removed from list */
+static int _del_client(struct local_client *delfd)
+{
+ struct local_client *lastfd, *thisfd;
+
+ for (lastfd = &local_client_head; (thisfd = lastfd->next); lastfd = thisfd)
+ if (thisfd == delfd) {
+ DEBUGLOG("(%p) Removing listener for fd %d\n", thisfd, thisfd->fd);
+ lastfd->next = delfd->next;
+ _local_client_count--;
+ return 0;
+ }
+
+ return 1;
+}
+
/* Structure of items on the LVM thread list */
struct lvm_thread_cmd {
struct dm_list list;
local_client_head.fd = clops->get_main_cluster_fd();
local_client_head.type = CLUSTER_MAIN_SOCK;
local_client_head.callback = clops->cluster_fd_callback;
+ _local_client_count++;
/* Add the local socket to the list */
if (!(newfd = dm_zalloc(sizeof(struct local_client)))) {
newfd->fd = local_sock;
newfd->type = LOCAL_RENDEZVOUS;
newfd->callback = local_rendezvous_callback;
- newfd->next = local_client_head.next;
- local_client_head.next = newfd;
+
+ (void) add_client(newfd);
/* This needs to be started after cluster initialisation
as it may need to take out locks */
while ((delfd = local_client_head.next)) {
local_client_head.next = delfd->next;
+ _local_client_count--;
/* Failing cleanup_zombie leaks... */
if (delfd->type == LOCAL_SOCK && !cleanup_zombie(delfd))
cmd_client_cleanup(delfd); /* calls sync_unlock */
while (!quit) {
fd_set in;
int select_status;
- struct local_client *thisfd;
+ struct local_client *thisfd, *nextfd;
struct timeval tv = { cmd_timeout, 0 };
int quorate = clops->is_quorate();
int client_count = 0;
int max_fd = 0;
- struct local_client *lastfd = &local_client_head;
- struct local_client *nextfd = local_client_head.next;
/* Wait on the cluster FD and all local sockets/pipes */
local_client_head.fd = clops->get_main_cluster_fd();
fprintf(stderr, "WARNING: Your cluster may freeze up if the number of clvmd file descriptors (%d) exceeds %d.\n", max_fd + 1, FD_SETSIZE);
}
- for (thisfd = &local_client_head; thisfd; thisfd = nextfd, nextfd = thisfd ? thisfd->next : NULL) {
+ for (thisfd = &local_client_head; thisfd; thisfd = nextfd) {
+ nextfd = thisfd->next;
if (thisfd->removeme && !cleanup_zombie(thisfd)) {
- struct local_client *free_fd = thisfd;
- lastfd->next = nextfd;
- DEBUGLOG("(%p) removeme set with %d monitored fds remaining\n", thisfd, client_count - 1);
+ /* cleanup_zombie might have removed the next list element */
+ nextfd = thisfd->next;
- /* Queue cleanup, this also frees the client struct */
- add_to_lvmqueue(free_fd, NULL, 0, NULL);
+ (void) _del_client(thisfd);
- continue;
- }
+ DEBUGLOG("(%p) removeme set with %d monitored fds remaining\n", thisfd, _local_client_count);
- lastfd = thisfd;
+ /* Queue cleanup, this also frees the client struct */
+ add_to_lvmqueue(thisfd, NULL, 0, NULL);
+ }
if (thisfd->removeme)
continue;
/* New client...simply add it to the list */
if (newfd) {
- newfd->next = thisfd->next;
- thisfd->next = newfd;
+ _add_client(newfd, thisfd);
thisfd = newfd;
}
}
/* Remove the pipe client */
if (thisfd->bits.localsock.pipe_client) {
- struct local_client *delfd;
- struct local_client *lastfd;
+ struct local_client *delfd = thisfd->bits.localsock.pipe_client;
- (void) close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */
+ (void) close(delfd->fd); /* Close pipe */
(void) close(thisfd->bits.localsock.pipe);
/* Remove pipe client */
- for (lastfd = &local_client_head; (delfd = lastfd->next); lastfd = delfd)
- if (thisfd->bits.localsock.pipe_client == delfd) {
- thisfd->bits.localsock.pipe_client = NULL;
- lastfd->next = delfd->next;
- dm_free(delfd);
- break;
- }
+ if (!_del_client(delfd)) {
+ dm_free(delfd);
+ thisfd->bits.localsock.pipe_client = NULL;
+ }
}
}
newfd->type = THREAD_PIPE;
newfd->callback = local_pipe_callback;
newfd->bits.pipe.client = thisfd;
- newfd->next = thisfd->next;
- thisfd->next = newfd;
+
+ _add_client(newfd, thisfd);
/* Store a cross link to the pipe */
thisfd->bits.localsock.pipe_client = newfd;
/* Add a file descriptor from the cluster or comms interface to
our list of FDs for select
*/
-int add_client(struct local_client *new_client)
-{
- new_client->next = local_client_head.next;
- local_client_head.next = new_client;
-
- return 0;
-}
/* Called when the pre-command has completed successfully - we
now execute the real command on all the requested nodes */