From: Alasdair Kergon Date: Fri, 2 Feb 2007 17:08:51 +0000 (+0000) Subject: Improve dmeventd messaging protocol: drain pipe and tag messages. X-Git-Tag: v1_02_18~1 X-Git-Url: https://sourceware.org/git/?a=commitdiff_plain;h=c404a13466c53b823f4556a9a02bbda34c6b2354;p=dm.git Improve dmeventd messaging protocol: drain pipe and tag messages. --- diff --git a/WHATS_NEW b/WHATS_NEW index 7359398..7b64fa0 100644 --- a/WHATS_NEW +++ b/WHATS_NEW @@ -1,5 +1,6 @@ Version 1.02.18 - =================================== + Improve dmeventd messaging protocol: drain pipe and tag messages. Version 1.02.17 - 29th January 2007 =================================== diff --git a/dmeventd/dmeventd.c b/dmeventd/dmeventd.c index 536d647..f1462b3 100644 --- a/dmeventd/dmeventd.c +++ b/dmeventd/dmeventd.c @@ -146,6 +146,7 @@ static LIST_INIT(_dso_registry); /* Structure to keep parsed register variables from client message. */ struct message_data { + char *id; char *dso_name; /* Name of DSO. */ char *device_uuid; /* Mapped device path. */ union { @@ -320,6 +321,8 @@ static int _fetch_string(char **ptr, char **src, const int delimiter) /* Free message memory. */ static void _free_message(struct message_data *message_data) { + if (message_data->id) + dm_free(message_data->id); if (message_data->dso_name) dm_free(message_data->dso_name); @@ -342,7 +345,8 @@ static int _parse_message(struct message_data *message_data) * Retrieve application identifier, mapped device * path and events # string from message. */ - if (_fetch_string(&message_data->dso_name, &p, ' ') && + if (_fetch_string(&message_data->id, &p, ' ') && + _fetch_string(&message_data->dso_name, &p, ' ') && _fetch_string(&message_data->device_uuid, &p, ' ') && _fetch_string(&message_data->events.str, &p, ' ') && _fetch_string(&message_data->timeout.str, &p, ' ')) { @@ -875,8 +879,8 @@ static struct dso_data *_load_dso(struct message_data *data) syslog(LOG_ERR, "dmeventd %s dlopen failed: %s", data->dso_name, dlerr); data->msg->size = - dm_asprintf(&(data->msg->data), "%s dlopen failed: %s", - data->dso_name, dlerr); + dm_asprintf(&(data->msg->data), "%s %s dlopen failed: %s", + data->id, data->dso_name, dlerr); return NULL; } @@ -1056,7 +1060,8 @@ static int _registered_device(struct message_data *message_data, { struct dm_event_daemon_message *msg = message_data->msg; - const char *fmt = "%s %s %u"; + const char *fmt = "%s %s %s %u"; + const char *id = message_data->id; const char *dso = thread->dso_data->dso_name; const char *dev = thread->device.uuid; unsigned events = ((thread->status == DM_THREAD_RUNNING) @@ -1066,7 +1071,7 @@ static int _registered_device(struct message_data *message_data, if (msg->data) dm_free(msg->data); - msg->size = dm_asprintf(&(msg->data), fmt, dso, dev, events); + msg->size = dm_asprintf(&(msg->data), fmt, id, dso, dev, events); _unlock_mutex(); @@ -1180,7 +1185,8 @@ static int _get_timeout(struct message_data *message_data) _lock_mutex(); if ((thread = _lookup_thread_status(message_data))) { msg->size = - dm_asprintf(&(msg->data), "%" PRIu32, thread->timeout); + dm_asprintf(&(msg->data), "%s %" PRIu32, message_data->id, + thread->timeout); } else { msg->data = NULL; msg->size = 0; @@ -1375,17 +1381,32 @@ static int _handle_request(struct dm_event_daemon_message *msg, static int _do_process_request(struct dm_event_daemon_message *msg) { int ret; + char *answer; static struct message_data message_data; /* Parse the message. */ memset(&message_data, 0, sizeof(message_data)); message_data.msg = msg; - if (msg->cmd != DM_EVENT_CMD_ACTIVE && !_parse_message(&message_data)) { + if (msg->cmd == DM_EVENT_CMD_HELLO) { + ret = 0; + answer = dm_strdup(msg->data); + if (answer) { + msg->size = dm_asprintf(&(msg->data), "%s HELLO", answer); + dm_free(answer); + } else { + msg->size = 0; + msg->data = NULL; + } + } else if (msg->cmd != DM_EVENT_CMD_ACTIVE && !_parse_message(&message_data)) { stack; ret = -EINVAL; } else ret = _handle_request(msg, &message_data); + msg->cmd = ret; + if (!msg->data) + msg->size = dm_asprintf(&(msg->data), "%s %s", message_data.id, strerror(-ret)); + _free_message(&message_data); return ret; @@ -1405,16 +1426,9 @@ static void _process_request(struct dm_event_fifos *fifos) if (!_client_read(fifos, &msg)) return; - msg.cmd = _do_process_request(&msg); - if (!msg.data) { - msg.data = dm_strdup(strerror(-msg.cmd)); - if (msg.data) - msg.size = strlen(msg.data) + 1; - else { - msg.size = 0; - stack; - } - } + /* _do_process_request fills in msg (if memory allows for + data, otherwise just cmd and size = 0) */ + _do_process_request(&msg); if (!_client_write(fifos, &msg)) stack; diff --git a/dmeventd/dmeventd.h b/dmeventd/dmeventd.h index 084d136..d7474b8 100644 --- a/dmeventd/dmeventd.h +++ b/dmeventd/dmeventd.h @@ -20,6 +20,7 @@ enum dm_event_command { DM_EVENT_CMD_GET_NEXT_REGISTERED_DEVICE, DM_EVENT_CMD_SET_TIMEOUT, DM_EVENT_CMD_GET_TIMEOUT, + DM_EVENT_CMD_HELLO, }; /* Message passed between client and daemon. */ diff --git a/dmeventd/libdevmapper-event.c b/dmeventd/libdevmapper-event.c index 6a1d31c..d2f8ac5 100644 --- a/dmeventd/libdevmapper-event.c +++ b/dmeventd/libdevmapper-event.c @@ -30,6 +30,8 @@ #include #include /* for htonl, ntohl */ +static int _sequence_nr = 0; + struct dm_event_handler { char *dso; @@ -182,6 +184,21 @@ enum dm_event_mask dm_event_handler_get_event_mask(const struct dm_event_handler return dmevh->mask; } +static int _check_message_id(struct dm_event_daemon_message *msg) +{ + int pid, seq_nr; + + if ((sscanf(msg->data, "%d:%d", &pid, &seq_nr) != 2) || + (pid != getpid()) || (seq_nr != _sequence_nr)) { + log_error("Ignoring out-of-sequence reply from dmeventd. " + "Expected %d:%d but received %s", getpid(), + _sequence_nr, msg->data); + return 0; + } + + return 1; +} + /* * daemon_read * @fifos @@ -260,11 +277,28 @@ static int _daemon_write(struct dm_event_fifos *fifos, size_t size = 2 * sizeof(uint32_t) + msg->size; char *buf = alloca(size); + char drainbuf[128]; + struct timeval tval = { 0, 0 }; *((uint32_t *)buf) = htonl(msg->cmd); *((uint32_t *)buf + 1) = htonl(msg->size); memcpy(buf + 2 * sizeof(uint32_t), msg->data, msg->size); + /* drain the answer fifo */ + while (1) { + FD_ZERO(&fds); + FD_SET(fifos->server, &fds); + tval.tv_usec = 100; + ret = select(fifos->server + 1, &fds, NULL, NULL, &tval); + if ((ret < 0) && (errno != EINTR)) { + log_error("Unable to talk to event daemon"); + return 0; + } + if (ret == 0) + break; + read(fifos->server, drainbuf, 127); + } + while (bytes < size) { do { /* Watch daemon write FIFO to be ready for output. */ @@ -301,7 +335,7 @@ static int _daemon_talk(struct dm_event_fifos *fifos, { const char *dso = dso_name ? dso_name : ""; const char *dev = dev_name ? dev_name : ""; - const char *fmt = "%s %s %u %" PRIu32; + const char *fmt = "%d:%d %s %s %u %" PRIu32; int msg_size; memset(msg, 0, sizeof(*msg)); @@ -310,8 +344,10 @@ static int _daemon_talk(struct dm_event_fifos *fifos, * into ASCII message string. */ msg->cmd = cmd; - if ((msg_size = dm_asprintf(&(msg->data), fmt, dso, dev, evmask, - timeout)) < 0) { + if (cmd == DM_EVENT_CMD_HELLO) + fmt = "%d:%d HELLO"; + if ((msg_size = dm_asprintf(&(msg->data), fmt, getpid(), _sequence_nr, + dso, dev, evmask, timeout)) < 0) { log_error("_daemon_talk: message allocation failed"); return -ENOMEM; } @@ -326,10 +362,14 @@ static int _daemon_talk(struct dm_event_fifos *fifos, return -EIO; } - if (!_daemon_read(fifos, msg)) { - stack; - return -EIO; - } + do { + if (!_daemon_read(fifos, msg)) { + stack; + return -EIO; + } + } while (!_check_message_id(msg)); + + _sequence_nr++; return (int32_t) msg->cmd; } @@ -507,7 +547,9 @@ static int _do_event(int cmd, struct dm_event_daemon_message *msg, return -ESRCH; } - ret = _daemon_talk(&fifos, msg, cmd, dso_name, dev_name, evmask, timeout); + ret = _daemon_talk(&fifos, msg, DM_EVENT_CMD_HELLO, 0, 0, 0, 0); + if (!ret) + ret = _daemon_talk(&fifos, msg, cmd, dso_name, dev_name, evmask, timeout); /* what is the opposite of init? */ _dtr_client(&fifos); @@ -521,7 +563,7 @@ int dm_event_register_handler(const struct dm_event_handler *dmevh) int ret = 1, err; const char *uuid; struct dm_task *dmt; - struct dm_event_daemon_message msg; + struct dm_event_daemon_message msg = { 0, 0, NULL }; if (!(dmt = _get_device_info(dmevh))) { stack; @@ -551,7 +593,7 @@ int dm_event_unregister_handler(const struct dm_event_handler *dmevh) int ret = 1, err; const char *uuid; struct dm_task *dmt; - struct dm_event_daemon_message msg; + struct dm_event_daemon_message msg = { 0, 0, NULL }; if (!(dmt = _get_device_info(dmevh))) { stack; @@ -598,15 +640,20 @@ static char *_fetch_string(char **src, const int delimiter) static int _parse_message(struct dm_event_daemon_message *msg, char **dso_name, char **uuid, enum dm_event_mask *evmask) { + char *id = NULL; char *p = msg->data; - if ((*dso_name = _fetch_string(&p, ' ')) && + if ((id = _fetch_string(&p, ' ')) && + (*dso_name = _fetch_string(&p, ' ')) && (*uuid = _fetch_string(&p, ' '))) { *evmask = atoi(p); + dm_free(id); return 0; } + if (id) + dm_free(id); return -ENOMEM; } @@ -621,12 +668,12 @@ static int _parse_message(struct dm_event_daemon_message *msg, char **dso_name, */ int dm_event_get_registered_device(struct dm_event_handler *dmevh, int next) { - int ret; + int ret = 0; const char *uuid = NULL; char *reply_dso = NULL, *reply_uuid = NULL; - enum dm_event_mask reply_mask; - struct dm_task *dmt; - struct dm_event_daemon_message msg; + enum dm_event_mask reply_mask = 0; + struct dm_task *dmt = NULL; + struct dm_event_daemon_message msg = { 0, 0, NULL }; if (!(dmt = _get_device_info(dmevh))) { stack; @@ -696,9 +743,17 @@ int dm_event_get_registered_device(struct dm_event_handler *dmevh, int next) #if 0 /* left out for now */ +static char *_skip_string(char *src, const int delimiter) +{ + src = srtchr(src, delimiter); + if (src && *(src + 1)) + return src + 1; + return NULL; +} + int dm_event_set_timeout(const char *device_path, uint32_t timeout) { - struct dm_event_daemon_message msg; + struct dm_event_daemon_message msg = { 0, 0, NULL }; if (!device_exists(device_path)) return -ENODEV; @@ -710,13 +765,20 @@ int dm_event_set_timeout(const char *device_path, uint32_t timeout) int dm_event_get_timeout(const char *device_path, uint32_t *timeout) { int ret; - struct dm_event_daemon_message msg; + struct dm_event_daemon_message msg = { 0, 0, NULL }; if (!device_exists(device_path)) return -ENODEV; if (!(ret = _do_event(DM_EVENT_CMD_GET_TIMEOUT, &msg, NULL, device_path, - 0, 0))) - *timeout = atoi(msg.data); + 0, 0))) { + char *p = _skip_string(msg.data, ' '); + if (!p) { + log_error("malformed reply from dmeventd '%s'\n", + msg.data); + return -EIO; + } + *timeout = atoi(p); + } if (msg.data) dm_free(msg.data); return ret;