]> sourceware.org Git - dm.git/commitdiff
Improve dmeventd messaging protocol: drain pipe and tag messages.
authorAlasdair Kergon <agk@redhat.com>
Fri, 2 Feb 2007 17:08:51 +0000 (17:08 +0000)
committerAlasdair Kergon <agk@redhat.com>
Fri, 2 Feb 2007 17:08:51 +0000 (17:08 +0000)
WHATS_NEW
dmeventd/dmeventd.c
dmeventd/dmeventd.h
dmeventd/libdevmapper-event.c

index 7359398cddaa186c4cd62202d31400174e30c85f..7b64fa00bac391ab261ea510eb810b0b4ea753fe 100644 (file)
--- a/WHATS_NEW
+++ b/WHATS_NEW
@@ -1,5 +1,6 @@
 Version 1.02.18 -
 ===================================
+  Improve dmeventd messaging protocol: drain pipe and tag messages.
 
 Version 1.02.17 - 29th January 2007
 ===================================
index 536d6471c3600a9414873703f29af2328610f5b2..f1462b381d36b024a588d7a14bfc3998c3f2c12f 100644 (file)
@@ -146,6 +146,7 @@ static LIST_INIT(_dso_registry);
 
 /* Structure to keep parsed register variables from client message. */
 struct message_data {
+       char *id;
        char *dso_name;         /* Name of DSO. */
        char *device_uuid;      /* Mapped device path. */
        union {
@@ -320,6 +321,8 @@ static int _fetch_string(char **ptr, char **src, const int delimiter)
 /* Free message memory. */
 static void _free_message(struct message_data *message_data)
 {
+       if (message_data->id)
+               dm_free(message_data->id);
        if (message_data->dso_name)
                dm_free(message_data->dso_name);
 
@@ -342,7 +345,8 @@ static int _parse_message(struct message_data *message_data)
         * Retrieve application identifier, mapped device
         * path and events # string from message.
         */
-       if (_fetch_string(&message_data->dso_name, &p, ' ') &&
+       if (_fetch_string(&message_data->id, &p, ' ') &&
+           _fetch_string(&message_data->dso_name, &p, ' ') &&
            _fetch_string(&message_data->device_uuid, &p, ' ') &&
            _fetch_string(&message_data->events.str, &p, ' ') &&
            _fetch_string(&message_data->timeout.str, &p, ' ')) {
@@ -875,8 +879,8 @@ static struct dso_data *_load_dso(struct message_data *data)
                syslog(LOG_ERR, "dmeventd %s dlopen failed: %s", data->dso_name,
                       dlerr);
                data->msg->size =
-                   dm_asprintf(&(data->msg->data), "%s dlopen failed: %s",
-                               data->dso_name, dlerr);
+                   dm_asprintf(&(data->msg->data), "%s %s dlopen failed: %s",
+                               data->id, data->dso_name, dlerr);
                return NULL;
        }
 
@@ -1056,7 +1060,8 @@ static int _registered_device(struct message_data *message_data,
 {
        struct dm_event_daemon_message *msg = message_data->msg;
 
-       const char *fmt = "%s %s %u";
+       const char *fmt = "%s %s %s %u";
+       const char *id = message_data->id;
        const char *dso = thread->dso_data->dso_name;
        const char *dev = thread->device.uuid;
        unsigned events = ((thread->status == DM_THREAD_RUNNING)
@@ -1066,7 +1071,7 @@ static int _registered_device(struct message_data *message_data,
        if (msg->data)
                dm_free(msg->data);
 
-       msg->size = dm_asprintf(&(msg->data), fmt, dso, dev, events);
+       msg->size = dm_asprintf(&(msg->data), fmt, id, dso, dev, events);
 
        _unlock_mutex();
 
@@ -1180,7 +1185,8 @@ static int _get_timeout(struct message_data *message_data)
        _lock_mutex();
        if ((thread = _lookup_thread_status(message_data))) {
                msg->size =
-                   dm_asprintf(&(msg->data), "%" PRIu32, thread->timeout);
+                   dm_asprintf(&(msg->data), "%s %" PRIu32, message_data->id,
+                               thread->timeout);
        } else {
                msg->data = NULL;
                msg->size = 0;
@@ -1375,17 +1381,32 @@ static int _handle_request(struct dm_event_daemon_message *msg,
 static int _do_process_request(struct dm_event_daemon_message *msg)
 {
        int ret;
+       char *answer;
        static struct message_data message_data;
 
        /* Parse the message. */
        memset(&message_data, 0, sizeof(message_data));
        message_data.msg = msg;
-       if (msg->cmd != DM_EVENT_CMD_ACTIVE && !_parse_message(&message_data)) {
+       if (msg->cmd == DM_EVENT_CMD_HELLO)  {
+               ret = 0;
+               answer = dm_strdup(msg->data);
+               if (answer) {
+                       msg->size = dm_asprintf(&(msg->data), "%s HELLO", answer);
+                       dm_free(answer);
+               } else {
+                       msg->size = 0;
+                       msg->data = NULL;
+               }
+       } else if (msg->cmd != DM_EVENT_CMD_ACTIVE && !_parse_message(&message_data)) {
                stack;
                ret = -EINVAL;
        } else
                ret = _handle_request(msg, &message_data);
 
+       msg->cmd = ret;
+       if (!msg->data)
+               msg->size = dm_asprintf(&(msg->data), "%s %s", message_data.id, strerror(-ret));
+
        _free_message(&message_data);
 
        return ret;
@@ -1405,16 +1426,9 @@ static void _process_request(struct dm_event_fifos *fifos)
        if (!_client_read(fifos, &msg))
                return;
 
-       msg.cmd = _do_process_request(&msg);
-       if (!msg.data) {
-               msg.data = dm_strdup(strerror(-msg.cmd));
-               if (msg.data)
-                       msg.size = strlen(msg.data) + 1;
-               else {
-                       msg.size = 0;
-                       stack;
-               }
-       }
+       /* _do_process_request fills in msg (if memory allows for
+          data, otherwise just cmd and size = 0) */
+       _do_process_request(&msg);
 
        if (!_client_write(fifos, &msg))
                stack;
index 084d13626ecac6250b606df5c1f6ad46d6103d64..d7474b83822d782215763acb3446a9cfc5824577 100644 (file)
@@ -20,6 +20,7 @@ enum dm_event_command {
        DM_EVENT_CMD_GET_NEXT_REGISTERED_DEVICE,
        DM_EVENT_CMD_SET_TIMEOUT,
        DM_EVENT_CMD_GET_TIMEOUT,
+       DM_EVENT_CMD_HELLO,
 };
 
 /* Message passed between client and daemon. */
index 6a1d31c1d7f55c8211d50ca8e3c09ffcaa6695ce..d2f8ac503f71e86e4469a816982c0c005a74209f 100644 (file)
@@ -30,6 +30,8 @@
 #include <sys/wait.h>
 #include <arpa/inet.h>         /* for htonl, ntohl */
 
+static int _sequence_nr = 0;
+
 struct dm_event_handler {
        char *dso;
 
@@ -182,6 +184,21 @@ enum dm_event_mask dm_event_handler_get_event_mask(const struct dm_event_handler
        return dmevh->mask;
 }
 
+static int _check_message_id(struct dm_event_daemon_message *msg)
+{
+       int pid, seq_nr;
+
+       if ((sscanf(msg->data, "%d:%d", &pid, &seq_nr) != 2) ||
+           (pid != getpid()) || (seq_nr != _sequence_nr)) {
+               log_error("Ignoring out-of-sequence reply from dmeventd. "
+                         "Expected %d:%d but received %s", getpid(),
+                         _sequence_nr, msg->data);
+               return 0;
+       }
+
+       return 1;
+}
+
 /*
  * daemon_read
  * @fifos
@@ -260,11 +277,28 @@ static int _daemon_write(struct dm_event_fifos *fifos,
 
        size_t size = 2 * sizeof(uint32_t) + msg->size;
        char *buf = alloca(size);
+       char drainbuf[128];
+       struct timeval tval = { 0, 0 };
 
        *((uint32_t *)buf) = htonl(msg->cmd);
        *((uint32_t *)buf + 1) = htonl(msg->size);
        memcpy(buf + 2 * sizeof(uint32_t), msg->data, msg->size);
 
+       /* drain the answer fifo */
+       while (1) {
+               FD_ZERO(&fds);
+               FD_SET(fifos->server, &fds);
+               tval.tv_usec = 100;
+               ret = select(fifos->server + 1, &fds, NULL, NULL, &tval);
+               if ((ret < 0) && (errno != EINTR)) {
+                       log_error("Unable to talk to event daemon");
+                       return 0;
+               }
+               if (ret == 0)
+                       break;
+               read(fifos->server, drainbuf, 127);
+       }
+
        while (bytes < size) {
                do {
                        /* Watch daemon write FIFO to be ready for output. */
@@ -301,7 +335,7 @@ static int _daemon_talk(struct dm_event_fifos *fifos,
 {
        const char *dso = dso_name ? dso_name : "";
        const char *dev = dev_name ? dev_name : "";
-       const char *fmt = "%s %s %u %" PRIu32;
+       const char *fmt = "%d:%d %s %s %u %" PRIu32;
        int msg_size;
        memset(msg, 0, sizeof(*msg));
 
@@ -310,8 +344,10 @@ static int _daemon_talk(struct dm_event_fifos *fifos,
         * into ASCII message string.
         */
        msg->cmd = cmd;
-       if ((msg_size = dm_asprintf(&(msg->data), fmt, dso, dev, evmask,
-                                    timeout)) < 0) {
+       if (cmd == DM_EVENT_CMD_HELLO)
+               fmt = "%d:%d HELLO";
+       if ((msg_size = dm_asprintf(&(msg->data), fmt, getpid(), _sequence_nr,
+                                   dso, dev, evmask, timeout)) < 0) {
                log_error("_daemon_talk: message allocation failed");
                return -ENOMEM;
        }
@@ -326,10 +362,14 @@ static int _daemon_talk(struct dm_event_fifos *fifos,
                return -EIO;
        }
 
-       if (!_daemon_read(fifos, msg)) {
-               stack;
-               return -EIO;
-       }
+       do {
+               if (!_daemon_read(fifos, msg)) {
+                       stack;
+                       return -EIO;
+               }
+       } while (!_check_message_id(msg));
+
+       _sequence_nr++;
 
        return (int32_t) msg->cmd;
 }
@@ -507,7 +547,9 @@ static int _do_event(int cmd, struct dm_event_daemon_message *msg,
                return -ESRCH;
        }
 
-       ret = _daemon_talk(&fifos, msg, cmd, dso_name, dev_name, evmask, timeout);
+       ret = _daemon_talk(&fifos, msg, DM_EVENT_CMD_HELLO, 0, 0, 0, 0);
+       if (!ret)
+               ret = _daemon_talk(&fifos, msg, cmd, dso_name, dev_name, evmask, timeout);
 
        /* what is the opposite of init? */
        _dtr_client(&fifos);
@@ -521,7 +563,7 @@ int dm_event_register_handler(const struct dm_event_handler *dmevh)
        int ret = 1, err;
        const char *uuid;
        struct dm_task *dmt;
-       struct dm_event_daemon_message msg;
+       struct dm_event_daemon_message msg = { 0, 0, NULL };
 
        if (!(dmt = _get_device_info(dmevh))) {
                stack;
@@ -551,7 +593,7 @@ int dm_event_unregister_handler(const struct dm_event_handler *dmevh)
        int ret = 1, err;
        const char *uuid;
        struct dm_task *dmt;
-       struct dm_event_daemon_message msg;
+       struct dm_event_daemon_message msg = { 0, 0, NULL };
 
        if (!(dmt = _get_device_info(dmevh))) {
                stack;
@@ -598,15 +640,20 @@ static char *_fetch_string(char **src, const int delimiter)
 static int _parse_message(struct dm_event_daemon_message *msg, char **dso_name,
                         char **uuid, enum dm_event_mask *evmask)
 {
+       char *id = NULL;
        char *p = msg->data;
 
-       if ((*dso_name = _fetch_string(&p, ' ')) &&
+       if ((id = _fetch_string(&p, ' ')) &&
+           (*dso_name = _fetch_string(&p, ' ')) &&
            (*uuid = _fetch_string(&p, ' '))) {
                *evmask = atoi(p);
 
+               dm_free(id);
                return 0;
        }
 
+       if (id)
+               dm_free(id);
        return -ENOMEM;
 }
 
@@ -621,12 +668,12 @@ static int _parse_message(struct dm_event_daemon_message *msg, char **dso_name,
  */
 int dm_event_get_registered_device(struct dm_event_handler *dmevh, int next)
 {
-       int ret;
+       int ret = 0;
        const char *uuid = NULL;
        char *reply_dso = NULL, *reply_uuid = NULL;
-       enum dm_event_mask reply_mask;
-       struct dm_task *dmt;
-       struct dm_event_daemon_message msg;
+       enum dm_event_mask reply_mask = 0;
+       struct dm_task *dmt = NULL;
+       struct dm_event_daemon_message msg = { 0, 0, NULL };
 
        if (!(dmt = _get_device_info(dmevh))) {
                stack;
@@ -696,9 +743,17 @@ int dm_event_get_registered_device(struct dm_event_handler *dmevh, int next)
 
 #if 0                          /* left out for now */
 
+static char *_skip_string(char *src, const int delimiter)
+{
+       src = srtchr(src, delimiter);
+       if (src && *(src + 1))
+               return src + 1;
+       return NULL;
+}
+
 int dm_event_set_timeout(const char *device_path, uint32_t timeout)
 {
-       struct dm_event_daemon_message msg;
+       struct dm_event_daemon_message msg = { 0, 0, NULL };
 
        if (!device_exists(device_path))
                return -ENODEV;
@@ -710,13 +765,20 @@ int dm_event_set_timeout(const char *device_path, uint32_t timeout)
 int dm_event_get_timeout(const char *device_path, uint32_t *timeout)
 {
        int ret;
-       struct dm_event_daemon_message msg;
+       struct dm_event_daemon_message msg = { 0, 0, NULL };
 
        if (!device_exists(device_path))
                return -ENODEV;
        if (!(ret = _do_event(DM_EVENT_CMD_GET_TIMEOUT, &msg, NULL, device_path,
-                            0, 0)))
-               *timeout = atoi(msg.data);
+                            0, 0))) {
+               char *p = _skip_string(msg.data, ' ');
+               if (!p) {
+                       log_error("malformed reply from dmeventd '%s'\n",
+                                 msg.data);
+                       return -EIO;
+               }
+               *timeout = atoi(p);
+       }
        if (msg.data)
                dm_free(msg.data);
        return ret;
This page took 0.040714 seconds and 5 git commands to generate.