Cluster Project branch, RHEL5, updated. cmirror_1_1_15-25-g5cd4597

jbrassow@sourceware.org jbrassow@sourceware.org
Wed Mar 26 19:24:00 GMT 2008


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Cluster Project".

http://sources.redhat.com/git/gitweb.cgi?p=cluster.git;a=commitdiff;h=5cd4597c0fd297414389f5cab77a115425570d8a

The branch, RHEL5 has been updated
       via  5cd4597c0fd297414389f5cab77a115425570d8a (commit)
       via  8a08d3be6b94a2cb9691e908f230ae6add29ccef (commit)
       via  2200d92f9ebc30fca8f4107929fc4707b57bcebd (commit)
       via  c749f5d08a8145308a98c551facfde0e4b80062c (commit)
       via  5c25534a1de557083aba092c76a7f3e380c4b108 (commit)
       via  492c4e3df027d04b3cc431807cc439979816f351 (commit)
      from  fd9a026fe08393ef7f561bb28db1815cf160d15f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 5cd4597c0fd297414389f5cab77a115425570d8a
Merge: 8a08d3be6b94a2cb9691e908f230ae6add29ccef fd9a026fe08393ef7f561bb28db1815cf160d15f
Author: Jonathan Brassow <jbrassow@redhat.com>
Date:   Wed Mar 26 14:13:17 2008 -0500

    Merge branch 'RHEL5' of ssh://jbrassow@sources.redhat.com/git/cluster into rhel5

commit 8a08d3be6b94a2cb9691e908f230ae6add29ccef
Merge: 2200d92f9ebc30fca8f4107929fc4707b57bcebd 0484ef93e8d09313f8f110d23c2a6a6b4aca2f60
Author: Jonathan Brassow <jbrassow@redhat.com>
Date:   Wed Mar 26 08:48:18 2008 -0500

    Merge branch 'RHEL5' of ssh://jbrassow@sources.redhat.com/git/cluster into rhel5

commit 2200d92f9ebc30fca8f4107929fc4707b57bcebd
Author: Jonathan Brassow <jbrassow@redhat.com>
Date:   Mon Mar 24 16:09:52 2008 -0500

    clogd: do not process requests after calling cpg_leave

commit c749f5d08a8145308a98c551facfde0e4b80062c
Author: Jonathan Brassow <jbrassow@redhat.com>
Date:   Wed Mar 5 16:11:28 2008 -0600

    clogd: split cpg_config_callback into two functions
    
    Easier to read/debug shorter functions
    cpg_config_callback now calls either cpg_join_callback or
    cpg_leave_callback

commit 5c25534a1de557083aba092c76a7f3e380c4b108
Author: Jonathan Brassow <jbrassow@redhat.com>
Date:   Wed Mar 5 10:22:03 2008 -0600

    clogd: remove various debugging statements

commit 492c4e3df027d04b3cc431807cc439979816f351
Author: Jonathan Brassow <jbrassow@redhat.com>
Date:   Thu Feb 28 14:57:43 2008 -0600

    clogd: small clean-ups
    
    - pull some debug statements
    - check for more errors (like short reads) when recv'ing from kernel

-----------------------------------------------------------------------

Summary of changes:
 cmirror/src/clogd.c     |    4 +
 cmirror/src/cluster.c   |  345 ++++++++++++++++++++++++++---------------------
 cmirror/src/functions.c |   86 +++++++++---
 cmirror/src/functions.h |    1 +
 cmirror/src/local.c     |   48 +++----
 cmirror/src/queues.c    |   29 ++++-
 6 files changed, 315 insertions(+), 198 deletions(-)

diff --git a/cmirror/src/clogd.c b/cmirror/src/clogd.c
index 416084f..114c711 100644
--- a/cmirror/src/clogd.c
+++ b/cmirror/src/clogd.c
@@ -127,6 +127,9 @@ static void process_signal(int sig){
 		queue_status(1);
 		log_status(1);
 		return;
+	case SIGUSR2:
+		log_validate();
+		return;
 	default:
 		LOG_PRINT("Unknown signal received... ignoring");
 		return;
@@ -234,6 +237,7 @@ static void daemonize(void)
 	signal(SIGHUP, &sig_handler);
 	signal(SIGPIPE, SIG_IGN);
 	signal(SIGUSR1, &sig_handler);
+	signal(SIGUSR2, &sig_handler);
 	sigemptyset(&signal_mask);
 	signal_received = 0;
 }
diff --git a/cmirror/src/cluster.c b/cmirror/src/cluster.c
index 27077ae..504ee58 100644
--- a/cmirror/src/cluster.c
+++ b/cmirror/src/cluster.c
@@ -46,6 +46,9 @@ struct checkpoint_data {
 	struct checkpoint_data *next;
 };	
 
+#define INVALID 0
+#define VALID   1
+#define LEAVING 2
 struct clog_cpg {
 	struct list_head list;
 
@@ -54,7 +57,7 @@ struct clog_cpg {
 	struct cpg_name name;
 
 	/* Are we the first, or have we received checkpoint? */
-	int valid;
+	int state;
 	int free_me;
 	struct queue *startup_queue;
 
@@ -251,7 +254,7 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
 	new->requester = cp_requester;
 	strncpy(new->uuid, entry->name.value, entry->name.length);
 
-	if (entry->valid) {
+	if (entry->state == VALID) {
 		new->bitmap_size = push_state(entry->name.value, "clean_bits",
 					      &new->clean_bits);
 		if (new->bitmap_size <= 0) {
@@ -282,9 +285,8 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
 		}
 	} else {
 		/*
-		 * We can store bitmaps yet, because the log is not
-		 * valid yet.  The new machine will have to ask
-		 * specifically for a new checkpoint.
+		 * We can't store bitmaps yet, because the log is not
+		 * valid yet.
 		 */
 		LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
 			  new->requester);
@@ -743,14 +745,14 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 	if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
 		if (my_cluster_id == tfr->originator) {
 			/* Redundant checkpoints ignored if match->valid */
-			if (import_checkpoint(match, match->valid)) {
+			if (import_checkpoint(match, (match->state != INVALID))) {
 				LOG_ERROR("Failed to import checkpoint");
 				/* Could we retry? */
 				goto out;
-			} else if (!match->valid) {
+			} else if (match->state == INVALID) {
 				LOG_DBG("[%s] Checkpoint data received.  Log is now valid",
 					SHORT_UUID(match->name.value));
-				match->valid = 1;
+				match->state = VALID;
 
 				while ((startup_tfr = queue_remove(match->startup_queue))) {
 					if (startup_tfr->request_type == DM_CLOG_CONFIG_CHANGE) {
@@ -817,7 +819,14 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 	} else {
 		tfr->originator = nodeid;
 
-		if (!match->valid) {
+		if (match->state == LEAVING) {
+			LOG_ERROR("[%s]  Ignoring %s from %u.  Reason: I'm leaving",
+				  SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+				  tfr->originator);
+			goto out;
+		}
+
+		if (match->state == INVALID) {
 			LOG_DBG("Log not valid yet, storing request");
 			startup_tfr = queue_remove(free_queue);
 			if (!startup_tfr) {
@@ -850,9 +859,11 @@ out:
 			  strerror(-r));
 		LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(tfr->uuid),
 			  (response) ? "YES" : "NO");
-		LOG_ERROR("[%s]    Originator: %u", SHORT_UUID(tfr->uuid), tfr->originator);
+		LOG_ERROR("[%s]    Originator: %u",
+			  SHORT_UUID(tfr->uuid), tfr->originator);
 		if (response)
-			LOG_ERROR("[%s]    Responder : %u", SHORT_UUID(tfr->uuid), nodeid);
+			LOG_ERROR("[%s]    Responder : %u",
+				  SHORT_UUID(tfr->uuid), nodeid);
 		LOG_ERROR("HISTORY::");
 
 		for (i = 0; i < DEBUGGING_HISTORY; i++) {
@@ -867,8 +878,10 @@ out:
 		int len;
 		idx++;
 		idx = idx % DEBUGGING_HISTORY;
-		len = sprintf(debugging[idx], "SEQ#=%llu, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
-			      (unsigned long long)tfr->seq, SHORT_UUID(tfr->uuid),
+		len = sprintf(debugging[idx],
+			      "SEQ#=%llu, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
+			      (unsigned long long)tfr->seq,
+			      SHORT_UUID(tfr->uuid),
 			      RQ_TYPE(tfr->request_type),
 			      tfr->originator, (response) ? "YES" : "NO");
 		if (response)
@@ -876,186 +889,215 @@ out:
 	}
 }
 
-static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname,
-				struct cpg_address *member_list, int member_list_entries,
-				struct cpg_address *left_list, int left_list_entries,
-				struct cpg_address *joined_list, int joined_list_entries)
+static void cpg_join_callback(struct clog_cpg *match,
+			      struct cpg_address *joined,
+			      struct cpg_address *member_list,
+			      int member_list_entries)
 {
-	int i, j, fd;
+	int i;
 	int my_pid = getpid();
-	int found = 0;
-	struct clog_cpg *match, *tmp;
-	uint32_t lowest = 0xDEAD;
-
-	memberz = member_list_entries;
-
-	LOG_DBG("****** CPG config callback **[%s]**",
-		SHORT_UUID(gname->value));
+	uint32_t lowest = match->lowest_id;
+	struct clog_tfr *tfr;
 
-	LOG_DBG("* JOINING (%d):", joined_list_entries);
-	for (i = 0; i < joined_list_entries; i++)
-		LOG_DBG("*   nodeid: %d, pid: %d",
-			joined_list[i].nodeid, joined_list[i].pid);
+	/* Assign my_cluster_id */
+	if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
+		my_cluster_id = joined->nodeid;
 
-	LOG_DBG("* MEMBERS (%d):", member_list_entries);
-	for (i = 0; i < member_list_entries; i++)
-		LOG_DBG("*   nodeid: %d, pid: %d",
-			member_list[i].nodeid, member_list[i].pid);
+	/* Am I the very first to join? */
+	if (member_list_entries == 1) {
+		match->lowest_id = joined->nodeid;
+		match->state = VALID;
+	}
 
-	LOG_DBG("* LEAVING (%d):", left_list_entries);
-	for (i = 0; i < left_list_entries; i++)
-		LOG_DBG("*   nodeid: %d, pid: %d",
-			left_list[i].nodeid, left_list[i].pid);	
+	/* If I am part of the joining list, I do not send checkpoints */
+	if (joined->nodeid == my_cluster_id)
+		goto out;
 
-	LOG_DBG("*****************************************");
+	LOG_DBG("Joining node, %u needs checkpoint", joined->nodeid);
 
-	list_for_each_entry_safe(match, tmp, &clog_cpg_list, list) {
-		LOG_DBG("Given handle: %llu", (unsigned long long)handle);
-		LOG_DBG("      hanlde: %llu", (unsigned long long)match->handle);
-		LOG_DBG("Given name  : %s", gname->value);
-		LOG_DBG("      name  : %s", match->name.value);
-		if (match->handle == handle) {
-			LOG_DBG("MATCH");
-			found = 1;
-			break;
-		}
-		LOG_DBG("NOT A MATCH");
+	/*
+	 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
+	 * the startup_queue interface exclusively
+	 */
+	if (queue_empty(match->startup_queue)) {
+		match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
+		goto out;
 	}
 
-	if (!found) {
-		LOG_ERROR("Unable to find match for CPG config callback");
-		goto out;
+	tfr = queue_remove(free_queue);
+	if (!tfr) {
+		LOG_PRINT("cpg_config_callback: "
+			  "Preallocated transfer structs exhausted");
+		tfr = malloc(DM_CLOG_TFR_SIZE);
+		if (!tfr) {
+			LOG_ERROR("cpg_config_callback: "
+				  "Unable to allocate transfer structs");
+			LOG_ERROR("cpg_config_callback: "
+				  "Unable to perform checkpoint");
+			goto out;
+		}
 	}
+	tfr->request_type = DM_CLOG_CONFIG_CHANGE;
+	tfr->originator   = joined->nodeid;
+	queue_add_tail(tfr, match->startup_queue);
 
-	lowest = match->lowest_id;
 
-	/* Am I leaving? */
-	for (i = 0; i < left_list_entries; i++)
-		if (my_cluster_id == left_list[i].nodeid) {
-			struct list_head l, *p, *n;
-			struct clog_tfr *tfr;
+out:
+	/* Find the lowest_id, i.e. the server */
+	match->lowest_id = member_list[0].nodeid;
+	for (i = 0; i < member_list_entries; i++)
+		if (match->lowest_id > member_list[i].nodeid)
+			match->lowest_id = member_list[i].nodeid;
 
-			INIT_LIST_HEAD(&l);
+	if (lowest == 0xDEAD)
+		LOG_DBG("[%s]  Server change <none> -> %u (%u %s)",
+			SHORT_UUID(match->name.value), match->lowest_id,
+			joined->nodeid, (member_list_entries == 1) ?
+			"is first to join" : "joined");
+	else if (lowest != match->lowest_id)
+		LOG_DBG("[%s]  Server change %u -> %u (%u joined)",
+			SHORT_UUID(match->name.value), lowest,
+			match->lowest_id, joined->nodeid);
+	else
+		LOG_DBG("[%s]  Server unchanged at %u (%u joined)",
+			SHORT_UUID(match->name.value),
+			lowest, joined->nodeid);
+}
 
-			LOG_DBG("Finalizing leave...");
-			list_del_init(&match->list);
+static void cpg_leave_callback(struct clog_cpg *match,
+			       struct cpg_address *left,
+			       struct cpg_address *member_list,
+			       int member_list_entries)
+{
+	int i, fd;
+	struct list_head l, *p, *n;
+	uint32_t lowest = match->lowest_id;
+	struct clog_tfr *tfr;
 
-			cpg_fd_get(match->handle, &fd);
-			links_unregister(fd);
+	INIT_LIST_HEAD(&l);
 
-			cluster_postsuspend(match->name.value);
+	/* Am I leaving? */
+	if (my_cluster_id == left->nodeid) {
+		LOG_DBG("Finalizing leave...");
+		list_del_init(&match->list);
 
-			queue_remove_all(&l, cluster_queue);
+		cpg_fd_get(match->handle, &fd);
+		links_unregister(fd);
 
-			list_for_each_safe(p, n, &l) {
-				list_del_init(p);
-				tfr = (struct clog_tfr *)p;
+		cluster_postsuspend(match->name.value);
 
-				if (tfr->request_type == DM_CLOG_POSTSUSPEND)
-					kernel_send(tfr);
-				else if (!strcmp(match->name.value, tfr->uuid))
-					queue_add(tfr, free_queue);
-				else
-					queue_add(tfr, cluster_queue);
-			}
+		queue_remove_all(&l, cluster_queue);
 
-			cpg_finalize(match->handle);
+		list_for_each_safe(p, n, &l) {
+			list_del_init(p);
+			tfr = (struct clog_tfr *)p;
 
-			if (match->startup_queue->count) {
-				LOG_ERROR("%d startup items remain in cluster log",
-					  match->startup_queue->count);
-				while (!queue_empty(match->startup_queue)) {
-					tfr = queue_remove(match->startup_queue);
-					queue_add(tfr, free_queue);
-				}
+			/* Leave in the cluster_queue if not of this log */
+			if (strcmp(match->name.value, tfr->uuid)) {
+				queue_add(tfr, cluster_queue);
+				continue;
 			}
 
-			free(match->startup_queue);
-			match->free_me = 1;
-			match->lowest_id = 0xDEAD;
+			if (tfr->request_type == DM_CLOG_POSTSUSPEND)
+				kernel_send(tfr);
+			else
+				queue_add(tfr, free_queue);
+		}
 
-			goto out;
-		}			
+		cpg_finalize(match->handle);
 
-	/* Am I the very first to join? */
-	if (!left_list_entries &&
-	    (member_list_entries == 1) && (joined_list_entries == 1) &&
-	    (member_list[0].nodeid == joined_list[0].nodeid)) {
-		match->lowest_id = my_cluster_id = joined_list[0].nodeid;
-		match->valid = 1;
-		goto out;
-	}
-
-	/* Assign my_cluster_id */
-	if (my_cluster_id == 0xDEAD) {
-		for (i = 0; i < joined_list_entries; i++) {
-			LOG_DBG("My pid = %d\t\t[%u/%d]", my_pid,
-				  joined_list[i].nodeid, joined_list[i].pid);
-			if (joined_list[i].pid == my_pid) {
-				if (my_cluster_id != 0xDEAD) {
-					LOG_ERROR("Unable to determine my cluster id.  Killing myself.");
-					exit(1);
-				}
-				my_cluster_id = joined_list[i].nodeid;
-				LOG_DBG("Setting my cluster id: %u", my_cluster_id);
+		if (match->startup_queue->count) {
+			LOG_ERROR("%d startup items remain in cluster log",
+				  match->startup_queue->count);
+			while (!queue_empty(match->startup_queue)) {
+				tfr = queue_remove(match->startup_queue);
+				queue_add(tfr, free_queue);
 			}
 		}
-	}
 
-	if (member_list_entries)
-		match->lowest_id = member_list[0].nodeid;
-	else
+		free(match->startup_queue);
+		match->free_me = 1;
 		match->lowest_id = 0xDEAD;
+		match->state = INVALID;
+	}			
+
 	/* Find the lowest_id, i.e. the server */
+	if (!member_list_entries) {
+		match->lowest_id = 0xDEAD;
+		LOG_DBG("[%s]  Server change %u -> <none> "
+			"(%u is last to leave)",
+			SHORT_UUID(match->name.value), left->nodeid,
+			left->nodeid);
+		return;
+	}
+		
+	match->lowest_id = member_list[0].nodeid;
 	for (i = 0; i < member_list_entries; i++)
 		if (match->lowest_id > member_list[i].nodeid)
 			match->lowest_id = member_list[i].nodeid;
 
-	/*
-	 * If I am part of the joining list, I do not send checkpoints
-	 * FIXME: What are the cases where multiple nodes can join?
-	 */
-	for (i = 0; i < joined_list_entries; i++)
-		if (joined_list[i].nodeid == my_cluster_id)
-			goto out;
+	if (lowest != match->lowest_id) {
+		LOG_DBG("[%s]  Server change %u -> %u (%u left)",
+			SHORT_UUID(match->name.value), lowest,
+			match->lowest_id, left->nodeid);
 
-	for (i = 0, j = match->checkpoints_needed; i < joined_list_entries; i++) {
-		struct clog_tfr *tfr;
-		LOG_DBG("Joining node, %u needs checkpoint", joined_list[i].nodeid);
+		list_for_each_safe(p, n, &cluster_queue->list) {
+			tfr = (struct clog_tfr *)p;
 
-		if (queue_empty(match->startup_queue)) {
-			match->checkpoint_requesters[match->checkpoints_needed++] = joined_list[i].nodeid;
-			continue;
+			/*
+			 * Don't resend DM_CLOG_POSTSUSPEND request, it will
+			 * be handled when we get our own config leave
+			 */
+			if (!strcmp(match->name.value, tfr->uuid) &&
+			    (tfr->request_type != DM_CLOG_POSTSUSPEND)){
+				LOG_ERROR("[%s] Resending %s from %u due to new server",
+					  SHORT_UUID(match->name.value),
+					  RQ_TYPE(tfr->request_type),
+					  tfr->originator);
+				if (cluster_send(tfr))
+					LOG_ERROR("Failed resend");
+			}
 		}
+	} else
+		LOG_DBG("[%s]  Server unchanged at %u (%u left)",
+			SHORT_UUID(match->name.value), lowest, left->nodeid);
 
-		tfr = queue_remove(free_queue);
-		if (!tfr) {
-			LOG_PRINT("cpg_config_callback: Preallocated transfer structs exhausted");
-			tfr = malloc(DM_CLOG_TFR_SIZE);
-			if (!tfr) {
-				LOG_ERROR("cpg_config_callback: Unable to allocate transfer structs");
-				LOG_ERROR("cpg_config_callback: Unable to perform checkpoint");
-				return;
-			}
+}
+
+static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname,
+				struct cpg_address *member_list,
+				int member_list_entries,
+				struct cpg_address *left_list,
+				int left_list_entries,
+				struct cpg_address *joined_list,
+				int joined_list_entries)
+{
+	struct clog_cpg *match, *tmp;
+	int found = 0;
+
+	memberz = member_list_entries;
+
+	list_for_each_entry_safe(match, tmp, &clog_cpg_list, list)
+		if (match->handle == handle) {
+			found = 1;
+			break;
 		}
-		tfr->request_type = DM_CLOG_CONFIG_CHANGE;
-		tfr->originator   = joined_list[i].nodeid;
-		queue_add_tail(tfr, match->startup_queue);
+
+	if (!found) {
+		LOG_ERROR("Unable to find match for CPG config callback");
+		return;
 	}
 
-out:
-	if (lowest != match->lowest_id)
-		LOG_DBG("[%s]  Server change %u -> %u (%u %s)",
-			SHORT_UUID(match->name.value),
-			lowest, match->lowest_id,
-			(joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
-			(joined_list_entries && (member_list_entries == 1)) ? 
-			"is first to join" : (joined_list_entries) ? "joined" : "left");
+	if ((joined_list_entries + left_list_entries) > 1)
+		LOG_ERROR("[%s]  More than one node joining/leaving",
+			  SHORT_UUID(match->name.value));
+
+	if (joined_list_entries)
+		cpg_join_callback(match, joined_list,
+				  member_list, member_list_entries);
 	else
-		LOG_DBG("[%s]  Server unchanged at %u (%u %s)",
-			SHORT_UUID(match->name.value), lowest,
-			(joined_list_entries) ? joined_list[0].nodeid : left_list[0].nodeid,
-			(joined_list_entries) ? "joined" : "left");
+		cpg_leave_callback(match, left_list,
+				  member_list, member_list_entries);
 
 	if (joined_list_entries && (joined_list[0].nodeid == my_cluster_id))
 		doit = 25;
@@ -1137,6 +1179,7 @@ int destroy_cluster_cpg(char *str)
 			if (r != CPG_OK)
 				LOG_ERROR("Error leaving CPG!");
 			break;
+			del->state = LEAVING;
 		}
 
 	return 0;
diff --git a/cmirror/src/functions.c b/cmirror/src/functions.c
index a4c06cf..9756136 100644
--- a/cmirror/src/functions.c
+++ b/cmirror/src/functions.c
@@ -1212,6 +1212,16 @@ static int clog_set_region_sync(struct clog_tfr *tfr)
 			SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count,
 			(unsigned long long)pkg->region, tfr->originator);
 	}
+
+	if (lc->sync_count != count_bits32(lc->sync_bits, lc->bitset_uint32_count)) {
+		unsigned long long reset = count_bits32(lc->sync_bits, lc->bitset_uint32_count);
+
+		LOG_ERROR("[%s]  sync_count(%llu) does not match bitmap count(%llu)",
+			  SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count, reset);
+		LOG_ERROR("[%s]  Resetting sync_count = %llu", SHORT_UUID(lc->uuid), reset);
+		lc->sync_count = reset;
+	}
+
 	
 	tfr->data_size = 0;
 	return 0;
@@ -1511,25 +1521,32 @@ int do_request(struct clog_tfr *tfr, int server)
 	return 0;
 }
 
-static void print_bits(char *buf, int size)
+static void print_bits(char *buf, int size, int print)
 {
-#ifdef DEBUG
 	int i;
 	char outbuf[128];
 
 	memset(outbuf, 0, sizeof(outbuf));
+
 	for (i = 0; i < size; i++) {
 		if (!(i % 16)) {
-			if (outbuf[0] != '\0')
-				LOG_DBG("%s", outbuf);
+			if (outbuf[0] != '\0') {
+				if (print)
+					LOG_PRINT("%s", outbuf);
+				else
+					LOG_DBG("%s", outbuf);
+			}
 			memset(outbuf, 0, sizeof(outbuf));
 			sprintf(outbuf, "[%3d - %3d]", i, i+15);
 		}
 		sprintf(outbuf + strlen(outbuf), " %.2X", (unsigned char)buf[i]);
 	}
-	if (outbuf[0] != '\0')
-		LOG_DBG("%s", outbuf);
-#endif
+	if (outbuf[0] != '\0') {
+		if (print)
+			LOG_PRINT("%s", outbuf);
+		else
+			LOG_DBG("%s", outbuf);
+	}
 }
 
 /* int store_bits(const char *uuid, const char *which, char **buf)*/
@@ -1566,12 +1583,14 @@ int push_state(const char *uuid, const char *which, char **buf)
 
 	if (!strncmp(which, "sync_bits", 9)) {
 		memcpy(*buf, lc->sync_bits, bitset_size);
-		LOG_DBG("[%s] storing sync_bits:", SHORT_UUID(lc->uuid));
-		print_bits(*buf, bitset_size);
+		LOG_DBG("[%s] storing sync_bits (sync_count = %llu):",
+			SHORT_UUID(uuid), (unsigned long long)
+			count_bits32(lc->sync_bits, lc->bitset_uint32_count));
+		print_bits(*buf, bitset_size, 0);
 	} else if (!strncmp(which, "clean_bits", 9)) {
 		memcpy(*buf, lc->clean_bits, bitset_size);
 		LOG_DBG("[%s] storing clean_bits:", SHORT_UUID(lc->uuid));
-		print_bits(*buf, bitset_size);
+		print_bits(*buf, bitset_size, 0);
 	}
 
 	return bitset_size;
@@ -1610,13 +1629,15 @@ int pull_state(const char *uuid, const char *which, char *buf, int size)
 	if (!strncmp(which, "sync_bits", 9)) {
 		lc->resume_override += 1;
 		memcpy(lc->sync_bits, buf, bitset_size);
-		LOG_DBG("[%s] loading sync_bits:", SHORT_UUID(lc->uuid));
-		print_bits((char *)lc->sync_bits, bitset_size);
+		LOG_DBG("[%s] loading sync_bits (sync_count = %llu):",
+			SHORT_UUID(lc->uuid),(unsigned long long)
+			count_bits32(lc->sync_bits, lc->bitset_uint32_count));
+		print_bits((char *)lc->sync_bits, bitset_size, 0);
 	} else if (!strncmp(which, "clean_bits", 9)) {
 		lc->resume_override += 2;
 		memcpy(lc->clean_bits, buf, bitset_size);
 		LOG_DBG("[%s] loading clean_bits:", SHORT_UUID(lc->uuid));
-		print_bits((char *)lc->clean_bits, bitset_size);
+		print_bits((char *)lc->clean_bits, bitset_size, 0);
 	}
 
 	return 0;
@@ -1647,12 +1668,12 @@ int log_status(int output_wanted)
 		lc = list_entry(l, struct log_c, list);
 		if (output_wanted) {
 			LOG_PRINT("%s", lc->uuid);
-			LOG_DBG("sync_bits:");
+			LOG_PRINT("sync_bits:");
 			print_bits((char *)lc->sync_bits,
-				   lc->bitset_uint32_count * sizeof(*lc->sync_bits));
-			LOG_DBG("clean_bits:");
+				   lc->bitset_uint32_count * sizeof(*lc->sync_bits), 1);
+			LOG_PRINT("clean_bits:");
 			print_bits((char *)lc->clean_bits,
-				   lc->bitset_uint32_count * sizeof(*lc->clean_bits));
+				   lc->bitset_uint32_count * sizeof(*lc->clean_bits), 1);
 		}
 	}
 	if (output_wanted)
@@ -1662,14 +1683,37 @@ int log_status(int output_wanted)
 		lc = list_entry(l, struct log_c, list);
 		if (output_wanted) {
 			LOG_PRINT("%s", lc->uuid);
-			LOG_DBG("sync_bits:");
+			LOG_PRINT("sync_bits:");
 			print_bits((char *)lc->sync_bits,
-				   lc->bitset_uint32_count * sizeof(*lc->sync_bits));
-			LOG_DBG("clean_bits:");
+				   lc->bitset_uint32_count * sizeof(*lc->sync_bits), 1);
+			LOG_PRINT("clean_bits:");
 			print_bits((char *)lc->clean_bits,
-				   lc->bitset_uint32_count * sizeof(*lc->clean_bits));
+				   lc->bitset_uint32_count * sizeof(*lc->clean_bits), 1);
 		}
 	}
 	return found;
 }
 
+int log_validate(void)
+{
+	struct list_head *l;
+	struct log_c *lc;
+	uint64_t r;
+
+	__list_for_each(l, &log_list) {
+		lc = list_entry(l, struct log_c, list);
+		LOG_PRINT("Validating %s::", SHORT_UUID(lc->uuid));
+		r = find_next_zero_bit(lc->sync_bits, lc->region_count, 0);
+		LOG_PRINT("  lc->region_count = %llu",
+			  (unsigned long long)lc->region_count);
+		LOG_PRINT("  lc->sync_count = %llu",
+			  (unsigned long long)lc->sync_count);
+		LOG_PRINT("  next zero bit  = %llu",
+			  (unsigned long long)r);
+		if (r >= lc->region_count) {
+			LOG_PRINT("ADJUSTING SYNC_COUNT");
+			lc->sync_count = lc->region_count;
+		}
+	}
+	return 0;
+}
diff --git a/cmirror/src/functions.h b/cmirror/src/functions.h
index b793511..f24d411 100644
--- a/cmirror/src/functions.h
+++ b/cmirror/src/functions.h
@@ -20,4 +20,5 @@ int pull_state(const char *uuid, const char *which, char *buf, int size);
 
 int log_get_state(struct clog_tfr *tfr);
 int log_status(int);
+int log_validate(void);
 #endif /* __CLOG_FUNCTIONS_DOT_H__ */
diff --git a/cmirror/src/local.c b/cmirror/src/local.c
index 93a99a8..b6aa0af 100644
--- a/cmirror/src/local.c
+++ b/cmirror/src/local.c
@@ -19,48 +19,54 @@ static int cn_fd;  /* Connector (netlink) socket fd */
 
 static int kernel_recv_helper(void *data, int in_size)
 {
-	int r;
+	int len;
 	struct cn_msg *msg;
 	unsigned char buf[2048];
 
-	ENTER();
-
 	/* FIXME: get rid of buf and use passed in 'data' */
 	memset(buf, 0, sizeof(buf));
 
-	r = recv(cn_fd, buf, sizeof(buf), 0);
-	if (r < 0) {
+	len = recv(cn_fd, buf, sizeof(buf), 0);
+	if (len < 0) {
 		LOG_ERROR("Failed to recv message from kernel");
-		r = -errno;
-		goto out;
+		return -errno;
 	}
 
 	switch (((struct nlmsghdr *)buf)->nlmsg_type) {
 	case NLMSG_ERROR:
 		LOG_ERROR("Unable to recv message from kernel: NLMSG_ERROR");
-		r = -EBADE;
-		break;
+		return -EBADE;
 	case NLMSG_DONE:
 		msg = (struct cn_msg *) NLMSG_DATA((struct nlmsghdr *)buf);
+		len -= sizeof(struct nlmsghdr);
+
+		if (len < sizeof(struct cn_msg)) {
+			LOG_ERROR("Incomplete request from kernel received");
+			return -EBADE;
+		}
+
 		if (msg->len > in_size) {
 			LOG_ERROR("Not enough space to receive kernel request (%d/%d)",
 				  msg->len, in_size);
 			return -EBADE;
 		}
+
 		if (!msg->len)
 			LOG_ERROR("Zero length message received");
 
+		len -= sizeof(struct cn_msg);
+
+		if (len < msg->len)
+			LOG_ERROR("len = %d, msg->len = %d", len, msg->len);
+
 		memcpy(data, msg->data, msg->len);
-		r = 0;
 		break;
 	default:
 		LOG_ERROR("Unknown nlmsg_type");
-		r = -EBADE;
-		break;
+		return -EBADE;
 	}
-out:
-	EXIT();
-	return r;
+
+	return 0;
 }
 
 /*
@@ -77,7 +83,6 @@ static int kernel_recv(struct clog_tfr **tfr)
 {
 	int r = 0;
 
-	ENTER();
 	/*
 	 * A failure to allocate space means the request is lost
 	 * The kernel must retry
@@ -99,7 +104,6 @@ static int kernel_recv(struct clog_tfr **tfr)
 		*tfr = NULL;
 	}
 
-	EXIT();
 	return (r == -EAGAIN) ? 0 : r;
 }
 
@@ -110,7 +114,6 @@ static int kernel_send_helper(void *data, int out_size)
 	struct cn_msg *msg;
 	unsigned char buf[2048];
 
-	ENTER();
 	memset(buf, 0, sizeof(buf));
 
 	nlh = (struct nlmsghdr *)buf;
@@ -130,12 +133,9 @@ static int kernel_send_helper(void *data, int out_size)
 	r = send(cn_fd, nlh, NLMSG_LENGTH(out_size + sizeof(struct cn_msg)), 0);
 	/* FIXME: do better error processing */
 	if (r <= 0)
-		r = -EBADE;
-	else
-		r = 0;
+		return -EBADE;
 
-	EXIT();
-	return r;
+	return 0;
 }
 
 /*
@@ -153,7 +153,6 @@ static int do_local_work(void *data)
 	int r;
 	struct clog_tfr *tfr = NULL;
 
-	ENTER();
 	r = kernel_recv(&tfr);
 	if (r)
 		return r;
@@ -243,7 +242,6 @@ static int do_local_work(void *data)
 		tfr->error = r;
 	}
 
-	EXIT();
 	return r;
 }
 
diff --git a/cmirror/src/queues.c b/cmirror/src/queues.c
index 0bf1aee..eeab79f 100644
--- a/cmirror/src/queues.c
+++ b/cmirror/src/queues.c
@@ -1,6 +1,8 @@
 #include <stdio.h>
 #include <errno.h>
 
+#include <string.h>
+
 #include "queues.h"
 #include "common.h"
 #include "logging.h"
@@ -184,7 +186,12 @@ struct clog_tfr *queue_remove(struct queue *q)
 	tfr = (struct clog_tfr *)q->list.next;
 	list_del_init((struct list_head *)&tfr->private);
 	q->count--;
-
+/*
+	if (q == cluster_queue)
+		LOG_ERROR("[%s] remove %s %llu",
+			  SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+			  (unsigned long long)tfr->seq);
+*/
 	return tfr;
 }
 
@@ -219,6 +226,12 @@ struct clog_tfr *queue_remove_match(struct queue *q,
 			list_del_init(p);
 			q->count--;
 
+/*
+			if (q == cluster_queue)
+				LOG_ERROR("[%s] remove match %s %llu",
+					  SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+					  (unsigned long long)tfr->seq);
+*/
 			return tfr;
 		}
 	}
@@ -236,6 +249,20 @@ struct clog_tfr *queue_remove_match(struct queue *q,
  */
 void queue_remove_all(struct list_head *l, struct queue *q)
 {
+/*
+	struct clog_tfr *tfr;
+	struct list_head *p, *n;
+
+	if (q == cluster_queue) {
+		LOG_ERROR("[--] remove all");
+		list_for_each_safe(p, n, &q->list) {
+			tfr = (struct clog_tfr *)p;
+			LOG_ERROR("    [%s] %s %llu",
+				  SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+				  (unsigned long long)tfr->seq);
+		}
+	}		
+*/
 	list_splice_init(&q->list, l);
 	q->count = 0;
 }


hooks/post-receive
--
Cluster Project



More information about the Cluster-cvs mailing list