Discussion:
[dpdk-dev] [PATCH] eal: add asynchronous request API to DPDK IPC
(too old to reply)
Anatoly Burakov
2018-02-27 14:59:29 UTC
Permalink
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).

Signed-off-by: Anatoly Burakov <***@intel.com>
---

Notes:
This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].

rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?

[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/

lib/librte_eal/common/eal_common_proc.c | 528 +++++++++++++++++++++++++++++---
lib/librte_eal/common/include/rte_eal.h | 71 +++++
2 files changed, 564 insertions(+), 35 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index bdea6d6..c5ae569 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -41,7 +41,11 @@ static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
struct action_entry {
TAILQ_ENTRY(action_entry) next;
char action_name[RTE_MP_MAX_NAME_LEN];
- rte_mp_t action;
+ RTE_STD_C11
+ union {
+ rte_mp_t action;
+ rte_mp_async_reply_t reply;
+ };
};

/** Double linked list of actions. */
@@ -73,13 +77,37 @@ TAILQ_HEAD(message_queue, message_queue_entry);
static struct message_queue message_queue =
TAILQ_HEAD_INITIALIZER(message_queue);

+enum mp_request_type {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+ struct rte_mp_reply *user_reply;
+ struct timespec *end;
+ int n_requests_processed;
+};
+
+struct async_request_param {
+ struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+ pthread_cond_t cond;
+};
+
struct sync_request {
TAILQ_ENTRY(sync_request) next;
- int reply_received;
+ enum mp_request_type type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
- struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ struct rte_mp_msg *reply_msg;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct sync_request_param sync;
+ struct async_request_param async;
+ };
};

TAILQ_HEAD(sync_request_list, sync_request);
@@ -87,9 +115,12 @@ TAILQ_HEAD(sync_request_list, sync_request);
static struct {
struct sync_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} sync_requests = {
.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};

static struct sync_request *
@@ -201,53 +232,97 @@ validate_action_name(const char *name)
return 0;
}

-int __rte_experimental
-rte_mp_action_register(const char *name, rte_mp_t action)
+static struct action_entry *
+action_register(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
- return -1;
+ return NULL;

entry = malloc(sizeof(struct action_entry));
if (entry == NULL) {
rte_errno = ENOMEM;
- return -1;
+ return NULL;
}
strcpy(entry->action_name, name);
- entry->action = action;

- pthread_mutex_lock(&mp_mutex_action);
if (find_action_entry_by_name(name) != NULL) {
pthread_mutex_unlock(&mp_mutex_action);
rte_errno = EEXIST;
free(entry);
- return -1;
+ return NULL;
}
TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
- return 0;
+
+ /* async and sync replies are handled by different threads, so even
+ * though they a share pointer in a union, one will never trigger in
+ * place of the other.
+ */
+
+ return entry;
}

-void __rte_experimental
-rte_mp_action_unregister(const char *name)
+static void
+action_unregister(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
return;

- pthread_mutex_lock(&mp_mutex_action);
entry = find_action_entry_by_name(name);
if (entry == NULL) {
- pthread_mutex_unlock(&mp_mutex_action);
return;
}
TAILQ_REMOVE(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
free(entry);
}

+int __rte_experimental
+rte_mp_action_register(const char *name, rte_mp_t action)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->action = action;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_action_unregister(const char *name)
+{
+ pthread_mutex_lock(&mp_mutex_action);
+ action_unregister(name);
+ pthread_mutex_unlock(&mp_mutex_action);
+}
+
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->reply = reply;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name)
+{
+ rte_mp_action_unregister(name);
+}
+
static int
read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
@@ -307,9 +382,13 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_mutex_lock(&sync_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
- memcpy(sync_req->reply, msg, sizeof(*msg));
+ memcpy(sync_req->reply_msg, msg, sizeof(*msg));
sync_req->reply_received = 1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(&sync_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&sync_requests.lock);
@@ -370,6 +449,166 @@ mp_handle(void *arg __rte_unused)
}

static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+static int
+process_async_request(struct sync_request *sr, const struct timespec *now)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+ int ret;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return 0;
+
+ ret = 1;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (received && reply->nb_sent != 0) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply_msg;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ ret = -1;
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+ }
+ free(sr->reply_msg);
+
+ /* mark this request as processed */
+ param->n_requests_processed++;
+
+ /* if number of sent messages is zero, we're short-circuiting */
+ last_msg = param->n_requests_processed == reply->nb_sent ||
+ reply->nb_sent == 0;
+
+ /* if this was the last request, we can call the callback */
+ if (last_msg) {
+ pthread_mutex_lock(&mp_mutex_action);
+ struct action_entry *entry =
+ find_action_entry_by_name(sr->request->name);
+ if (!entry) {
+ RTE_LOG(ERR, EAL, "Cannot find async request callback for %s\n",
+ sr->request->name);
+ ret = -1;
+ } else {
+ entry->reply(reply);
+ }
+ pthread_mutex_unlock(&mp_mutex_action);
+ /* clean up */
+ free(sr->async.param->user_reply->msgs);
+ free(sr->async.param->user_reply);
+ free(sr->async.param->end);
+ free(sr->async.param);
+ }
+ return ret;
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct sync_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ do {
+ int ret;
+ pthread_mutex_lock(&sync_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ pthread_mutex_unlock(&sync_requests.lock);
+ break;
+ }
+
+ /* set a 60 second timeout by default */
+ timeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;
+ timeout.tv_sec = now.tv_sec + 60 +
+ (now.tv_usec * 1000 + 60) / 1000000000;
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &sync_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (timespec_cmp(sr->async.param->end, &timeout) < 0)
+ memcpy(&timeout, sr->async.param->end,
+ sizeof(timeout));
+ }
+
+ /* now, wait until we either time out or get woken up */
+ ret = pthread_cond_timedwait(&sync_requests.async_cond,
+ &sync_requests.lock, &timeout);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct sync_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,
+ next) {
+ if (process_async_request(sr, &ts_now)) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ free(sr);
+ }
+ }
+ }
+ pthread_mutex_unlock(&sync_requests.lock);
+ } while (1);
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -506,7 +745,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;

/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -543,7 +782,16 @@ rte_mp_channel_init(void)
return -1;
}

- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -553,7 +801,11 @@ rte_mp_channel_init(void)

/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);

/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -750,19 +1002,76 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}

static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_shared_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct sync_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply_msg = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);
+ if (exist) {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply->nb_sent++;
+
+ return 0;
+fail:
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct timeval now;
+ struct timespec ts_now;
struct rte_mp_msg msg, *tmp;
struct sync_request sync_req, *exist;

+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
- sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ sync_req.reply_msg = &msg;
+ pthread_cond_init(&sync_req.sync.cond, NULL);

pthread_mutex_lock(&sync_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -787,17 +1096,17 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,

pthread_mutex_lock(&sync_requests.lock);
do {
- pthread_cond_timedwait(&sync_req.cond, &sync_requests.lock, ts);
+ pthread_cond_timedwait(&sync_req.sync.cond,
+ &sync_requests.lock, ts);
/* Check spurious wakeups */
if (sync_req.reply_received == 1)
break;
/* Check if time is out */
if (gettimeofday(&now, NULL) < 0)
break;
- if (ts->tv_sec < now.tv_sec)
- break;
- else if (now.tv_sec == ts->tv_sec &&
- now.tv_usec * 1000 < ts->tv_nsec)
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+ if (timespec_cmp(ts, &ts_now) < 0)
break;
} while (1);
/* We got the lock now */
@@ -854,7 +1163,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);

/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -896,7 +1205,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
continue;
}

- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -907,9 +1216,158 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}

int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ reply = malloc(sizeof(*reply));
+ end = malloc(sizeof(*end));
+ if (reply == NULL || end == NULL || param == NULL || dummy == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ param->n_requests_processed = 0;
+ param->end = end;
+ param->user_reply = reply;
+
+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;

+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&sync_requests.lock);
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = req;
+ dummy->reply_msg = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ TAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), req, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ closedir(mp_dir);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ const char *peer_name;
+ char path[PATH_MAX];
+ int active;
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+ peer_name = get_peer_name(path);
+
+ active = socket_is_active(peer_name);
+
+ if (active < 0) {
+ ret = -1;
+ break;
+ } else if (active == 0) {
+ unlinkat(dir_fd, ent->d_name, 0);
+ continue;
+ }
+
+ if (mp_request_async(path, req, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&sync_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ closedir(mp_dir);
+ return ret;
+unlock_fail:
+ pthread_mutex_unlock(&sync_requests.lock);
+fail:
+ free(dummy);
+ free(param);
+ free(reply);
+ free(end);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);

if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 5269dab..78a40de 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -231,6 +231,15 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);

/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_reply *msg);
+
+/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
@@ -274,6 +283,46 @@ rte_mp_action_unregister(const char *name);
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Register an asynchronous reply callback for primary/secondary communication.
+ *
+ * Call this function to register a callback for asynchronous requests, if the
+ * calling component wants to receive responses to its own asynchronous requests
+ * from the corresponding component in its primary or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ * @param reply
+ * The reply argument is the function pointer to the reply callback.
+ *
+ * @return
+ * - 0 on success.
+ * - (<0) on failure.
+ */
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Unregister an asynchronous reply callback.
+ *
+ * Call this function to unregister a callback if the calling component does
+ * not want responses the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ */
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a message to the peer process.
*
* This function will send a message which will be responsed by the action
@@ -322,6 +371,28 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ * The req argument contains the customized request message.
+ *
+ * @param ts
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @return
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
--
2.7.4
Burakov, Anatoly
2018-02-28 10:22:31 UTC
Permalink
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).
---
Missed updating .map file, will respin.
--
Thanks,
Anatoly
Anatoly Burakov
2018-03-02 18:06:51 UTC
Permalink
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).

Signed-off-by: Anatoly Burakov <***@intel.com>
---

Notes:
v2:
- fixed race conditions and deadlocks
- fixed dereferencing of invalid memory
- added source request to callbacks, so that user
callback can check which specific request this is
a callback to
- added missing .map file entries

lib/librte_eal/common/eal_common_proc.c | 573 ++++++++++++++++++++++++++++++--
lib/librte_eal/common/include/rte_eal.h | 72 ++++
lib/librte_eal/rte_eal_version.map | 3 +
3 files changed, 617 insertions(+), 31 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 666c566..1fd3f2e 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>

#include "eal_private.h"
#include "eal_filesystem.h"
@@ -40,7 +41,11 @@ static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
struct action_entry {
TAILQ_ENTRY(action_entry) next;
char action_name[RTE_MP_MAX_NAME_LEN];
- rte_mp_t action;
+ RTE_STD_C11
+ union {
+ rte_mp_t action;
+ rte_mp_async_reply_t reply;
+ };
};

/** Double linked list of actions. */
@@ -60,13 +65,38 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

+
+enum mp_request_type {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+ struct rte_mp_reply *user_reply;
+ struct timespec *end;
+ int n_requests_processed;
+};
+
+struct async_request_param {
+ struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+ pthread_cond_t cond;
+};
+
struct sync_request {
TAILQ_ENTRY(sync_request) next;
- int reply_received;
+ enum mp_request_type type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
- struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ struct rte_mp_msg *reply_msg;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct sync_request_param sync;
+ struct async_request_param async;
+ };
};

TAILQ_HEAD(sync_request_list, sync_request);
@@ -74,9 +104,12 @@ TAILQ_HEAD(sync_request_list, sync_request);
static struct {
struct sync_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} sync_requests = {
.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};

static struct sync_request *
@@ -190,53 +223,97 @@ validate_action_name(const char *name)
return 0;
}

-int __rte_experimental
-rte_mp_action_register(const char *name, rte_mp_t action)
+static struct action_entry *
+action_register(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
- return -1;
+ return NULL;

entry = malloc(sizeof(struct action_entry));
if (entry == NULL) {
rte_errno = ENOMEM;
- return -1;
+ return NULL;
}
strcpy(entry->action_name, name);
- entry->action = action;

- pthread_mutex_lock(&mp_mutex_action);
if (find_action_entry_by_name(name) != NULL) {
pthread_mutex_unlock(&mp_mutex_action);
rte_errno = EEXIST;
free(entry);
- return -1;
+ return NULL;
}
TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
- return 0;
+
+ /* async and sync replies are handled by different threads, so even
+ * though they a share pointer in a union, one will never trigger in
+ * place of the other.
+ */
+
+ return entry;
}

-void __rte_experimental
-rte_mp_action_unregister(const char *name)
+static void
+action_unregister(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
return;

- pthread_mutex_lock(&mp_mutex_action);
entry = find_action_entry_by_name(name);
if (entry == NULL) {
- pthread_mutex_unlock(&mp_mutex_action);
return;
}
TAILQ_REMOVE(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
free(entry);
}

+int __rte_experimental
+rte_mp_action_register(const char *name, rte_mp_t action)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->action = action;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_action_unregister(const char *name)
+{
+ pthread_mutex_lock(&mp_mutex_action);
+ action_unregister(name);
+ pthread_mutex_unlock(&mp_mutex_action);
+}
+
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->reply = reply;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name)
+{
+ rte_mp_action_unregister(name);
+}
+
static int
read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
@@ -296,9 +373,13 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_mutex_lock(&sync_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
- memcpy(sync_req->reply, msg, sizeof(*msg));
+ memcpy(sync_req->reply_msg, msg, sizeof(*msg));
sync_req->reply_received = 1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(&sync_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&sync_requests.lock);
@@ -332,6 +413,202 @@ mp_handle(void *arg __rte_unused)
}

static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct sync_request *sr, const struct timespec *now)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (received && sr->reply_msg) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply_msg;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+ }
+ free(sr->reply_msg);
+
+ /* mark this request as processed */
+ param->n_requests_processed++;
+
+ /* if number of sent messages is zero, we're short-circuiting */
+ last_msg = param->n_requests_processed == reply->nb_sent ||
+ reply->nb_sent == 0;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct sync_request *sr)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ pthread_mutex_lock(&mp_mutex_action);
+ struct action_entry *entry =
+ find_action_entry_by_name(sr->request->name);
+ pthread_mutex_unlock(&mp_mutex_action);
+ if (!entry)
+ RTE_LOG(ERR, EAL, "Cannot find async request callback for %s\n",
+ sr->request->name);
+ else
+ entry->reply(sr->request, reply);
+ /* clean up */
+ free(sr->async.param->user_reply->msgs);
+ free(sr->async.param->user_reply);
+ free(sr->async.param->end);
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct sync_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ do {
+ struct sync_request *trigger = NULL;
+ int ret;
+ bool dontwait = false;
+
+ pthread_mutex_lock(&sync_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ pthread_mutex_unlock(&sync_requests.lock);
+ break;
+ }
+
+ /* set a 60 second timeout by default */
+ timeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;
+ timeout.tv_sec = now.tv_sec + 60 +
+ (now.tv_usec * 1000 + 60) / 1000000000;
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &sync_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (timespec_cmp(sr->async.param->end, &timeout) < 0)
+ memcpy(&timeout, sr->async.param->end,
+ sizeof(timeout));
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ dontwait = true;
+ break;
+ }
+ }
+
+ /* now, wait until we either time out or get woken up */
+ if (!dontwait)
+ ret = pthread_cond_timedwait(&sync_requests.async_cond,
+ &sync_requests.lock, &timeout);
+ else
+ ret = 0;
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct sync_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ /* trigger one callback at a time */
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&sync_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ } while (1);
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char initfile[PATH_MAX] = {0};
@@ -469,7 +746,7 @@ rte_mp_channel_init(void)
char path[PATH_MAX];
char init_filter[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;

/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -511,7 +788,16 @@ rte_mp_channel_init(void)
return -1;
}

- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -522,7 +808,11 @@ rte_mp_channel_init(void)

/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);

/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -702,18 +992,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}

static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_shared_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct sync_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply_msg = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);
+ if (exist) {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply->nb_sent++;
+
+ return 0;
+fail:
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct sync_request sync_req, *exist;

+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
- sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ sync_req.reply_msg = &msg;
+ pthread_cond_init(&sync_req.sync.cond, NULL);

pthread_mutex_lock(&sync_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -737,7 +1086,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;

do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&sync_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

@@ -795,7 +1144,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);

/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -836,7 +1185,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
continue;
}

- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -848,9 +1197,171 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}

int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct rte_mp_msg *copy;
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ reply = malloc(sizeof(*reply));
+ end = malloc(sizeof(*end));
+ if (copy == NULL || dummy == NULL || param == NULL || reply == NULL ||
+ end == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+ memset(reply, 0, sizeof(*reply));
+ memset(end, 0, sizeof(*end));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_requests_processed = 0;
+ param->end = end;
+ param->user_reply = reply;
+
+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;
+
+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&sync_requests.lock);
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply_msg = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */

+ TAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ closedir(mp_dir);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ const char *peer_name;
+ char path[PATH_MAX];
+ int ready;
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+ peer_name = get_peer_name(path);
+
+ ready = socket_is_ready(peer_name);
+
+ if (ready < 0) {
+ ret = -1;
+ continue;
+ } else if (ready == 0) {
+ continue;
+ }
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&sync_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd is closed automatically on closedir */
+ closedir(mp_dir);
+ return ret;
+unlock_fail:
+ pthread_mutex_unlock(&sync_requests.lock);
+fail:
+ free(dummy);
+ free(param);
+ free(reply);
+ free(end);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);

if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..93ca4cc 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);

/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
@@ -273,6 +283,46 @@ rte_mp_action_unregister(const char *name);
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Register an asynchronous reply callback for primary/secondary communication.
+ *
+ * Call this function to register a callback for asynchronous requests, if the
+ * calling component wants to receive responses to its own asynchronous requests
+ * from the corresponding component in its primary or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ * @param reply
+ * The reply argument is the function pointer to the reply callback.
+ *
+ * @return
+ * - 0 on success.
+ * - (<0) on failure.
+ */
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Unregister an asynchronous reply callback.
+ *
+ * Call this function to unregister a callback if the calling component does
+ * not want responses the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ */
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a message to the peer process.
*
* This function will send a message which will be responsed by the action
@@ -321,6 +371,28 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ * The req argument contains the customized request message.
+ *
+ * @param ts
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @return
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..1d88437 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -223,8 +223,11 @@ EXPERIMENTAL {
rte_eal_mbuf_user_pool_ops;
rte_mp_action_register;
rte_mp_action_unregister;
+ rte_mp_async_reply_register;
+ rte_mp_async_reply_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
--
2.7.4
Anatoly Burakov
2018-03-07 16:57:01 UTC
Permalink
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).

Signed-off-by: Anatoly Burakov <***@intel.com>
---

Notes:
v3:
- added support for MP_IGN messages introduced in
IPC improvements v5 patchset
v2:
- fixed deadlocks and race conditions by not calling
callbacks while iterating over sync request list
- fixed use-after-free by making a copy of request
- changed API to also give user a copy of original
request, so that they know to which message the
callback is a reply to
- fixed missing .map file entries

This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].

rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?

[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/

lib/librte_eal/common/eal_common_proc.c | 563 ++++++++++++++++++++++++++++++--
lib/librte_eal/common/include/rte_eal.h | 72 ++++
lib/librte_eal/rte_eal_version.map | 3 +
3 files changed, 607 insertions(+), 31 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 1ea6045..d99ba56 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>

#include "eal_private.h"
#include "eal_filesystem.h"
@@ -39,7 +40,11 @@ static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
struct action_entry {
TAILQ_ENTRY(action_entry) next;
char action_name[RTE_MP_MAX_NAME_LEN];
- rte_mp_t action;
+ RTE_STD_C11
+ union {
+ rte_mp_t action;
+ rte_mp_async_reply_t reply;
+ };
};

/** Double linked list of actions. */
@@ -60,13 +65,37 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

+enum mp_request_type {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+ struct rte_mp_reply *user_reply;
+ struct timespec *end;
+ int n_requests_processed;
+};
+
+struct async_request_param {
+ struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+ pthread_cond_t cond;
+};
+
struct sync_request {
TAILQ_ENTRY(sync_request) next;
- int reply_received;
+ enum mp_request_type type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
- struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ struct rte_mp_msg *reply_msg;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct sync_request_param sync;
+ struct async_request_param async;
+ };
};

TAILQ_HEAD(sync_request_list, sync_request);
@@ -74,9 +103,12 @@ TAILQ_HEAD(sync_request_list, sync_request);
static struct {
struct sync_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} sync_requests = {
.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};

static struct sync_request *
@@ -159,50 +191,50 @@ validate_action_name(const char *name)
return 0;
}

-int __rte_experimental
-rte_mp_action_register(const char *name, rte_mp_t action)
+static struct action_entry *
+action_register(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
- return -1;
+ return NULL;

entry = malloc(sizeof(struct action_entry));
if (entry == NULL) {
rte_errno = ENOMEM;
- return -1;
+ return NULL;
}
strcpy(entry->action_name, name);
- entry->action = action;

- pthread_mutex_lock(&mp_mutex_action);
if (find_action_entry_by_name(name) != NULL) {
pthread_mutex_unlock(&mp_mutex_action);
rte_errno = EEXIST;
free(entry);
- return -1;
+ return NULL;
}
TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
- return 0;
+
+ /* async and sync replies are handled by different threads, so even
+ * though they a share pointer in a union, one will never trigger in
+ * place of the other.
+ */
+
+ return entry;
}

-void __rte_experimental
-rte_mp_action_unregister(const char *name)
+static void
+action_unregister(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
return;

- pthread_mutex_lock(&mp_mutex_action);
entry = find_action_entry_by_name(name);
if (entry == NULL) {
- pthread_mutex_unlock(&mp_mutex_action);
return;
}
TAILQ_REMOVE(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
free(entry);
}

@@ -330,6 +362,50 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)
return ret;
}

+int __rte_experimental
+rte_mp_action_register(const char *name, rte_mp_t action)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->action = action;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_action_unregister(const char *name)
+{
+ pthread_mutex_lock(&mp_mutex_action);
+ action_unregister(name);
+ pthread_mutex_unlock(&mp_mutex_action);
+}
+
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->reply = reply;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name)
+{
+ rte_mp_action_unregister(name);
+}
+
static int
read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
@@ -389,10 +465,14 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_mutex_lock(&sync_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
- memcpy(sync_req->reply, msg, sizeof(*msg));
+ memcpy(sync_req->reply_msg, msg, sizeof(*msg));
/* -1 indicates that we've been asked to ignore */
sync_req->reply_received = m->type == MP_REP ? 1 : -1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(&sync_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&sync_requests.lock);
@@ -438,6 +518,204 @@ mp_handle(void *arg __rte_unused)
}

static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct sync_request *sr, const struct timespec *now)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply_msg) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply_msg;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ }
+ free(sr->reply_msg);
+
+ /* mark this request as processed */
+ param->n_requests_processed++;
+
+ /* if number of sent messages is zero, we're short-circuiting */
+ last_msg = param->n_requests_processed == reply->nb_sent ||
+ reply->nb_sent == 0;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct sync_request *sr)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ pthread_mutex_lock(&mp_mutex_action);
+ struct action_entry *entry =
+ find_action_entry_by_name(sr->request->name);
+ pthread_mutex_unlock(&mp_mutex_action);
+ if (!entry)
+ RTE_LOG(ERR, EAL, "Cannot find async request callback for %s\n",
+ sr->request->name);
+ else
+ entry->reply(sr->request, reply);
+ /* clean up */
+ free(sr->async.param->user_reply->msgs);
+ free(sr->async.param->user_reply);
+ free(sr->async.param->end);
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct sync_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ do {
+ struct sync_request *trigger = NULL;
+ int ret;
+ bool dontwait = false;
+
+ pthread_mutex_lock(&sync_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ pthread_mutex_unlock(&sync_requests.lock);
+ break;
+ }
+
+ /* set a 60 second timeout by default */
+ timeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;
+ timeout.tv_sec = now.tv_sec + 60 +
+ (now.tv_usec * 1000 + 60) / 1000000000;
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &sync_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (timespec_cmp(sr->async.param->end, &timeout) < 0)
+ memcpy(&timeout, sr->async.param->end,
+ sizeof(timeout));
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ dontwait = true;
+ break;
+ }
+ }
+
+ /* now, wait until we either time out or get woken up */
+ if (!dontwait)
+ ret = pthread_cond_timedwait(&sync_requests.async_cond,
+ &sync_requests.lock, &timeout);
+ else
+ ret = 0;
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct sync_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&sync_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ } while (1);
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -500,7 +778,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;

/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -537,7 +815,16 @@ rte_mp_channel_init(void)
return -1;
}

- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -548,7 +835,11 @@ rte_mp_channel_init(void)

/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);

/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -596,18 +887,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}

static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_shared_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct sync_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply_msg = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);
+ if (exist) {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply->nb_sent++;
+
+ return 0;
+fail:
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct sync_request sync_req, *exist;

+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
- sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ sync_req.reply_msg = &msg;
+ pthread_cond_init(&sync_req.sync.cond, NULL);

pthread_mutex_lock(&sync_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -631,7 +981,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;

do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&sync_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

@@ -697,7 +1047,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);

/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -726,7 +1076,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);

- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -738,9 +1088,160 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}

int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct rte_mp_msg *copy;
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ reply = malloc(sizeof(*reply));
+ end = malloc(sizeof(*end));
+ if (copy == NULL || dummy == NULL || param == NULL || reply == NULL ||
+ end == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+ memset(reply, 0, sizeof(*reply));
+ memset(end, 0, sizeof(*end));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_requests_processed = 0;
+ param->end = end;
+ param->user_reply = reply;

+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;
+
+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&sync_requests.lock);
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply_msg = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ TAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ goto closedir_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&sync_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+ return ret;
+closedir_fail:
+ closedir(mp_dir);
+unlock_fail:
+ pthread_mutex_unlock(&sync_requests.lock);
+fail:
+ free(dummy);
+ free(param);
+ free(reply);
+ free(end);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);

if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..93ca4cc 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);

/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
@@ -273,6 +283,46 @@ rte_mp_action_unregister(const char *name);
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Register an asynchronous reply callback for primary/secondary communication.
+ *
+ * Call this function to register a callback for asynchronous requests, if the
+ * calling component wants to receive responses to its own asynchronous requests
+ * from the corresponding component in its primary or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ * @param reply
+ * The reply argument is the function pointer to the reply callback.
+ *
+ * @return
+ * - 0 on success.
+ * - (<0) on failure.
+ */
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Unregister an asynchronous reply callback.
+ *
+ * Call this function to unregister a callback if the calling component does
+ * not want responses the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ */
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a message to the peer process.
*
* This function will send a message which will be responsed by the action
@@ -321,6 +371,28 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ * The req argument contains the customized request message.
+ *
+ * @param ts
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @return
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..1d88437 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -223,8 +223,11 @@ EXPERIMENTAL {
rte_eal_mbuf_user_pool_ops;
rte_mp_action_register;
rte_mp_action_unregister;
+ rte_mp_async_reply_register;
+ rte_mp_async_reply_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
--
2.7.4
Anatoly Burakov
2018-03-13 17:42:51 UTC
Permalink
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).

Signed-off-by: Anatoly Burakov <***@intel.com>
---

Notes:
v4:
- rebase on top of latest IPC Improvements patchset [2]
v3:
- added support for MP_IGN messages introduced in
IPC improvements v5 patchset
v2:
- fixed deadlocks and race conditions by not calling
callbacks while iterating over sync request list
- fixed use-after-free by making a copy of request
- changed API to also give user a copy of original
request, so that they know to which message the
callback is a reply to
- fixed missing .map file entries

This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].

rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?

[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/

lib/librte_eal/common/eal_common_proc.c | 563 ++++++++++++++++++++++++++++++--
lib/librte_eal/common/include/rte_eal.h | 72 ++++
lib/librte_eal/rte_eal_version.map | 3 +
3 files changed, 607 insertions(+), 31 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 4131b67..50d6506 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>

#include "eal_private.h"
#include "eal_filesystem.h"
@@ -39,7 +40,11 @@ static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
struct action_entry {
TAILQ_ENTRY(action_entry) next;
char action_name[RTE_MP_MAX_NAME_LEN];
- rte_mp_t action;
+ RTE_STD_C11
+ union {
+ rte_mp_t action;
+ rte_mp_async_reply_t reply;
+ };
};

/** Double linked list of actions. */
@@ -60,13 +65,37 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

+enum mp_request_type {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+ struct rte_mp_reply *user_reply;
+ struct timespec *end;
+ int n_requests_processed;
+};
+
+struct async_request_param {
+ struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+ pthread_cond_t cond;
+};
+
struct sync_request {
TAILQ_ENTRY(sync_request) next;
- int reply_received;
+ enum mp_request_type type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
- struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ struct rte_mp_msg *reply_msg;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct sync_request_param sync;
+ struct async_request_param async;
+ };
};

TAILQ_HEAD(sync_request_list, sync_request);
@@ -74,9 +103,12 @@ TAILQ_HEAD(sync_request_list, sync_request);
static struct {
struct sync_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} sync_requests = {
.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};

/* forward declarations */
@@ -164,53 +196,97 @@ validate_action_name(const char *name)
return 0;
}

-int __rte_experimental
-rte_mp_action_register(const char *name, rte_mp_t action)
+static struct action_entry *
+action_register(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
- return -1;
+ return NULL;

entry = malloc(sizeof(struct action_entry));
if (entry == NULL) {
rte_errno = ENOMEM;
- return -1;
+ return NULL;
}
strcpy(entry->action_name, name);
- entry->action = action;

- pthread_mutex_lock(&mp_mutex_action);
if (find_action_entry_by_name(name) != NULL) {
pthread_mutex_unlock(&mp_mutex_action);
rte_errno = EEXIST;
free(entry);
- return -1;
+ return NULL;
}
TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
- return 0;
+
+ /* async and sync replies are handled by different threads, so even
+ * though they a share pointer in a union, one will never trigger in
+ * place of the other.
+ */
+
+ return entry;
}

-void __rte_experimental
-rte_mp_action_unregister(const char *name)
+static void
+action_unregister(const char *name)
{
struct action_entry *entry;

if (validate_action_name(name))
return;

- pthread_mutex_lock(&mp_mutex_action);
entry = find_action_entry_by_name(name);
if (entry == NULL) {
- pthread_mutex_unlock(&mp_mutex_action);
return;
}
TAILQ_REMOVE(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
free(entry);
}

+int __rte_experimental
+rte_mp_action_register(const char *name, rte_mp_t action)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->action = action;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_action_unregister(const char *name)
+{
+ pthread_mutex_lock(&mp_mutex_action);
+ action_unregister(name);
+ pthread_mutex_unlock(&mp_mutex_action);
+}
+
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->reply = reply;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name)
+{
+ rte_mp_action_unregister(name);
+}
+
static int
read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
@@ -270,10 +346,14 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_mutex_lock(&sync_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
- memcpy(sync_req->reply, msg, sizeof(*msg));
+ memcpy(sync_req->reply_msg, msg, sizeof(*msg));
/* -1 indicates that we've been asked to ignore */
sync_req->reply_received = m->type == MP_REP ? 1 : -1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(&sync_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&sync_requests.lock);
@@ -320,6 +400,204 @@ mp_handle(void *arg __rte_unused)
}

static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct sync_request *sr, const struct timespec *now)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply_msg) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply_msg;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ }
+ free(sr->reply_msg);
+
+ /* mark this request as processed */
+ param->n_requests_processed++;
+
+ /* if number of sent messages is zero, we're short-circuiting */
+ last_msg = param->n_requests_processed == reply->nb_sent ||
+ reply->nb_sent == 0;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct sync_request *sr)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ pthread_mutex_lock(&mp_mutex_action);
+ struct action_entry *entry =
+ find_action_entry_by_name(sr->request->name);
+ pthread_mutex_unlock(&mp_mutex_action);
+ if (!entry)
+ RTE_LOG(ERR, EAL, "Cannot find async request callback for %s\n",
+ sr->request->name);
+ else
+ entry->reply(sr->request, reply);
+ /* clean up */
+ free(sr->async.param->user_reply->msgs);
+ free(sr->async.param->user_reply);
+ free(sr->async.param->end);
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct sync_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ do {
+ struct sync_request *trigger = NULL;
+ int ret;
+ bool dontwait = false;
+
+ pthread_mutex_lock(&sync_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ pthread_mutex_unlock(&sync_requests.lock);
+ break;
+ }
+
+ /* set a 60 second timeout by default */
+ timeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;
+ timeout.tv_sec = now.tv_sec + 60 +
+ (now.tv_usec * 1000 + 60) / 1000000000;
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &sync_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (timespec_cmp(sr->async.param->end, &timeout) < 0)
+ memcpy(&timeout, sr->async.param->end,
+ sizeof(timeout));
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ dontwait = true;
+ break;
+ }
+ }
+
+ /* now, wait until we either time out or get woken up */
+ if (!dontwait)
+ ret = pthread_cond_timedwait(&sync_requests.async_cond,
+ &sync_requests.lock, &timeout);
+ else
+ ret = 0;
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct sync_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&sync_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ } while (1);
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -382,7 +660,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;

/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -419,7 +697,16 @@ rte_mp_channel_init(void)
return -1;
}

- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -430,7 +717,11 @@ rte_mp_channel_init(void)

/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);

/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -602,18 +893,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}

static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_shared_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct sync_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply_msg = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);
+ if (exist) {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply->nb_sent++;
+
+ return 0;
+fail:
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct sync_request sync_req, *exist;

+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
- sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ sync_req.reply_msg = &msg;
+ pthread_cond_init(&sync_req.sync.cond, NULL);

pthread_mutex_lock(&sync_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -637,7 +987,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;

do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&sync_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

@@ -703,7 +1053,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);

/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -732,7 +1082,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);

- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -744,9 +1094,160 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}

int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct rte_mp_msg *copy;
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ reply = malloc(sizeof(*reply));
+ end = malloc(sizeof(*end));
+ if (copy == NULL || dummy == NULL || param == NULL || reply == NULL ||
+ end == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+ memset(reply, 0, sizeof(*reply));
+ memset(end, 0, sizeof(*end));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_requests_processed = 0;
+ param->end = end;
+ param->user_reply = reply;

+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;
+
+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&sync_requests.lock);
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply_msg = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ TAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ goto closedir_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&sync_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+ return ret;
+closedir_fail:
+ closedir(mp_dir);
+unlock_fail:
+ pthread_mutex_unlock(&sync_requests.lock);
+fail:
+ free(dummy);
+ free(param);
+ free(reply);
+ free(end);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);

if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..93ca4cc 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);

/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
@@ -273,6 +283,46 @@ rte_mp_action_unregister(const char *name);
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Register an asynchronous reply callback for primary/secondary communication.
+ *
+ * Call this function to register a callback for asynchronous requests, if the
+ * calling component wants to receive responses to its own asynchronous requests
+ * from the corresponding component in its primary or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ * @param reply
+ * The reply argument is the function pointer to the reply callback.
+ *
+ * @return
+ * - 0 on success.
+ * - (<0) on failure.
+ */
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Unregister an asynchronous reply callback.
+ *
+ * Call this function to unregister a callback if the calling component does
+ * not want responses the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * @param name
+ * The name argument plays as a unique key to find the action.
+ *
+ */
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a message to the peer process.
*
* This function will send a message which will be responsed by the action
@@ -321,6 +371,28 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ * The req argument contains the customized request message.
+ *
+ * @param ts
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @return
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..1d88437 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -223,8 +223,11 @@ EXPERIMENTAL {
rte_eal_mbuf_user_pool_ops;
rte_mp_action_register;
rte_mp_action_unregister;
+ rte_mp_async_reply_register;
+ rte_mp_async_reply_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
--
2.7.4
Tan, Jianfeng
2018-03-23 15:38:43 UTC
Permalink
Hi Anatoly,

Two general comments firstly.

Q1. As I understand, malloc usage as an example, when a primary process
receives a request in rte_mp_handle thread, it will allocate memory, and
broadcast an async request to all secondary processes, and it will
return immediately; then responses are replied from each secondary
process, which are recorded at rte_mp_async_handle thread firstly;
either timeout or responses are all collected, rte_mp_async_handle will
trigger an async action.

I agree the necessity of the async request API; without it, each caller
who has similar requirement needs lots of code to implement it.
But can we achieve that without creating another pthread by leveraging
the alarm set? For example, to send an async request,
step 1. set an alarm;
step 2. send the request;
step 3. receive and record responses in mp thread;
step 4. if alarm timeout, trigger the async action with timeout result;
step 5. if all responses are collected, cancel the alarm, and trigger
the async action with success result.

I don't have strong objection for adding another thread, and actually
this is an internal thing in ipc system, we can optimize it later.

Q2. Do we really need to register actions for result handler of async
request? Instead, we can put it as a parameter into
rte_mp_request_async(), whenever the request fails or succeeds, we just
invoke the function.

Please see some other comments inline.
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).
Wait for 1min seems a little strange to me; it shall wake up for a
latest timeout of pending async requests?
Post by Anatoly Burakov
---
- rebase on top of latest IPC Improvements patchset [2]
- added support for MP_IGN messages introduced in
IPC improvements v5 patchset
- fixed deadlocks and race conditions by not calling
callbacks while iterating over sync request list
- fixed use-after-free by making a copy of request
- changed API to also give user a copy of original
request, so that they know to which message the
callback is a reply to
- fixed missing .map file entries
This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].
rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?
[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/
lib/librte_eal/common/eal_common_proc.c | 563 ++++++++++++++++++++++++++++++--
lib/librte_eal/common/include/rte_eal.h | 72 ++++
lib/librte_eal/rte_eal_version.map | 3 +
3 files changed, 607 insertions(+), 31 deletions(-)
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 4131b67..50d6506 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>
#include "eal_private.h"
#include "eal_filesystem.h"
@@ -39,7 +40,11 @@ static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
struct action_entry {
TAILQ_ENTRY(action_entry) next;
char action_name[RTE_MP_MAX_NAME_LEN];
- rte_mp_t action;
+ RTE_STD_C11
+ union {
+ rte_mp_t action;
+ rte_mp_async_reply_t reply;
+ };
};
/** Double linked list of actions. */
@@ -60,13 +65,37 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};
+enum mp_request_type {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+ struct rte_mp_reply *user_reply;
+ struct timespec *end;
Why we have to make these two as pointers? Operating on pointers
introduce unnecessary complexity.
Post by Anatoly Burakov
+ int n_requests_processed;
It sounds like recording how many requests are sent, but it means how
many responses are received, right? n_responses sounds better?
Post by Anatoly Burakov
+};
+
+struct async_request_param {
+ struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+ pthread_cond_t cond;
+};
+
struct sync_request {
I know "sync_" in the original version was for synchronization between
the blocked thread (who sends the request) and the mp thread.

But it indeed makes the code difficult to understand. We may change the
name to "pending_request"?
Post by Anatoly Burakov
TAILQ_ENTRY(sync_request) next;
- int reply_received;
+ enum mp_request_type type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
- struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ struct rte_mp_msg *reply_msg;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct sync_request_param sync;
+ struct async_request_param async;
+ };
};
Too many structs are defined? How about just putting it like this:

struct pending_request {
TAILQ_ENTRY(sync_request) next;
enum mp_request_type type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
struct rte_mp_msg *reply_msg;
int reply_received;
RTE_STD_C11
union {
/* for sync request */
struct {
pthread_cond_t cond; /* used for mp thread to
wake up requesting thread */
};

/* for async request */
struct {
struct rte_mp_reply user_reply;
struct timespec end;
int n_requests_processed; /* store how requests */
};
};
};
Post by Anatoly Burakov
TAILQ_HEAD(sync_request_list, sync_request);
@@ -74,9 +103,12 @@ TAILQ_HEAD(sync_request_list, sync_request);
static struct {
struct sync_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} sync_requests = {
.requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};
/* forward declarations */
@@ -164,53 +196,97 @@ validate_action_name(const char *name)
return 0;
}
-int __rte_experimental
-rte_mp_action_register(const char *name, rte_mp_t action)
+static struct action_entry *
+action_register(const char *name)
{
struct action_entry *entry;
if (validate_action_name(name))
- return -1;
+ return NULL;
entry = malloc(sizeof(struct action_entry));
if (entry == NULL) {
rte_errno = ENOMEM;
- return -1;
+ return NULL;
}
strcpy(entry->action_name, name);
- entry->action = action;
- pthread_mutex_lock(&mp_mutex_action);
if (find_action_entry_by_name(name) != NULL) {
pthread_mutex_unlock(&mp_mutex_action);
rte_errno = EEXIST;
free(entry);
- return -1;
+ return NULL;
}
TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
- return 0;
+
+ /* async and sync replies are handled by different threads, so even
+ * though they a share pointer in a union, one will never trigger in
+ * place of the other.
+ */
+
+ return entry;
}
-void __rte_experimental
-rte_mp_action_unregister(const char *name)
+static void
+action_unregister(const char *name)
{
struct action_entry *entry;
if (validate_action_name(name))
return;
- pthread_mutex_lock(&mp_mutex_action);
entry = find_action_entry_by_name(name);
if (entry == NULL) {
- pthread_mutex_unlock(&mp_mutex_action);
return;
}
TAILQ_REMOVE(&action_entry_list, entry, next);
- pthread_mutex_unlock(&mp_mutex_action);
free(entry);
}
+int __rte_experimental
+rte_mp_action_register(const char *name, rte_mp_t action)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->action = action;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_action_unregister(const char *name)
+{
+ pthread_mutex_lock(&mp_mutex_action);
+ action_unregister(name);
+ pthread_mutex_unlock(&mp_mutex_action);
+}
+
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply)
+{
+ struct action_entry *entry;
+
+ pthread_mutex_lock(&mp_mutex_action);
+
+ entry = action_register(name);
+ if (entry != NULL)
+ entry->reply = reply;
+ pthread_mutex_unlock(&mp_mutex_action);
+
+ return entry == NULL ? -1 : 0;
+}
+
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name)
+{
+ rte_mp_action_unregister(name);
+}
+
static int
read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
@@ -270,10 +346,14 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_mutex_lock(&sync_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
- memcpy(sync_req->reply, msg, sizeof(*msg));
+ memcpy(sync_req->reply_msg, msg, sizeof(*msg));
/* -1 indicates that we've been asked to ignore */
sync_req->reply_received = m->type == MP_REP ? 1 : -1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(&sync_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&sync_requests.lock);
@@ -320,6 +400,204 @@ mp_handle(void *arg __rte_unused)
}
static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct sync_request *sr, const struct timespec *now)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply_msg) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply_msg;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ }
+ free(sr->reply_msg);
+
+ /* mark this request as processed */
+ param->n_requests_processed++;
+
+ /* if number of sent messages is zero, we're short-circuiting */
+ last_msg = param->n_requests_processed == reply->nb_sent ||
+ reply->nb_sent == 0;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct sync_request *sr)
+{
+ struct async_request_shared_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = param->user_reply;
+
+ pthread_mutex_lock(&mp_mutex_action);
+ struct action_entry *entry =
+ find_action_entry_by_name(sr->request->name);
+ pthread_mutex_unlock(&mp_mutex_action);
+ if (!entry)
+ RTE_LOG(ERR, EAL, "Cannot find async request callback for %s\n",
+ sr->request->name);
+ else
+ entry->reply(sr->request, reply);
+ /* clean up */
+ free(sr->async.param->user_reply->msgs);
+ free(sr->async.param->user_reply);
+ free(sr->async.param->end);
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct sync_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ do {
Put while(1) here so that it's clear to be an infinite loop.
Post by Anatoly Burakov
+ struct sync_request *trigger = NULL;
+ int ret;
+ bool dontwait = false;
+
+ pthread_mutex_lock(&sync_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ pthread_mutex_unlock(&sync_requests.lock);
+ break;
+ }
+
+ /* set a 60 second timeout by default */
+ timeout.tv_nsec = (now.tv_usec * 1000 + 60) % 1000000000;
+ timeout.tv_sec = now.tv_sec + 60 +
+ (now.tv_usec * 1000 + 60) / 1000000000;
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &sync_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (timespec_cmp(sr->async.param->end, &timeout) < 0)
+ memcpy(&timeout, sr->async.param->end,
+ sizeof(timeout));
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ dontwait = true;
+ break;
+ }
+ }
+
+ /* now, wait until we either time out or get woken up */
+ if (!dontwait)
+ ret = pthread_cond_timedwait(&sync_requests.async_cond,
+ &sync_requests.lock, &timeout);
+ else
+ ret = 0;
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct sync_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &sync_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ TAILQ_REMOVE(&sync_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&sync_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ } while (1);
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -382,7 +660,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;
/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -419,7 +697,16 @@ rte_mp_channel_init(void)
return -1;
}
- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -430,7 +717,11 @@ rte_mp_channel_init(void)
/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);
/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -602,18 +893,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}
static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_shared_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct sync_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply_msg = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&sync_requests.requests, sync_req, next);
+ if (exist) {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply->nb_sent++;
+
+ return 0;
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct sync_request sync_req, *exist;
+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
- sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ sync_req.reply_msg = &msg;
+ pthread_cond_init(&sync_req.sync.cond, NULL);
pthread_mutex_lock(&sync_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -637,7 +987,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;
do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&sync_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);
@@ -703,7 +1053,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);
/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -732,7 +1082,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);
- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -744,9 +1094,160 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}
int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct rte_mp_msg *copy;
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ reply = malloc(sizeof(*reply));
+ end = malloc(sizeof(*end));
+ if (copy == NULL || dummy == NULL || param == NULL || reply == NULL ||
+ end == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+ memset(reply, 0, sizeof(*reply));
+ memset(end, 0, sizeof(*end));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_requests_processed = 0;
+ param->end = end;
+ param->user_reply = reply;
+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;
+
+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&sync_requests.lock);
Why do we share this lock for both sync and async requests?
Post by Anatoly Burakov
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply_msg = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ TAILQ_INSERT_TAIL(&sync_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ goto closedir_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&sync_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&sync_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&sync_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+ return ret;
+ closedir(mp_dir);
+ pthread_mutex_unlock(&sync_requests.lock);
+ free(dummy);
+ free(param);
+ free(reply);
+ free(end);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..93ca4cc 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);
/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
Here are two spaces.
Post by Anatoly Burakov
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
*
@@ -273,6 +283,46 @@ rte_mp_action_unregister(const char *name);
*
+ * Register an asynchronous reply callback for primary/secondary communication.
+ *
+ * Call this function to register a callback for asynchronous requests, if the
+ * calling component wants to receive responses to its own asynchronous requests
+ * from the corresponding component in its primary or secondary processes.
+ *
+ * The name argument plays as a unique key to find the action.
+ *
+ * The reply argument is the function pointer to the reply callback.
+ *
+ * - 0 on success.
+ * - (<0) on failure.
+ */
+int __rte_experimental
+rte_mp_async_reply_register(const char *name, rte_mp_async_reply_t reply);
+
+/**
+ *
+ * Unregister an asynchronous reply callback.
+ *
+ * Call this function to unregister a callback if the calling component does
+ * not want responses the messages from the corresponding component in its
+ * primary process or secondary processes.
+ *
+ * The name argument plays as a unique key to find the action.
+ *
+ */
+void __rte_experimental
+rte_mp_async_reply_unregister(const char *name);
+
+/**
+ *
* Send a message to the peer process.
*
* This function will send a message which will be responsed by the action
@@ -321,6 +371,28 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * The req argument contains the customized request message.
+ *
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts);
+
+/**
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..1d88437 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -223,8 +223,11 @@ EXPERIMENTAL {
rte_eal_mbuf_user_pool_ops;
rte_mp_action_register;
rte_mp_action_unregister;
+ rte_mp_async_reply_register;
+ rte_mp_async_reply_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
Burakov, Anatoly
2018-03-23 18:21:01 UTC
Permalink
Post by Tan, Jianfeng
Hi Anatoly,
Two general comments firstly.
Q1. As I understand, malloc usage as an example, when a primary process
receives a request in rte_mp_handle thread, it will allocate memory, and
broadcast an async request to all secondary processes, and it will
return immediately; then responses are replied from each secondary
process, which are recorded at rte_mp_async_handle thread firstly;
either timeout or responses are all collected, rte_mp_async_handle will
trigger an async action.
I agree the necessity of the async request API; without it, each caller
who has similar requirement needs lots of code to implement it.
But can we achieve that without creating another pthread by leveraging
the alarm set? For example, to send an async request,
step 1. set an alarm;
step 2. send the request;
step 3. receive and record responses in mp thread;
step 4. if alarm timeout, trigger the async action with timeout result;
step 5. if all responses are collected, cancel the alarm, and trigger
the async action with success result.
That would work, but this "alarm" sounds like another thread. The main
thread is always blocked in recvmsg(), and interrupting that would
involve a signal, which is a recipe for disaster (you know, global
overwritable signal handlers, undefined behavior as to who actually gets
the signal, stuff like that).

That is, unless you were referring to DPDK's own "alarm" API, in which
case it's a chicken and egg problem - we want to use alarm for IPC, but
can't because rte_malloc relies on IPC, which relies on alarm, which
relies on rte_malloc. So unless we rewrite (or add an option for) alarm
API to not use rte_malloc'd memory, this isn't going to work.

We of course can do it, but really, i would be inclined to leave this as
is in the interests of merging this earlier, unless there are strong
objections to it.
Post by Tan, Jianfeng
I don't have strong objection for adding another thread, and actually
this is an internal thing in ipc system, we can optimize it later.
Q2. Do we really need to register actions for result handler of async
request? Instead, we can put it as a parameter into
rte_mp_request_async(), whenever the request fails or succeeds, we just
invoke the function.
That can be done, sure. This would look cleaner on the backend too, so
i'll probably do that for v5.
Post by Tan, Jianfeng
Please see some other comments inline.
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).
Wait for 1min seems a little strange to me; it shall wake up for a
latest timeout of pending async requests?
We do. However, we have to wait for *something* if there aren't any
asynchronous requests pending. There isn't a way to put "wait infinite
amount" as a time value, so i opted for next best thing - large enough
to not cause any performance issues. The timeout is arbitrary.
Post by Tan, Jianfeng
Post by Anatoly Burakov
  /** Double linked list of actions. */
@@ -60,13 +65,37 @@ struct mp_msg_internal {
      struct rte_mp_msg msg;
  };
+enum mp_request_type {
+    REQUEST_TYPE_SYNC,
+    REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+    struct rte_mp_reply *user_reply;
+    struct timespec *end;
Why we have to make these two as pointers? Operating on pointers
introduce unnecessary complexity.
Because those are shared between different pending requests. Each
pending request gets its own entry in the queue (because it expects
answer from a particular process), but the request data (callback,
number of requests processed, etc.) is shared between all requests for
this sync operation. We don't have the luxury of storing all of that in
a local variable like we do with synchronous requests :)
Post by Tan, Jianfeng
Post by Anatoly Burakov
+    int n_requests_processed;
It sounds like recording how many requests are sent, but it means how
many responses are received, right? n_responses sounds better?
No, it doesn't. Well, perhaps "n_requests_processed" should be
"n_responses_processed", but not "n_responses", because this is a value
that is indicating how many responses we've already seen (to know when
to trigger the callback).
Post by Tan, Jianfeng
Post by Anatoly Burakov
+};
+
+struct async_request_param {
+    struct async_request_shared_param *param;
+};
+
+struct sync_request_param {
+    pthread_cond_t cond;
+};
+
  struct sync_request {
I know "sync_" in the original version was for synchronization between
the blocked thread (who sends the request) and the mp thread.
But it indeed makes the code difficult to understand. We may change the
name to "pending_request"?
This can be done, sure.
Post by Tan, Jianfeng
Post by Anatoly Burakov
      TAILQ_ENTRY(sync_request) next;
-    int reply_received;
+    enum mp_request_type type;
      char dst[PATH_MAX];
      struct rte_mp_msg *request;
-    struct rte_mp_msg *reply;
-    pthread_cond_t cond;
+    struct rte_mp_msg *reply_msg;
+    int reply_received;
+    RTE_STD_C11
+    union {
+        struct sync_request_param sync;
+        struct async_request_param async;
+    };
  };
struct pending_request {
        TAILQ_ENTRY(sync_request) next;
        enum mp_request_type type;
        char dst[PATH_MAX];
        struct rte_mp_msg *request;
        struct rte_mp_msg *reply_msg;
        int reply_received;
        RTE_STD_C11
        union {
                /* for sync request */
                struct {
                        pthread_cond_t cond; /* used for mp thread to
wake up requesting thread */
                };
                /* for async request */
                struct {
                        struct rte_mp_reply user_reply;
                        struct timespec end;
                        int n_requests_processed; /* store how requests */
                };
        };
};
That can work, sure. However, i actually think that my approach is
clearer, because when you're working with autocomplete and a proper IDE,
it's clear which values are for which case (and i would argue it makes
the code more readable as well).
Post by Tan, Jianfeng
Post by Anatoly Burakov
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+    struct sync_request *sr;
+    struct timeval now;
+    struct timespec timeout, ts_now;
+    do {
Put while(1) here so that it's clear to be an infinite loop.
Will do.
Post by Tan, Jianfeng
Post by Anatoly Burakov
+    /* we have to lock the request queue here, as we will be adding a
bunch
+     * of requests to the queue at once, and some of the replies may
arrive
+     * before we add all of the requests to the queue.
+     */
+    pthread_mutex_lock(&sync_requests.lock);
Why do we share this lock for both sync and async requests?
Because sync and async requests share the queue.
Post by Tan, Jianfeng
Post by Anatoly Burakov
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create  socket channel for primary/secondary communication, use
Here are two spaces.
Will fix.
--
Thanks,
Anatoly
Burakov, Anatoly
2018-03-24 13:22:08 UTC
Permalink
A few of my yesterday's replies made no sense... Lesson learned: don't
reply to code review comments on a late friday evening :)
Post by Burakov, Anatoly
We do. However, we have to wait for *something* if there aren't any
asynchronous requests pending. There isn't a way to put "wait infinite
amount" as a time value, so i opted for next best thing - large enough
to not cause any performance issues. The timeout is arbitrary.
Didn't realize we were holding the lock, so we could choose between wait
and timed wait. Fixed in v5.
Post by Burakov, Anatoly
Post by Tan, Jianfeng
Post by Anatoly Burakov
  /** Double linked list of actions. */
@@ -60,13 +65,37 @@ struct mp_msg_internal {
      struct rte_mp_msg msg;
  };
+enum mp_request_type {
+    REQUEST_TYPE_SYNC,
+    REQUEST_TYPE_ASYNC
+};
+
+struct async_request_shared_param {
+    struct rte_mp_reply *user_reply;
+    struct timespec *end;
Why we have to make these two as pointers? Operating on pointers
introduce unnecessary complexity.
Because those are shared between different pending requests. Each
pending request gets its own entry in the queue (because it expects
answer from a particular process), but the request data (callback,
number of requests processed, etc.) is shared between all requests for
this sync operation. We don't have the luxury of storing all of that in
a local variable like we do with synchronous requests :)
Missed the fact that you weren't referring to the need of storing these
in shared_param but rather to the fact that i was storing
malloc-allocated values that are shared, as pointers in shared param
structure, when i could just as easily store actual structs there. Fixed
in v5.
Post by Burakov, Anatoly
Post by Tan, Jianfeng
struct pending_request {
         TAILQ_ENTRY(sync_request) next;
         enum mp_request_type type;
         char dst[PATH_MAX];
         struct rte_mp_msg *request;
         struct rte_mp_msg *reply_msg;
         int reply_received;
         RTE_STD_C11
         union {
                 /* for sync request */
                 struct {
                         pthread_cond_t cond; /* used for mp thread to
wake up requesting thread */
                 };
                 /* for async request */
                 struct {
                         struct rte_mp_reply user_reply;
                         struct timespec end;
                         int n_requests_processed; /* store how
requests */
                 };
         };
};
That can work, sure. However, i actually think that my approach is
clearer, because when you're working with autocomplete and a proper IDE,
it's clear which values are for which case (and i would argue it makes
the code more readable as well).
Again, didn't realize that you were referring to defining all of the
structs and values outside of pending request, rather than storing them
as anonymous structs (which would indeed cause problems with
autocomplete and readability). Fixed in v5.

Thanks again for your review!
--
Thanks,
Anatoly
Anatoly Burakov
2018-03-24 12:46:19 UTC
Permalink
Signed-off-by: Anatoly Burakov <***@intel.com>
Suggested-by: Jianfeng Tan <***@intel.com>
---
lib/librte_eal/common/eal_common_proc.c | 38 ++++++++++++++++-----------------
1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 4131b67..52b6ab2 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -60,8 +60,8 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

-struct sync_request {
- TAILQ_ENTRY(sync_request) next;
+struct pending_request {
+ TAILQ_ENTRY(pending_request) next;
int reply_received;
char dst[PATH_MAX];
struct rte_mp_msg *request;
@@ -69,13 +69,13 @@ struct sync_request {
pthread_cond_t cond;
};

-TAILQ_HEAD(sync_request_list, sync_request);
+TAILQ_HEAD(pending_request_list, pending_request);

static struct {
- struct sync_request_list requests;
+ struct pending_request_list requests;
pthread_mutex_t lock;
-} sync_requests = {
- .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
+} pending_requests = {
+ .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
.lock = PTHREAD_MUTEX_INITIALIZER
};

@@ -84,12 +84,12 @@ static int
mp_send(struct rte_mp_msg *msg, const char *peer, int type);


-static struct sync_request *
+static struct pending_request *
find_sync_request(const char *dst, const char *act_name)
{
- struct sync_request *r;
+ struct pending_request *r;

- TAILQ_FOREACH(r, &sync_requests.requests, next) {
+ TAILQ_FOREACH(r, &pending_requests.requests, next) {
if (!strcmp(r->dst, dst) &&
!strcmp(r->request->name, act_name))
break;
@@ -259,7 +259,7 @@ read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
static void
process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
- struct sync_request *sync_req;
+ struct pending_request *sync_req;
struct action_entry *entry;
struct rte_mp_msg *msg = &m->msg;
rte_mp_t action = NULL;
@@ -267,7 +267,7 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);

if (m->type == MP_REP || m->type == MP_IGN) {
- pthread_mutex_lock(&sync_requests.lock);
+ pthread_mutex_lock(&pending_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
memcpy(sync_req->reply, msg, sizeof(*msg));
@@ -276,7 +276,7 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_cond_signal(&sync_req->cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
- pthread_mutex_unlock(&sync_requests.lock);
+ pthread_mutex_unlock(&pending_requests.lock);
return;
}

@@ -607,7 +607,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
{
int ret;
struct rte_mp_msg msg, *tmp;
- struct sync_request sync_req, *exist;
+ struct pending_request sync_req, *exist;

sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
@@ -615,14 +615,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
sync_req.reply = &msg;
pthread_cond_init(&sync_req.cond, NULL);

- pthread_mutex_lock(&sync_requests.lock);
+ pthread_mutex_lock(&pending_requests.lock);
exist = find_sync_request(dst, req->name);
if (!exist)
- TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next);
+ TAILQ_INSERT_TAIL(&pending_requests.requests, &sync_req, next);
if (exist) {
RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
rte_errno = EEXIST;
- pthread_mutex_unlock(&sync_requests.lock);
+ pthread_mutex_unlock(&pending_requests.lock);
return -1;
}

@@ -638,12 +638,12 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,

do {
ret = pthread_cond_timedwait(&sync_req.cond,
- &sync_requests.lock, ts);
+ &pending_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

/* We got the lock now */
- TAILQ_REMOVE(&sync_requests.requests, &sync_req, next);
- pthread_mutex_unlock(&sync_requests.lock);
+ TAILQ_REMOVE(&pending_requests.requests, &sync_req, next);
+ pthread_mutex_unlock(&pending_requests.lock);

if (sync_req.reply_received == 0) {
RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
--
2.7.4
Anatoly Burakov
2018-03-24 12:46:20 UTC
Permalink
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller
(callback specified at the time of request, rather than registering
for it in advance).

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.

Signed-off-by: Anatoly Burakov <***@intel.com>
---

Notes:
v5:
- addressed review comments from Jianfeng
- split into two patches to avoid rename noise
- do not mark ignored message as processed
v4:
- rebase on top of latest IPC Improvements patchset [2]

v3:
- added support for MP_IGN messages introduced in
IPC improvements v5 patchset
v2:
- fixed deadlocks and race conditions by not calling
callbacks while iterating over sync request list
- fixed use-after-free by making a copy of request
- changed API to also give user a copy of original
request, so that they know to which message the
callback is a reply to
- fixed missing .map file entries

This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].

rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?

[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/

lib/librte_eal/common/eal_common_proc.c | 455 +++++++++++++++++++++++++++++++-
lib/librte_eal/common/include/rte_eal.h | 36 +++
lib/librte_eal/rte_eal_version.map | 1 +
3 files changed, 479 insertions(+), 13 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 52b6ab2..c86252c 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>

#include "eal_private.h"
#include "eal_filesystem.h"
@@ -60,13 +61,32 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

+struct async_request_param {
+ rte_mp_async_reply_t clb;
+ struct rte_mp_reply user_reply;
+ struct timespec end;
+ int n_responses_processed;
+};
+
struct pending_request {
TAILQ_ENTRY(pending_request) next;
- int reply_received;
+ enum {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+ } type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct {
+ struct async_request_param *param;
+ } async;
+ struct {
+ pthread_cond_t cond;
+ } sync;
+ };
};

TAILQ_HEAD(pending_request_list, pending_request);
@@ -74,9 +94,12 @@ TAILQ_HEAD(pending_request_list, pending_request);
static struct {
struct pending_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} pending_requests = {
.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};

/* forward declarations */
@@ -273,7 +296,12 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
memcpy(sync_req->reply, msg, sizeof(*msg));
/* -1 indicates that we've been asked to ignore */
sync_req->reply_received = m->type == MP_REP ? 1 : -1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(
+ &pending_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&pending_requests.lock);
@@ -320,6 +348,189 @@ mp_handle(void *arg __rte_unused)
}

static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct pending_request *sr, const struct timespec *now)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = &param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(&param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+
+ /* mark this request as processed */
+ param->n_responses_processed++;
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ }
+ free(sr->reply);
+
+ last_msg = param->n_responses_processed == reply->nb_sent;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct pending_request *sr)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = &param->user_reply;
+
+ param->clb(sr->request, reply);
+
+ /* clean up */
+ free(sr->async.param->user_reply.msgs);
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct pending_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ while (1) {
+ struct pending_request *trigger = NULL;
+ int ret;
+ bool nowait = false;
+ bool timedwait = false;
+
+ pthread_mutex_lock(&pending_requests.lock);
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &pending_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (!timedwait || timespec_cmp(&sr->async.param->end,
+ &timeout) < 0) {
+ memcpy(&timeout, &sr->async.param->end,
+ sizeof(timeout));
+ timedwait = true;
+ }
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ nowait = true;
+ break;
+ }
+ }
+
+ if (nowait)
+ ret = 0;
+ else if (timedwait)
+ ret = pthread_cond_timedwait(
+ &pending_requests.async_cond,
+ &pending_requests.lock, &timeout);
+ else
+ ret = pthread_cond_wait(&pending_requests.async_cond,
+ &pending_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct pending_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &pending_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&pending_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ TAILQ_REMOVE(&pending_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&pending_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ };
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -382,7 +593,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;

/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -419,7 +630,16 @@ rte_mp_channel_init(void)
return -1;
}

- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -430,7 +650,11 @@ rte_mp_channel_init(void)

/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);

/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -602,18 +826,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}

static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct pending_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next);
+ if (exist) {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply.nb_sent++;
+
+ return 0;
+fail:
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct pending_request sync_req, *exist;

+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ pthread_cond_init(&sync_req.sync.cond, NULL);

pthread_mutex_lock(&pending_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -637,7 +920,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;

do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&pending_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

@@ -703,7 +986,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);

/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -732,7 +1015,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);

- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -744,9 +1027,155 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}

int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+ rte_mp_async_reply_t clb)
{
+ struct rte_mp_msg *copy;
+ struct pending_request *dummy;
+ struct async_request_param *param = NULL;
+ struct rte_mp_reply *reply;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ if (copy == NULL || dummy == NULL || param == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_responses_processed = 0;
+ param->clb = clb;
+ end = &param->end;
+ reply = &param->user_reply;
+
+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;

+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&pending_requests.lock);
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&pending_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ goto closedir_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&pending_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&pending_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+ return ret;
+closedir_fail:
+ closedir(mp_dir);
+unlock_fail:
+ pthread_mutex_unlock(&pending_requests.lock);
+fail:
+ free(dummy);
+ free(param);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);

if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..87ebfd0 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);

/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
@@ -321,6 +331,32 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ * The req argument contains the customized request message.
+ *
+ * @param ts
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @param clb
+ * The callback to trigger when all responses for this request have arrived.
+ *
+ * @return
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+ rte_mp_async_reply_t clb);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..328a0be 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -225,6 +225,7 @@ EXPERIMENTAL {
rte_mp_action_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
--
2.7.4
Tan, Jianfeng
2018-03-26 14:15:24 UTC
Permalink
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller
(callback specified at the time of request, rather than registering
for it in advance).
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
Generally, it looks great to me except some trivial nits, so
Post by Anatoly Burakov
---
- addressed review comments from Jianfeng
- split into two patches to avoid rename noise
- do not mark ignored message as processed
- rebase on top of latest IPC Improvements patchset [2]
- added support for MP_IGN messages introduced in
IPC improvements v5 patchset
- fixed deadlocks and race conditions by not calling
callbacks while iterating over sync request list
- fixed use-after-free by making a copy of request
- changed API to also give user a copy of original
request, so that they know to which message the
callback is a reply to
- fixed missing .map file entries
This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].
rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?
[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/
lib/librte_eal/common/eal_common_proc.c | 455 +++++++++++++++++++++++++++++++-
lib/librte_eal/common/include/rte_eal.h | 36 +++
lib/librte_eal/rte_eal_version.map | 1 +
3 files changed, 479 insertions(+), 13 deletions(-)
diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 52b6ab2..c86252c 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>
#include "eal_private.h"
#include "eal_filesystem.h"
@@ -60,13 +61,32 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};
+struct async_request_param {
+ rte_mp_async_reply_t clb;
+ struct rte_mp_reply user_reply;
+ struct timespec end;
+ int n_responses_processed;
+};
+
struct pending_request {
TAILQ_ENTRY(pending_request) next;
- int reply_received;
+ enum {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+ } type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct {
+ struct async_request_param *param;
+ } async;
+ struct {
+ pthread_cond_t cond;
+ } sync;
+ };
};
TAILQ_HEAD(pending_request_list, pending_request);
@@ -74,9 +94,12 @@ TAILQ_HEAD(pending_request_list, pending_request);
static struct {
struct pending_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} pending_requests = {
.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};
/* forward declarations */
@@ -273,7 +296,12 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
memcpy(sync_req->reply, msg, sizeof(*msg));
/* -1 indicates that we've been asked to ignore */
sync_req->reply_received = m->type == MP_REP ? 1 : -1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(
+ &pending_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&pending_requests.lock);
@@ -320,6 +348,189 @@ mp_handle(void *arg __rte_unused)
}
static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct pending_request *sr, const struct timespec *now)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = &param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(&param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+
+ /* mark this request as processed */
+ param->n_responses_processed++;
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ }
+ free(sr->reply);
+
+ last_msg = param->n_responses_processed == reply->nb_sent;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct pending_request *sr)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = &param->user_reply;
+
+ param->clb(sr->request, reply);
+
+ /* clean up */
+ free(sr->async.param->user_reply.msgs);
How about simple "free(reply->msgs);"?
Post by Anatoly Burakov
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct pending_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ while (1) {
+ struct pending_request *trigger = NULL;
+ int ret;
+ bool nowait = false;
+ bool timedwait = false;
+
+ pthread_mutex_lock(&pending_requests.lock);
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &pending_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (!timedwait || timespec_cmp(&sr->async.param->end,
+ &timeout) < 0) {
+ memcpy(&timeout, &sr->async.param->end,
+ sizeof(timeout));
+ timedwait = true;
+ }
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ nowait = true;
+ break;
+ }
+ }
+
+ if (nowait)
+ ret = 0;
+ else if (timedwait)
+ ret = pthread_cond_timedwait(
+ &pending_requests.async_cond,
+ &pending_requests.lock, &timeout);
+ else
+ ret = pthread_cond_wait(&pending_requests.async_cond,
+ &pending_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct pending_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &pending_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&pending_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ TAILQ_REMOVE(&pending_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&pending_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ };
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -382,7 +593,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;
/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -419,7 +630,16 @@ rte_mp_channel_init(void)
return -1;
}
- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -430,7 +650,11 @@ rte_mp_channel_init(void)
/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);
/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -602,18 +826,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}
static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct pending_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist)
+ TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next);
+ if (exist) {
else?
Post by Anatoly Burakov
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply.nb_sent++;
+
+ return 0;
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct pending_request sync_req, *exist;
+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ pthread_cond_init(&sync_req.sync.cond, NULL);
pthread_mutex_lock(&pending_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -637,7 +920,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;
do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&pending_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);
@@ -703,7 +986,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);
/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -732,7 +1015,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);
- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -744,9 +1027,155 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}
int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+ rte_mp_async_reply_t clb)
{
+ struct rte_mp_msg *copy;
+ struct pending_request *dummy;
+ struct async_request_param *param = NULL;
No need to assign it to NULL.
Post by Anatoly Burakov
+ struct rte_mp_reply *reply;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ if (copy == NULL || dummy == NULL || param == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_responses_processed = 0;
+ param->clb = clb;
+ end = &param->end;
+ reply = &param->user_reply;
+
+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;
+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&pending_requests.lock);
+
+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. put it
+ * on the queue and fill it. we will remove it once we know we sent
+ * something.
+ */
Or we can add this dummy at last if it's necessary, instead of adding
firstly and remove if not necessary? No strong option here.
Post by Anatoly Burakov
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ TAILQ_INSERT_TAIL(&pending_requests.requests, dummy, next);
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&pending_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ goto closedir_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we sent something, remove dummy request from the queue */
+ if (reply->nb_sent != 0) {
+ TAILQ_REMOVE(&pending_requests.requests, dummy, next);
+ free(dummy);
+ dummy = NULL;
+ }
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&pending_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+ return ret;
+ closedir(mp_dir);
+ pthread_mutex_unlock(&pending_requests.lock);
+ free(dummy);
+ free(param);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);
if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..87ebfd0 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);
/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
*
@@ -321,6 +331,32 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * The req argument contains the customized request message.
+ *
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * The callback to trigger when all responses for this request have arrived.
+ *
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+ rte_mp_async_reply_t clb);
+
+/**
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..328a0be 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -225,6 +225,7 @@ EXPERIMENTAL {
rte_mp_action_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
Burakov, Anatoly
2018-03-26 14:28:23 UTC
Permalink
Post by Tan, Jianfeng
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller
(callback specified at the time of request, rather than registering
for it in advance).
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
Generally, it looks great to me except some trivial nits, so
Thanks!
Post by Tan, Jianfeng
Post by Anatoly Burakov
+static void
+trigger_async_action(struct pending_request *sr)
+{
+    struct async_request_param *param;
+    struct rte_mp_reply *reply;
+
+    param = sr->async.param;
+    reply = &param->user_reply;
+
+    param->clb(sr->request, reply);
+
+    /* clean up */
+    free(sr->async.param->user_reply.msgs);
How about simple "free(reply->msgs);"?
I would prefer leaving it as is, as it makes it clear that i'm freeing
everything to do with sync request.
Post by Tan, Jianfeng
Post by Anatoly Burakov
+
+    sync_req->type = REQUEST_TYPE_ASYNC;
+    strcpy(sync_req->dst, dst);
+    sync_req->request = req;
+    sync_req->reply = reply_msg;
+    sync_req->async.param = param;
+
+    /* queue already locked by caller */
+
+    exist = find_sync_request(dst, req->name);
+    if (!exist)
+        TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next);
+    if (exist) {
else?
Will fix in v6
Post by Tan, Jianfeng
Post by Anatoly Burakov
@@ -744,9 +1027,155 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
  }
  int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+        rte_mp_async_reply_t clb)
  {
+    struct rte_mp_msg *copy;
+    struct pending_request *dummy;
+    struct async_request_param *param = NULL;
No need to assign it to NULL.
Will fix in v6.
Post by Tan, Jianfeng
Post by Anatoly Burakov
+    /* we have to lock the request queue here, as we will be adding a
bunch
+     * of requests to the queue at once, and some of the replies may
arrive
+     * before we add all of the requests to the queue.
+     */
+    pthread_mutex_lock(&pending_requests.lock);
+
+    /* we have to ensure that callback gets triggered even if we
don't send
+     * anything, therefore earlier we have allocated a dummy request.
put it
+     * on the queue and fill it. we will remove it once we know we sent
+     * something.
+     */
Or we can add this dummy at last if it's necessary, instead of adding
firstly and remove if not necessary? No strong option here.
Yep, sure, will fix in v6.
--
Thanks,
Anatoly
Tan, Jianfeng
2018-03-26 07:31:38 UTC
Permalink
Acked-by: Jianfeng Tan <***@intel.com>
Anatoly Burakov
2018-03-27 13:59:43 UTC
Permalink
Signed-off-by: Anatoly Burakov <***@intel.com>
Suggested-by: Jianfeng Tan <***@intel.com>
Acked-by: Jianfeng Tan <***@intel.com>
---
lib/librte_eal/common/eal_common_proc.c | 38 ++++++++++++++++-----------------
1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 4131b67..52b6ab2 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -60,8 +60,8 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

-struct sync_request {
- TAILQ_ENTRY(sync_request) next;
+struct pending_request {
+ TAILQ_ENTRY(pending_request) next;
int reply_received;
char dst[PATH_MAX];
struct rte_mp_msg *request;
@@ -69,13 +69,13 @@ struct sync_request {
pthread_cond_t cond;
};

-TAILQ_HEAD(sync_request_list, sync_request);
+TAILQ_HEAD(pending_request_list, pending_request);

static struct {
- struct sync_request_list requests;
+ struct pending_request_list requests;
pthread_mutex_t lock;
-} sync_requests = {
- .requests = TAILQ_HEAD_INITIALIZER(sync_requests.requests),
+} pending_requests = {
+ .requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
.lock = PTHREAD_MUTEX_INITIALIZER
};

@@ -84,12 +84,12 @@ static int
mp_send(struct rte_mp_msg *msg, const char *peer, int type);


-static struct sync_request *
+static struct pending_request *
find_sync_request(const char *dst, const char *act_name)
{
- struct sync_request *r;
+ struct pending_request *r;

- TAILQ_FOREACH(r, &sync_requests.requests, next) {
+ TAILQ_FOREACH(r, &pending_requests.requests, next) {
if (!strcmp(r->dst, dst) &&
!strcmp(r->request->name, act_name))
break;
@@ -259,7 +259,7 @@ read_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
static void
process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
{
- struct sync_request *sync_req;
+ struct pending_request *sync_req;
struct action_entry *entry;
struct rte_mp_msg *msg = &m->msg;
rte_mp_t action = NULL;
@@ -267,7 +267,7 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
RTE_LOG(DEBUG, EAL, "msg: %s\n", msg->name);

if (m->type == MP_REP || m->type == MP_IGN) {
- pthread_mutex_lock(&sync_requests.lock);
+ pthread_mutex_lock(&pending_requests.lock);
sync_req = find_sync_request(s->sun_path, msg->name);
if (sync_req) {
memcpy(sync_req->reply, msg, sizeof(*msg));
@@ -276,7 +276,7 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
pthread_cond_signal(&sync_req->cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
- pthread_mutex_unlock(&sync_requests.lock);
+ pthread_mutex_unlock(&pending_requests.lock);
return;
}

@@ -607,7 +607,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
{
int ret;
struct rte_mp_msg msg, *tmp;
- struct sync_request sync_req, *exist;
+ struct pending_request sync_req, *exist;

sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
@@ -615,14 +615,14 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
sync_req.reply = &msg;
pthread_cond_init(&sync_req.cond, NULL);

- pthread_mutex_lock(&sync_requests.lock);
+ pthread_mutex_lock(&pending_requests.lock);
exist = find_sync_request(dst, req->name);
if (!exist)
- TAILQ_INSERT_TAIL(&sync_requests.requests, &sync_req, next);
+ TAILQ_INSERT_TAIL(&pending_requests.requests, &sync_req, next);
if (exist) {
RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
rte_errno = EEXIST;
- pthread_mutex_unlock(&sync_requests.lock);
+ pthread_mutex_unlock(&pending_requests.lock);
return -1;
}

@@ -638,12 +638,12 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,

do {
ret = pthread_cond_timedwait(&sync_req.cond,
- &sync_requests.lock, ts);
+ &pending_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

/* We got the lock now */
- TAILQ_REMOVE(&sync_requests.requests, &sync_req, next);
- pthread_mutex_unlock(&sync_requests.lock);
+ TAILQ_REMOVE(&pending_requests.requests, &sync_req, next);
+ pthread_mutex_unlock(&pending_requests.lock);

if (sync_req.reply_received == 0) {
RTE_LOG(ERR, EAL, "Fail to recv reply for request %s:%s\n",
--
2.7.4
Anatoly Burakov
2018-03-27 13:59:44 UTC
Permalink
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller
(callback specified at the time of request, rather than registering
for it in advance).

Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.

Signed-off-by: Anatoly Burakov <***@intel.com>
Acked-by: Jianfeng Tan <***@intel.com>
---

Notes:
v6:
- address review comments from Jianfeng
- do not add dummy item to queue by default

v5:
- addressed review comments from Jianfeng
- split into two patches to avoid rename noise
- do not mark ignored message as processed
v4:
- rebase on top of latest IPC Improvements patchset [2]

v3:
- added support for MP_IGN messages introduced in
IPC improvements v5 patchset
v2:
- fixed deadlocks and race conditions by not calling
callbacks while iterating over sync request list
- fixed use-after-free by making a copy of request
- changed API to also give user a copy of original
request, so that they know to which message the
callback is a reply to
- fixed missing .map file entries

This patch is dependent upon previously published patchsets
for IPC fixes [1] and improvements [2].

rte_mp_action_unregister and rte_mp_async_reply_unregister
do the same thing - should we perhaps make it one function?

[1] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Fixes/
[2] http://dpdk.org/dev/patchwork/bundle/aburakov/IPC_Improvements/

lib/librte_eal/common/eal_common_proc.c | 458 +++++++++++++++++++++++++++++++-
lib/librte_eal/common/include/rte_eal.h | 36 +++
lib/librte_eal/rte_eal_version.map | 1 +
3 files changed, 482 insertions(+), 13 deletions(-)

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index 52b6ab2..139b653 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -26,6 +26,7 @@
#include <rte_errno.h>
#include <rte_lcore.h>
#include <rte_log.h>
+#include <rte_tailq.h>

#include "eal_private.h"
#include "eal_filesystem.h"
@@ -60,13 +61,32 @@ struct mp_msg_internal {
struct rte_mp_msg msg;
};

+struct async_request_param {
+ rte_mp_async_reply_t clb;
+ struct rte_mp_reply user_reply;
+ struct timespec end;
+ int n_responses_processed;
+};
+
struct pending_request {
TAILQ_ENTRY(pending_request) next;
- int reply_received;
+ enum {
+ REQUEST_TYPE_SYNC,
+ REQUEST_TYPE_ASYNC
+ } type;
char dst[PATH_MAX];
struct rte_mp_msg *request;
struct rte_mp_msg *reply;
- pthread_cond_t cond;
+ int reply_received;
+ RTE_STD_C11
+ union {
+ struct {
+ struct async_request_param *param;
+ } async;
+ struct {
+ pthread_cond_t cond;
+ } sync;
+ };
};

TAILQ_HEAD(pending_request_list, pending_request);
@@ -74,9 +94,12 @@ TAILQ_HEAD(pending_request_list, pending_request);
static struct {
struct pending_request_list requests;
pthread_mutex_t lock;
+ pthread_cond_t async_cond;
} pending_requests = {
.requests = TAILQ_HEAD_INITIALIZER(pending_requests.requests),
- .lock = PTHREAD_MUTEX_INITIALIZER
+ .lock = PTHREAD_MUTEX_INITIALIZER,
+ .async_cond = PTHREAD_COND_INITIALIZER
+ /**< used in async requests only */
};

/* forward declarations */
@@ -273,7 +296,12 @@ process_msg(struct mp_msg_internal *m, struct sockaddr_un *s)
memcpy(sync_req->reply, msg, sizeof(*msg));
/* -1 indicates that we've been asked to ignore */
sync_req->reply_received = m->type == MP_REP ? 1 : -1;
- pthread_cond_signal(&sync_req->cond);
+
+ if (sync_req->type == REQUEST_TYPE_SYNC)
+ pthread_cond_signal(&sync_req->sync.cond);
+ else if (sync_req->type == REQUEST_TYPE_ASYNC)
+ pthread_cond_signal(
+ &pending_requests.async_cond);
} else
RTE_LOG(ERR, EAL, "Drop mp reply: %s\n", msg->name);
pthread_mutex_unlock(&pending_requests.lock);
@@ -320,6 +348,189 @@ mp_handle(void *arg __rte_unused)
}

static int
+timespec_cmp(const struct timespec *a, const struct timespec *b)
+{
+ if (a->tv_sec < b->tv_sec)
+ return -1;
+ if (a->tv_sec > b->tv_sec)
+ return 1;
+ if (a->tv_nsec < b->tv_nsec)
+ return -1;
+ if (a->tv_nsec > b->tv_nsec)
+ return 1;
+ return 0;
+}
+
+enum async_action {
+ ACTION_NONE, /**< don't do anything */
+ ACTION_FREE, /**< free the action entry, but don't trigger callback */
+ ACTION_TRIGGER /**< trigger callback, then free action entry */
+};
+
+static enum async_action
+process_async_request(struct pending_request *sr, const struct timespec *now)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+ bool timeout, received, last_msg;
+
+ param = sr->async.param;
+ reply = &param->user_reply;
+
+ /* did we timeout? */
+ timeout = timespec_cmp(&param->end, now) <= 0;
+
+ /* did we receive a response? */
+ received = sr->reply_received != 0;
+
+ /* if we didn't time out, and we didn't receive a response, ignore */
+ if (!timeout && !received)
+ return ACTION_NONE;
+
+ /* if we received a response, adjust relevant data and copy mesasge. */
+ if (sr->reply_received == 1 && sr->reply) {
+ struct rte_mp_msg *msg, *user_msgs, *tmp;
+
+ msg = sr->reply;
+ user_msgs = reply->msgs;
+
+ tmp = realloc(user_msgs, sizeof(*msg) *
+ (reply->nb_received + 1));
+ if (!tmp) {
+ RTE_LOG(ERR, EAL, "Fail to alloc reply for request %s:%s\n",
+ sr->dst, sr->request->name);
+ /* this entry is going to be removed and its message
+ * dropped, but we don't want to leak memory, so
+ * continue.
+ */
+ } else {
+ user_msgs = tmp;
+ reply->msgs = user_msgs;
+ memcpy(&user_msgs[reply->nb_received],
+ msg, sizeof(*msg));
+ reply->nb_received++;
+ }
+
+ /* mark this request as processed */
+ param->n_responses_processed++;
+ } else if (sr->reply_received == -1) {
+ /* we were asked to ignore this process */
+ reply->nb_sent--;
+ }
+ free(sr->reply);
+
+ last_msg = param->n_responses_processed == reply->nb_sent;
+
+ return last_msg ? ACTION_TRIGGER : ACTION_FREE;
+}
+
+static void
+trigger_async_action(struct pending_request *sr)
+{
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+
+ param = sr->async.param;
+ reply = &param->user_reply;
+
+ param->clb(sr->request, reply);
+
+ /* clean up */
+ free(sr->async.param->user_reply.msgs);
+ free(sr->async.param);
+ free(sr->request);
+}
+
+static void *
+async_reply_handle(void *arg __rte_unused)
+{
+ struct pending_request *sr;
+ struct timeval now;
+ struct timespec timeout, ts_now;
+ while (1) {
+ struct pending_request *trigger = NULL;
+ int ret;
+ bool nowait = false;
+ bool timedwait = false;
+
+ pthread_mutex_lock(&pending_requests.lock);
+
+ /* scan through the list and see if there are any timeouts that
+ * are earlier than our current timeout.
+ */
+ TAILQ_FOREACH(sr, &pending_requests.requests, next) {
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+ if (!timedwait || timespec_cmp(&sr->async.param->end,
+ &timeout) < 0) {
+ memcpy(&timeout, &sr->async.param->end,
+ sizeof(timeout));
+ timedwait = true;
+ }
+
+ /* sometimes, we don't even wait */
+ if (sr->reply_received) {
+ nowait = true;
+ break;
+ }
+ }
+
+ if (nowait)
+ ret = 0;
+ else if (timedwait)
+ ret = pthread_cond_timedwait(
+ &pending_requests.async_cond,
+ &pending_requests.lock, &timeout);
+ else
+ ret = pthread_cond_wait(&pending_requests.async_cond,
+ &pending_requests.lock);
+
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Cannot get current time\n");
+ break;
+ }
+ ts_now.tv_nsec = now.tv_usec * 1000;
+ ts_now.tv_sec = now.tv_sec;
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ struct pending_request *next;
+ /* we've either been woken up, or we timed out */
+
+ /* we have still the lock, check if anything needs
+ * processing.
+ */
+ TAILQ_FOREACH_SAFE(sr, &pending_requests.requests, next,
+ next) {
+ enum async_action action;
+ if (sr->type != REQUEST_TYPE_ASYNC)
+ continue;
+
+ action = process_async_request(sr, &ts_now);
+ if (action == ACTION_FREE) {
+ TAILQ_REMOVE(&pending_requests.requests,
+ sr, next);
+ free(sr);
+ } else if (action == ACTION_TRIGGER &&
+ trigger == NULL) {
+ TAILQ_REMOVE(&pending_requests.requests,
+ sr, next);
+ trigger = sr;
+ }
+ }
+ }
+ pthread_mutex_unlock(&pending_requests.lock);
+ if (trigger) {
+ trigger_async_action(trigger);
+ free(trigger);
+ }
+ };
+
+ RTE_LOG(ERR, EAL, "ERROR: asynchronous requests disabled\n");
+
+ return NULL;
+}
+
+static int
open_socket_fd(void)
{
char peer_name[PATH_MAX] = {0};
@@ -382,7 +593,7 @@ rte_mp_channel_init(void)
char thread_name[RTE_MAX_THREAD_NAME_LEN];
char path[PATH_MAX];
int dir_fd;
- pthread_t tid;
+ pthread_t mp_handle_tid, async_reply_handle_tid;

/* create filter path */
create_socket_path("*", path, sizeof(path));
@@ -419,7 +630,16 @@ rte_mp_channel_init(void)
return -1;
}

- if (pthread_create(&tid, NULL, mp_handle, NULL) < 0) {
+ if (pthread_create(&mp_handle_tid, NULL, mp_handle, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
+ strerror(errno));
+ close(mp_fd);
+ mp_fd = -1;
+ return -1;
+ }
+
+ if (pthread_create(&async_reply_handle_tid, NULL,
+ async_reply_handle, NULL) < 0) {
RTE_LOG(ERR, EAL, "failed to create mp thead: %s\n",
strerror(errno));
close(mp_fd);
@@ -430,7 +650,11 @@ rte_mp_channel_init(void)

/* try best to set thread name */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_handle");
- rte_thread_setname(tid, thread_name);
+ rte_thread_setname(mp_handle_tid, thread_name);
+
+ /* try best to set thread name */
+ snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "rte_mp_async_handle");
+ rte_thread_setname(async_reply_handle_tid, thread_name);

/* unlock the directory */
flock(dir_fd, LOCK_UN);
@@ -602,18 +826,77 @@ rte_mp_sendmsg(struct rte_mp_msg *msg)
}

static int
-mp_request_one(const char *dst, struct rte_mp_msg *req,
+mp_request_async(const char *dst, struct rte_mp_msg *req,
+ struct async_request_param *param)
+{
+ struct rte_mp_msg *reply_msg;
+ struct pending_request *sync_req, *exist;
+ int ret;
+
+ sync_req = malloc(sizeof(*sync_req));
+ reply_msg = malloc(sizeof(*reply_msg));
+ if (sync_req == NULL || reply_msg == NULL) {
+ RTE_LOG(ERR, EAL, "Could not allocate space for sync request\n");
+ rte_errno = ENOMEM;
+ ret = -1;
+ goto fail;
+ }
+
+ memset(sync_req, 0, sizeof(*sync_req));
+ memset(reply_msg, 0, sizeof(*reply_msg));
+
+ sync_req->type = REQUEST_TYPE_ASYNC;
+ strcpy(sync_req->dst, dst);
+ sync_req->request = req;
+ sync_req->reply = reply_msg;
+ sync_req->async.param = param;
+
+ /* queue already locked by caller */
+
+ exist = find_sync_request(dst, req->name);
+ if (!exist) {
+ TAILQ_INSERT_TAIL(&pending_requests.requests, sync_req, next);
+ } else {
+ RTE_LOG(ERR, EAL, "A pending request %s:%s\n", dst, req->name);
+ rte_errno = EEXIST;
+ ret = -1;
+ goto fail;
+ }
+
+ ret = send_msg(dst, req, MP_REQ);
+ if (ret < 0) {
+ RTE_LOG(ERR, EAL, "Fail to send request %s:%s\n",
+ dst, req->name);
+ ret = -1;
+ goto fail;
+ } else if (ret == 0) {
+ ret = 0;
+ goto fail;
+ }
+
+ param->user_reply.nb_sent++;
+
+ return 0;
+fail:
+ free(sync_req);
+ free(reply_msg);
+ return ret;
+}
+
+static int
+mp_request_sync(const char *dst, struct rte_mp_msg *req,
struct rte_mp_reply *reply, const struct timespec *ts)
{
int ret;
struct rte_mp_msg msg, *tmp;
struct pending_request sync_req, *exist;

+ sync_req.type = REQUEST_TYPE_SYNC;
sync_req.reply_received = 0;
strcpy(sync_req.dst, dst);
sync_req.request = req;
sync_req.reply = &msg;
- pthread_cond_init(&sync_req.cond, NULL);
+ pthread_cond_init(&sync_req.sync.cond, NULL);

pthread_mutex_lock(&pending_requests.lock);
exist = find_sync_request(dst, req->name);
@@ -637,7 +920,7 @@ mp_request_one(const char *dst, struct rte_mp_msg *req,
reply->nb_sent++;

do {
- ret = pthread_cond_timedwait(&sync_req.cond,
+ ret = pthread_cond_timedwait(&sync_req.sync.cond,
&pending_requests.lock, ts);
} while (ret != 0 && ret != ETIMEDOUT);

@@ -703,7 +986,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

/* for secondary process, send request to the primary process only */
if (rte_eal_process_type() == RTE_PROC_SECONDARY)
- return mp_request_one(eal_mp_socket_path(), req, reply, &end);
+ return mp_request_sync(eal_mp_socket_path(), req, reply, &end);

/* for primary process, broadcast request, and collect reply 1 by 1 */
mp_dir = opendir(mp_dir_path);
@@ -732,7 +1015,7 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
ent->d_name);

- if (mp_request_one(path, req, reply, &end))
+ if (mp_request_sync(path, req, reply, &end))
ret = -1;
}
/* unlock the directory */
@@ -744,9 +1027,158 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
}

int __rte_experimental
-rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+ rte_mp_async_reply_t clb)
{
+ struct rte_mp_msg *copy;
+ struct pending_request *dummy;
+ struct async_request_param *param;
+ struct rte_mp_reply *reply;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end;
+ bool dummy_used = false;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
+ copy = malloc(sizeof(*copy));
+ dummy = malloc(sizeof(*dummy));
+ param = malloc(sizeof(*param));
+ if (copy == NULL || dummy == NULL || param == NULL) {
+ RTE_LOG(ERR, EAL, "Failed to allocate memory for async reply\n");
+ rte_errno = ENOMEM;
+ goto fail;
+ }
+
+ memset(copy, 0, sizeof(*copy));
+ memset(dummy, 0, sizeof(*dummy));
+ memset(param, 0, sizeof(*param));
+
+ /* copy message */
+ memcpy(copy, req, sizeof(*copy));
+
+ param->n_responses_processed = 0;
+ param->clb = clb;
+ end = &param->end;
+ reply = &param->user_reply;
+
+ end->tv_nsec = (now.tv_usec * 1000 + ts->tv_nsec) % 1000000000;
+ end->tv_sec = now.tv_sec + ts->tv_sec +
+ (now.tv_usec * 1000 + ts->tv_nsec) / 1000000000;
+ reply->nb_sent = 0;
+ reply->nb_received = 0;
+ reply->msgs = NULL;
+
+ /* we have to lock the request queue here, as we will be adding a bunch
+ * of requests to the queue at once, and some of the replies may arrive
+ * before we add all of the requests to the queue.
+ */
+ pthread_mutex_lock(&pending_requests.lock);

+ /* we have to ensure that callback gets triggered even if we don't send
+ * anything, therefore earlier we have allocated a dummy request. fill
+ * it, and put it on the queue if we don't send any requests.
+ */
+ dummy->type = REQUEST_TYPE_ASYNC;
+ dummy->request = copy;
+ dummy->reply = NULL;
+ dummy->async.param = param;
+ dummy->reply_received = 1; /* short-circuit the timeout */
+
+ /* for secondary process, send request to the primary process only */
+ if (rte_eal_process_type() == RTE_PROC_SECONDARY) {
+ ret = mp_request_async(eal_mp_socket_path(), copy, param);
+
+ /* if we didn't send anything, put dummy request on the queue */
+ if (ret == 0 && reply->nb_sent == 0) {
+ TAILQ_INSERT_TAIL(&pending_requests.requests, dummy,
+ next);
+ dummy_used = true;
+ }
+
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ /* if we couldn't send anything, clean up */
+ if (ret != 0)
+ goto fail;
+ return 0;
+ }
+
+ /* for primary process, broadcast request */
+ mp_dir = opendir(mp_dir_path);
+ if (!mp_dir) {
+ RTE_LOG(ERR, EAL, "Unable to open directory %s\n", mp_dir_path);
+ rte_errno = errno;
+ goto unlock_fail;
+ }
+ dir_fd = dirfd(mp_dir);
+
+ /* lock the directory to prevent processes spinning up while we send */
+ if (flock(dir_fd, LOCK_EX)) {
+ RTE_LOG(ERR, EAL, "Unable to lock directory %s\n",
+ mp_dir_path);
+ rte_errno = errno;
+ goto closedir_fail;
+ }
+
+ while ((ent = readdir(mp_dir))) {
+ char path[PATH_MAX];
+
+ if (fnmatch(mp_filter, ent->d_name, 0) != 0)
+ continue;
+
+ snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+ ent->d_name);
+
+ if (mp_request_async(path, copy, param))
+ ret = -1;
+ }
+ /* if we didn't send anything, put dummy request on the queue */
+ if (ret == 0 && reply->nb_sent == 0) {
+ TAILQ_INSERT_HEAD(&pending_requests.requests, dummy, next);
+ dummy_used = true;
+ }
+
+ /* trigger async request thread wake up */
+ pthread_cond_signal(&pending_requests.async_cond);
+
+ /* finally, unlock the queue */
+ pthread_mutex_unlock(&pending_requests.lock);
+
+ /* unlock the directory */
+ flock(dir_fd, LOCK_UN);
+
+ /* dir_fd automatically closed on closedir */
+ closedir(mp_dir);
+
+ /* if dummy was unused, free it */
+ if (!dummy_used)
+ free(dummy);
+
+ return ret;
+closedir_fail:
+ closedir(mp_dir);
+unlock_fail:
+ pthread_mutex_unlock(&pending_requests.lock);
+fail:
+ free(dummy);
+ free(param);
+ free(copy);
+ return -1;
+}
+
+int __rte_experimental
+rte_mp_reply(struct rte_mp_msg *msg, const char *peer)
+{
RTE_LOG(DEBUG, EAL, "reply: %s\n", msg->name);

if (check_input(msg) == false)
diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h
index 044474e..87ebfd0 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -230,6 +230,16 @@ struct rte_mp_reply {
typedef int (*rte_mp_t)(const struct rte_mp_msg *msg, const void *peer);

/**
+ * Asynchronous reply function typedef used by other components.
+ *
+ * As we create socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming responses to asynchronous
+ * requests.
+ */
+typedef int (*rte_mp_async_reply_t)(const struct rte_mp_msg *request,
+ const struct rte_mp_reply *reply);
+
+/**
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
@@ -321,6 +331,32 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
* @warning
* @b EXPERIMENTAL: this API may change without prior notice
*
+ * Send a request to the peer process and expect a reply in a separate callback.
+ *
+ * This function sends a request message to the peer process, and will not
+ * block. Instead, reply will be received in a separate callback.
+ *
+ * @param req
+ * The req argument contains the customized request message.
+ *
+ * @param ts
+ * The ts argument specifies how long we can wait for the peer(s) to reply.
+ *
+ * @param clb
+ * The callback to trigger when all responses for this request have arrived.
+ *
+ * @return
+ * - On success, return 0.
+ * - On failure, return -1, and the reason will be stored in rte_errno.
+ */
+int __rte_experimental
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts,
+ rte_mp_async_reply_t clb);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
* Send a reply to the peer process.
*
* This function will send a reply message in response to a request message
diff --git a/lib/librte_eal/rte_eal_version.map b/lib/librte_eal/rte_eal_version.map
index d123602..328a0be 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -225,6 +225,7 @@ EXPERIMENTAL {
rte_mp_action_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
--
2.7.4
Thomas Monjalon
2018-03-27 16:33:54 UTC
Permalink
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Post by Anatoly Burakov
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -225,6 +225,7 @@ EXPERIMENTAL {
rte_mp_action_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
So there is rte_mp_request and rte_mp_request_async?
You should rename rte_mp_request, I guess.
Post by Anatoly Burakov
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
Tan, Jianfeng
2018-03-28 02:08:56 UTC
Permalink
Hi Thomas ,
-----Original Message-----
Sent: Wednesday, March 28, 2018 12:34 AM
To: Burakov, Anatoly
Subject: Re: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API to
DPDK IPC
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.

I was considering to merge all the threads into the interrupt thread, however, we don't have an interrupt thread in freebsd. Further, we don't implement alarm API in freebsd. That's why I tend to current implementation, and optimize it later.

For rte_service, it may be not a good idea to reply on it as it needs explicit API calls to setup.
Post by Anatoly Burakov
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -225,6 +225,7 @@ EXPERIMENTAL {
rte_mp_action_unregister;
rte_mp_sendmsg;
rte_mp_request;
+ rte_mp_request_async;
So there is rte_mp_request and rte_mp_request_async?
You should rename rte_mp_request, I guess.
+1.

Thanks,
Jianfeng
Post by Anatoly Burakov
rte_mp_reply;
rte_service_attr_get;
rte_service_attr_reset_all;
Thomas Monjalon
2018-03-28 07:29:35 UTC
Permalink
Post by Tan, Jianfeng
Hi Thomas ,
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread, however, we don't have an interrupt thread in freebsd. Further, we don't implement alarm API in freebsd. That's why I tend to current implementation, and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
Van Haaren, Harry
2018-03-28 08:22:41 UTC
Permalink
Sent: Wednesday, March 28, 2018 8:30 AM
Subject: Re: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API to
DPDK IPC
Post by Tan, Jianfeng
Hi Thomas ,
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread,
however, we don't have an interrupt thread in freebsd. Further, we don't
implement alarm API in freebsd. That's why I tend to current implementation,
and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs
explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
Although I do like to see new services, if we want to enable "core" dpdk functionality with Services, we need a proper designed solution for that. Service cores is not intended for "occasional" work - there is no method to block and sleep on a specific service until work becomes available, so this would imply a busy-polling. Using a service (hence busy polling) for rte_malloc()-based memory mapping requests is inefficient, and total overkill :)

For this patch I suggest to use some blocking-read capable mechanism.

The above said, in the longer term it would be good to have a design that allows new file-descriptors to be added to a "dpdk core" thread, which performs occasional lengthy work if the FD has data available.
Tan, Jianfeng
2018-03-28 08:55:07 UTC
Permalink
Hi Thomas and Harry,
Post by Van Haaren, Harry
Sent: Wednesday, March 28, 2018 8:30 AM
Subject: Re: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API to
DPDK IPC
Post by Tan, Jianfeng
Hi Thomas ,
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread,
however, we don't have an interrupt thread in freebsd. Further, we don't
implement alarm API in freebsd. That's why I tend to current implementation,
and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs
explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
My concern is that not every DPDK application sets up rte_service, but
IPC will be used for very fundamental functions, like memory allocation.
We could not possibly ask all DPDK applications to add rte_service now.

And also take Harry's comments below into consideration, most likely, we
will move these threads into interrupt thread now by adding
Post by Van Haaren, Harry
Although I do like to see new services, if we want to enable "core" dpdk functionality with Services, we need a proper designed solution for that. Service cores is not intended for "occasional" work - there is no method to block and sleep on a specific service until work becomes available, so this would imply a busy-polling. Using a service (hence busy polling) for rte_malloc()-based memory mapping requests is inefficient, and total overkill :)
For this patch I suggest to use some blocking-read capable mechanism.
The problem here is that we add too many threads; blocking-read does not
decrease # of threads.
Post by Van Haaren, Harry
The above said, in the longer term it would be good to have a design that allows new file-descriptors to be added to a "dpdk core" thread, which performs occasional lengthy work if the FD has data available.
Interrupt thread vs rte_service, which is the direction to go? We
actually have some others threads, in vhost and even virtio-user; we can
also avoid those threads if we have a clear direction.

Thanks,
Jianfeng
Van Haaren, Harry
2018-03-28 09:10:12 UTC
Permalink
-----Original Message-----
From: Tan, Jianfeng
Sent: Wednesday, March 28, 2018 9:55 AM
Subject: Re: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API to
DPDK IPC
Hi Thomas and Harry,
Hey,
Post by Van Haaren, Harry
Sent: Wednesday, March 28, 2018 8:30 AM
Subject: Re: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API
to
Post by Van Haaren, Harry
DPDK IPC
Post by Tan, Jianfeng
Hi Thomas ,
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread,
however, we don't have an interrupt thread in freebsd. Further, we don't
implement alarm API in freebsd. That's why I tend to current
implementation,
Post by Van Haaren, Harry
and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs
explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
My concern is that not every DPDK application sets up rte_service, but
IPC will be used for very fundamental functions, like memory allocation.
We could not possibly ask all DPDK applications to add rte_service now.
And also take Harry's comments below into consideration, most likely, we
will move these threads into interrupt thread now by adding
I don't suggest moving everything into interrupt thread.

We need to ensure the interrupt thread (aka, link status interrupt thread) is not
busy for too long. Certain work may cause un-acceptable delay in the interrupt
thread, hence perhaps we should have 2 core DPDK threads:

1) EAL interrupt thread: handles only interrupts, so is ~always blocked on some FDs

2) EAL "core" thread: handles the IPC work here, and any other irregular work that
can take some time to complete. Adding all core DPDK work to this thread enables
us to refactor and create a scalable / coherent design.
Post by Van Haaren, Harry
Although I do like to see new services, if we want to enable "core" dpdk
functionality with Services, we need a proper designed solution for that.
Service cores is not intended for "occasional" work - there is no method to
block and sleep on a specific service until work becomes available, so this
would imply a busy-polling. Using a service (hence busy polling) for
rte_malloc()-based memory mapping requests is inefficient, and total
overkill :)
Post by Van Haaren, Harry
For this patch I suggest to use some blocking-read capable mechanism.
The problem here is that we add too many threads; blocking-read does not
decrease # of threads.
To correct my statement just above - some method capable of blocking reads (as opposed to
busy polling). In my mind, this method *must* allow waiting on multiple FDs,
as to *require* only 1 thread. We could use more if it makes sense to do so.
Post by Van Haaren, Harry
The above said, in the longer term it would be good to have a design that
allows new file-descriptors to be added to a "dpdk core" thread, which
performs occasional lengthy work if the FD has data available.
Interrupt thread vs rte_service, which is the direction to go? We
actually have some others threads, in vhost and even virtio-user; we can
also avoid those threads if we have a clear direction.
I don't think that service is the correct hammer for this problem.

As to interrupt thread or creating a new thread, what makes more sense given
the current codebase? Is the current implementation an acceptable short-term
solution, that gets reworked to be more generic in future?
Burakov, Anatoly
2018-03-28 09:21:23 UTC
Permalink
Post by Tan, Jianfeng
Hi Thomas and Harry,
Post by Van Haaren, Harry
Sent: Wednesday, March 28, 2018 8:30 AM
Subject: Re: [dpdk-dev] [PATCH v6 2/2] eal: add asynchronous request API to
DPDK IPC
Post by Tan, Jianfeng
Hi Thomas ,
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread,
however, we don't have an interrupt thread in freebsd. Further, we don't
implement alarm API in freebsd. That's why I tend to current
implementation,
and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs
explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
My concern is that not every DPDK application sets up rte_service, but
IPC will be used for very fundamental functions, like memory allocation.
We could not possibly ask all DPDK applications to add rte_service now.
And also take Harry's comments below into consideration, most likely, we
will move these threads into interrupt thread now by adding
Post by Van Haaren, Harry
Although I do like to see new services, if we want to enable "core"
dpdk functionality with Services, we need a proper designed solution
for that. Service cores is not intended for "occasional" work - there
is no method to block and sleep on a specific service until work
becomes available, so this would imply a busy-polling. Using a service
(hence busy polling) for rte_malloc()-based memory mapping requests is
inefficient, and total overkill :)
For this patch I suggest to use some blocking-read capable mechanism.
The problem here is that we add too many threads; blocking-read does not
decrease # of threads.
Post by Van Haaren, Harry
The above said, in the longer term it would be good to have a design
that allows new file-descriptors to be added to a "dpdk core" thread,
which performs occasional lengthy work if the FD has data available.
Interrupt thread vs rte_service, which is the direction to go? We
actually have some others threads, in vhost and even virtio-user; we can
also avoid those threads if we have a clear direction.
Thanks,
Jianfeng
Hi all,

First of all, @Thomas, this is not a "new library" - it's part of EAL.
We're going to be removing a few threads from EAL as it is because of
IPC (Jianfeng has already submitted patches for those), so i don't think
it's such a big deal to have two IPC threads instead of one. I'm open to
suggestions on how to make this work without a second thread, but i
don't see it.

We've discussed possibility of using rte_service internally, but decided
against it for reasons already outlined by Harry - it's not a suitable
mechanism for this kind of thing, not as it is.

Using interrupt thread for this _will_ work, however this will require a
a lot more changes, as currently alarm API allocates everything through
rte_malloc, while we want to use IPC for rte_malloc (which would make it
a circular dependency). So it'll probably be more API and more
complexity for dealing with malloc vs rte_malloc allocations. Hence the
least-bad approach taken here: a new thread.
--
Thanks,
Anatoly
Thomas Monjalon
2018-03-28 09:53:20 UTC
Permalink
Post by Burakov, Anatoly
Post by Tan, Jianfeng
Post by Van Haaren, Harry
Post by Tan, Jianfeng
Post by Tan, Jianfeng
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread,
however, we don't have an interrupt thread in freebsd. Further, we don't
implement alarm API in freebsd. That's why I tend to current implementation,
and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs
explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
My concern is that not every DPDK application sets up rte_service, but
IPC will be used for very fundamental functions, like memory allocation.
We could not possibly ask all DPDK applications to add rte_service now.
And also take Harry's comments below into consideration, most likely, we
will move these threads into interrupt thread now by adding
Post by Van Haaren, Harry
Although I do like to see new services, if we want to enable "core"
dpdk functionality with Services, we need a proper designed solution
for that. Service cores is not intended for "occasional" work - there
is no method to block and sleep on a specific service until work
becomes available, so this would imply a busy-polling. Using a service
(hence busy polling) for rte_malloc()-based memory mapping requests is
inefficient, and total overkill :)
For this patch I suggest to use some blocking-read capable mechanism.
The problem here is that we add too many threads; blocking-read does not
decrease # of threads.
Post by Van Haaren, Harry
The above said, in the longer term it would be good to have a design
that allows new file-descriptors to be added to a "dpdk core" thread,
which performs occasional lengthy work if the FD has data available.
Interrupt thread vs rte_service, which is the direction to go? We
actually have some others threads, in vhost and even virtio-user; we can
also avoid those threads if we have a clear direction.
Thanks,
Jianfeng
Hi all,
I did not say it is a new library.
Post by Burakov, Anatoly
We're going to be removing a few threads from EAL as it is because of
IPC (Jianfeng has already submitted patches for those),
I don't understand.
Which threads are you going to remove? Which patch?
Post by Burakov, Anatoly
so i don't think
it's such a big deal to have two IPC threads instead of one. I'm open to
suggestions on how to make this work without a second thread, but i
don't see it.
I am not against the second thread.
I am against both threads :)
Post by Burakov, Anatoly
We've discussed possibility of using rte_service internally, but decided
against it for reasons already outlined by Harry - it's not a suitable
mechanism for this kind of thing, not as it is.
Using interrupt thread for this _will_ work, however this will require a
a lot more changes, as currently alarm API allocates everything through
rte_malloc, while we want to use IPC for rte_malloc (which would make it
a circular dependency). So it'll probably be more API and more
complexity for dealing with malloc vs rte_malloc allocations. Hence the
least-bad approach taken here: a new thread.
If everybody is happy enough with "least bad" design and not trying
to improve the core design, what can I say?
Burakov, Anatoly
2018-03-28 10:42:23 UTC
Permalink
Post by Thomas Monjalon
Post by Burakov, Anatoly
Post by Tan, Jianfeng
Post by Van Haaren, Harry
Post by Tan, Jianfeng
Post by Tan, Jianfeng
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread,
however, we don't have an interrupt thread in freebsd. Further, we don't
implement alarm API in freebsd. That's why I tend to current implementation,
and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
Post by Tan, Jianfeng
For rte_service, it may be not a good idea to reply on it as it needs
explicit API calls to setup.
I don't see the issue of the explicit API.
The IPC is a new service.
My concern is that not every DPDK application sets up rte_service, but
IPC will be used for very fundamental functions, like memory allocation.
We could not possibly ask all DPDK applications to add rte_service now.
And also take Harry's comments below into consideration, most likely, we
will move these threads into interrupt thread now by adding
Post by Van Haaren, Harry
Although I do like to see new services, if we want to enable "core"
dpdk functionality with Services, we need a proper designed solution
for that. Service cores is not intended for "occasional" work - there
is no method to block and sleep on a specific service until work
becomes available, so this would imply a busy-polling. Using a service
(hence busy polling) for rte_malloc()-based memory mapping requests is
inefficient, and total overkill :)
For this patch I suggest to use some blocking-read capable mechanism.
The problem here is that we add too many threads; blocking-read does not
decrease # of threads.
Post by Van Haaren, Harry
The above said, in the longer term it would be good to have a design
that allows new file-descriptors to be added to a "dpdk core" thread,
which performs occasional lengthy work if the FD has data available.
Interrupt thread vs rte_service, which is the direction to go? We
actually have some others threads, in vhost and even virtio-user; we can
also avoid those threads if we have a clear direction.
Thanks,
Jianfeng
Hi all,
I did not say it is a new library.
Post by Burakov, Anatoly
We're going to be removing a few threads from EAL as it is because of
IPC (Jianfeng has already submitted patches for those),
I don't understand.
Which threads are you going to remove? Which patch?
There are separate patches that get rid of some EAL threads we have from
before (e.g. VFIO thread) - this is the reason IPC was created in the
first place. These aren't relevant to this patchset per se, i'm just
saying that it's not like we're adding to the pile of already existing
threads without taking anything away :)
Post by Thomas Monjalon
Post by Burakov, Anatoly
so i don't think
it's such a big deal to have two IPC threads instead of one. I'm open to
suggestions on how to make this work without a second thread, but i
don't see it.
I am not against the second thread.
I am against both threads :)
Well, the first thread is already in DPDK. To provide some context,
first implementation for DPDK IPC was suggested for 17.11, and (without
many conceptual changes) was merged in 18.02. I think it's a bit late to
be against both threads :)
Post by Thomas Monjalon
Post by Burakov, Anatoly
We've discussed possibility of using rte_service internally, but decided
against it for reasons already outlined by Harry - it's not a suitable
mechanism for this kind of thing, not as it is.
Using interrupt thread for this _will_ work, however this will require a
a lot more changes, as currently alarm API allocates everything through
rte_malloc, while we want to use IPC for rte_malloc (which would make it
a circular dependency). So it'll probably be more API and more
complexity for dealing with malloc vs rte_malloc allocations. Hence the
least-bad approach taken here: a new thread.
If everybody is happy enough with "least bad" design and not trying
to improve the core design, what can I say?
I'm not against trying to improve the core design. I'm just saying that,
had this kind of feedback been provided just a bit earlier, I would've
had time to fix it in time for deadlines. However, because memory rework
patchset depends on this API, i would suggest merging it in now, as is,
and commit to a roadmap of improvements for next release(s).

For starters, we could plan on removing alarm thread's dependency on
rte_malloc and just use regular malloc API's in there, and rework
asynchronous IPC API to use that instead. This shouldn't be much work,
and will presumably make you halfway happy, as one of the threads will
be gone :)

We can then look into removing the second thread and moving the entirety
of DPDK IPC into the interrupt thread. I'm not too sure how would that
work, but i haven't looked at it in any detail, so maybe it is feasible.

Can we agree on this? It would be great to do everything perfectly from
the first try, but having a goal in sight and working towards it is fine
too, even if not all of the steps we take are perfect.
--
Thanks,
Anatoly
Thomas Monjalon
2018-03-28 11:26:03 UTC
Permalink
Post by Burakov, Anatoly
Post by Thomas Monjalon
Post by Burakov, Anatoly
so i don't think
it's such a big deal to have two IPC threads instead of one. I'm open to
suggestions on how to make this work without a second thread, but i
don't see it.
I am not against the second thread.
I am against both threads :)
Well, the first thread is already in DPDK. To provide some context,
first implementation for DPDK IPC was suggested for 17.11, and (without
many conceptual changes) was merged in 18.02. I think it's a bit late to
be against both threads :)
No, it's never too late to discuss.
Merging a patch does not prevent discussing it :)
Post by Burakov, Anatoly
Post by Thomas Monjalon
Post by Burakov, Anatoly
We've discussed possibility of using rte_service internally, but decided
against it for reasons already outlined by Harry - it's not a suitable
mechanism for this kind of thing, not as it is.
Using interrupt thread for this _will_ work, however this will require a
a lot more changes, as currently alarm API allocates everything through
rte_malloc, while we want to use IPC for rte_malloc (which would make it
a circular dependency). So it'll probably be more API and more
complexity for dealing with malloc vs rte_malloc allocations. Hence the
least-bad approach taken here: a new thread.
If everybody is happy enough with "least bad" design and not trying
to improve the core design, what can I say?
I'm not against trying to improve the core design. I'm just saying that,
had this kind of feedback been provided just a bit earlier, I would've
had time to fix it in time for deadlines. However, because memory rework
patchset depends on this API, i would suggest merging it in now, as is,
and commit to a roadmap of improvements for next release(s).
Actually, you had the feedback yourself from the beginning.
You decided to gave up with interrupt thread because its implementation
is not complete (and maybe far from perfect).
There are some communities where it is not acceptable to workaround
core issues because of timing issues. I think we accept it in DPDK,
but I continue to question it, in order to be sure that everybody is OK
with this kind of tradeoff.
Post by Burakov, Anatoly
For starters, we could plan on removing alarm thread's dependency on
rte_malloc and just use regular malloc API's in there, and rework
asynchronous IPC API to use that instead. This shouldn't be much work,
and will presumably make you halfway happy, as one of the threads will
be gone :)
We can then look into removing the second thread and moving the entirety
of DPDK IPC into the interrupt thread. I'm not too sure how would that
work, but i haven't looked at it in any detail, so maybe it is feasible.
Can we agree on this? It would be great to do everything perfectly from
the first try, but having a goal in sight and working towards it is fine
too, even if not all of the steps we take are perfect.
The main concern is API.
If all these changes are internal only, and does not involve any major
API change, then I guess it is OK to pospone them in next release.
Burakov, Anatoly
2018-03-28 12:21:30 UTC
Permalink
Post by Thomas Monjalon
Post by Burakov, Anatoly
I'm not against trying to improve the core design. I'm just saying that,
had this kind of feedback been provided just a bit earlier, I would've
had time to fix it in time for deadlines. However, because memory rework
patchset depends on this API, i would suggest merging it in now, as is,
and commit to a roadmap of improvements for next release(s).
Actually, you had the feedback yourself from the beginning.
You decided to gave up with interrupt thread because its implementation
is not complete (and maybe far from perfect).
That's not quite how i see it, but OK, suppose so.
Post by Thomas Monjalon
There are some communities where it is not acceptable to workaround
core issues because of timing issues. I think we accept it in DPDK,
but I continue to question it, in order to be sure that everybody is OK
with this kind of tradeoff.
The way i see it, not all API's are equal; some are more important than
others. This is a new, experimental API that is not core to any DPDK
function - it's not used on any hotpaths nor is it even that demanding
(the two threads will be sleeping 99.999% of the time anyway). I think
we're allowed to experiment on it before settling on an implementation
that satisfies everyone :)
Post by Thomas Monjalon
Post by Burakov, Anatoly
For starters, we could plan on removing alarm thread's dependency on
rte_malloc and just use regular malloc API's in there, and rework
asynchronous IPC API to use that instead. This shouldn't be much work,
and will presumably make you halfway happy, as one of the threads will
be gone :)
We can then look into removing the second thread and moving the entirety
of DPDK IPC into the interrupt thread. I'm not too sure how would that
work, but i haven't looked at it in any detail, so maybe it is feasible.
Can we agree on this? It would be great to do everything perfectly from
the first try, but having a goal in sight and working towards it is fine
too, even if not all of the steps we take are perfect.
The main concern is API.
If all these changes are internal only, and does not involve any major
API change, then I guess it is OK to pospone them in next release.
Yes, all of this is/will be internal to DPDK IPC - no externally visible
changes whatsoever.
--
Thanks,
Anatoly
Bruce Richardson
2018-03-28 09:11:58 UTC
Permalink
Post by Thomas Monjalon
Post by Tan, Jianfeng
Hi Thomas ,
Post by Thomas Monjalon
Post by Anatoly Burakov
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer.
I really don't like that a library is creating a thread.
We don't even know where the thread is created (which core).
Can it be a rte_service? or in the interrupt thread?
Agree that we'd better not adding so many threads in a library.
I was considering to merge all the threads into the interrupt thread, however, we don't have an interrupt thread in freebsd. Further, we don't implement alarm API in freebsd. That's why I tend to current implementation, and optimize it later.
I would prefer we improve the current code now instead of polluting more
with more uncontrolled threads.
+1
I think it would be worthwhile adding an interrupt thread to BSD, and it
should not be a massive amount of work. Having a single interrupt thread
has a lot of benefits, I think.

/Bruce
Thomas Monjalon
2018-03-27 16:27:07 UTC
Permalink
You probably have a good explanation for this change.
Please explain in the commit message.

Generally speaking, every commits should have an explanation,
it is a basic quality rule.
Thanks
Burakov, Anatoly
2018-03-28 09:15:49 UTC
Permalink
Post by Thomas Monjalon
You probably have a good explanation for this change.
Please explain in the commit message.
Generally speaking, every commits should have an explanation,
it is a basic quality rule.
Thanks
This is just to reduce noise in the following patch.
--
Thanks,
Anatoly
Thomas Monjalon
2018-03-28 10:08:29 UTC
Permalink
Post by Burakov, Anatoly
Post by Thomas Monjalon
You probably have a good explanation for this change.
Please explain in the commit message.
Generally speaking, every commits should have an explanation,
it is a basic quality rule.
Thanks
This is just to reduce noise in the following patch.
No Anatoly, you should explain why a rename is needed.
Burakov, Anatoly
2018-03-28 10:57:20 UTC
Permalink
Post by Thomas Monjalon
Post by Burakov, Anatoly
Post by Thomas Monjalon
You probably have a good explanation for this change.
Please explain in the commit message.
Generally speaking, every commits should have an explanation,
it is a basic quality rule.
Thanks
This is just to reduce noise in the following patch.
No Anatoly, you should explain why a rename is needed.
I know :) I'm going to be submitting a v7 with function renames anyway,
so i'll fix this as well.
--
Thanks,
Anatoly
Tan, Jianfeng
2018-04-02 05:09:03 UTC
Permalink
-----Original Message-----
From: Burakov, Anatoly
Sent: Sunday, April 1, 2018 1:06 AM
Subject: [PATCH v7 2/3] eal: rename mp_request to mp_request_sync
Rename rte_mp_request to rte_mp_request_sync to indicate
that this request will be done synchronously (as opposed to
asynchronous request, which comes in next patch).
Also, fix alphabetical ordering for .map file.
Acked-by: Jianfeng Tan <***@intel.com>

Thanks!
---
- Added this patch
lib/librte_eal/common/eal_common_proc.c | 2 +-
lib/librte_eal/common/include/rte_eal.h | 2 +-
lib/librte_eal/rte_eal_version.map | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/lib/librte_eal/common/eal_common_proc.c
b/lib/librte_eal/common/eal_common_proc.c
index 52b6ab2..b704f5a 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -674,7 +674,7 @@ mp_request_one(const char *dst, struct rte_mp_msg
*req,
}
int __rte_experimental
-rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
+rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply
*reply,
const struct timespec *ts)
{
int dir_fd, ret = 0;
diff --git a/lib/librte_eal/common/include/rte_eal.h
b/lib/librte_eal/common/include/rte_eal.h
index 044474e..d1cc89e 100644
--- a/lib/librte_eal/common/include/rte_eal.h
+++ b/lib/librte_eal/common/include/rte_eal.h
@@ -314,7 +314,7 @@ rte_mp_sendmsg(struct rte_mp_msg *msg);
* - On failure, return -1, and the reason will be stored in rte_errno.
*/
int __rte_experimental
-rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
+rte_mp_request_sync(struct rte_mp_msg *req, struct rte_mp_reply
*reply,
const struct timespec *ts);
/**
diff --git a/lib/librte_eal/rte_eal_version.map
b/lib/librte_eal/rte_eal_version.map
index d123602..7b66c73 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -223,9 +223,9 @@ EXPERIMENTAL {
rte_eal_mbuf_user_pool_ops;
rte_mp_action_register;
rte_mp_action_unregister;
- rte_mp_sendmsg;
- rte_mp_request;
rte_mp_reply;
+ rte_mp_request_sync;
+ rte_mp_sendmsg;
rte_service_attr_get;
rte_service_attr_reset_all;
rte_service_component_register;
--
2.7.4
Stephen Hemminger
2018-03-02 18:48:25 UTC
Permalink
On Tue, 27 Feb 2018 14:59:29 +0000
Post by Anatoly Burakov
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
gettimeofday is not a good API to use in DPDK.
It gets changed by NTP; if you have to use system time you want monotonic clock
Burakov, Anatoly
2018-03-03 12:29:47 UTC
Permalink
Post by Stephen Hemminger
On Tue, 27 Feb 2018 14:59:29 +0000
Post by Anatoly Burakov
+rte_mp_request_async(struct rte_mp_msg *req, const struct timespec *ts)
{
+ struct sync_request *dummy;
+ struct async_request_shared_param *param = NULL;
+ struct rte_mp_reply *reply = NULL;
+ int dir_fd, ret = 0;
+ DIR *mp_dir;
+ struct dirent *ent;
+ struct timeval now;
+ struct timespec *end = NULL;
+
+ RTE_LOG(DEBUG, EAL, "request: %s\n", req->name);
+
+ if (check_input(req) == false)
+ return -1;
+ if (gettimeofday(&now, NULL) < 0) {
+ RTE_LOG(ERR, EAL, "Faile to get current time\n");
+ rte_errno = errno;
+ return -1;
+ }
gettimeofday is not a good API to use in DPDK.
It gets changed by NTP; if you have to use system time you want monotonic clock
We need current time because pthread_cond_timedwait() accepts current
time. So it's either that, or reimplementing pthread_cond_timedwait() in
DPDK using monotonic clock :) Unless, of course, there already are
alternatives that use monotonic clock and that don't need other DPDK
machinery (e.g. rte_malloc) to work (like rte_alarm callbacks).
--
Thanks,
Anatoly
Stephen Hemminger
2018-03-02 18:51:32 UTC
Permalink
On Tue, 27 Feb 2018 14:59:29 +0000
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).
The problem with this callback model is it makes it possible to
have a single wait for multiple events model (like epoll) which
is the most scaleable way to write applications.
Burakov, Anatoly
2018-03-03 13:44:59 UTC
Permalink
Post by Stephen Hemminger
On Tue, 27 Feb 2018 14:59:29 +0000
Post by Anatoly Burakov
This API is similar to the blocking API that is already present,
but reply will be received in a separate callback by the caller.
Under the hood, we create a separate thread to deal with replies to
asynchronous requests, that will just wait to be notified by the
main thread, or woken up on a timer (it'll wake itself up every
minute regardless of whether it was called, but if there are no
requests in the queue, nothing will be done and it'll go to sleep
for another minute).
The problem with this callback model is it makes it possible to
have a single wait for multiple events model (like epoll) which
is the most scaleable way to write applications.
I assume there's a typo in there somewhere, because the way it's written
makes it look like an advantage, not a problem :) Some more details on
what exactly is the issue would be welcome though.
--
Thanks,
Anatoly
Continue reading on narkive:
Loading...