Cluster Project branch, master, updated. cluster-2.99.04-53-g2a8c5d4

teigland@sourceware.org teigland@sourceware.org
Fri Jun 20 17:05:00 GMT 2008


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "Cluster Project".

http://sources.redhat.com/git/gitweb.cgi?p=cluster.git;a=commitdiff;h=2a8c5d4d91e5ff0467cbe20fce4b230c19a75376

The branch, master has been updated
       via  2a8c5d4d91e5ff0467cbe20fce4b230c19a75376 (commit)
      from  837e9238be257221fffe7abe1efccbe1332ce810 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
commit 2a8c5d4d91e5ff0467cbe20fce4b230c19a75376
Author: David Teigland <teigland@redhat.com>
Date:   Mon Jun 2 10:50:51 2008 -0500

    gfs_controld: new version
    
    Uses libcpg directly instead of libgroup/groupd, like we've already
    done for fenced and dlm_controld.
    
    Signed-off-by: David Teigland <teigland@redhat.com>

-----------------------------------------------------------------------

Summary of changes:
 dlm/libdlmcontrol/libdlmcontrol.h          |    3 +
 dlm/libdlmcontrol/main.c                   |   12 +-
 group/gfs_controld/Makefile                |    7 +-
 group/gfs_controld/cpg-new.c               | 2565 ++++++++++++++++++++++++++++
 group/gfs_controld/cpg-old.c               |  261 +---
 group/gfs_controld/cpg-old.h               |    2 +-
 group/{dlm_controld => gfs_controld}/crc.c |   14 +-
 group/gfs_controld/gfs_daemon.h            |   74 +-
 group/gfs_controld/main.c                  |  262 +++-
 group/gfs_controld/plock.c                 |    6 +-
 group/gfs_controld/util.c                  |   29 +-
 11 files changed, 2936 insertions(+), 299 deletions(-)
 create mode 100644 group/gfs_controld/cpg-new.c
 copy group/{dlm_controld => gfs_controld}/crc.c (86%)

diff --git a/dlm/libdlmcontrol/libdlmcontrol.h b/dlm/libdlmcontrol/libdlmcontrol.h
index 5f03377..83ddf6f 100644
--- a/dlm/libdlmcontrol/libdlmcontrol.h
+++ b/dlm/libdlmcontrol/libdlmcontrol.h
@@ -73,6 +73,9 @@ int dlmc_lockspaces(int max, int *count, struct dlmc_lockspace *lss);
 int dlmc_lockspace_nodes(char *lsname, int type, int max, int *count,
 			 struct dlmc_node *nodes);
 
+#define DLMC_RESULT_REGISTER	1
+#define DLMC_RESULT_NOTIFIED	2
+
 int dlmc_fs_connect(void);
 void dlmc_fs_disconnect(int fd);
 int dlmc_fs_register(int fd, char *name);
diff --git a/dlm/libdlmcontrol/main.c b/dlm/libdlmcontrol/main.c
index 4354fba..c31afb8 100644
--- a/dlm/libdlmcontrol/main.c
+++ b/dlm/libdlmcontrol/main.c
@@ -376,9 +376,19 @@ int dlmc_fs_result(int fd, char *name, int *type, int *nodeid, int *result)
 		goto out;
 
 	strncpy(name, h.name, DLM_LOCKSPACE_LEN);
-	*type = h.command;
 	*nodeid = h.option;
 	*result = h.data;
+
+	switch (h.command) {
+	case DLMC_CMD_FS_REGISTER:
+		*type = DLMC_RESULT_REGISTER;
+		break;
+	case DLMC_CMD_FS_NOTIFIED:
+		*type = DLMC_RESULT_NOTIFIED;
+		break;
+	default:
+		*type = 0;
+	}
  out:
 	return rv;
 }
diff --git a/group/gfs_controld/Makefile b/group/gfs_controld/Makefile
index e117d5b..368435c 100644
--- a/group/gfs_controld/Makefile
+++ b/group/gfs_controld/Makefile
@@ -13,24 +13,25 @@ include $(OBJDIR)/make/uninstall.mk
 OBJS= 	main.o \
 	member_cman.o \
 	config.o \
+	crc.o \
+	cpg-new.o \
 	cpg-old.o \
 	group.o \
 	util.o \
 	plock.o
 
-CFLAGS += -I${ccsincdir} -I${cmanincdir} -I${openaisincdir}
+CFLAGS += -I${ccsincdir} -I${cmanincdir} -I${dlmcontrolincdir}
 CFLAGS += -I${openaisincdir}
 CFLAGS += -I${KERNEL_SRC}/include/
 CFLAGS += -I$(S)/../libgfscontrol -I$(S)/../../fence/libfenced/
 CFLAGS += -I$(S)/../lib/ -I$(S)/../include/
 CFLAGS += -I${incdir}
 
-LDFLAGS += -L${ccslibdir} -L${cmanlibdir} -lcman -lccs
+LDFLAGS += -L${ccslibdir} -L${cmanlibdir} -lcman -lccs -ldlmcontrol
 LDFLAGS += -L${openaislibdir} -lcpg -lSaCkpt
 LDFLAGS += -L../../fence/libfenced/ -lfenced
 LDFLAGS += -L../lib -lgroup
 
-
 ${TARGET}: ${OBJS}
 	$(CC) -o $@ $^ $(LDFLAGS)
 
diff --git a/group/gfs_controld/cpg-new.c b/group/gfs_controld/cpg-new.c
new file mode 100644
index 0000000..f44abaf
--- /dev/null
+++ b/group/gfs_controld/cpg-new.c
@@ -0,0 +1,2565 @@
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2008 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "gfs_daemon.h"
+#include "config.h"
+#include "libdlmcontrol.h"
+
+#define MAX_JOURNALS 256
+
+uint32_t cpgname_to_crc(const char *data, int len);
+
+static unsigned int protocol_active[3] = {1, 0, 0};
+static int dlmcontrol_fd;
+
+/* gfs_header types */
+enum {
+	GFS_MSG_START = 1,
+	GFS_MSG_MOUNT_DONE = 2,
+	GFS_MSG_FIRST_RECOVERY_DONE = 3,
+	GFS_MSG_RECOVERY_RESULT = 4,
+};
+
+/* gfs_header flags */
+#define GFS_MFLG_JOINING   1  /* accompanies start, we are joining */
+
+struct gfs_header {
+	uint16_t version[3];
+	uint16_t type;          /* GFS_MSG_ */
+	uint32_t nodeid;        /* sender */
+	uint32_t to_nodeid;     /* recipient, 0 for all */
+	uint32_t global_id;     /* global unique id for this lockspace */
+	uint32_t flags;         /* GFS_MFLG_ */
+	uint32_t msgdata;       /* in-header payload depends on MSG type */
+	uint32_t pad1;
+	uint64_t pad2;
+};
+
+/* mg_info and id_info: for syncing state in start message */
+
+struct mg_info {
+	uint32_t mg_info_size;
+	uint32_t id_info_size;
+	uint32_t id_info_count;
+
+	uint32_t started_count;
+
+	int member_count;
+	int joined_count;
+	int remove_count;
+	int failed_count;
+
+	int first_recovery_needed;
+	int first_recovery_master;
+};
+
+#define IDI_NODEID_IS_MEMBER	0x00000001
+#define IDI_JID_NEEDS_RECOVERY	0x00000002
+#define IDI_MOUNT_DONE		0x00000008
+#define IDI_MOUNT_ERROR		0x00000010
+#define IDI_MOUNT_RO		0x00000020
+#define IDI_MOUNT_SPECTATOR	0x00000040
+
+struct id_info {
+	int nodeid;
+	int jid;
+	uint32_t flags;
+};
+
+#define JID_NONE -1
+
+struct journal {
+	struct list_head list;
+	int jid;
+	int nodeid;
+	int failed_nodeid;
+	int needs_recovery;
+
+	int local_recovery_busy;
+	int local_recovery_done;
+	int local_recovery_result;
+	int failed_recovery_count;
+};
+
+struct node {
+	struct list_head list;
+	int nodeid;
+	int jid;
+	int ro;
+	int spectator;
+	int kernel_mount_done;
+	int kernel_mount_error;
+
+	int check_dlm;
+	int dlm_notify_callback;
+	int dlm_notify_result;
+
+	int failed_reason;
+	uint32_t added_seq;
+	uint32_t removed_seq;
+	uint64_t add_time;
+};
+
+struct member {
+	struct list_head list;
+	int nodeid;
+	int start;   /* 1 if we received a start message for this change */
+	int added;   /* 1 if added by this change */
+	int failed;  /* 1 if failed in this change */
+	int disallowed;
+	char *start_msg; /* full copy of the start message from this node */
+	struct mg_info *mg_info; /* shortcut into started_msg */
+};
+
+/* One of these change structs is created for every confchg a cpg gets. */
+
+#define CGST_WAIT_CONDITIONS 1
+#define CGST_WAIT_MESSAGES   2
+
+struct change {
+	struct list_head list;
+	struct list_head members;
+	struct list_head removed; /* nodes removed by this change */
+	struct list_head saved_messages; /* saved messages */
+	int member_count;
+	int joined_count;
+	int remove_count;
+	int failed_count;
+	int state;
+	int we_joined;
+	uint32_t seq; /* used as a reference for debugging, and for queries */
+	uint32_t combined_seq; /* for queries */
+};
+
+struct save_msg {
+	struct list_head list;
+	int len;
+	char buf[0];
+};
+
+/* 
+   cpg confchg's arrive telling us that mountgroup members have
+   joined/left/failed.  A "change" struct is created for each confchg,
+   and added to the mg->changes list.
+
+   apply_changes()
+   ---------------
+
+   <a new node won't know whether first_recovery_needed or not, but it also
+    won't have any conditions to wait for, so a new node will go directly to
+    sending out start message regardless>
+
+   if first_recovery_needed,
+   (or new, where new is not having completed a start barrier yet)
+   all nodes: skip wait conditions
+   all nodes: send start message
+
+   else !first_recovery_needed,
+   all nodes: if failures in changes, wait for conditions:
+              local mount to complete if in progress, stop_kernel, dlm_notified
+   all nodes: send start message
+
+   <new changes that arrive result in going back to beginning; start messages
+    from this aborted start cycle will be ignored>
+
+   all nodes: wait for all start messages
+
+   <once all start messages are received, new changes will be handled in a
+    new batch after all current changes are cleared at end of sync_state>
+
+   if start cycle / start barrier completes (start messages received from
+   all nodes without being interrupted by a change), go on to sync_state
+   which puts all members (as defined by the most recent change) in sync.
+
+   "old nodes" are nodes that have completed a start cycle before (have
+   a non-zero started_count), and "new nodes" are nodes that have not
+   completed a start cycle before (they are being added by one of the
+   changes in this start cycle)
+
+   sync_state()
+   ------------
+
+   if old nodes have first_recovery_needed, or all nodes are new
+   all nodes: mg->first_recovery_needed = 1
+   all nodes: mg->first_recovery_master = prev or new low nodeid
+   new nodes: instantiate existing state to match old nodes
+   old nodes: update state per the changes in the completed start cycle
+   all nodes: assign jids to new members
+   all nodes: clear all change structs
+
+   else !first_recovery_needed,
+   new nodes: instantiate existing state to match old nodes
+   old nodes: update state per the changes in the completed start cycle
+   all nodes: assign jids to new members
+   all nodes: clear all change structs
+
+   <new changes that arrive from here on result in going back to the top>
+
+   recover_and_start()
+   ------------------- 
+
+   if first_recovery_needed,
+   master:    tells mount to run with first=1 (if not already)
+   all nodes: wait for first_recovery_done message
+   master:    sends first_recovery_done message when mount is done
+   all nodes: mg->first_recovery_needed = 0
+   all nodes: start kernel / tell mount.gfs to mount(2) (master already did)
+   all nodes: send message with result of kernel mount
+
+   else !first_recovery_needed,
+   all nodes: if there are no journals to recover, goto start kernel
+   old nodes: tell kernel to recover jids, send message with each result
+   all nodes: wait for all recoveries to be done
+   all nodes: start kernel
+   new nodes: tell mount.gfs to mount(2)
+   new nodes: send message with result of kernel mount
+
+   [If no one can recover some journal(s), all will be left waiting, unstarted.
+    A new change from a new mount will result in things going back to the top,
+    and hopefully the new node will be successful at doing the journal
+    recoveries when it comes through the recover_and_start() section, which
+    would let everyone start again.]
+*/
+
+static void process_mountgroup(struct mountgroup *mg);
+
+static char *msg_name(int type)
+{
+	switch (type) {
+	case GFS_MSG_START:
+		return "start";
+	case GFS_MSG_MOUNT_DONE:
+		return "mount_done";
+	case GFS_MSG_FIRST_RECOVERY_DONE:
+		return "first_recovery_done";
+	case GFS_MSG_RECOVERY_RESULT:
+		return "recovery_result";
+	default:
+		return "unknown";
+	}
+}
+
+static int _send_message(cpg_handle_t h, void *buf, int len, int type)
+{
+	struct iovec iov;
+	cpg_error_t error;
+	int retries = 0;
+
+	iov.iov_base = buf;
+	iov.iov_len = len;
+
+ retry:
+	error = cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		retries++;
+		usleep(1000);
+		if (!(retries % 100))
+			log_error("cpg_mcast_joined retry %d %s",
+				   retries, msg_name(type));
+		goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("cpg_mcast_joined error %d handle %llx %s",
+			  error, (unsigned long long)h, msg_name(type));
+		return -1;
+	}
+
+	if (retries)
+		log_debug("cpg_mcast_joined retried %d %s",
+			  retries, msg_name(type));
+
+	return 0;
+}
+
+/* header fields caller needs to set: type, to_nodeid, flags, msgdata */
+
+static void gfs_send_message(struct mountgroup *mg, char *buf, int len)
+{
+	struct gfs_header *hd = (struct gfs_header *) buf;
+	int type = hd->type;
+
+	hd->version[0]  = cpu_to_le16(protocol_active[0]);
+	hd->version[1]  = cpu_to_le16(protocol_active[1]);
+	hd->version[2]  = cpu_to_le16(protocol_active[2]);
+	hd->type	= cpu_to_le16(hd->type);
+	hd->nodeid      = cpu_to_le32(our_nodeid);
+	hd->to_nodeid   = cpu_to_le32(hd->to_nodeid);
+	hd->global_id   = cpu_to_le32(mg->id);
+	hd->flags       = cpu_to_le32(hd->flags);
+	hd->msgdata     = cpu_to_le32(hd->msgdata);
+
+	_send_message(mg->cpg_handle, buf, len, type);
+}
+
+static struct member *find_memb(struct change *cg, int nodeid)
+{
+	struct member *memb;
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (memb->nodeid == nodeid)
+			return memb;
+	}
+	return NULL;
+}
+
+static struct mountgroup *find_mg_handle(cpg_handle_t h)
+{
+	struct mountgroup *mg;
+
+	list_for_each_entry(mg, &mountgroups, list) {
+		if (mg->cpg_handle == h)
+			return mg;
+	}
+	return NULL;
+}
+
+static struct mountgroup *find_mg_ci(int ci)
+{
+	struct mountgroup *mg;
+
+	list_for_each_entry(mg, &mountgroups, list) {
+		if (mg->cpg_client == ci)
+			return mg;
+	}
+	return NULL;
+}
+
+static struct journal *find_journal(struct mountgroup *mg, int jid)
+{
+	struct journal *j;
+
+	list_for_each_entry(j, &mg->journals, list) {
+		if (j->jid == jid)
+			return j;
+	}
+	return NULL;
+}
+
+static struct journal *find_journal_by_nodeid(struct mountgroup *mg, int nodeid)
+{
+	struct journal *j;
+
+	list_for_each_entry(j, &mg->journals, list) {
+		if (j->nodeid == nodeid)
+			return j;
+	}
+	return NULL;
+}
+
+static void free_cg(struct change *cg)
+{
+	struct member *memb, *safe;
+	struct save_msg *sm, *sm2;
+
+	list_for_each_entry_safe(memb, safe, &cg->members, list) {
+		list_del(&memb->list);
+		if (memb->start_msg)
+			free(memb->start_msg);
+		free(memb);
+	}
+	list_for_each_entry_safe(memb, safe, &cg->removed, list) {
+		list_del(&memb->list);
+		if (memb->start_msg)
+			free(memb->start_msg);
+		free(memb);
+	}
+	list_for_each_entry_safe(sm, sm2, &cg->saved_messages, list) {
+		list_del(&sm->list);
+		free(sm);
+	}
+	free(cg);
+}
+
+static void free_mg(struct mountgroup *mg)
+{
+	struct change *cg, *cg_safe;
+	struct node *node, *node_safe;
+
+	list_for_each_entry_safe(cg, cg_safe, &mg->changes, list) {
+		list_del(&cg->list);
+		free_cg(cg);
+	}
+
+	if (mg->started_change)
+		free_cg(mg->started_change);
+
+	list_for_each_entry_safe(node, node_safe, &mg->node_history, list) {
+		list_del(&node->list);
+		free(node);
+	}
+
+	free(mg);
+}
+
+static struct node *get_node_history(struct mountgroup *mg, int nodeid)
+{
+	struct node *node;
+
+	list_for_each_entry(node, &mg->node_history, list) {
+		if (node->nodeid == nodeid)
+			return node;
+	}
+	return NULL;
+}
+
+static void node_history_init(struct mountgroup *mg, int nodeid,
+			      struct change *cg)
+{
+	struct node *node;
+
+	node = get_node_history(mg, nodeid);
+	if (node)
+		goto out;
+
+	node = malloc(sizeof(struct node));
+	if (!node)
+		return;
+	memset(node, 0, sizeof(struct node));
+
+	node->nodeid = nodeid;
+	node->add_time = 0;
+	list_add_tail(&node->list, &mg->node_history);
+ out:
+	node->added_seq = cg->seq;	/* for queries */
+}
+
+static void node_history_start(struct mountgroup *mg, int nodeid)
+{
+	struct node *node;
+	
+	node = get_node_history(mg, nodeid);
+	if (!node) {
+		log_error("node_history_start no nodeid %d", nodeid);
+		return;
+	}
+
+	node->add_time = time(NULL);
+}
+
+static void node_history_left(struct mountgroup *mg, int nodeid,
+			      struct change *cg)
+{
+	struct node *node;
+
+	node = get_node_history(mg, nodeid);
+	if (!node) {
+		log_error("node_history_left no nodeid %d", nodeid);
+		return;
+	}
+
+	node->add_time = 0;
+	node->removed_seq = cg->seq;	/* for queries */
+}
+
+static void node_history_fail(struct mountgroup *mg, int nodeid,
+			      struct change *cg, int reason)
+{
+	struct node *node;
+
+	node = get_node_history(mg, nodeid);
+	if (!node) {
+		log_error("node_history_fail no nodeid %d", nodeid);
+		return;
+	}
+
+	node->check_dlm = 1;
+
+	node->removed_seq = cg->seq;	/* for queries */
+	node->failed_reason = reason;	/* for queries */
+}
+
+static int is_added(struct mountgroup *mg, int nodeid)
+{
+	struct change *cg;
+	struct member *memb;
+
+	list_for_each_entry(cg, &mg->changes, list) {
+		memb = find_memb(cg, nodeid);
+		if (memb && memb->added)
+			return 1;
+	}
+	return 0;
+}
+
+static int nodes_failed(struct mountgroup *mg)
+{
+	struct change *cg;
+
+	list_for_each_entry(cg, &mg->changes, list) {
+		if (cg->failed_count)
+			return 1;
+	}
+	return 0;
+}
+
+static int get_id_list(struct mountgroup *mg, struct id_info **ids,
+		       int *count, int *size)
+{
+	struct change *cg;
+	struct member *memb;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	/* find a start message from an old node to use; it doesn't
+	   matter which old node we take the start message from, they
+	   should all be the same */
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!memb->mg_info->started_count)
+			continue;
+
+		*count = memb->mg_info->id_info_count;
+		*size = memb->mg_info->id_info_size;
+		*ids = (struct id_info *)(memb->start_msg +
+					  sizeof(struct gfs_header) +
+					  memb->mg_info->mg_info_size);
+		return 0;
+	}
+	return -1;
+}
+
+static struct id_info *get_id_struct(struct id_info *ids, int count, int size,
+				     int nodeid)
+{
+	struct id_info *id = ids;
+	int i;
+
+	for (i = 0; i < count; i++) {
+		if (id->nodeid == nodeid)
+			return id;
+		id = (struct id_info *)((char *)id + size);
+	}
+	return NULL;
+}
+
+static void start_kernel(struct mountgroup *mg)
+{
+	struct change *cg = mg->started_change;
+
+	if (!mg->kernel_stopped) {
+		log_error("start_kernel %u not stopped", cg->seq);
+		return;
+	}
+
+	log_group(mg, "start_kernel %u member_count %d",
+		  cg->seq, cg->member_count);
+
+	set_sysfs(mg, "block", 0);
+	mg->kernel_stopped = 0;
+
+	if (mg->joining) {
+		client_reply_join_full(mg, 0);
+		mg->joining = 0;
+		mg->mount_client_notified = 1;
+	}
+}
+
+static void stop_kernel(struct mountgroup *mg)
+{
+	if (!mg->kernel_stopped) {
+		log_group(mg, "stop_kernel");
+		set_sysfs(mg, "block", 1);
+		mg->kernel_stopped = 1;
+	}
+}
+
+void process_dlmcontrol(int ci)
+{
+	struct mountgroup *mg;
+	struct node *node;
+	char name[GFS_MOUNTGROUP_LEN+1];
+	int rv, type, nodeid, result;
+
+	memset(name, 0, sizeof(name));
+
+	rv = dlmc_fs_result(dlmcontrol_fd, name, &type, &nodeid, &result);
+	if (rv) {
+		log_error("process_dlmcontrol dlmc_fs_result %d", rv);
+		return;
+	}
+
+	mg = find_mg(name);
+	if (!mg) {
+		log_error("process_dlmcontrol no mg %s", name);
+		return;
+	}
+
+	if (type == DLMC_RESULT_NOTIFIED) {
+		log_group(mg, "process_dlmcontrol notified nodeid %d result %d",
+			  nodeid, result);
+
+		node = get_node_history(mg, nodeid);
+		if (!node) {
+			/* shouldn't happen */
+			log_error("process_dlmcontrol no nodeid %d", nodeid);
+			return;
+		}
+
+		if (mg->dlm_notify_nodeid != nodeid) {
+			/* shouldn't happen */
+			log_error("process_dlmcontrol node %d expected %d",
+				  nodeid, mg->dlm_notify_nodeid);
+			return;
+		}
+
+		mg->dlm_notify_nodeid = 0;
+		node->dlm_notify_callback = 1;
+		node->dlm_notify_result = result;
+
+	} else if (type == DLMC_RESULT_REGISTER) {
+		log_group(mg, "process_dlmcontrol register nodeid %d result %d",
+			  nodeid, result);
+	} else {
+		log_group(mg, "process_dlmcontrol unknown type %d", type);
+	}
+
+	poll_dlm = 0;
+}
+
+static int check_dlm_notify_done(struct mountgroup *mg)
+{
+	struct node *node;
+	int rv;
+
+	/* we're waiting for a notify result from the dlm (could we fire off
+	   all dlmc_fs_notified() calls at once instead of serially?) */
+
+	if (mg->dlm_notify_nodeid)
+		return 0;
+
+	list_for_each_entry(node, &mg->node_history, list) {
+
+		/* check_dlm is set when we see a node fail, and is cleared
+		   below when we find that the dlm has also seen it fail */
+
+		if (!node->check_dlm)
+			continue;
+
+		/* we're in sync with the dlm for this nodeid, i.e. we've
+		   both seen this node fail */
+
+		if (node->dlm_notify_callback && !node->dlm_notify_result) {
+			node->dlm_notify_callback = 0;
+			node->check_dlm = 0;
+			continue;
+		}
+
+		/* we're not in sync with the dlm for this nodeid, i.e.
+		   the dlm hasn't seen this node fail yet; try calling
+		   dlmc_fs_notified() again in a bit */
+
+		if (node->dlm_notify_callback && node->dlm_notify_result) {
+			log_group(mg, "check_dlm_notify will retry nodeid %d",
+				  node->nodeid);
+			node->dlm_notify_callback = 0;
+			poll_dlm = 1;
+			return 0;
+		}
+
+		/* check if the dlm has seen this nodeid fail, we get the
+		   answer asynchronously in process_dlmcontrol */
+
+		log_group(mg, "check_dlm_notify %d begin", node->nodeid);
+
+		rv = dlmc_fs_notified(dlmcontrol_fd, mg->name, node->nodeid);
+		if (rv) {
+			log_error("dlmc_fs_notified error %d", rv);
+			return 0;
+		}
+
+		mg->dlm_notify_nodeid = node->nodeid;
+		return 0;
+	}
+
+	log_group(mg, "check_dlm_notify done");
+	return 1;
+}
+
+static int wait_conditions_done(struct mountgroup *mg)
+{
+	if (mg->first_recovery_needed) {
+		log_group(mg, "wait_conditions skip for first_recovery_needed");
+		return 1;
+	}
+
+	if (!mg->started_count) {
+		log_group(mg, "wait_conditions skip for zero started_count");
+		return 1;
+	}
+
+	if (!nodes_failed(mg)) {
+		log_group(mg, "wait_conditions skip for zero nodes_failed");
+		return 1;
+	}
+
+	if (!mg->mount_client_notified) {
+		log_group(mg, "wait_conditions skip mount client not notified");
+		return 1;
+	}
+
+	if (mg->kernel_mount_done && mg->kernel_mount_error) {
+		log_group(mg, "wait_conditions skip for kernel_mount_error");
+		return 1;
+	}
+
+	if (!mg->kernel_mount_done) {
+		log_group(mg, "wait_conditions need mount_done");
+		return 0;
+	}
+
+	stop_kernel(mg);
+
+	if (!check_dlm_notify_done(mg))
+		return 0;
+
+	return 1;
+}
+
+static int wait_messages_done(struct mountgroup *mg)
+{
+	struct change *cg = list_first_entry(&mg->changes, struct change, list);
+	struct member *memb;
+	int need = 0, total = 0;
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!memb->start)
+			need++;
+		total++;
+	}
+
+	if (need) {
+		log_group(mg, "wait_messages_done need %d of %d", need, total);
+		return 0;
+	}
+
+	log_group(mg, "wait_messages_done got all %d", total);
+	return 1;
+}
+
+static void cleanup_changes(struct mountgroup *mg)
+{
+	struct change *cg = list_first_entry(&mg->changes, struct change, list);
+	struct change *safe;
+
+	list_del(&cg->list);
+	if (mg->started_change)
+		free_cg(mg->started_change);
+	mg->started_change = cg;
+
+	/* zero started_count means "never started" */
+
+	mg->started_count++;
+	if (!mg->started_count)
+		mg->started_count++;
+
+	cg->combined_seq = cg->seq; /* for queries */
+
+	list_for_each_entry_safe(cg, safe, &mg->changes, list) {
+		mg->started_change->combined_seq = cg->seq; /* for queries */
+		list_del(&cg->list);
+		free_cg(cg);
+	}
+}
+
+/* do the change details in the message match the details of the given change */
+
+static int match_change(struct mountgroup *mg, struct change *cg,
+			struct gfs_header *hd, int len, struct mg_info *mi)
+{
+	struct id_info *ids, *id;
+	struct member *memb;
+	uint32_t seq = hd->msgdata;
+	int i, members_mismatch;
+
+	/* We can ignore messages if we're not in the list of members.  The one
+	   known time this will happen is after we've joined the cpg, we can
+	   get messages for changes prior to the change in which we're added. */
+
+	ids = (struct id_info *)((char *)hd +
+				 sizeof(struct gfs_header) +
+				 mi->mg_info_size);
+
+	id = get_id_struct(ids, mi->id_info_count, mi->id_info_size,
+			   our_nodeid);
+
+	if (!id || !(id->flags & IDI_NODEID_IS_MEMBER)) {
+		log_group(mg, "match_change fail %d:%u we are not in members",
+			  hd->nodeid, seq);
+		return 0;
+	}
+
+	memb = find_memb(cg, hd->nodeid);
+	if (!memb) {
+		log_group(mg, "match_change fail %d:%u sender not member",
+			  hd->nodeid, seq);
+		return 0;
+	}
+
+	/* verify this is the right change by matching the counts
+	   and the nodeids of the current members */
+
+	if (mi->member_count != cg->member_count ||
+	    mi->joined_count != cg->joined_count ||
+	    mi->remove_count != cg->remove_count ||
+	    mi->failed_count != cg->failed_count) {
+		log_group(mg, "match_change fail %d:%u expect counts "
+			  "%d %d %d %d", hd->nodeid, seq,
+			  cg->member_count, cg->joined_count,
+			  cg->remove_count, cg->failed_count);
+		return 0;
+	}
+
+	members_mismatch = 0;
+	id = ids;
+
+	for (i = 0; i < mi->id_info_count; i++) {
+		if (!(id->flags & IDI_NODEID_IS_MEMBER))
+			goto next;
+
+		memb = find_memb(cg, id->nodeid);
+		if (!memb) {
+			log_group(mg, "match_change fail %d:%u no memb %d",
+			  	hd->nodeid, seq, id->nodeid);
+			members_mismatch = 1;
+			break;
+		}
+ next:
+		id = (struct id_info *)((char *)id + mi->id_info_size);
+	}
+
+	if (members_mismatch)
+		return 0;
+
+	log_group(mg, "match_change done %d:%u", hd->nodeid, seq);
+	return 1;
+}
+
+/* Unfortunately, there's no really simple way to match a message with the
+   specific change that it was sent for.  We hope that by passing all the
+   details of the change in the message, we will be able to uniquely match
+   it to the correct change. */
+
+/* A start message will usually be for the first (current) change on our list.
+   In some cases it will be for a non-current change, and we can ignore it:
+
+   1. A,B,C get confchg1 adding C
+   2. C sends start for confchg1
+   3. A,B,C get confchg2 adding D
+   4. A,B,C,D recv start from C for confchg1 - ignored
+   5. C,D send start for confchg2
+   6. A,B send start for confchg2
+   7. A,B,C,D recv all start messages for confchg2; start barrier/cycle done
+ 
+   In step 4, how do the nodes know whether the start message from C is
+   for confchg1 or confchg2?  Hopefully by comparing the counts and members. */
+
+static struct change *find_change(struct mountgroup *mg, struct gfs_header *hd,
+				  int len, struct mg_info *mi)
+{
+	struct change *cg;
+
+	list_for_each_entry_reverse(cg, &mg->changes, list) {
+		if (!match_change(mg, cg, hd, len, mi))
+			continue;
+		return cg;
+	}
+
+	log_group(mg, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
+	return NULL;
+}
+
+static void receive_start(struct mountgroup *mg, struct gfs_header *hd, int len)
+{
+	struct change *cg;
+	struct member *memb;
+	struct mg_info *mi;
+	uint32_t seq = hd->msgdata;
+	int added;
+
+	log_group(mg, "receive_start %d:%u len %d", hd->nodeid, seq, len);
+
+	/* header endian conv in deliver_cb, mg_info endian conv here,
+	   id_info endian conv later when it's used */
+
+	mi = (struct mg_info *)((char *)hd + sizeof(struct gfs_header));
+	mi->mg_info_size  = le32_to_cpu(mi->mg_info_size);
+	mi->id_info_size  = le32_to_cpu(mi->id_info_size);
+	mi->id_info_count = le32_to_cpu(mi->id_info_count);
+	mi->started_count = le32_to_cpu(mi->started_count);
+	mi->member_count  = le32_to_cpu(mi->member_count);
+	mi->joined_count  = le32_to_cpu(mi->joined_count);
+	mi->remove_count  = le32_to_cpu(mi->remove_count);
+	mi->failed_count  = le32_to_cpu(mi->failed_count);
+	mi->first_recovery_needed = le32_to_cpu(mi->first_recovery_needed);
+	mi->first_recovery_master = le32_to_cpu(mi->first_recovery_master);
+
+	cg = find_change(mg, hd, len, mi);
+	if (!cg)
+		return;
+
+	memb = find_memb(cg, hd->nodeid);
+	if (!memb) {
+		/* this should never happen since match_change checks it */
+		log_error("receive_start no member %d", hd->nodeid);
+		return;
+	}
+
+	added = is_added(mg, hd->nodeid);
+
+	if (added && mi->started_count) {
+		log_error("receive_start %d:%u add node with started_count %u",
+			  hd->nodeid, seq, mi->started_count);
+
+		/* observe this scheme working before using it; I'm not sure
+		   that a joining node won't ever see an existing node as added
+		   under normal circumstances */
+		/*
+		memb->disallowed = 1;
+		return;
+		*/
+	}
+
+	node_history_start(mg, hd->nodeid);
+	memb->start = 1;
+
+	if (memb->start_msg) {
+		/* shouldn't happen */
+		log_error("receive_start %d:%u dup start msg", hd->nodeid, seq);
+		return;
+	}
+
+	memb->start_msg = malloc(len);
+	if (!memb->start_msg) {
+		log_error("receive_start len %d no mem", len);
+		return;
+	}
+	memcpy(memb->start_msg, hd, len);
+	memb->mg_info = (struct mg_info *)(memb->start_msg +
+					   sizeof(struct gfs_header));
+}
+
+/* start messages are associated with a specific change and use the
+   find_change/match_change routines to make sure all start messages
+   are matched with the same change on all nodes.  The current set of
+   changes are cleared after a completed start cycle.  Other messages
+   happen outside the context of changes.  An "incomplete" start cycle
+   is when a confchg arrives (adding a new change struct) before all
+   start messages have been received for the current change.  In this
+   case, all members send a new start message for the latest change,
+   and any start messages received for the previous change(s) are ignored.
+
+   To sync state with start messages, we need to include:
+   - the state before applying any of the current set of queued changes
+     (new nodes will initialize with this)
+   - the essential info from changes in the set that's being started,
+     so nodes added by one of the queued changes can apply the same changes
+     to the init state that the existing nodes do. */ 
+
+/* recovery_result and mount_done messages may arrive between the time
+   that an old node sends start and the time a new node receives it.
+   two old nodes may also send start before/after a recovery_result or
+   mount_done message, creating inconsistent data in their start messages.
+
+   Soln: a new node saves recovery_result/mount_done messages between
+   last confchg and final start.  the new node knows that a start message
+   from an old node may or may not include the effects from rr/md messages
+   since the last confchg, but *will* include all effects from prior to
+   the last confchg.  The saved rr/md messages can be applied on top of
+   the state from an old node's start message; applying them a second time
+   should not change anything, producing the same result. */
+
+static int count_ids(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct member *memb;
+	struct journal *j;
+	int count = 0;
+
+	if (!mg->started_count)
+		return 1;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list)
+		count++;
+
+	list_for_each_entry(j, &mg->journals, list)
+		if (j->needs_recovery)
+			count++;
+
+	list_for_each_entry(cg, &mg->changes, list) {
+		list_for_each_entry(memb, &cg->removed, list) {
+			j = find_journal_by_nodeid(mg, memb->nodeid);
+			if (j)
+				count++;
+		}
+	}
+
+	return count;
+}
+
+/* old member: current member that has completed a start cycle
+   new member: current member that has not yet completed a start cycle */
+
+static void send_start(struct mountgroup *mg)
+{
+	struct change *cg, *c;
+	struct gfs_header *hd;
+	struct mg_info *mi;
+	struct id_info *id;
+	struct member *memb;
+	struct node *node;
+	struct journal *j;
+	char *buf;
+	uint32_t flags;
+	int len, id_count, jid;
+	int old_memb = 0, new_memb = 0, old_journal = 0, new_journal = 0;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	id_count = count_ids(mg);
+
+	len = sizeof(struct gfs_header) + sizeof(struct mg_info) +
+	      id_count * sizeof(struct id_info);
+
+	buf = malloc(len);
+	if (!buf) {
+		log_error("send_start len %d no mem", len);
+		return;
+	}
+	memset(buf, 0, len);
+
+	hd = (struct gfs_header *)buf;
+	mi = (struct mg_info *)(buf + sizeof(*hd));
+	id = (struct id_info *)(buf + sizeof(*hd) + sizeof(*mi));
+
+	/* fill in header (gfs_send_message handles part of header) */
+
+	hd->type = GFS_MSG_START;
+	hd->msgdata = cg->seq;
+	hd->flags |= mg->joining ? GFS_MFLG_JOINING : 0;
+
+	/* fill in mg_info */
+
+	mi->mg_info_size  = cpu_to_le32(sizeof(struct mg_info));
+	mi->id_info_size  = cpu_to_le32(sizeof(struct id_info));
+	mi->id_info_count = cpu_to_le32(id_count);
+	mi->started_count = cpu_to_le32(mg->started_count);
+	mi->member_count  = cpu_to_le32(cg->member_count);
+	mi->joined_count  = cpu_to_le32(cg->joined_count);
+	mi->remove_count  = cpu_to_le32(cg->remove_count);
+	mi->failed_count  = cpu_to_le32(cg->failed_count);
+	mi->first_recovery_needed = cpu_to_le32(mg->first_recovery_needed);
+	mi->first_recovery_master = cpu_to_le32(mg->first_recovery_master);
+
+	/* fill in id_info entries */
+
+	/* new members only send info about themselves, it's all they have */
+
+	if (!mg->started_count) {
+		flags = IDI_NODEID_IS_MEMBER;
+		flags |= mg->ro ? IDI_MOUNT_RO : 0;
+		flags |= mg->spectator ? IDI_MOUNT_SPECTATOR : 0;
+
+		id->nodeid = cpu_to_le32(our_nodeid);
+		id->jid    = cpu_to_le32(JID_NONE);
+		id->flags  = cpu_to_le32(flags);
+		goto do_send;
+	}
+
+	/* all old members send full info about all old members, and empty
+	   id_info slots about new members.  The union of start messages
+	   from a single old node and all new nodes give a complete picture
+	   of state for all members.  In sync_state, all nodes (old and new)
+	   make this union, and then assign jid's to new nodes. */
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (is_added(mg, memb->nodeid)) {
+			/* send empty slot for new member */
+			jid = JID_NONE;
+			flags = IDI_NODEID_IS_MEMBER;
+			new_memb++;
+		} else {
+			/* send full info for old member */
+			node = get_node_history(mg, memb->nodeid);
+			if (!node) {
+				log_error("send_start no node %d",memb->nodeid);
+				continue;
+			}
+
+			jid = node->jid;
+			flags = IDI_NODEID_IS_MEMBER;
+			flags |= node->ro ? IDI_MOUNT_RO : 0;
+			flags |= node->spectator ? IDI_MOUNT_SPECTATOR : 0;
+			flags |= node->kernel_mount_done ? IDI_MOUNT_DONE : 0;
+			flags |= node->kernel_mount_error ? IDI_MOUNT_ERROR : 0;
+			old_memb++;
+		}
+
+		id->nodeid = cpu_to_le32(memb->nodeid);
+		id->jid    = cpu_to_le32(jid);
+		id->flags  = cpu_to_le32(flags);
+		id++;
+	}
+
+	/* journals needing recovery from previous start cycles */
+
+	list_for_each_entry(j, &mg->journals, list) {
+		if (j->needs_recovery) {
+			id->jid = cpu_to_le32(j->jid);
+			id->flags = cpu_to_le32(IDI_JID_NEEDS_RECOVERY);
+			id++;
+			old_journal++;
+		}
+	}
+
+	/* journals needing recovery from the current start cycle */
+
+	list_for_each_entry(c, &mg->changes, list) {
+		list_for_each_entry(memb, &c->removed, list) {
+			j = find_journal_by_nodeid(mg, memb->nodeid);
+			if (j) {
+				id->jid = cpu_to_le32(j->jid);
+				id->flags = cpu_to_le32(IDI_JID_NEEDS_RECOVERY);
+				id++;
+				new_journal++;
+			}
+		}
+	}
+
+ do_send:
+	log_group(mg, "send_start %u id_count %d om %d nm %d oj %d nj %d",
+		  cg->seq, id_count, old_memb, new_memb, old_journal,
+		  new_journal);
+
+	gfs_send_message(mg, buf, len);
+
+	free(buf);
+}
+
+static void send_mount_done(struct mountgroup *mg, int result)
+{
+	struct gfs_header h;
+
+	memset(&h, 0, sizeof(h));
+
+	h.type = GFS_MSG_MOUNT_DONE;
+	h.msgdata = result;
+
+	gfs_send_message(mg, (char *)&h, sizeof(h));
+}
+
+static void send_first_recovery_done(struct mountgroup *mg)
+{
+	struct gfs_header h;
+
+	memset(&h, 0, sizeof(h));
+
+	h.type = GFS_MSG_FIRST_RECOVERY_DONE;
+
+	gfs_send_message(mg, (char *)&h, sizeof(h));
+}
+
+static void send_recovery_result(struct mountgroup *mg, int jid, int result)
+{
+	struct gfs_header *hd;
+	char *buf;
+	int len, *p;
+
+	len = sizeof(struct gfs_header) + 2 * sizeof(int);
+
+	buf = malloc(len);
+	if (!buf) {
+		return;
+	}
+	memset(buf, 0, len);
+
+	hd = (struct gfs_header *)buf;
+	hd->type = GFS_MSG_RECOVERY_RESULT;
+
+	p = (int *)(buf + sizeof(struct gfs_header));
+
+	p[0] = cpu_to_le32(jid);
+	p[1] = cpu_to_le32(result);
+
+	gfs_send_message(mg, buf, len);
+
+	free(buf);
+}
+
+static void save_message(struct mountgroup *mg, struct gfs_header *hd, int len)
+{
+	struct change *cg;
+	struct save_msg *sm;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	sm = malloc(sizeof(struct save_msg) + len);
+	if (!sm) {
+		log_error("save_message len %d no mem", len);
+		return;
+	}
+
+	sm->len = len;
+	memcpy(sm->buf, hd, len);
+
+	list_add_tail(&sm->list, &cg->saved_messages);
+}
+
+/* We can't register with dlm_controld until dlm_controld knows about this
+   lockspace.  We know that it will when the kernel mount completes.  */
+
+void gfs_mount_done(struct mountgroup *mg)
+{
+	int rv;
+
+	if (!mg->kernel_mount_error) {
+		rv = dlmc_fs_register(dlmcontrol_fd, mg->name);
+		if (rv)
+			log_error("dlmc_fs_register %s error %d", mg->name, rv);
+	}
+
+	send_mount_done(mg, mg->kernel_mount_error);
+}
+
+static void receive_mount_done(struct mountgroup *mg, struct gfs_header *hd,
+			       int len)
+{
+	struct node *node;
+
+	log_group(mg, "receive_mount_done %d result %d",
+		  hd->nodeid, hd->msgdata);
+
+	node = get_node_history(mg, hd->nodeid);
+	if (!node) {
+		log_error("receive_mount_done no nodeid %d", hd->nodeid);
+		return;
+	}
+
+	node->kernel_mount_done = 1;
+	node->kernel_mount_error = hd->msgdata;
+}
+
+static void receive_recovery_result(struct mountgroup *mg,
+				    struct gfs_header *hd, int len)
+{
+	struct journal *j;
+	int jid, result, *p;
+
+	p = (int *)((char *)hd + sizeof(struct gfs_header));
+	jid = le32_to_cpu(p[0]);
+	result = le32_to_cpu(p[1]);
+
+	log_group(mg, "receive_recovery_result %d jid %d result %d",
+		  hd->nodeid, jid, result);
+
+	j = find_journal(mg, jid);
+	if (!j) {
+		log_error("receive_recovery_result %d no jid %d",
+			  hd->nodeid, jid);
+		return;
+	}
+
+	if (!j->needs_recovery)
+		return;
+
+	if (result == LM_RD_SUCCESS)
+		j->needs_recovery = 0;
+	else {
+		j->failed_recovery_count++;
+		log_group(mg, "jid %d failed_recovery_count %d", jid,
+			  j->failed_recovery_count);
+	}
+}
+
+static void receive_first_recovery_done(struct mountgroup *mg,
+					struct gfs_header *hd, int len)
+{
+	int master = mg->first_recovery_master;
+
+	log_group(mg, "receive_first_recovery_done %d master %d "
+		  "mount_client_notified %d",
+		  hd->nodeid, master, mg->mount_client_notified);
+
+	if (master != hd->nodeid)
+		log_error("receive_first_recovery_done %d master %d",
+			  hd->nodeid, master);
+
+	if (list_empty(&mg->changes)) {
+		/* everything is idle, no changes in progress */
+
+		mg->first_recovery_needed = 0;
+		mg->first_recovery_master = 0;
+		mg->first_recovery_msg = 1;
+
+		if (master != our_nodeid)
+			start_kernel(mg);
+	} else {
+		/* Everyone will receive this message in the same sequence
+		   wrt other start messages and confchgs:
+
+		   - If a new confchg arrives after this message (and before
+		     the final start message in the current start cycle),
+		     a new start cycle will begin.  All nodes before the
+		     confchg will have frn=0 due to receiving this message,
+		     and nodes added by the confchg will see frn=0 in all
+		     start messages (in any_nodes_first_recovery() which
+		     returns 0).
+
+		   - If the final start message arrives after this message,
+		     the start cycle will complete, running sync_state(), on
+		     all current nodes with all having seen this message.
+		     Old and new nodes in the current start cycle will see
+		     this msg and use it (first_recovery_msg) instead of the
+		     first_recovery_needed/master data in the start messages
+		     (which may be inconsistent due to members sending their
+		     start messages either before or after receiving this
+		     message). */
+
+		mg->first_recovery_needed = 0;
+		mg->first_recovery_master = 0;
+		mg->first_recovery_msg = 1;
+	}
+}
+
+/* start message from all nodes shows zero started_count */
+
+static int all_nodes_new(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct member *memb;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (memb->mg_info->started_count)
+			return 0;
+	}
+	return 1;
+}
+
+/* does start message from any node with non-zero started_count have
+   first_recovery_needed set?  (verify that all started nodes agree on
+   first_recovery_needed) */
+
+static int any_nodes_first_recovery(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct member *memb;
+	int yes = 0, no = 0, master = 0;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!memb->mg_info->started_count)
+			continue;
+		if (memb->mg_info->first_recovery_needed)
+			yes++;
+		else
+			no++;
+	}
+
+	if (no && yes) {
+		/* disagreement on first_recovery_needed, shouldn't happen */
+		log_error("any_nodes_first_recovery no %d yes %d", no, yes);
+		return 1;
+	}
+
+	if (no)
+		return 0;
+
+	/* sanity check: verify agreement on the master */
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!memb->mg_info->started_count)
+			continue;
+		if (!master) {
+			master = memb->mg_info->first_recovery_master;
+			continue;
+		}
+		if (master == memb->mg_info->first_recovery_master)
+			continue;
+
+		/* disagreement on master, shouldn't happen */
+		log_error("any_nodes_first_recovery master %d vs %d",
+			  master, memb->mg_info->first_recovery_master);
+	}
+
+	return 1;
+}
+
+/* If all nodes new, there's no previous master, pick low nodeid;
+   if not all nodes new, there will be a previous master, use that one unless
+   it's no longer a member; if master is no longer a member pick low nodeid.
+   The current master will already be set in mg->first_recovery_master for old
+   nodes, but new nodes will need to look in the start messages to find it. */
+
+static int pick_first_recovery_master(struct mountgroup *mg, int all_new)
+{
+	struct change *cg;
+	struct member *memb;
+	int old = 0, low = 0;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (memb->mg_info->started_count)
+			old = memb->mg_info->first_recovery_master;
+
+		if (!low)
+			low = memb->nodeid;
+		else if (memb->nodeid < low)
+			low = memb->nodeid;
+	}
+
+	memb = find_memb(cg, old);
+
+	if (!memb || all_new) {
+		log_group(mg, "pick_first_recovery_master low %d old %d",
+			  low, old);
+		return low;
+	}
+
+	log_group(mg, "pick_first_recovery_master old %d", old);
+	return old;
+}
+
+/* use a start message from an old node to create node info for each old node */
+
+static void create_old_nodes(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct member *memb;
+	struct node *node;
+	struct journal *j;
+	struct id_info *ids, *id;
+	int id_count, id_size, rv;
+
+	/* get ids from a start message of an old node */
+
+	rv = get_id_list(mg, &ids, &id_count, &id_size);
+	if (rv) {
+		/* all new nodes, no old nodes */
+		log_group(mg, "create_old_nodes all new");
+		return;
+	}
+
+	/* use id list to set info for all old nodes */
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (!memb->mg_info->started_count)
+			continue;
+
+		node = get_node_history(mg, memb->nodeid);
+		id = get_id_struct(ids, id_count, id_size, memb->nodeid);
+
+		if (!node || !id) {
+			/* shouldn't happen */
+			log_error("create_old_nodes %d node %d id %d",
+				  memb->nodeid, !!node, !!id);
+			return;
+		}
+
+		if (!(id->flags & IDI_NODEID_IS_MEMBER) ||
+		     (id->flags & IDI_JID_NEEDS_RECOVERY)) {
+			/* shouldn't happen */
+			log_error("create_old_nodes %d bad flags %x",
+				  memb->nodeid, id->flags);
+			return;
+		}
+
+		node->jid                = id->jid;
+		node->kernel_mount_done  = !!(id->flags & IDI_MOUNT_DONE);
+		node->kernel_mount_error = !!(id->flags & IDI_MOUNT_ERROR);
+		node->ro                 = !!(id->flags & IDI_MOUNT_RO);
+		node->spectator          = !!(id->flags & IDI_MOUNT_SPECTATOR);
+
+		j = malloc(sizeof(struct journal));
+		if (!j) {
+			log_error("create_old_nodes no mem");
+			return;
+		}
+		memset(j, 0, sizeof(struct journal));
+
+		j->nodeid = node->nodeid;
+		j->jid = node->jid;
+		list_add(&j->list, &mg->journals);
+
+		log_group(mg, "create_old_nodes %d jid %d ro %d spect %d "
+			  "kernel_mount_done %d error %d",
+			  node->nodeid, node->jid, node->ro, node->spectator,
+			  node->kernel_mount_done, node->kernel_mount_error);
+	}
+}
+
+/* use start messages from new nodes to create node info for each new node */
+
+static void create_new_nodes(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct member *memb;
+	struct id_info *id;
+	struct node *node;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (memb->mg_info->started_count)
+			continue;
+
+		node = get_node_history(mg, memb->nodeid);
+		if (!node) {
+			/* shouldn't happen */
+			log_error("create_new_nodes %d no node", memb->nodeid);
+			return;
+		}
+
+		/* a new node sends one id struct describing itself */
+
+		id = (struct id_info *)(memb->start_msg +
+					sizeof(struct gfs_header) +
+					memb->mg_info->mg_info_size);
+
+		if (!(id->flags & IDI_NODEID_IS_MEMBER) ||
+		     (id->flags & IDI_JID_NEEDS_RECOVERY)) {
+			/* shouldn't happen */
+			log_error("create_new_nodes %d bad flags %x",
+				  memb->nodeid, id->flags);
+			return;
+		}
+
+		node->jid       = JID_NONE;
+		node->ro        = !!(id->flags & IDI_MOUNT_RO);
+		node->spectator = !!(id->flags & IDI_MOUNT_SPECTATOR);
+
+		log_group(mg, "create_new_nodes %d ro %d spect %d",
+			  node->nodeid, node->ro, node->spectator);
+	}
+}
+
+static void create_failed_journals(struct mountgroup *mg)
+{
+	struct journal *j;
+	struct id_info *ids, *id;
+	int id_count, id_size;
+	int rv, i;
+
+	rv = get_id_list(mg, &ids, &id_count, &id_size);
+	if (rv) {
+		/* all new nodes, no old nodes */
+		return;
+	}
+
+	id = ids;
+
+	for (i = 0; i < id_count; i++) {
+		if (!(id->flags & IDI_JID_NEEDS_RECOVERY))
+			continue;
+
+		j = malloc(sizeof(struct journal));
+		if (!j) {
+			log_error("create_failed_journals no mem");
+			return;
+		}
+		memset(j, 0, sizeof(struct journal));
+
+		j->jid = id->jid;
+		j->needs_recovery = 1;
+		list_add(&j->list, &mg->journals);
+
+		id = (struct id_info *)((char *)id + id_size);
+
+		log_group(mg, "create_failed_journals jid %d", j->jid);
+	}
+}
+
+static void set_failed_journals(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct member *memb;
+	struct journal *j;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(cg, &mg->changes, list) {
+		list_for_each_entry(memb, &cg->removed, list) {
+			if (!memb->failed)
+				continue;
+
+			j = find_journal_by_nodeid(mg, memb->nodeid);
+			if (j) {
+				j->needs_recovery = 1;
+				j->failed_nodeid = j->nodeid;
+				j->nodeid = 0;
+				log_group(mg, "set_failed_journals jid %d "
+					  "nodeid %d", j->jid, memb->nodeid);
+			} else {
+				log_group(mg, "set_failed_journals no journal "
+					  "for nodeid %d ", memb->nodeid);
+			}
+		}
+	}
+}
+
+/* returns nodeid of new member with the next highest nodeid */
+
+static int next_new_nodeid(struct mountgroup *mg, int prev)
+{
+	struct change *cg;
+	struct member *memb;
+	int low = 0;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry(memb, &cg->members, list) {
+		if (memb->mg_info->started_count)
+			continue;
+		if (memb->nodeid <= prev)
+			continue;
+		if (!low)
+			low = memb->nodeid;
+		else if (memb->nodeid < low)
+			low = memb->nodeid;
+	}
+
+	return low;
+}
+
+/* returns lowest unused jid */
+
+static int next_free_jid(struct mountgroup *mg)
+{
+	int i;
+
+	for (i = 0; i < MAX_JOURNALS; i++) {
+		if (!find_journal(mg, i))
+			return i;
+	}
+	return -1;
+}
+
+static void create_new_journals(struct mountgroup *mg)
+{
+	struct journal *j, *safe;
+	struct change *cg;
+	struct node *node;
+	int nodeid = 0;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	/* first get rid of journal structs that are no longer used
+	   or dirty, i.e. from nodes that have unmounted/left, or
+	   journals that have been recovered */
+
+	list_for_each_entry_safe(j, safe, &mg->journals, list) {
+		if (j->needs_recovery)
+			continue;
+
+		if (find_memb(cg, j->nodeid))
+			continue;
+
+		list_del(&j->list);
+		free(j);
+	}
+
+	while (1) {
+		nodeid = next_new_nodeid(mg, nodeid);
+		if (!nodeid)
+			break;
+
+		node = get_node_history(mg, nodeid);
+		if (!node) {
+			/* shouldn't happen */
+			log_error("create_new_journals no nodeid %d", nodeid);
+			continue;
+		}
+
+		if (node->spectator)
+			node->jid = JID_NONE;
+		else
+			node->jid = next_free_jid(mg);
+
+		if (node->nodeid == our_nodeid)
+			mg->our_jid = node->jid;
+
+		log_group(mg, "create_new_journals %d gets jid %d",
+			  node->nodeid, node->jid);
+	}
+}
+
+/* recovery_result and mount_done messages are saved by new members until
+   they've completed the start cycle and have member state to apply them to.
+   The start messages from old nodes may not reflect the rr/md updates. */
+
+static void apply_saved_messages(struct mountgroup *mg)
+{
+	struct change *cg;
+	struct save_msg *sm, *safe;
+	struct gfs_header *hd;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	list_for_each_entry_safe(sm, safe, &cg->saved_messages, list) {
+		hd = (struct gfs_header *)sm->buf;
+
+		switch (hd->type) {
+		case GFS_MSG_MOUNT_DONE:
+			receive_mount_done(mg, hd, sm->len);
+			break;
+		case GFS_MSG_RECOVERY_RESULT:
+			receive_recovery_result(mg, hd, sm->len);
+			break;
+		}
+
+		list_del(&sm->list);
+		free(sm);
+	}
+}
+
+/* this is run immediately after receiving the final start message in a start
+   cycle, so all nodes will run this in the same sequence wrt other messages
+   and confchgs */
+
+static void sync_state(struct mountgroup *mg)
+{
+	/* This is needed for the case where the first_recovery_done message
+	   arrives while a change/start cycle is in progress.  The
+	   first_recovery data in the start messages (used by new nodes in this
+	   cycle to determine the first_recovery state) may be inconsistent in
+	   different start messages (because nodes sent their start messages at
+	   different times wrt the first_recovery_done message.)  But, in the
+	   case where the new nodes received the first_recovery_done message,
+	   they can just use that and don't need the (possibly inconsistent)
+	   first recovery data in the start messages. */
+
+	if (mg->first_recovery_msg) {
+		if (mg->first_recovery_needed || mg->first_recovery_master) {
+			/* shouldn't happen */
+			log_error("sync_state first_recovery_msg needed %d "
+				  "master %d", mg->first_recovery_needed,
+				  mg->first_recovery_master);
+		}
+
+		log_group(mg, "sync_state first_recovery_msg");
+		goto out;
+	}
+
+	/* This is the path the initial start cycle for the group always
+	   follows.  It's the case where one or more nodes are all starting up
+	   for the first time.  No one has completed a start cycle yet because
+	   everyone is joining, and one node needs to do first recovery. */
+
+	if (all_nodes_new(mg)) {
+		if (mg->first_recovery_needed || mg->first_recovery_master) {
+			/* shouldn't happen */
+			log_error("sync_state all_nodes_new first_recovery "
+				  "needed %d master %d",
+				  mg->first_recovery_needed,
+				  mg->first_recovery_master);
+		}
+		mg->first_recovery_needed = 1;
+		mg->first_recovery_master = pick_first_recovery_master(mg, 1);
+
+		log_group(mg, "sync_state all_nodes_new first_recovery_needed "
+			  "master %d", mg->first_recovery_master);
+		goto out;
+	}
+
+	/* This is for the case where new nodes are added to existing members
+	   that have first_recovery_needed set. */
+
+	if (any_nodes_first_recovery(mg)) {
+		mg->first_recovery_needed = 1;
+		mg->first_recovery_master = pick_first_recovery_master(mg, 0);
+
+		log_group(mg, "sync_state first_recovery_needed master %d",
+			  mg->first_recovery_master);
+		goto out;
+	}
+
+	/* Normal case where nodes join an established group that completed
+	   first recovery sometime in the past.  Existing nodes that weren't
+	   around during first recovery come through here, and new nodes
+           being added in this cycle come through here. */
+
+	if (mg->first_recovery_needed) {
+		/* shouldn't happen */
+		log_error("sync_state frn should not be set");
+	}
+
+ out:
+	if (!mg->started_count) {
+		create_old_nodes(mg);
+		create_new_nodes(mg);
+		create_failed_journals(mg);
+		apply_saved_messages(mg);
+		create_new_journals(mg);
+	} else {
+		create_new_nodes(mg);
+		set_failed_journals(mg);
+		create_new_journals(mg);
+	}
+}
+
+static void apply_changes(struct mountgroup *mg)
+{
+	struct change *cg;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	switch (cg->state) {
+
+	case CGST_WAIT_CONDITIONS:
+		if (wait_conditions_done(mg)) {
+			send_start(mg);
+			cg->state = CGST_WAIT_MESSAGES;
+		}
+		break;
+
+	case CGST_WAIT_MESSAGES:
+		if (wait_messages_done(mg)) {
+			sync_state(mg);
+			cleanup_changes(mg);
+		}
+		break;
+
+	default:
+		log_error("apply_changes invalid state %d", cg->state);
+	}
+}
+
+/* We send messages with the info from kernel uevents or mount.gfs ipc,
+   and then process the uevent/ipc upon receiving the message for it, so
+   that it can be processed in the same order by all nodes. */
+
+void process_recovery_uevent(char *table)
+{
+	struct mountgroup *mg;
+	struct journal *j;
+	char *name = strstr(table, ":") + 1;
+	int jid, recover_status, first_done;
+	int rv;
+
+	mg = find_mg(name);
+	if (!mg) {
+		log_error("process_recovery_uevent mg not found %s", table);
+		return;
+	}
+
+	rv = read_sysfs_int(mg, "recover_done", &jid);
+	if (rv < 0) {
+		log_error("process_recovery_uevent recover_done read %d", rv);
+		return;
+	}
+
+	rv = read_sysfs_int(mg, "recover_status", &recover_status);
+	if (rv < 0) {
+		log_error("process_recovery_uevent recover_status read %d", rv);
+		return;
+	}
+
+	if (!mg->first_recovery_needed) {
+		mg->local_recovery_busy = 0;
+
+		if (mg->local_recovery_jid != jid) {
+			log_error("process_recovery_uevent jid %d exp %d",
+				  jid, mg->local_recovery_jid);
+			return;
+		}
+
+		j = find_journal(mg, jid);
+		if (!j) {
+			log_error("process_recovery_uevent no journal %d", jid);
+			return;
+		}
+
+		log_group(mg, "process_recovery_uevent jid %d status %d",
+			  jid, recover_status);
+
+		j->local_recovery_done = 1;
+		j->local_recovery_result = recover_status;
+
+		/* j->needs_recovery will be cleared when we receive this
+		   recovery_result message */
+
+		send_recovery_result(mg, jid, recover_status);
+	} else {
+
+		/*
+		 * Assumption here is that only the first mounter will get
+		 * uevents when first_recovery_needed is set.
+	 	 */
+
+		/* make a local record of jid and recover_status; we may want
+		   to check below that we've seen uevents for all jids
+		   during first recovery before sending first_recovery_done. */
+
+		log_group(mg, "process_recovery_uevent jid %d status %d "
+			  "ignore during first_recovery", jid, recover_status);
+
+		rv = read_sysfs_int(mg, "first_done", &first_done);
+		if (rv < 0) {
+			log_error("process_recovery_uevent first_done read %d",
+				  rv);
+			return;
+		}
+
+		if (first_done) {
+			log_group(mg, "process_recovery_uevent first_done");
+			send_first_recovery_done(mg);
+		}
+	}
+
+	process_mountgroup(mg);
+}
+
+static void start_journal_recovery(struct mountgroup *mg, int jid)
+{
+	int rv;
+
+	log_group(mg, "start_journal_recovery jid %d", jid);
+
+	rv = set_sysfs(mg, "recover", jid);
+	if (rv < 0) {
+		log_error("start_journal_recovery %d error %d", jid, rv);
+		return;
+	}
+
+	mg->local_recovery_busy = 1;
+	mg->local_recovery_jid = jid;
+}
+
+static int wait_recoveries_done(struct mountgroup *mg)
+{
+	struct journal *j;
+	int wait_count = 0;
+
+	list_for_each_entry(j, &mg->journals, list) {
+		if (j->needs_recovery) {
+			log_group(mg, "wait_recoveries jid %d unrecovered",
+				  j->jid);
+			wait_count++;
+		}
+	}
+
+	if (wait_count)
+		return 0;
+
+	log_group(mg, "wait_recoveries done");
+	return 1;
+}
+
+/* pick a jid that has not been successfully recovered by someone else
+   (received recovery_result success message) and hasn't been recovered
+   by us (local record); if nothing to recover, return 0 */
+
+static int pick_journal_to_recover(struct mountgroup *mg, int *jid)
+{
+	struct journal *j;
+
+	list_for_each_entry(j, &mg->journals, list) {
+		if (j->needs_recovery && !j->local_recovery_done) {
+			*jid = j->jid;
+			return 1;
+		}
+	}
+	return 0;
+}
+
+/* processing that happens after all changes have been dealt with */
+
+static void recover_and_start(struct mountgroup *mg)
+{
+	int jid;
+
+	if (mg->first_recovery_needed) {
+		if (mg->first_recovery_master == our_nodeid &&
+		    !mg->mount_client_notified) {
+			log_group(mg, "recover_and_start first start_kernel");
+			mg->first_mounter = 1; /* adds first=1 to hostdata */
+			start_kernel(mg);      /* includes reply to mount.gfs */
+		}
+		return;
+	}
+
+	/* The normal non-first-recovery mode.  When a recovery_done message
+	   is received, check whether any more journals need recovery.  If
+	   so, start recovery on the next one, if not, start the kernel. */
+
+	if (wait_recoveries_done(mg)) {
+		/* will first mounter get an error in start_kernel
+		   the first time it's called after first mount? */
+		log_group(mg, "recover_and_start start_kernel");
+		start_kernel(mg);
+	} else {
+		if (!mg->kernel_mount_done || mg->kernel_mount_error)
+			return;
+		if (mg->spectator)
+			return;
+		if (mg->local_recovery_busy)
+			return;
+		if (pick_journal_to_recover(mg, &jid))
+			start_journal_recovery(mg, jid);
+	}
+}
+
+static void process_mountgroup(struct mountgroup *mg)
+{
+	if (!list_empty(&mg->changes))
+		apply_changes(mg);
+	
+	if (list_empty(&mg->changes))
+		recover_and_start(mg);
+}
+
+void process_mountgroups(void)
+{
+	struct mountgroup *mg, *safe;
+
+	list_for_each_entry_safe(mg, safe, &mountgroups, list)
+		process_mountgroup(mg);
+}
+
+static int add_change(struct mountgroup *mg,
+		      struct cpg_address *member_list, int member_list_entries,
+		      struct cpg_address *left_list, int left_list_entries,
+		      struct cpg_address *joined_list, int joined_list_entries,
+		      struct change **cg_out)
+{
+	struct change *cg;
+	struct member *memb;
+	int i, error;
+
+	cg = malloc(sizeof(struct change));
+	if (!cg)
+		goto fail_nomem;
+	memset(cg, 0, sizeof(struct change));
+	INIT_LIST_HEAD(&cg->members);
+	INIT_LIST_HEAD(&cg->removed);
+	INIT_LIST_HEAD(&cg->saved_messages);
+	cg->state = CGST_WAIT_CONDITIONS;
+	cg->seq = ++mg->change_seq;
+	if (!cg->seq)
+		cg->seq = ++mg->change_seq;
+
+	cg->member_count = member_list_entries;
+	cg->joined_count = joined_list_entries;
+	cg->remove_count = left_list_entries;
+
+	for (i = 0; i < member_list_entries; i++) {
+		memb = malloc(sizeof(struct member));
+		if (!memb)
+			goto fail_nomem;
+		memset(memb, 0, sizeof(struct member));
+		memb->nodeid = member_list[i].nodeid;
+		list_add_tail(&memb->list, &cg->members);
+	}
+
+	for (i = 0; i < left_list_entries; i++) {
+		memb = malloc(sizeof(struct member));
+		if (!memb)
+			goto fail_nomem;
+		memset(memb, 0, sizeof(struct member));
+		memb->nodeid = left_list[i].nodeid;
+		if (left_list[i].reason == CPG_REASON_NODEDOWN ||
+		    left_list[i].reason == CPG_REASON_PROCDOWN) {
+			memb->failed = 1;
+			cg->failed_count++;
+		}
+		list_add_tail(&memb->list, &cg->removed);
+
+		if (memb->failed)
+			node_history_fail(mg, memb->nodeid, cg,
+					  left_list[i].reason);
+		else
+			node_history_left(mg, memb->nodeid, cg);
+
+		log_group(mg, "add_change %u nodeid %d remove reason %d",
+			  cg->seq, memb->nodeid, left_list[i].reason);
+	}
+
+	for (i = 0; i < joined_list_entries; i++) {
+		memb = find_memb(cg, joined_list[i].nodeid);
+		if (!memb) {
+			log_error("no member %d", joined_list[i].nodeid);
+			error = -ENOENT;
+			goto fail;
+		}
+		memb->added = 1;
+
+		if (memb->nodeid == our_nodeid)
+			cg->we_joined = 1;
+		else
+			node_history_init(mg, memb->nodeid, cg);
+
+		log_group(mg, "add_change %u nodeid %d joined", cg->seq,
+			  memb->nodeid);
+	}
+
+	if (cg->we_joined) {
+		log_group(mg, "add_change %u we joined", cg->seq);
+		list_for_each_entry(memb, &cg->members, list)
+			node_history_init(mg, memb->nodeid, cg);
+	}
+
+	log_group(mg, "add_change %u member %d joined %d remove %d failed %d",
+		  cg->seq, cg->member_count, cg->joined_count, cg->remove_count,
+		  cg->failed_count);
+
+	list_add(&cg->list, &mg->changes);
+	*cg_out = cg;
+	return 0;
+
+ fail_nomem:
+	log_error("no memory");
+	error = -ENOMEM;
+ fail:
+	free_cg(cg);
+	return error;
+}
+
+static int we_left(struct cpg_address *left_list, int left_list_entries)
+{
+	int i;
+
+	for (i = 0; i < left_list_entries; i++) {
+		if (left_list[i].nodeid == our_nodeid)
+			return 1;
+	}
+	return 0;
+}
+
+static void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		       struct cpg_address *member_list, int member_list_entries,
+		       struct cpg_address *left_list, int left_list_entries,
+		       struct cpg_address *joined_list, int joined_list_entries)
+{
+	struct mountgroup *mg;
+	struct change *cg;
+	int rv;
+
+	mg = find_mg_handle(handle);
+	if (!mg) {
+		log_error("confchg_cb no mountgroup for cpg %s",
+			  group_name->value);
+		return;
+	}
+
+	if (mg->leaving && we_left(left_list, left_list_entries)) {
+		/* we called cpg_leave(), and this should be the final
+		   cpg callback we receive */
+		log_group(mg, "confchg for our leave");
+		cpg_finalize(mg->cpg_handle);
+		client_dead(mg->cpg_client);
+		list_del(&mg->list);
+		free_mg(mg);
+		return;
+	}
+
+	rv = add_change(mg, member_list, member_list_entries,
+			left_list, left_list_entries,
+			joined_list, joined_list_entries, &cg);
+	if (rv)
+		return;
+
+	process_mountgroup(mg);
+}
+
+static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
+		       uint32_t nodeid, uint32_t pid, void *data, int len)
+{
+	struct mountgroup *mg;
+	struct gfs_header *hd;
+
+	mg = find_mg_handle(handle);
+	if (!mg) {
+		log_error("deliver_cb no mg for cpg %s", group_name->value);
+		return;
+	}
+
+	hd = (struct gfs_header *)data;
+
+	hd->version[0]  = le16_to_cpu(hd->version[0]);
+	hd->version[1]  = le16_to_cpu(hd->version[1]);
+	hd->version[2]  = le16_to_cpu(hd->version[2]);
+	hd->type        = le16_to_cpu(hd->type);
+	hd->nodeid      = le32_to_cpu(hd->nodeid);
+	hd->to_nodeid   = le32_to_cpu(hd->to_nodeid);
+	hd->global_id   = le32_to_cpu(hd->global_id);
+	hd->flags       = le32_to_cpu(hd->flags);
+	hd->msgdata     = le32_to_cpu(hd->msgdata);
+
+	if (hd->version[0] != protocol_active[0]) {
+		log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
+			  nodeid, hd->version[0], hd->version[1],
+			  hd->version[2], protocol_active[0],
+			  protocol_active[1], protocol_active[2]);
+		return;
+	}
+
+	if (hd->nodeid != nodeid) {
+		log_error("bad msg nodeid %d %d", hd->nodeid, nodeid);
+		return;
+	}
+
+	switch (hd->type) {
+	case GFS_MSG_START:
+		receive_start(mg, hd, len);
+		break;
+	case GFS_MSG_MOUNT_DONE:
+		if (!mg->started_count)
+			save_message(mg, hd, len);
+		else
+			receive_mount_done(mg, hd, len);
+		break;
+	case GFS_MSG_FIRST_RECOVERY_DONE:
+		receive_first_recovery_done(mg, hd, len);
+		break;
+	case GFS_MSG_RECOVERY_RESULT:
+		if (!mg->started_count)
+			save_message(mg, hd, len);
+		else
+			receive_recovery_result(mg, hd, len);
+		break;
+	default:
+		log_error("unknown msg type %d", hd->type);
+	}
+
+	process_mountgroup(mg);
+}
+
+static cpg_callbacks_t cpg_callbacks = {
+	.cpg_deliver_fn = deliver_cb,
+	.cpg_confchg_fn = confchg_cb,
+};
+
+static void process_mountgroup_cpg(int ci)
+{
+	struct mountgroup *mg;
+	cpg_error_t error;
+
+	mg = find_mg_ci(ci);
+	if (!mg) {
+		log_error("process_mountgroup_cpg no mountgroup for ci %d", ci);
+		return;
+	}
+
+	error = cpg_dispatch(mg->cpg_handle, CPG_DISPATCH_ALL);
+	if (error != CPG_OK) {
+		log_error("cpg_dispatch error %d", error);
+		return;
+	}
+
+	update_flow_control_status();
+}
+
+int gfs_join_mountgroup(struct mountgroup *mg)
+{
+	cpg_error_t error;
+	cpg_handle_t h;
+	struct cpg_name name;
+	int i = 0, fd, ci;
+
+	error = cpg_initialize(&h, &cpg_callbacks);
+	if (error != CPG_OK) {
+		log_error("cpg_initialize error %d", error);
+		goto fail;
+	}
+
+	cpg_fd_get(h, &fd);
+
+	ci = client_add(fd, process_mountgroup_cpg, NULL);
+
+	mg->cpg_handle = h;
+	mg->cpg_client = ci;
+	mg->cpg_fd = fd;
+	mg->kernel_stopped = 1;
+	mg->joining = 1;
+
+	memset(&name, 0, sizeof(name));
+	sprintf(name.value, "gfs:%s", mg->name);
+	name.length = strlen(name.value) + 1;
+
+	/* TODO: allow global_id to be set in cluster.conf? */
+	mg->id = cpgname_to_crc(name.value, name.length);
+
+ retry:
+	error = cpg_join(h, &name);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		sleep(1);
+		if (!(++i % 10))
+			log_error("cpg_join error retrying");
+		goto retry;
+	}
+	if (error != CPG_OK) {
+		log_error("cpg_join error %d", error);
+		cpg_finalize(h);
+		goto fail_client;
+	}
+
+	return 0;
+
+ fail_client:
+	client_dead(ci);
+	cpg_finalize(h);
+ fail:
+	return -ENOTCONN;
+}
+
+void gfs_leave_mountgroup(char *mgname, int mnterr)
+{
+	struct mountgroup *mg;
+	cpg_error_t error;
+	struct cpg_name name;
+	int i = 0;
+
+	mg = find_mg(mgname);
+	if (!mg) {
+		log_error("leave: %s not found", mgname);
+		return;
+	}
+
+	mg->leaving = 1;
+
+	memset(&name, 0, sizeof(name));
+	sprintf(name.value, "gfs:%s", mg->name);
+	name.length = strlen(name.value) + 1;
+
+ retry:
+	error = cpg_leave(mg->cpg_handle, &name);
+	if (error == CPG_ERR_TRY_AGAIN) {
+		sleep(1);
+		if (!(++i % 10))
+			log_error("cpg_leave error retrying");
+		goto retry;
+	}
+	if (error != CPG_OK)
+		log_error("cpg_leave error %d", error);
+}
+
+int setup_cpg(void)
+{
+	cpg_error_t error;
+
+	error = cpg_initialize(&libcpg_handle, &cpg_callbacks);
+	if (error != CPG_OK) {
+		log_error("setup_cpg cpg_initialize error %d", error);
+		return -1;
+	}
+
+	/* join "gfs_controld" cpg to interact with other daemons in
+	   the cluster before we start processing uevents?  Could this
+	   also help in handling transient partitions? */
+
+	return 0;
+}
+
+int setup_dlmcontrol(void)
+{
+	dlmcontrol_fd = dlmc_fs_connect();
+	return dlmcontrol_fd;
+}
+
+#if 0
+int set_mountgroup_info(struct mountgroup *mg, struct gfsc_mountgroup *mountgroup)
+{
+	struct change *cg, *last = NULL;
+
+	strncpy(mountgroup->name, mg->name, GFS_LOCKSPACE_LEN);
+	mountgroup->global_id = mg->id;
+
+	if (mg->joining)
+		mountgroup->flags |= GFSC_LF_JOINING;
+	if (mg->leaving)
+		mountgroup->flags |= GFSC_LF_LEAVING;
+	if (mg->kernel_stopped)
+		mountgroup->flags |= GFSC_LF_KERNEL_STOPPED;
+	if (mg->fs_registered)
+		mountgroup->flags |= GFSC_LF_FS_REGISTERED;
+	if (mg->need_plocks)
+		mountgroup->flags |= GFSC_LF_NEED_PLOCKS;
+	if (mg->save_plocks)
+		mountgroup->flags |= GFSC_LF_SAVE_PLOCKS;
+
+	if (!mg->started_change)
+		goto next;
+
+	cg = mg->started_change;
+
+	mountgroup->cg_prev.member_count = cg->member_count;
+	mountgroup->cg_prev.joined_count = cg->joined_count;
+	mountgroup->cg_prev.remove_count = cg->remove_count;
+	mountgroup->cg_prev.failed_count = cg->failed_count;
+	mountgroup->cg_prev.combined_seq = cg->combined_seq;
+	mountgroup->cg_prev.seq = cg->seq;
+
+ next:
+	if (list_empty(&mg->changes))
+		goto out;
+
+	list_for_each_entry(cg, &mg->changes, list)
+		last = cg;
+
+	cg = list_first_entry(&mg->changes, struct change, list);
+
+	mountgroup->cg_next.member_count = cg->member_count;
+	mountgroup->cg_next.joined_count = cg->joined_count;
+	mountgroup->cg_next.remove_count = cg->remove_count;
+	mountgroup->cg_next.failed_count = cg->failed_count;
+	mountgroup->cg_next.combined_seq = last->seq;
+	mountgroup->cg_next.seq = cg->seq;
+
+	if (cg->state == CGST_WAIT_CONDITIONS)
+		mountgroup->cg_next.wait_condition = 4;
+	if (poll_fencing)
+		mountgroup->cg_next.wait_condition = 1;
+	else if (poll_quorum)
+		mountgroup->cg_next.wait_condition = 2;
+	else if (poll_fs)
+		mountgroup->cg_next.wait_condition = 3;
+
+	if (cg->state == CGST_WAIT_MESSAGES)
+		mountgroup->cg_next.wait_messages = 1;
+ out:
+	return 0;
+}
+
+static int _set_node_info(struct mountgroup *mg, struct change *cg, int nodeid,
+			  struct gfsc_node *node)
+{
+	struct member *m = NULL;
+	struct node *n;
+
+	node->nodeid = nodeid;
+
+	if (cg)
+		m = find_memb(cg, nodeid);
+	if (!m)
+		goto history;
+
+	node->flags |= GFSC_NF_MEMBER;
+
+	if (m->start)
+		node->flags |= GFSC_NF_START;
+	if (m->disallowed)
+		node->flags |= GFSC_NF_DISALLOWED;
+
+ history:
+	n = get_node_history(mg, nodeid);
+	if (!n)
+		goto out;
+
+	if (n->check_fencing)
+		node->flags |= GFSC_NF_CHECK_FENCING;
+	if (n->check_quorum)
+		node->flags |= GFSC_NF_CHECK_QUORUM;
+	if (n->check_fs)
+		node->flags |= GFSC_NF_CHECK_FS;
+	if (n->fs_notified)
+		node->flags |= GFSC_NF_FS_NOTIFIED;
+
+	node->added_seq = n->added_seq;
+	node->removed_seq = n->removed_seq;
+	node->failed_reason = n->failed_reason;
+ out:
+	return 0;
+}
+
+int set_node_info(struct mountgroup *mg, int nodeid, struct gfsc_node *node)
+{
+	struct change *cg;
+
+	if (!list_empty(&mg->changes)) {
+		cg = list_first_entry(&mg->changes, struct change, list);
+		return _set_node_info(mg, cg, nodeid, node);
+	}
+
+	return _set_node_info(mg, mg->started_change, nodeid, node);
+}
+
+int set_mountgroups(int *count, struct gfsc_mountgroup **mgs_out)
+{
+	struct mountgroup *mg;
+	struct gfsc_mountgroup *mgs, *mgp;
+	int mg_count = 0;
+
+	list_for_each_entry(mg, &mountgroups, list)
+		mg_count++;
+
+	mgs = malloc(mg_count * sizeof(struct gfsc_mountgroup));
+	if (!mgs)
+		return -ENOMEM;
+	memset(mgs, 0, mg_count * sizeof(struct gfsc_mountgroup));
+
+	mgp = mgs;
+	list_for_each_entry(mg, &mountgroups, list) {
+		set_mountgroup_info(mg, mgp++);
+	}
+
+	*count = mg_count;
+	*mgs_out = mgs;
+	return 0;
+}
+
+int set_mountgroup_nodes(struct mountgroup *mg, int option, int *node_count,
+                        struct gfsc_node **nodes_out)
+{
+	struct change *cg;
+	struct node *n;
+	struct gfsc_node *nodes = NULL, *nodep;
+	struct member *memb;
+	int count = 0;
+
+	if (option == GFSC_NODES_ALL) {
+		if (!list_empty(&mg->changes))
+			cg = list_first_entry(&mg->changes, struct change,list);
+		else
+			cg = mg->started_change;
+
+		list_for_each_entry(n, &mg->node_history, list)
+			count++;
+
+	} else if (option == GFSC_NODES_MEMBERS) {
+		if (!mg->started_change)
+			goto out;
+		cg = mg->started_change;
+		count = cg->member_count;
+
+	} else if (option == GFSC_NODES_NEXT) {
+		if (list_empty(&mg->changes))
+			goto out;
+		cg = list_first_entry(&mg->changes, struct change, list);
+		count = cg->member_count;
+	} else
+		goto out;
+
+	nodes = malloc(count * sizeof(struct gfsc_node));
+	if (!nodes)
+		return -ENOMEM;
+	memset(nodes, 0, count * sizeof(struct gfsc_node));
+	nodep = nodes;
+
+	if (option == GFSC_NODES_ALL) {
+		list_for_each_entry(n, &mg->node_history, list)
+			_set_node_info(mg, cg, n->nodeid, nodep++);
+	} else {
+		list_for_each_entry(memb, &cg->members, list)
+			_set_node_info(mg, cg, memb->nodeid, nodep++);
+	}
+ out:
+	*node_count = count;
+	*nodes_out = nodes;
+	return 0;
+}
+#endif
diff --git a/group/gfs_controld/cpg-old.c b/group/gfs_controld/cpg-old.c
index f4db7c0..8808957 100644
--- a/group/gfs_controld/cpg-old.c
+++ b/group/gfs_controld/cpg-old.c
@@ -3,6 +3,14 @@
 #include "cpg-old.h"
 #include "libgroup.h"
 
+#define ASSERT(x) \
+do { \
+	if (!(x)) { \
+		log_error("Assertion failed on line %d of file %s\n" \
+			  "Assertion:  \"%s\"\n", __LINE__, __FILE__, #x); \
+	} \
+} while (0)
+
 #define JID_INIT	-9
 
 /* mg_member opts bit field */
@@ -51,8 +59,6 @@ struct mg_member {
 
 extern group_handle_t gh;
 
-int message_flow_control_on;
-
 /* cpg message protocol
    1.0.0 is initial version
    2.0.0 is incompatible with 1.0.0 and allows plock ownership */
@@ -61,8 +67,7 @@ static unsigned int protocol_v200[3] = {2, 0, 0};
 static unsigned int protocol_active[3];
 
 static struct list_head withdrawn_mounts;
-static cpg_handle_t	daemon_handle;
-static struct cpg_name	daemon_name;
+static struct cpg_name daemon_name;
 
 
 static void send_journals(struct mountgroup *mg, int nodeid);
@@ -136,7 +141,7 @@ int send_group_message_old(struct mountgroup *mg, int len, char *buf)
 	hd->to_nodeid	= cpu_to_le32(hd->to_nodeid);
 	memcpy(hd->name, mg->name, strlen(mg->name));
 
-	return _send_message(daemon_handle, buf, len, type);
+	return _send_message(libcpg_handle, buf, len, type);
 }
 
 static struct mg_member *find_memb_nodeid(struct mountgroup *mg, int nodeid)
@@ -425,7 +430,7 @@ static void assign_next_first_mounter(struct mountgroup *mg)
 
 #define SEND_MS_INTS 4
 
-static void send_mount_status(struct mountgroup *mg)
+void send_mount_status_old(struct mountgroup *mg)
 {
 	struct gdlm_header *hd;
 	int len, *p;
@@ -702,7 +707,7 @@ static void receive_remount(struct mountgroup *mg, char *buf, int len, int from)
 	if (from == our_nodeid) {
 		if (!result) {
 			mg->rw = memb->rw;
-			mg->readonly = memb->readonly;
+			mg->ro = memb->readonly;
 		}
 		client_reply_remount(mg, result);
 	}
@@ -717,7 +722,7 @@ static void set_our_memb_options(struct mountgroup *mg)
 	memb = find_memb_nodeid(mg, our_nodeid);
 	ASSERT(memb);
 
-	if (mg->readonly) {
+	if (mg->ro) {
 		memb->readonly = 1;
 		memb->opts |= MEMB_OPT_RO;
 	} else if (mg->spectator) {
@@ -1447,126 +1452,19 @@ static void recover_members(struct mountgroup *mg, int num_nodes,
 	}
 }
 
-int join_mountgroup_old(int ci, struct gfsc_mount_args *ma)
+int gfs_join_mountgroup_old(struct mountgroup *mg, struct gfsc_mount_args *ma)
 {
-	struct mountgroup *mg = NULL;
-	char table2[PATH_MAX];
-	char *cluster = NULL, *name = NULL;
 	int rv;
 
-	log_debug("join: %s %s %s %s %s %s",
-		  ma->dir, ma->type, ma->proto, ma->table,
-		  ma->options, ma->dev);
-
-	if (strcmp(ma->proto, "lock_dlm")) {
-		log_error("join: lockproto %s not supported", ma->proto);
-		rv = -EPROTONOSUPPORT;
-		goto fail;
-	}
-
-	if (strstr(ma->options, "jid=") ||
-	    strstr(ma->options, "first=") ||
-	    strstr(ma->options, "id=")) {
-		log_error("join: jid, first and id are reserved options");
-		rv = -EOPNOTSUPP;
-		goto fail;
-	}
-
-	/* table is <cluster>:<name> */
-
-	memset(table2, 0, sizeof(table2));
-	strncpy(table2, ma->table, sizeof(table2));
-
-	name = strstr(table2, ":");
-	if (!name) {
-		rv = -EBADFD;
-		goto fail;
-	}
-
-	*name = '\0';
-	name++;
-	cluster = table2;
-
-	if (strlen(name) > GFS_MOUNTGROUP_LEN) {
-		rv = -ENAMETOOLONG;
-		goto fail;
-	}
-
-	mg = find_mg(name);
-	if (mg) {
-		if (strcmp(mg->mount_args.dev, ma->dev)) {
-			log_error("different fs dev %s with same name",
-				  mg->mount_args.dev);
-			rv = -EADDRINUSE;
-		} else if (mg->reject_mounts) {
-			/* fs is being unmounted */
-			log_error("join: reject mount due to unmount");
-			rv = -ESTALE;
-		} else if (mg->mount_client || !mg->kernel_mount_done) {
-			log_error("join: other mount in progress %d %d",
-				  mg->mount_client, mg->kernel_mount_done);
-			rv = -EBUSY;
-		} else {
-			log_group(mg, "join: already mounted");
-			rv = -EALREADY;
-		}
-		goto fail;
-	}
-
-	mg = create_mg(name);
-	if (!mg) {
-		rv = -ENOMEM;
-		goto fail;
-	}
-	mg->mount_client = ci;
-	memcpy(&mg->mount_args, ma, sizeof(struct gfsc_mount_args));
-
-	if (strlen(cluster) != strlen(clustername) ||
-	    strlen(cluster) == 0 || strcmp(cluster, clustername)) {
-		log_error("join: fs requires cluster=\"%s\" current=\"%s\"",
-			  cluster, clustername);
-		rv = -EBADR;
-		goto fail_free;
-	}
-	log_group(mg, "join: cluster name matches: %s", clustername);
-
-	if (strstr(ma->options, "spectator")) {
-		log_group(mg, "join: spectator mount");
-		mg->spectator = 1;
-	} else {
-		if (!we_are_in_fence_domain()) {
-			log_error("join: not in default fence domain");
-			rv = -ENOANO;
-			goto fail_free;
-		}
-	}
-
-	if (!mg->spectator && strstr(ma->options, "rw"))
-		mg->rw = 1;
-	else if (strstr(ma->options, "ro")) {
-		if (mg->spectator) {
-			log_error("join: readonly invalid with spectator");
-			rv = -EROFS;
-			goto fail_free;
-		}
-		mg->readonly = 1;
-	}
-
 	if (strlen(ma->options) > MAX_OPTIONS_LEN-1) {
-		rv = -EMLINK;
-		log_error("mount: options too long %zu", strlen(ma->options));
-		goto fail_free;
+		log_error("join: options too long %zu", strlen(ma->options));
+		return -EMLINK;
 	}
 
-	list_add(&mg->list, &mountgroups);
-	group_join(gh, name);
+	rv = group_join(gh, mg->name);
+	if (rv)
+		return -ENOTCONN;
 	return 0;
-
- fail_free:
-	free(mg);
- fail:
-	client_reply_join(ci, ma, rv);
-	return rv;
 }
 
 /* recover_members() discovers which nodes need journal recovery
@@ -1594,7 +1492,7 @@ static void recover_journals(struct mountgroup *mg)
 	int rv;
 
 	if (mg->spectator ||
-	    mg->readonly ||
+	    mg->ro ||
 	    mg->withdraw ||
 	    mg->our_jid == JID_INIT ||
 	    mg->kernel_mount_error ||
@@ -1603,7 +1501,7 @@ static void recover_journals(struct mountgroup *mg)
 	    !mg->kernel_mount_done) {
 		log_group(mg, "recover_journals: unable %d,%d,%d,%d,%d,%d,%d,%d",
 			  mg->spectator,
-			  mg->readonly,
+			  mg->ro,
 			  mg->withdraw,
 			  mg->our_jid,
 			  mg->kernel_mount_error,
@@ -1732,7 +1630,7 @@ static int need_kernel_recovery_done(struct mountgroup *mg)
    remain blocked until an rw node mounts, and the next mounter must
    be rw. */
 
-int kernel_recovery_done_old(char *table)
+int process_recovery_uevent_old(char *table)
 {
 	struct mountgroup *mg;
 	struct mg_member *memb;
@@ -1848,7 +1746,7 @@ int remount_mountgroup_old(int ci, struct gfsc_mount_args *ma)
 	}
 
 	/* no change */
-	if ((mg->readonly && ro) || (mg->rw && rw))
+	if ((mg->ro && ro) || (mg->rw && rw))
 		return 1;
 
 	mg->remount_client = ci;
@@ -1856,12 +1754,9 @@ int remount_mountgroup_old(int ci, struct gfsc_mount_args *ma)
 	return 0;
 }
 
-int leave_mountgroup_old(char *table, int mnterr)
+void gfs_leave_mountgroup_old(char *name, int mnterr)
 {
 	struct mountgroup *mg;
-	char *name = strstr(table, ":") + 1;
-
-	log_debug("leave: %s mnterr %d", name, mnterr);
 
 	list_for_each_entry(mg, &withdrawn_mounts, list) {
 		if (strcmp(mg->name, name))
@@ -1870,36 +1765,24 @@ int leave_mountgroup_old(char *table, int mnterr)
 		log_group(mg, "leave: for withdrawn fs");
 		list_del(&mg->list);
 		free(mg);
-		return 0;
+		return;
 	}
 
 	mg = find_mg(name);
 	if (!mg) {
 		log_error("leave: %s not found", name);
-		return -1;
+		return;
 	}
 
-	if (mnterr) {
-		/* sanity check: we should already have gotten the error from
-		   the mount_result message sent by mount.gfs */
-		if (!mg->kernel_mount_error) {
-			log_group(mg, "leave: mount_error is new %d %d",
-				  mg->kernel_mount_error, mnterr);
-			mg->kernel_mount_error = mnterr;
-			mg->kernel_mount_done = 1;
-		}
-		goto out;
-	}
+	/* sanity check: we should already have gotten the error from
+	   the mount.gfs mount_done; so this shouldn't happen */
 
-	if (mg->withdraw) {
-		log_error("leave: %s is withdrawing", name);
-		return -1;
+	if (mnterr && !mg->kernel_mount_error) {
+		log_error("leave: mount_error is new %d %d",
+			  mg->kernel_mount_error, mnterr);
 	}
 
-	if (!mg->kernel_mount_done) {
-		log_error("leave: %s is still mounting", name);
-		return -1;
-	}
+	mg->leaving = 1;
 
 	/* Check to see if we're waiting for a kernel recovery_done to do a
 	   start_done().  If so, call the start_done() here because we won't be
@@ -1909,48 +1792,8 @@ int leave_mountgroup_old(char *table, int mnterr)
 		log_group(mg, "leave: fill in start_done");
 		start_done(mg);
 	}
- out:
-	mg->reject_mounts = 1;
-	group_leave(gh, mg->name);
-	return 0;
-}
-
-void ping_kernel_mount_old(char *table)
-{
-	struct mountgroup *mg;
-	char *name = strstr(table, ":") + 1;
-	int rv, val;
 
-	mg = find_mg(name);
-	if (!mg)
-		return;
-
-	rv = read_sysfs_int(mg, "id", &val);
-
-	log_group(mg, "ping_kernel_mount %d", rv);
-}
-
-void mount_done_old(struct gfsc_mount_args *ma, int result)
-{
-	struct mountgroup *mg;
-	char *name = strstr(ma->table, ":") + 1;
-
-	mg = find_mg(name);
-	if (!mg) {
-		log_error("mount_done: %s not found", ma->table);
-		return;
-	}
-
-	log_group(mg, "mount_done: result %d first_mounter %d",
-		  result, mg->first_mounter);
-
-	mg->mount_client = 0;
-	mg->mount_client_fd = 0;
-
-	mg->kernel_mount_done = 1;
-	mg->kernel_mount_error = result;
-
-	send_mount_status(mg);
+	group_leave(gh, mg->name);
 }
 
 /* When mounting a fs, we first join the mountgroup, then tell mount.gfs
@@ -2201,11 +2044,11 @@ static void start_first_mounter(struct mountgroup *mg)
 	memb = find_memb_nodeid(mg, our_nodeid);
 	ASSERT(memb);
 
-	if (mg->readonly || mg->spectator) {
+	if (mg->ro || mg->spectator) {
 		memb->jid = -2;
 		mg->our_jid = -2;
 		log_group(mg, "start_first_mounter not rw ro=%d spect=%d",
-			  mg->readonly, mg->spectator);
+			  mg->ro , mg->spectator);
 		mg->mount_client_result = -EUCLEAN;
 	} else {
 		memb->opts |= MEMB_OPT_RECOVER;
@@ -2590,35 +2433,11 @@ static cpg_callbacks_t callbacks = {
 	.cpg_confchg_fn = confchg_cb,
 };
 
-void update_flow_control_status(void)
-{
-	cpg_flow_control_state_t flow_control_state;
-	cpg_error_t error;
-
-	error = cpg_flow_control_state_get(daemon_handle, &flow_control_state);
-	if (error != CPG_OK) {
-		log_error("cpg_flow_control_state_get %d", error);
-		return;
-	}
-
-	if (flow_control_state == CPG_FLOW_CONTROL_ENABLED) {
-		if (message_flow_control_on == 0) {
-			log_debug("flow control on");
-		}
-		message_flow_control_on = 1;
-	} else {
-		if (message_flow_control_on) {
-			log_debug("flow control off");
-		}
-		message_flow_control_on = 0;
-	}
-}
-
 void process_cpg_old(int ci)
 {
 	cpg_error_t error;
 
-	error = cpg_dispatch(daemon_handle, CPG_DISPATCH_ALL);
+	error = cpg_dispatch(libcpg_handle, CPG_DISPATCH_ALL);
 	if (error != CPG_OK) {
 		log_error("cpg_dispatch error %d", error);
 		return;
@@ -2639,13 +2458,13 @@ int setup_cpg_old(void)
 	else
 		memcpy(protocol_active, protocol_v100, sizeof(protocol_v100));
 
-	error = cpg_initialize(&daemon_handle, &callbacks);
+	error = cpg_initialize(&libcpg_handle, &callbacks);
 	if (error != CPG_OK) {
 		log_error("cpg_initialize error %d", error);
 		return -1;
 	}
 
-	cpg_fd_get(daemon_handle, &fd);
+	cpg_fd_get(libcpg_handle, &fd);
 	if (fd < 0) {
 		log_error("cpg_fd_get error %d", error);
 		return -1;
@@ -2656,7 +2475,7 @@ int setup_cpg_old(void)
 	daemon_name.length = 12;
 
  retry:
-	error = cpg_join(daemon_handle, &daemon_name);
+	error = cpg_join(libcpg_handle, &daemon_name);
 	if (error == CPG_ERR_TRY_AGAIN) {
 		log_debug("setup_cpg cpg_join retry");
 		sleep(1);
@@ -2664,7 +2483,7 @@ int setup_cpg_old(void)
 	}
 	if (error != CPG_OK) {
 		log_error("cpg_join error %d", error);
-		cpg_finalize(daemon_handle);
+		cpg_finalize(libcpg_handle);
 		return -1;
 	}
 
diff --git a/group/gfs_controld/cpg-old.h b/group/gfs_controld/cpg-old.h
index 2fe3720..e343faf 100644
--- a/group/gfs_controld/cpg-old.h
+++ b/group/gfs_controld/cpg-old.h
@@ -23,7 +23,7 @@ enum {
 	MSG_PLOCK_SYNC_WAITER,
 };
 
-/* These lengths are part of the wire protocol. */
+/* These lengths are part of the "old" wire protocol. */
 
 #define MAX_OPTIONS_LEN		1024
 #define MSG_NAMELEN		255
diff --git a/group/dlm_controld/crc.c b/group/gfs_controld/crc.c
similarity index 86%
copy from group/dlm_controld/crc.c
copy to group/gfs_controld/crc.c
index ff8c1d3..5dd09c7 100644
--- a/group/dlm_controld/crc.c
+++ b/group/gfs_controld/crc.c
@@ -1,4 +1,16 @@
-#include "dlm_daemon.h"
+/******************************************************************************
+*******************************************************************************
+**
+**  Copyright (C) 2008 Red Hat, Inc.  All rights reserved.
+**
+**  This copyrighted material is made available to anyone wishing to use,
+**  modify, copy, or redistribute it subject to the terms and conditions
+**  of the GNU General Public License v.2.
+**
+*******************************************************************************
+******************************************************************************/
+
+#include "gfs_daemon.h"
 
 static const uint32_t crc_32_tab[] = {
   0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
diff --git a/group/gfs_controld/gfs_daemon.h b/group/gfs_controld/gfs_daemon.h
index d538640..58fa69e 100644
--- a/group/gfs_controld/gfs_daemon.h
+++ b/group/gfs_controld/gfs_daemon.h
@@ -34,8 +34,8 @@
 #include <openais/saAis.h>
 #include <openais/saCkpt.h>
 #include <openais/cpg.h>
-
 #include <linux/dlmconstants.h>
+
 #include "libgfscontrol.h"
 #include "gfs_controld.h"
 #include "list.h"
@@ -58,6 +58,7 @@
 
 extern int daemon_debug_opt;
 extern int daemon_quit;
+extern int poll_dlm;
 extern int poll_ignore_plock;
 extern int plock_fd;
 extern int plock_ci;
@@ -72,6 +73,8 @@ extern int dump_wrap;
 extern char plock_dump_buf[GFSC_DUMP_SIZE];
 extern int plock_dump_len;
 extern int dmsetup_wait;
+extern cpg_handle_t libcpg_handle;
+extern int libcpg_flow_control_on;
 
 void daemon_dump_save(void);
 
@@ -103,14 +106,6 @@ do { \
 	syslog(LOG_ERR, fmt, ##args); \
 } while (0)
 
-#define ASSERT(x) \
-do { \
-	if (!(x)) { \
-		log_error("Assertion failed on line %d of file %s\n" \
-			  "Assertion:  \"%s\"\n", __LINE__, __FILE__, #x); \
-	} \
-} while (0)
-
 struct mountgroup {
 	struct list_head	list;
 	uint32_t		id;
@@ -119,7 +114,6 @@ struct mountgroup {
 	int			old_group_mode;
 
 	int			mount_client;
-	int			mount_client_fd;
 	int			mount_client_result;
 	int			mount_client_notified;
 	int			mount_client_delay;
@@ -128,6 +122,34 @@ struct mountgroup {
 	int			withdraw;
 	int			dmsetup_wait;
 	pid_t			dmsetup_pid;
+	int			our_jid;
+	int			spectator;
+	int			ro;
+	int			rw;
+	int                     joining;
+	int                     leaving;
+	int			kernel_mount_error;
+	int			kernel_mount_done;
+	int			first_mounter;
+
+	/* cpg-new stuff */
+
+	cpg_handle_t            cpg_handle;
+	int                     cpg_client;
+	int                     cpg_fd;
+	int                     kernel_stopped;
+	uint32_t                change_seq;
+	uint32_t                started_count;
+	struct change           *started_change;
+	struct list_head        changes;
+	struct list_head        node_history;
+	struct list_head	journals;
+	int			dlm_notify_nodeid;
+	int			first_recovery_needed;
+	int			first_recovery_master;
+	int			first_recovery_msg;
+	int			local_recovery_jid;
+	int			local_recovery_busy;
 
 	/* cpg-old stuff for rhel5/stable2 compat */
 
@@ -145,23 +167,15 @@ struct mountgroup {
 	int			got_our_options;
 	int			got_our_journals;
 	int			delay_send_journals;
-	int			kernel_mount_error;
-	int			kernel_mount_done;
-	int			got_kernel_mount;
 	int			first_mount_pending_stop;
-	int			first_mounter;
 	int			first_mounter_done;
 	int			global_first_recover_done;
 	int			emulate_first_mounter;
 	int			wait_first_done;
+	int			needs_recovery;
 	int			low_nodeid;
 	int			master_nodeid;
-	int			reject_mounts;
-	int			needs_recovery;
-	int			our_jid;
-	int			spectator;
-	int			readonly;
-	int			rw;
+	int			got_kernel_mount;
 	struct list_head	saved_messages;
 	void			*start2_fn;
 
@@ -185,25 +199,34 @@ struct mountgroup {
 void read_ccs(void);
 void read_ccs_nodir(struct mountgroup *mg, char *buf);
 
+/* cpg-new.c */
+int setup_cpg(void);
+int setup_dlmcontrol(void);
+void process_dlmcontrol(int ci);
+void process_recovery_uevent(char *table);
+void process_mountgroups(void);
+int gfs_join_mountgroup(struct mountgroup *mg);
+void gfs_leave_mountgroup(char *name, int mnterr);
+void gfs_mount_done(struct mountgroup *mg);
+
 /* cpg-old.c */
 int setup_cpg_old(void);
 void process_cpg_old(int ci);
+int gfs_join_mountgroup_old(struct mountgroup *mg, struct gfsc_mount_args *ma);
+void gfs_leave_mountgroup_old(char *name, int mnterr);
 int send_group_message_old(struct mountgroup *mg, int len, char *buf);
 void save_message_old(struct mountgroup *mg, char *buf, int len, int from,
 		      int type);
 void send_withdraw_old(struct mountgroup *mg);
+int process_recovery_uevent_old(char *table);
 void ping_kernel_mount_old(char *table);
-int join_mountgroup_old(int ci, struct gfsc_mount_args *ma);
-int kernel_recovery_done_old(char *table);
 int remount_mountgroup_old(int ci, struct gfsc_mount_args *ma);
-int leave_mountgroup_old(char *table, int mnterr);
-void mount_done_old(struct gfsc_mount_args *ma, int result);
+void send_mount_status_old(struct mountgroup *mg);
 int do_stop(struct mountgroup *mg);
 int do_finish(struct mountgroup *mg);
 void do_start(struct mountgroup *mg, int type, int member_count, int *nodeids);
 int do_terminate(struct mountgroup *mg);
 int do_withdraw_old(char *table);
-void update_flow_control_status(void);
 
 /* group.c */
 int setup_groupd(void);
@@ -252,5 +275,6 @@ int set_sysfs(struct mountgroup *mg, char *field, int val);
 int read_sysfs_int(struct mountgroup *mg, char *field, int *val_out);
 int run_dmsetup_suspend(struct mountgroup *mg, char *dev);
 void update_dmsetup_wait(void);
+void update_flow_control_status(void);
 
 #endif
diff --git a/group/gfs_controld/main.c b/group/gfs_controld/main.c
index 342e17f..8fed0d2 100644
--- a/group/gfs_controld/main.c
+++ b/group/gfs_controld/main.c
@@ -23,6 +23,8 @@ struct client {
 	struct mountgroup *mg;
 };
 
+static void do_leave(char *table, int mnterr);
+
 int do_read(int fd, void *buf, size_t count)
 {
 	int rv, off = 0;
@@ -162,6 +164,9 @@ struct mountgroup *create_mg(char *name)
 	INIT_LIST_HEAD(&mg->members_gone);
 	INIT_LIST_HEAD(&mg->plock_resources);
 	INIT_LIST_HEAD(&mg->saved_messages);
+	INIT_LIST_HEAD(&mg->changes);
+	INIT_LIST_HEAD(&mg->journals);
+	INIT_LIST_HEAD(&mg->node_history);
 	mg->init = 1;
 	mg->master_nodeid = -1;
 	mg->low_nodeid = -1;
@@ -226,6 +231,21 @@ static char *get_args(char *buf, int *argc, char **argv, char sep, int want)
 	return rp;
 }
 
+static void ping_kernel_mount(char *table)
+{
+	struct mountgroup *mg;
+	char *name = strstr(table, ":") + 1;
+	int rv, val;
+
+	mg = find_mg(name);
+	if (!mg)
+		return;
+
+	rv = read_sysfs_int(mg, "id", &val);
+
+	log_group(mg, "ping_kernel_mount %d", rv);
+}
+
 static void process_uevent(int ci)
 {
 	char buf[MAXLINE];
@@ -278,14 +298,16 @@ static void process_uevent(int ci)
 			return;
 
 		if (group_mode == GROUP_LIBGROUP)
-			leave_mountgroup_old(argv[3], 0);
+			do_leave(argv[3], 0);
 
 	} else if (!strcmp(act, "change@")) {
 		if (!lock_module)
 			return;
 
 		if (group_mode == GROUP_LIBGROUP)
-			kernel_recovery_done_old(argv[3]);
+			process_recovery_uevent_old(argv[3]);
+		else
+			process_recovery_uevent(argv[3]);
 
 	} else if (!strcmp(act, "offline@")) {
 		if (!lock_module)
@@ -298,8 +320,7 @@ static void process_uevent(int ci)
 		if (!lock_module)
 			return;
 
-		if (group_mode == GROUP_LIBGROUP)
-			ping_kernel_mount_old(argv[3]);
+		ping_kernel_mount(argv[3]);
 	}
 }
 
@@ -425,7 +446,7 @@ void client_reply_remount(struct mountgroup *mg, int result)
 {
 	struct gfsc_mount_args *ma = &mg->mount_args;
 
-	log_group(mg, "remount_reply ci %d result %d",
+	log_group(mg, "client_reply_remount ci %d result %d",
 		  mg->remount_client, result);
 
 	do_reply(client[mg->remount_client].fd, GFSC_CMD_FS_REMOUNT,
@@ -438,7 +459,7 @@ void client_reply_join(int ci, struct gfsc_mount_args *ma, int result)
 {
 	char *name = strstr(ma->table, ":") + 1;
 
-	log_debug("join_reply %s ci %d result %d", name, ci, result);
+	log_debug("client_reply_join %s ci %d result %d", name, ci, result);
 
 	do_reply(client[ci].fd, GFSC_CMD_FS_JOIN,
 		 name, result, ma, sizeof(struct gfsc_mount_args));
@@ -467,12 +488,171 @@ void client_reply_join_full(struct mountgroup *mg, int result)
 	if (nodir_str[0])
 		strcat(mg->mount_args.hostdata, nodir_str);
  out:
-	log_group(mg, "join_full_reply ci %d result %d hostdata %s",
+	log_group(mg, "client_reply_join_full ci %d result %d hostdata %s",
 		  mg->mount_client, result, mg->mount_args.hostdata);
 
 	client_reply_join(mg->mount_client, &mg->mount_args, result);
 }
 
+static void do_join(int ci, struct gfsc_mount_args *ma)
+{
+	struct mountgroup *mg = NULL;
+	char table2[PATH_MAX];
+	char *cluster = NULL, *name = NULL;
+	int rv;
+
+	log_debug("join: %s %s %s %s %s %s", ma->dir, ma->type, ma->proto,
+		  ma->table, ma->options, ma->dev);
+
+	if (strcmp(ma->proto, "lock_dlm")) {
+		log_error("join: lockproto %s not supported", ma->proto);
+		rv = -EPROTONOSUPPORT;
+		goto fail;
+	}
+
+	if (strstr(ma->options, "jid=") ||
+	    strstr(ma->options, "first=") ||
+	    strstr(ma->options, "id=")) {
+		log_error("join: jid, first and id are reserved options");
+		rv = -EOPNOTSUPP;
+		goto fail;
+	}
+
+	/* table is <cluster>:<name> */
+
+	memset(table2, 0, sizeof(table2));
+	strncpy(table2, ma->table, sizeof(table2));
+
+	name = strstr(table2, ":");
+	if (!name) {
+		rv = -EBADFD;
+		goto fail;
+	}
+
+	*name = '\0';
+	name++;
+	cluster = table2;
+
+	if (strlen(name) > GFS_MOUNTGROUP_LEN) {
+		rv = -ENAMETOOLONG;
+		goto fail;
+	}
+
+	mg = find_mg(name);
+	if (mg) {
+		if (strcmp(mg->mount_args.dev, ma->dev)) {
+			log_error("different fs dev %s with same name",
+				  mg->mount_args.dev);
+			rv = -EADDRINUSE;
+		} else if (mg->leaving) {
+			/* we're leaving the group */
+			log_error("join: reject mount due to unmount");
+			rv = -ESTALE;
+		} else if (mg->mount_client || !mg->kernel_mount_done) {
+			log_error("join: other mount in progress %d %d",
+				  mg->mount_client, mg->kernel_mount_done);
+			rv = -EBUSY;
+		} else {
+			log_group(mg, "join: already mounted");
+			rv = -EALREADY;
+		}
+		goto fail;
+	}
+
+	mg = create_mg(name);
+	if (!mg) {
+		rv = -ENOMEM;
+		goto fail;
+	}
+	mg->mount_client = ci;
+	memcpy(&mg->mount_args, ma, sizeof(struct gfsc_mount_args));
+
+	if (strlen(cluster) != strlen(clustername) ||
+	    strlen(cluster) == 0 || strcmp(cluster, clustername)) {
+		log_error("join: fs requires cluster=\"%s\" current=\"%s\"",
+			  cluster, clustername);
+		rv = -EBADR;
+		goto fail_free;
+	}
+	log_group(mg, "join: cluster name matches: %s", clustername);
+
+	if (strstr(ma->options, "spectator")) {
+		log_group(mg, "join: spectator mount");
+		mg->spectator = 1;
+	} else {
+		if (!we_are_in_fence_domain()) {
+			log_error("join: not in default fence domain");
+			rv = -ENOANO;
+			goto fail_free;
+		}
+	}
+
+	if (!mg->spectator && strstr(ma->options, "rw"))
+		mg->rw = 1;
+	else if (strstr(ma->options, "ro")) {
+		if (mg->spectator) {
+			log_error("join: readonly invalid with spectator");
+			rv = -EROFS;
+			goto fail_free;
+		}
+		mg->ro = 1;
+	}
+
+	list_add(&mg->list, &mountgroups);
+
+	if (group_mode == GROUP_LIBGROUP)
+		rv = gfs_join_mountgroup_old(mg, ma);
+	else
+		rv = gfs_join_mountgroup(mg);
+
+	if (rv) {
+		log_error("join: group join error %d", rv);
+		list_del(&mg->list);
+		goto fail_free;
+	}
+	return;
+
+ fail_free:
+	free(mg);
+ fail:
+	client_reply_join(ci, ma, rv);
+}
+
+static void do_leave(char *table, int mnterr)
+{
+	char *name = strstr(table, ":") + 1;
+
+	log_debug("leave: %s mnterr %d", name, mnterr);
+
+	if (group_mode == GROUP_LIBGROUP)
+		gfs_leave_mountgroup_old(name, mnterr);
+	else
+		gfs_leave_mountgroup(name, mnterr);
+}
+
+static void do_mount_done(char *table, int result)
+{
+	struct mountgroup *mg;
+	char *name = strstr(table, ":") + 1;
+
+	log_debug("mount_done: %s result %d", name, result);
+
+	mg = find_mg(name);
+	if (!mg) {
+		log_error("mount_done: %s not found", name);
+		return;
+	}
+
+	mg->mount_client = 0;
+	mg->kernel_mount_done = 1;
+	mg->kernel_mount_error = result;
+
+	if (group_mode == GROUP_LIBGROUP)
+		send_mount_status_old(mg);
+	else
+		gfs_mount_done(mg);
+}
+
 void process_connection(int ci)
 {
 	struct gfsc_header h;
@@ -530,39 +710,24 @@ void process_connection(int ci)
 	switch (h.command) {
 
 	case GFSC_CMD_FS_JOIN:
-		if (group_mode == GROUP_LIBGROUP)
-			join_mountgroup_old(ci, ma);
-		/*
-		else
-			join_mountgroup(ci, ma);
-		*/
-		break;
-
-	case GFSC_CMD_FS_REMOUNT:
-		if (group_mode == GROUP_LIBGROUP)
-			remount_mountgroup_old(ci, ma);
-		/*
-		else
-			remount_mountgroup(ci, ma);
-		*/
+		do_join(ci, ma);
 		break;
 
 	case GFSC_CMD_FS_LEAVE:
-		if (group_mode == GROUP_LIBGROUP)
-			leave_mountgroup_old(ma->table, h.data);
-		/*
-		else
-			leave_mountgroup(ma->table, h.data);
-		*/
+		do_leave(ma->table, h.data);
 		break;
 
 	case GFSC_CMD_FS_MOUNT_DONE:
+		do_mount_done(ma->table, h.data);
+		break;
+
+	case GFSC_CMD_FS_REMOUNT:
 		if (group_mode == GROUP_LIBGROUP)
-			mount_done_old(ma, h.data);
-		/*
+			remount_mountgroup_old(ci, ma);
+#if 0
 		else
-			mount_done(ma, h.data);
-		*/
+			remount_mountgroup(ci, ma);
+#endif
 		break;
 
 	default:
@@ -708,6 +873,11 @@ static void cluster_dead(int ci)
 	exit(1);
 }
 
+static void dead_dlmcontrol(int ci)
+{
+	log_error("dlm_controld poll error %x", pollfd[ci].revents);
+}
+
 static int loop(void)
 {
 	int poll_timeout = -1;
@@ -715,6 +885,9 @@ static int loop(void)
 	void (*workfn) (int ci);
 	void (*deadfn) (int ci);
 
+	/* FIXME: add code that looks for uncontrolled instances of
+	   gfs filesystems in the kernel */
+
 	rv = setup_queries();
 	if (rv < 0)
 		goto out;
@@ -754,20 +927,24 @@ static int loop(void)
 	if (group_mode == GROUP_LIBCPG) {
 
 		/*
-		 * code in: cpg_new.c
+		 * The new, good, way of doing things using libcpg directly.
+		 * code in: cpg-new.c
 		 */
 
-		/*
-		rv = setup_cpg_new();
+		rv = setup_cpg();
 		if (rv < 0)
 			goto out;
-		client_add(rv, process_cpg_new, cluster_dead);
-		*/
+
+		rv = setup_dlmcontrol();
+		if (rv < 0)
+			goto out;
+		client_add(rv, process_dlmcontrol, dead_dlmcontrol);
 
 	} else if (group_mode == GROUP_LIBGROUP) {
 
 		/*
-		 * code in: cpg_old.c group.c recover.c plock.c
+		 * The old, bad, way of doing things using libgroup.
+		 * code in: cpg-old.c group.c plock.c
 		 */
 
 		rv = setup_cpg_old();
@@ -814,13 +991,11 @@ static int loop(void)
 
 		poll_timeout = -1;
 
-#if 0
 		if (poll_dlm) {
 			/* only happens for GROUP_LIBCPG */
-			process_mountgroup_changes();
-			poll_timeout = 1000;
+			process_mountgroups();
+			poll_timeout = 500;
 		}
-#endif
 
 		if (poll_ignore_plock) {
 			/* only happens for GROUP_LIBGROUP */
@@ -1089,6 +1264,7 @@ void daemon_dump_save(void)
 int daemon_debug_opt;
 int daemon_quit;
 int poll_ignore_plock;
+int poll_dlm;
 int plock_fd;
 int plock_ci;
 struct list_head mountgroups;
@@ -1102,4 +1278,6 @@ int dump_wrap;
 char plock_dump_buf[GFSC_DUMP_SIZE];
 int plock_dump_len;
 int dmsetup_wait;
+cpg_handle_t libcpg_handle;
+int libcpg_flow_control_on;
 
diff --git a/group/gfs_controld/plock.c b/group/gfs_controld/plock.c
index bb01237..0b485a2 100644
--- a/group/gfs_controld/plock.c
+++ b/group/gfs_controld/plock.c
@@ -27,8 +27,6 @@ static char section_buf[1024 * 1024];
 static uint32_t section_len;
 static int need_fsid_translation = 0;
 
-extern int message_flow_control_on;
-
 struct pack_plock {
 	uint64_t start;
 	uint64_t end;
@@ -1592,9 +1590,9 @@ int limit_plocks(void)
 
 	/* Don't send more messages while the cpg message queue is backed up */
 
-	if (message_flow_control_on) {
+	if (libcpg_flow_control_on) {
 		update_flow_control_status();
-		if (message_flow_control_on)
+		if (libcpg_flow_control_on)
 			return 1;
 	}
 
diff --git a/group/gfs_controld/util.c b/group/gfs_controld/util.c
index 13eee5f..7c93934 100644
--- a/group/gfs_controld/util.c
+++ b/group/gfs_controld/util.c
@@ -1,6 +1,30 @@
 #include "gfs_daemon.h"
 #include "libfenced.h"
 
+void update_flow_control_status(void)
+{
+	cpg_flow_control_state_t flow_control_state;
+	cpg_error_t error;
+
+	error = cpg_flow_control_state_get(libcpg_handle, &flow_control_state);
+	if (error != CPG_OK) {
+		log_error("cpg_flow_control_state_get %d", error);
+		return;
+	}
+
+	if (flow_control_state == CPG_FLOW_CONTROL_ENABLED) {
+		if (libcpg_flow_control_on == 0) {
+			log_debug("flow control on");
+		}
+		libcpg_flow_control_on = 1;
+	} else {
+		if (libcpg_flow_control_on) {
+			log_debug("flow control off");
+		}
+		libcpg_flow_control_on = 0;
+	}
+}
+
 int we_are_in_fence_domain(void)
 {
 	struct fenced_node nodeinfo;
@@ -47,7 +71,10 @@ int set_sysfs(struct mountgroup *mg, char *field, int val)
 	rv = write(fd, out, strlen(out));
 
 	close(fd);
-	return 0;
+
+	if (rv)
+		rv = 0;
+	return rv;
 }
 
 static int get_sysfs(struct mountgroup *mg, char *field, char *buf, int len)


hooks/post-receive
--
Cluster Project



More information about the Cluster-cvs mailing list