master - gfs_controld: add protocol negotiation
David Teigland
teigland@fedoraproject.org
Thu Sep 25 04:27:00 GMT 2008
Gitweb: http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=51610f70d7bc79c3e35a7f9b6eded687db47d89e
Commit: 51610f70d7bc79c3e35a7f9b6eded687db47d89e
Parent: f873ac30082420d148c39a44f3c73106f579c278
Author: David Teigland <teigland@redhat.com>
AuthorDate: Wed Sep 24 15:57:55 2008 -0500
Committer: David Teigland <teigland@redhat.com>
CommitterDate: Wed Sep 24 16:04:10 2008 -0500
gfs_controld: add protocol negotiation
For both daemon and kernel protocols, although the kernel protocol is
not connected in any way to the actual kernel version yet.
Signed-off-by: David Teigland <teigland@redhat.com>
---
group/gfs_controld/cpg-new.c | 499 ++++++++++++++++++++++++++++++++++++---
group/gfs_controld/gfs_daemon.h | 1 +
group/gfs_controld/main.c | 4 +
3 files changed, 472 insertions(+), 32 deletions(-)
diff --git a/group/gfs_controld/cpg-new.c b/group/gfs_controld/cpg-new.c
index 2352682..1f0a6ec 100644
--- a/group/gfs_controld/cpg-new.c
+++ b/group/gfs_controld/cpg-new.c
@@ -18,25 +18,23 @@
uint32_t cpgname_to_crc(const char *data, int len);
-static unsigned int protocol_active[3] = {1, 0, 0};
-static int dlmcontrol_fd;
-
/* gfs_header types */
enum {
- GFS_MSG_START = 1,
- GFS_MSG_MOUNT_DONE = 2,
- GFS_MSG_FIRST_RECOVERY_DONE = 3,
- GFS_MSG_RECOVERY_RESULT = 4,
- GFS_MSG_REMOUNT = 5,
- GFS_MSG_WITHDRAW = 6,
- GFS_MSG_WITHDRAW_ACK = 7,
+ GFS_MSG_PROTOCOL = 1,
+ GFS_MSG_START = 2,
+ GFS_MSG_MOUNT_DONE = 3,
+ GFS_MSG_FIRST_RECOVERY_DONE = 4,
+ GFS_MSG_RECOVERY_RESULT = 5,
+ GFS_MSG_REMOUNT = 6,
+ GFS_MSG_WITHDRAW = 7,
+ GFS_MSG_WITHDRAW_ACK = 8,
};
/* gfs_header flags */
#define GFS_MFLG_JOINING 1 /* accompanies start, we are joining */
struct gfs_header {
- uint16_t version[3];
+ uint16_t version[3]; /* daemon_run protocol */
uint16_t type; /* GFS_MSG_ */
uint32_t nodeid; /* sender */
uint32_t to_nodeid; /* recipient, 0 for all */
@@ -47,6 +45,32 @@ struct gfs_header {
uint64_t pad2;
};
+struct protocol_version {
+ uint16_t major;
+ uint16_t minor;
+ uint16_t patch;
+ uint16_t flags;
+};
+
+struct protocol {
+ union {
+ struct protocol_version dm_ver;
+ uint16_t daemon_max[4];
+ };
+ union {
+ struct protocol_version km_ver;
+ uint16_t kernel_max[4];
+ };
+ union {
+ struct protocol_version dr_ver;
+ uint16_t daemon_run[4];
+ };
+ union {
+ struct protocol_version kr_ver;
+ uint16_t kernel_run[4];
+ };
+};
+
/* mg_info and id_info: for syncing state in start message */
struct mg_info {
@@ -113,6 +137,8 @@ struct node {
int withdraw;
int send_withdraw_ack;
+
+ struct protocol proto;
};
struct member {
@@ -152,6 +178,13 @@ struct save_msg {
char buf[0];
};
+static int dlmcontrol_fd;
+static int daemon_cpg_fd;
+static struct protocol our_protocol;
+static struct list_head daemon_nodes;
+static struct cpg_address daemon_member[MAX_NODES];
+static int daemon_member_count;
+
/*
cpg confchg's arrive telling us that mountgroup members have
joined/left/failed. A "change" struct is created for each confchg,
@@ -301,9 +334,9 @@ static void gfs_send_message(struct mountgroup *mg, char *buf, int len)
struct gfs_header *hd = (struct gfs_header *) buf;
int type = hd->type;
- hd->version[0] = cpu_to_le16(protocol_active[0]);
- hd->version[1] = cpu_to_le16(protocol_active[1]);
- hd->version[2] = cpu_to_le16(protocol_active[2]);
+ hd->version[0] = cpu_to_le16(our_protocol.daemon_run[0]);
+ hd->version[1] = cpu_to_le16(our_protocol.daemon_run[1]);
+ hd->version[2] = cpu_to_le16(our_protocol.daemon_run[2]);
hd->type = cpu_to_le16(hd->type);
hd->nodeid = cpu_to_le32(our_nodeid);
hd->to_nodeid = cpu_to_le32(hd->to_nodeid);
@@ -437,8 +470,10 @@ static void node_history_init(struct mountgroup *mg, int nodeid,
}
node = malloc(sizeof(struct node));
- if (!node)
+ if (!node) {
+ log_error("node_history_init no mem");
return;
+ }
out:
memset(node, 0, sizeof(struct node));
@@ -1256,6 +1291,7 @@ static void send_recovery_result(struct mountgroup *mg, int jid, int result)
buf = malloc(len);
if (!buf) {
+ log_error("send_recovery_result no mem %d", len);
return;
}
memset(buf, 0, len);
@@ -2456,11 +2492,13 @@ static void gfs_header_in(struct gfs_header *hd)
static int gfs_header_check(struct gfs_header *hd, int nodeid)
{
- if (hd->version[0] != protocol_active[0]) {
+ if (hd->version[0] != our_protocol.daemon_run[0] ||
+ hd->version[1] != our_protocol.daemon_run[1]) {
log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
nodeid, hd->version[0], hd->version[1],
- hd->version[2], protocol_active[0],
- protocol_active[1], protocol_active[2]);
+ hd->version[2], our_protocol.daemon_run[0],
+ our_protocol.daemon_run[1],
+ our_protocol.daemon_run[2]);
return -1;
}
@@ -2484,6 +2522,11 @@ static void deliver_cb(cpg_handle_t handle, struct cpg_name *group_name,
return;
}
+ if (len < sizeof(*hd)) {
+ log_error("deliver_cb short message %d", len);
+ return;
+ }
+
hd = (struct gfs_header *)data;
gfs_header_in(hd);
@@ -2579,7 +2622,7 @@ int gfs_join_mountgroup(struct mountgroup *mg)
mg->joining = 1;
memset(&name, 0, sizeof(name));
- sprintf(name.value, "gfs:%s", mg->name);
+ sprintf(name.value, "gfs:mount:%s", mg->name);
name.length = strlen(name.value) + 1;
/* TODO: allow global_id to be set in cluster.conf? */
@@ -2627,7 +2670,7 @@ static void leave_mountgroup(struct mountgroup *mg, int mnterr)
mg->leaving = 1;
memset(&name, 0, sizeof(name));
- sprintf(name.value, "gfs:%s", mg->name);
+ sprintf(name.value, "gfs:mount:%s", mg->name);
name.length = strlen(name.value) + 1;
retry:
@@ -2689,9 +2732,9 @@ static void send_withdraw_ack(struct mountgroup *mg, int nodeid)
memset(&h, 0, sizeof(h));
- h.version[0] = cpu_to_le16(protocol_active[0]);
- h.version[1] = cpu_to_le16(protocol_active[1]);
- h.version[2] = cpu_to_le16(protocol_active[2]);
+ h.version[0] = cpu_to_le16(our_protocol.daemon_run[0]);
+ h.version[1] = cpu_to_le16(our_protocol.daemon_run[1]);
+ h.version[2] = cpu_to_le16(our_protocol.daemon_run[2]);
h.type = cpu_to_le16(GFS_MSG_WITHDRAW_ACK);
h.nodeid = cpu_to_le32(our_nodeid);
h.to_nodeid = cpu_to_le32(nodeid);
@@ -2716,19 +2759,389 @@ static void send_withdraw_acks(struct mountgroup *mg)
}
}
+static struct node *get_node_daemon(int nodeid)
+{
+ struct node *node;
+
+ list_for_each_entry(node, &daemon_nodes, list) {
+ if (node->nodeid == nodeid)
+ return node;
+ }
+ return NULL;
+}
+
+static void add_node_daemon(int nodeid)
+{
+ struct node *node;
+
+ if (get_node_daemon(nodeid))
+ return;
+
+ node = malloc(sizeof(struct node));
+ if (!node) {
+ log_error("add_node_daemon no mem");
+ return;
+ }
+ memset(node, 0, sizeof(struct node));
+ node->nodeid = nodeid;
+ list_add_tail(&node->list, &daemon_nodes);
+}
+
+static void pv_in(struct protocol_version *pv)
+{
+ pv->major = le16_to_cpu(pv->major);
+ pv->minor = le16_to_cpu(pv->minor);
+ pv->patch = le16_to_cpu(pv->patch);
+ pv->flags = le16_to_cpu(pv->flags);
+}
+
+static void pv_out(struct protocol_version *pv)
+{
+ pv->major = cpu_to_le16(pv->major);
+ pv->minor = cpu_to_le16(pv->minor);
+ pv->patch = cpu_to_le16(pv->patch);
+ pv->flags = cpu_to_le16(pv->flags);
+}
+
+static void protocol_in(struct protocol *proto)
+{
+ pv_in(&proto->dm_ver);
+ pv_in(&proto->km_ver);
+ pv_in(&proto->dr_ver);
+ pv_in(&proto->kr_ver);
+}
+
+static void protocol_out(struct protocol *proto)
+{
+ pv_out(&proto->dm_ver);
+ pv_out(&proto->km_ver);
+ pv_out(&proto->dr_ver);
+ pv_out(&proto->kr_ver);
+}
+
+/* go through member list saved in last confchg, see if we have received a
+ proto message from each */
+
+static int all_protocol_messages(void)
+{
+ struct node *node;
+ int i;
+
+ if (!daemon_member_count)
+ return 0;
+
+ for (i = 0; i < daemon_member_count; i++) {
+ node = get_node_daemon(daemon_member[i].nodeid);
+ if (!node) {
+ log_error("all_protocol_messages no node %d",
+ daemon_member[i].nodeid);
+ return 0;
+ }
+
+ if (!node->proto.daemon_max[0])
+ return 0;
+ }
+ return 1;
+}
+
+static int pick_min_protocol(struct protocol *proto)
+{
+ uint16_t mind[4];
+ uint16_t mink[4];
+ struct node *node;
+ int i;
+
+ memset(&mind, 0, sizeof(mind));
+ memset(&mink, 0, sizeof(mink));
+
+ /* first choose the minimum major */
+
+ for (i = 0; i < daemon_member_count; i++) {
+ node = get_node_daemon(daemon_member[i].nodeid);
+ if (!node) {
+ log_error("pick_min_protocol no node %d",
+ daemon_member[i].nodeid);
+ return -1;
+ }
+
+ if (!mind[0] || node->proto.daemon_max[0] < mind[0])
+ mind[0] = node->proto.daemon_max[0];
+
+ if (!mink[0] || node->proto.kernel_max[0] < mink[0])
+ mink[0] = node->proto.kernel_max[0];
+ }
+
+ if (!mind[0] || !mink[0]) {
+ log_error("pick_min_protocol zero major number");
+ return -1;
+ }
+
+ /* second pick the minimum minor with the chosen major */
+
+ for (i = 0; i < daemon_member_count; i++) {
+ node = get_node_daemon(daemon_member[i].nodeid);
+ if (!node)
+ continue;
+
+ if (mind[0] == node->proto.daemon_max[0]) {
+ if (!mind[1] || node->proto.daemon_max[1] < mind[1])
+ mind[1] = node->proto.daemon_max[1];
+ }
+
+ if (mink[0] == node->proto.kernel_max[0]) {
+ if (!mink[1] || node->proto.kernel_max[1] < mink[1])
+ mink[1] = node->proto.kernel_max[1];
+ }
+ }
+
+ if (!mind[1] || !mink[1]) {
+ log_error("pick_min_protocol zero minor number");
+ return -1;
+ }
+
+ /* third pick the minimum patch with the chosen major.minor */
+
+ for (i = 0; i < daemon_member_count; i++) {
+ node = get_node_daemon(daemon_member[i].nodeid);
+ if (!node)
+ continue;
+
+ if (mind[0] == node->proto.daemon_max[0] &&
+ mind[1] == node->proto.daemon_max[1]) {
+ if (!mind[2] || node->proto.daemon_max[2] < mind[2])
+ mind[2] = node->proto.daemon_max[2];
+ }
+
+ if (mink[0] == node->proto.kernel_max[0] &&
+ mink[1] == node->proto.kernel_max[1]) {
+ if (!mink[2] || node->proto.kernel_max[2] < mink[2])
+ mink[2] = node->proto.kernel_max[2];
+ }
+ }
+
+ if (!mind[2] || !mink[2]) {
+ log_error("pick_min_protocol zero patch number");
+ return -1;
+ }
+
+ memcpy(&proto->daemon_run, &mind, sizeof(mind));
+ memcpy(&proto->kernel_run, &mink, sizeof(mink));
+ return 0;
+}
+
+static void receive_protocol(struct gfs_header *hd, int len)
+{
+ struct protocol *p;
+ struct node *node;
+
+ p = (struct protocol *)((char *)hd + sizeof(struct gfs_header));
+ protocol_in(p);
+
+ if (len < sizeof(struct gfs_header) + sizeof(struct protocol)) {
+ log_error("receive_protocol invalid len %d from %d",
+ len, hd->nodeid);
+ return;
+ }
+
+ /* zero is an invalid version value */
+
+ if (!p->daemon_max[0] || !p->daemon_max[1] || !p->daemon_max[2] ||
+ !p->kernel_max[0] || !p->kernel_max[1] || !p->kernel_max[2]) {
+ log_error("receive_protocol invalid max value from %d "
+ "daemon %u.%u.%u kernel %u.%u.%u", hd->nodeid,
+ p->daemon_max[0], p->daemon_max[1], p->daemon_max[2],
+ p->kernel_max[0], p->kernel_max[1], p->kernel_max[2]);
+ return;
+ }
+
+ /* the run values will be zero until a version is set, after
+ which none of the run values can be zero */
+
+ if (p->daemon_run[0] && (!p->daemon_run[1] || !p->daemon_run[2] ||
+ !p->kernel_run[0] || !p->kernel_run[1] || !p->kernel_run[2])) {
+ log_error("receive_protocol invalid run value from %d "
+ "daemon %u.%u.%u kernel %u.%u.%u", hd->nodeid,
+ p->daemon_run[0], p->daemon_run[1], p->daemon_run[2],
+ p->kernel_run[0], p->kernel_run[1], p->kernel_run[2]);
+ return;
+ }
+
+ /* if we have zero run values, and this msg has non-zero run values,
+ then adopt them as ours; otherwise save this proto message */
+
+ if (our_protocol.daemon_run[0])
+ return;
+
+ if (p->daemon_run[0]) {
+ memcpy(&our_protocol.daemon_run, &p->daemon_run,
+ sizeof(struct protocol_version));
+ memcpy(&our_protocol.kernel_run, &p->kernel_run,
+ sizeof(struct protocol_version));
+ log_debug("run protocol from nodeid %d", hd->nodeid);
+ return;
+ }
+
+ /* save this node's proto so we can tell when we've got all, and
+ use it to select a minimum protocol from all */
+
+ node = get_node_daemon(hd->nodeid);
+ if (!node) {
+ log_error("receive_protocol no node %d", hd->nodeid);
+ return;
+ }
+ memcpy(&node->proto, p, sizeof(struct protocol));
+}
+
+static void send_protocol(struct protocol *proto)
+{
+ struct gfs_header *hd;
+ struct protocol *pr;
+ char *buf;
+ int len;
+
+ len = sizeof(struct gfs_header) + sizeof(struct protocol);
+ buf = malloc(len);
+ if (!buf) {
+ log_error("send_protocol no mem %d", len);
+ return;
+ }
+ memset(buf, 0, len);
+
+ hd = (struct gfs_header *)buf;
+ pr = (struct protocol *)(buf + sizeof(*hd));
+
+ hd->type = cpu_to_le16(GFS_MSG_PROTOCOL);
+ hd->nodeid = cpu_to_le32(our_nodeid);
+
+ memcpy(pr, proto, sizeof(struct protocol));
+ protocol_out(pr);
+
+ _send_message(cpg_handle_daemon, buf, len, GFS_MSG_PROTOCOL);
+}
+
+int set_protocol(void)
+{
+ struct protocol proto;
+ struct pollfd pollfd;
+ int sent_proposal = 0;
+ int rv;
+
+ memset(&pollfd, 0, sizeof(pollfd));
+ pollfd.fd = daemon_cpg_fd;
+ pollfd.events = POLLIN;
+
+ while (1) {
+ if (our_protocol.daemon_run[0])
+ break;
+
+ if (!sent_proposal && all_protocol_messages()) {
+ /* propose a protocol; look through info from all
+ nodes and pick the min for both daemon and kernel,
+ and propose that */
+
+ sent_proposal = 1;
+
+ /* copy our max values */
+ memcpy(&proto, &our_protocol, sizeof(struct protocol));
+
+ rv = pick_min_protocol(&proto);
+ if (rv < 0)
+ return rv;
+
+ log_debug("set_protocol member_count %d propose "
+ "daemon %u.%u.%u kernel %u.%u.%u",
+ daemon_member_count,
+ proto.daemon_run[0], proto.daemon_run[1],
+ proto.daemon_run[2], proto.kernel_run[0],
+ proto.kernel_run[1], proto.kernel_run[2]);
+
+ send_protocol(&proto);
+ }
+
+ /* only process messages/events from daemon cpg until protocol
+ is established */
+
+ rv = poll(&pollfd, 1, -1);
+ if (rv == -1 && errno == EINTR) {
+ if (daemon_quit)
+ return -1;
+ continue;
+ }
+ if (rv < 0) {
+ log_error("set_protocol poll errno %d", errno);
+ return -1;
+ }
+
+ if (pollfd.revents & POLLIN)
+ process_cpg(0);
+ if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
+ log_error("set_protocol poll revents %u",
+ pollfd.revents);
+ return -1;
+ }
+ }
+
+ if (our_protocol.daemon_run[0] != our_protocol.daemon_max[0] ||
+ our_protocol.daemon_run[1] != our_protocol.daemon_max[1]) {
+ log_error("incompatible daemon protocol run %u.%u.%u max %u.%u.%u",
+ our_protocol.daemon_run[0],
+ our_protocol.daemon_run[1],
+ our_protocol.daemon_run[2],
+ our_protocol.daemon_max[0],
+ our_protocol.daemon_max[1],
+ our_protocol.daemon_max[2]);
+ return -1;
+ }
+
+ if (our_protocol.kernel_run[0] != our_protocol.kernel_max[0] ||
+ our_protocol.kernel_run[1] != our_protocol.kernel_max[1]) {
+ log_error("incompatible kernel protocol run %u.%u.%u max %u.%u.%u",
+ our_protocol.kernel_run[0],
+ our_protocol.kernel_run[1],
+ our_protocol.kernel_run[2],
+ our_protocol.kernel_max[0],
+ our_protocol.kernel_max[1],
+ our_protocol.kernel_max[2]);
+ return -1;
+ }
+
+ log_debug("daemon run %u.%u.%u max %u.%u.%u "
+ "kernel run %u.%u.%u max %u.%u.%u",
+ our_protocol.daemon_run[0],
+ our_protocol.daemon_run[1],
+ our_protocol.daemon_run[2],
+ our_protocol.daemon_max[0],
+ our_protocol.daemon_max[1],
+ our_protocol.daemon_max[2],
+ our_protocol.kernel_run[0],
+ our_protocol.kernel_run[1],
+ our_protocol.kernel_run[2],
+ our_protocol.kernel_max[0],
+ our_protocol.kernel_max[1],
+ our_protocol.kernel_max[2]);
+ return 0;
+}
+
static void deliver_cb_daemon(cpg_handle_t handle, struct cpg_name *group_name,
uint32_t nodeid, uint32_t pid, void *data, int len)
{
struct gfs_header *hd;
+ if (len < sizeof(*hd)) {
+ log_error("deliver_cb short message %d", len);
+ return;
+ }
+
hd = (struct gfs_header *)data;
gfs_header_in(hd);
- if (gfs_header_check(hd, nodeid) < 0)
- return;
-
switch (hd->type) {
+ case GFS_MSG_PROTOCOL:
+ receive_protocol(hd, len);
+ break;
case GFS_MSG_WITHDRAW_ACK:
+ if (gfs_header_check(hd, nodeid) < 0)
+ return;
receive_withdraw_ack(hd, len);
break;
default:
@@ -2741,6 +3154,18 @@ static void confchg_cb_daemon(cpg_handle_t handle, struct cpg_name *group_name,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
+ int i;
+
+ if (joined_list_entries)
+ send_protocol(&our_protocol);
+
+ memset(&daemon_member, 0, sizeof(daemon_member));
+ daemon_member_count = member_list_entries;
+
+ for (i = 0; i < member_list_entries; i++) {
+ daemon_member[i] = member_list[i];
+ add_node_daemon(member_list[i].nodeid);
+ }
}
static cpg_callbacks_t cpg_callbacks_daemon = {
@@ -2762,7 +3187,17 @@ int setup_cpg(void)
cpg_error_t error;
cpg_handle_t h;
struct cpg_name name;
- int i = 0, f;
+ int i = 0;
+
+ INIT_LIST_HEAD(&daemon_nodes);
+
+ memset(&our_protocol, 0, sizeof(our_protocol));
+ our_protocol.daemon_max[0] = 1;
+ our_protocol.daemon_max[1] = 1;
+ our_protocol.daemon_max[2] = 1;
+ our_protocol.kernel_max[0] = 1;
+ our_protocol.kernel_max[1] = 1;
+ our_protocol.kernel_max[2] = 1;
error = cpg_initialize(&h, &cpg_callbacks_daemon);
if (error != CPG_OK) {
@@ -2770,12 +3205,12 @@ int setup_cpg(void)
return -1;
}
- cpg_fd_get(h, &f);
+ cpg_fd_get(h, &daemon_cpg_fd);
cpg_handle_daemon = h;
memset(&name, 0, sizeof(name));
- sprintf(name.value, "gfs::daemon");
+ sprintf(name.value, "gfs:controld");
name.length = strlen(name.value) + 1;
retry:
@@ -2791,8 +3226,8 @@ int setup_cpg(void)
goto fail;
}
- log_debug("setup_cpg %d", f);
- return f;
+ log_debug("setup_cpg %d", daemon_cpg_fd);
+ return daemon_cpg_fd;
fail:
cpg_finalize(h);
@@ -2809,7 +3244,7 @@ void close_cpg(void)
return;
memset(&name, 0, sizeof(name));
- sprintf(name.value, "gfs::daemon");
+ sprintf(name.value, "gfs:controld");
name.length = strlen(name.value) + 1;
retry:
diff --git a/group/gfs_controld/gfs_daemon.h b/group/gfs_controld/gfs_daemon.h
index e022ccc..c3d4688 100644
--- a/group/gfs_controld/gfs_daemon.h
+++ b/group/gfs_controld/gfs_daemon.h
@@ -226,6 +226,7 @@ void close_cpg(void);
void process_cpg(int ci);
int setup_dlmcontrol(void);
void process_dlmcontrol(int ci);
+int set_protocol(void);
void process_recovery_uevent(char *table);
void process_mountgroups(void);
int gfs_join_mountgroup(struct mountgroup *mg);
diff --git a/group/gfs_controld/main.c b/group/gfs_controld/main.c
index f328a07..4d34657 100644
--- a/group/gfs_controld/main.c
+++ b/group/gfs_controld/main.c
@@ -1137,6 +1137,10 @@ static void loop(void)
goto out;
client_add(rv, process_cpg, cluster_dead);
+ rv = set_protocol();
+ if (rv < 0)
+ goto out;
+
rv = setup_dlmcontrol();
if (rv < 0)
goto out;
More information about the Cluster-cvs
mailing list