Discussion:
[dpdk-dev] [PATCH 0/5 for 2.3] vhost rxtx refactor
Yuanhan Liu
2015-12-03 06:06:08 UTC
Permalink
Vhost rxtx code is derived from vhost-switch example, which is very
likely the most messy code in DPDK. Unluckily, the move also brings
over the bad merits: twisted logic, bad comments.

When I joined this team firstly, I was quite scared off by the messy
and long vhost rxtx code. While adding the vhost-user live migration
support, that I have to make fews changes to it, I then ventured to
look at it again, to understand it better, in the meantime, to see if
I can refactor it.

And, here you go.

The first 3 patches refactor 3 major functions at vhost_rxtx.c,
respectively. It simplifies the code logic, making it more readable.
On the other hand, it reduces code size, due to a lot of same
code are removed.

Patch 4 gets rid of the rte_memcpy for virtio_hdr copy, which nearly
saves 12K bytes of code size!

Till now, the code has been greatly reduced: 39348 vs 24179.

Patch 5 removes "unlikely" for VIRTIO_NET_F_MRG_RXBUF detection.

Note that the code could be further simplified or reduced. However,
judging that it's a first try and it's the *key* data path, I guess
it's okay to not be radical and stop here so far.

Another note is that we should add more secure checks at rxtx side.
It could be a standalone task for v2.3, and this series is more
about refactor, hence I leave it for future enhancement.

---
Yuanhan Liu (5):
vhost: refactor rte_vhost_dequeue_burst
vhost: refactor virtio_dev_rx
vhost: refactor virtio_dev_merge_rx
vhost: do not use rte_memcpy for virtio_hdr copy
vhost: don't use unlikely for VIRTIO_NET_F_MRG_RXBUF detection

lib/librte_vhost/vhost_rxtx.c | 959 ++++++++++++++++++------------------------
1 file changed, 407 insertions(+), 552 deletions(-)
--
1.9.0
Yuanhan Liu
2015-12-03 06:06:09 UTC
Permalink
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() 3 at three different places!

However, rte_vhost_dequeue_burst() acutally does a simple job: copy
the packet data from vring desc to mbuf. What's tricky here is:

- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.

- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.

Even though, the logic could be simple. Here is the pseudo code.

while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}

if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}

COPY(mbuf, desc);
}

And this is how I refactored this patch.

Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
and desc_offset var:

/* Discard virtio header */
desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;

This refactor makes the code much more readable (IMO), yet it reduces
code size (nearly 2K):

# without this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
39348 0 0 39348 99b4 /path/to/vhost_rxtx.o

# with this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
37435 0 0 37435 923b /path/to/vhost_rxtx.o

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 287 +++++++++++++++++-------------------------
1 file changed, 113 insertions(+), 174 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 9322ce6..b4c6cab 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -43,6 +43,18 @@

#define MAX_PKT_BURST 32

+#define COPY(dst, src) do { \
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail); \
+ rte_memcpy((void *)(uintptr_t)(dst), \
+ (const void *)(uintptr_t)(src), cpy_len); \
+ \
+ mbuf_avail -= cpy_len; \
+ mbuf_offset += cpy_len; \
+ desc_avail -= cpy_len; \
+ desc_offset += cpy_len; \
+} while(0)
+
+
static bool
is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
{
@@ -568,19 +580,89 @@ rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
return virtio_dev_rx(dev, queue_id, pkts, count);
}

+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+
+ struct rte_mbuf *head = NULL;
+ struct rte_mbuf *cur = NULL, *prev = NULL;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Discard virtio header */
+ desc_avail = desc->len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
+
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
+ if (!head) {
+ head = cur;
+ } else {
+ prev->next = cur;
+ prev->data_len = mbuf_offset;
+ head->nb_segs += 1;
+ }
+ head->pkt_len += mbuf_offset;
+ prev = cur;
+
+ mbuf_offset = 0;
+ mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+ }
+
+ COPY(rte_pktmbuf_mtod_offset(cur, uint64_t, mbuf_offset),
+ desc_addr + desc_offset);
+ }
+ prev->data_len = mbuf_offset;
+ head->pkt_len += mbuf_offset;
+
+ return head;
+}
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
uint32_t used_idx;
uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
+ struct rte_mbuf *m;

if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -594,192 +676,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
return 0;

avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- /* If there are no available buffers then return. */
- if (vq->last_used_idx == avail_idx)
+ free_entries = avail_idx - vq->last_used_idx;
+ if (free_entries == 0)
return 0;

- LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__,
- dev->device_fh);
+ LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__, dev->device_fh);

- /* Prefetch available ring to retrieve head indexes. */
- rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
+ used_idx = vq->last_used_idx & (vq->size -1);

- /*get the number of free entries in the ring*/
- free_entries = (avail_idx - vq->last_used_idx);
+ /* Prefetch available ring to retrieve head indexes. */
+ rte_prefetch0(&vq->avail->ring[used_idx]);

- free_entries = RTE_MIN(free_entries, count);
- /* Limit to MAX_PKT_BURST. */
- free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
+ count = RTE_MIN(count, MAX_PKT_BURST);
+ count = RTE_MIN(count, free_entries);
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequene %u buffers\n",
+ dev->device_fh, count);

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
- dev->device_fh, free_entries);
/* Retrieve all of the head indexes first to avoid caching issues. */
- for (i = 0; i < free_entries; i++)
- head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
+ for (i = 0; i < count; i++) {
+ desc_indexes[i] = vq->avail->ring[(vq->last_used_idx + i) &
+ (vq->size - 1)];
+ }

/* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success]]);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);

- while (entry_success < free_entries) {
- uint32_t vb_avail, vb_offset;
- uint32_t seg_avail, seg_offset;
- uint32_t cpy_len;
- uint32_t seg_num = 0;
- struct rte_mbuf *cur;
- uint8_t alloc_err = 0;
-
- desc = &vq->desc[head[entry_success]];
-
- /* Discard first buffer as it is the virtio header */
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- vb_offset = 0;
- vb_avail = desc->len;
- } else {
- vb_offset = vq->vhost_hlen;
- vb_avail = desc->len - vb_offset;
- }
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
-
- used_idx = vq->last_used_idx & (vq->size - 1);
-
- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
- }
-
- /* Update used index buffer information. */
- vq->used->ring[used_idx].id = head[entry_success];
- vq->used->ring[used_idx].len = 0;
-
- /* Allocate an mbuf and populate the structure. */
- m = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(m == NULL)) {
- RTE_LOG(ERR, VHOST_DATA,
- "Failed to allocate memory for mbuf.\n");
- break;
- }
- seg_offset = 0;
- seg_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
-
- seg_num++;
- cur = m;
- prev = m;
- while (cpy_len != 0) {
- rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, seg_offset),
- (void *)((uintptr_t)(vb_addr + vb_offset)),
- cpy_len);
-
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- vb_avail -= cpy_len;
- seg_avail -= cpy_len;
-
- if (vb_avail != 0) {
- /*
- * The segment reachs to its end,
- * while the virtio buffer in TX vring has
- * more data to be copied.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /* Allocate mbuf and populate the structure. */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR, VHOST_DATA, "Failed to "
- "allocate memory for mbuf.\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
-
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- } else {
- if (desc->flags & VRING_DESC_F_NEXT) {
- /*
- * There are more virtio buffers in
- * same vring entry need to be copied.
- */
- if (seg_avail == 0) {
- /*
- * The current segment hasn't
- * room to accomodate more
- * data.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /*
- * Allocate an mbuf and
- * populate the structure.
- */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR,
- VHOST_DATA,
- "Failed to "
- "allocate memory "
- "for mbuf\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- }
-
- desc = &vq->desc[desc->next];
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = desc->len;
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr,
- desc->len, 0);
- } else {
- /* The whole packet completes. */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- vb_avail = 0;
- }
- }
-
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- }
-
- if (unlikely(alloc_err == 1))
+ for (i = 0; i < count; i++) {
+ m = copy_desc_to_mbuf(dev, vq, desc_indexes[i], mbuf_pool);
+ if (m == NULL)
break;
+ pkts[i] = m;

- m->nb_segs = seg_num;
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
}

rte_compiler_barrier();
- vq->used->idx += entry_success;
+ vq->used->idx += i;
+
/* Kick guest if required. */
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
eventfd_write(vq->callfd, (eventfd_t)1);
- return entry_success;
+
+ return i;
}
--
1.9.0
Stephen Hemminger
2015-12-03 07:02:44 UTC
Permalink
On Thu, 3 Dec 2015 14:06:09 +0800
Post by Yuanhan Liu
+#define COPY(dst, src) do { \
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail); \
+ rte_memcpy((void *)(uintptr_t)(dst), \
+ (const void *)(uintptr_t)(src), cpy_len); \
+ \
+ mbuf_avail -= cpy_len; \
+ mbuf_offset += cpy_len; \
+ desc_avail -= cpy_len; \
+ desc_offset += cpy_len; \
+} while(0)
+
I see lots of issues here.

All those void * casts are unnecessary, C casts arguements already.
rte_memcpy is slower for constant size values than memcpy()

This macro violates the rule that ther should be no hidden variables
in a macro. I.e you are assuming cpy_len, desc_avail, and mbuf_avail
are defined in all code using the macro.

Why use an un-typed macro when an inline function would be just
as fast and give type safety?
Yuanhan Liu
2015-12-03 07:25:01 UTC
Permalink
Post by Stephen Hemminger
On Thu, 3 Dec 2015 14:06:09 +0800
Post by Yuanhan Liu
+#define COPY(dst, src) do { \
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail); \
+ rte_memcpy((void *)(uintptr_t)(dst), \
+ (const void *)(uintptr_t)(src), cpy_len); \
+ \
+ mbuf_avail -= cpy_len; \
+ mbuf_offset += cpy_len; \
+ desc_avail -= cpy_len; \
+ desc_offset += cpy_len; \
+} while(0)
+
I see lots of issues here.
All those void * casts are unnecessary, C casts arguements already.
Hi Stephen,

Without the cast, the compile will not work, as dst is actully with
uint64_t type.
Post by Stephen Hemminger
rte_memcpy is slower for constant size values than memcpy()
Sorry, what does it have something to do with this patch?
Post by Stephen Hemminger
This macro violates the rule that ther should be no hidden variables
in a macro. I.e you are assuming cpy_len, desc_avail, and mbuf_avail
are defined in all code using the macro.
Yes, I'm aware of that. And I agree that it's not a good usage
_in general_.

But I'm thinking that it's okay to use it here, for the three
major functions has quite similar logic. And if my memory
servers me right, there are also quite many codes like that
in Linux kernel.
Post by Stephen Hemminger
Why use an un-typed macro when an inline function would be just
as fast and give type safety?
It references too many variables, upto 6. And there are 4 vars
needed to be updated. Therefore, making it to be a function
would make the caller long and ugly. That's why I was thinking
to use a macro to remove few lines of repeat code.

So, if hidden var macro is forbidden here, I guess I would go
with just unfolding those lines of code, but not introducing
a helper function.

--yliu
Stephen Hemminger
2015-12-03 07:03:16 UTC
Permalink
On Thu, 3 Dec 2015 14:06:09 +0800
Post by Yuanhan Liu
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
Another unnecessary set of casts.
Rich Lane
2015-12-12 06:55:48 UTC
Permalink
Post by Yuanhan Liu
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
...
Post by Yuanhan Liu
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Discard virtio header */
+ desc_avail = desc->len - vq->vhost_hlen;
If desc->len is zero (happens all the time when restarting DPDK apps in the
guest) then desc_avail will be huge.
Post by Yuanhan Liu
+ desc_offset = vq->vhost_hlen;
+
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
Post by Yuanhan Liu
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
Need to check desc->next against vq->size.

There should be a limit on the number of descriptors in a chain to prevent
an infinite loop.

uint16_t
Post by Yuanhan Liu
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
...
avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- /* If there are no available buffers then return. */
- if (vq->last_used_idx == avail_idx)
+ free_entries = avail_idx - vq->last_used_idx;
+ if (free_entries == 0)
return 0;
A common problem is that avail_idx goes backwards when the guest zeroes its
virtqueues. This function could check for free_entries > vq->size and reset
vq->last_used_idx.

+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequene %u buffers\n",
Post by Yuanhan Liu
+ dev->device_fh, count);
Typo at "dequene".
Yuanhan Liu
2015-12-14 01:55:37 UTC
Permalink
Post by Yuanhan Liu
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+                 uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
... 
+
+       desc = &vq->desc[desc_idx];
+       desc_addr = gpa_to_vva(dev, desc->addr);
+       rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+       /* Discard virtio header */
+       desc_avail  = desc->len - vq->vhost_hlen;
If desc->len is zero (happens all the time when restarting DPDK apps in the
guest) then desc_avail will be huge.
I'm aware of it; I have already noted that in the cover letter. This
patch set is just a code refactor. Things like that will do a in a
latter patch set. (The thing is that Huawei is very cagy about making
any changes to vhost rxtx code, as it's the hot data path. So, I will
not make any future changes base on this refactor, unless it's applied).
Post by Yuanhan Liu
 
+       desc_offset = vq->vhost_hlen;
+
+       mbuf_avail  = 0;
+       mbuf_offset = 0;
+       while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) { 
+               /* This desc reachs to its end, get the next one */
+               if (desc_avail == 0) {
+                       desc = &vq->desc[desc->next];
Need to check desc->next against vq->size.
Thanks for the remind.
Post by Yuanhan Liu
There should be a limit on the number of descriptors in a chain to prevent an
infinite loop.
 uint16_t
 rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
        struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t
count)
 {
...
        avail_idx =  *((volatile uint16_t *)&vq->avail->idx);
-
-       /* If there are no available buffers then return. */
-       if (vq->last_used_idx == avail_idx)
+       free_entries = avail_idx - vq->last_used_idx;
+       if (free_entries == 0)
                return 0;
A common problem is that avail_idx goes backwards when the guest zeroes its
virtqueues. This function could check for free_entries > vq->size and reset
vq->last_used_idx.
Thanks, but ditto, will consider it in another patch set.
Post by Yuanhan Liu
+       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequene %u buffers\n",
+                       dev->device_fh, count);
Typo at "dequene".
Oops; thanks.

--yliu
Xie, Huawei
2016-01-26 10:30:12 UTC
Permalink
Post by Yuanhan Liu
---
lib/librte_vhost/vhost_rxtx.c | 287 +++++++++++++++++-------------------------
1 file changed, 113 insertions(+), 174 deletions(-)
Prefer to unroll copy_mbuf_to_desc and your COPY macro. It prevents us
processing descriptors in a burst way in future.
Yuanhan Liu
2016-01-27 03:26:15 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
---
lib/librte_vhost/vhost_rxtx.c | 287 +++++++++++++++++-------------------------
1 file changed, 113 insertions(+), 174 deletions(-)
Prefer to unroll copy_mbuf_to_desc and your COPY macro. It prevents us
I'm okay to unroll COPY macro. But for copy_mbuf_to_desc, I prefer not
to do that, unless it has a good reason.
Post by Xie, Huawei
processing descriptors in a burst way in future.
So, do you have a plan?

--yliu
Xie, Huawei
2016-01-27 06:12:22 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
---
lib/librte_vhost/vhost_rxtx.c | 287 +++++++++++++++++-------------------------
1 file changed, 113 insertions(+), 174 deletions(-)
Prefer to unroll copy_mbuf_to_desc and your COPY macro. It prevents us
I'm okay to unroll COPY macro. But for copy_mbuf_to_desc, I prefer not
to do that, unless it has a good reason.
Post by Xie, Huawei
processing descriptors in a burst way in future.
So, do you have a plan?
I think it is OK. If we need unroll in future, we could do that then. I
am open to this. Just my preference. I understand that wrapping makes
code more readable.
Post by Yuanhan Liu
--yliu
Yuanhan Liu
2016-01-27 06:16:32 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
---
lib/librte_vhost/vhost_rxtx.c | 287 +++++++++++++++++-------------------------
1 file changed, 113 insertions(+), 174 deletions(-)
Prefer to unroll copy_mbuf_to_desc and your COPY macro. It prevents us
I'm okay to unroll COPY macro. But for copy_mbuf_to_desc, I prefer not
to do that, unless it has a good reason.
Post by Xie, Huawei
processing descriptors in a burst way in future.
So, do you have a plan?
I think it is OK. If we need unroll in future, we could do that then. I
am open to this. Just my preference. I understand that wrapping makes
code more readable.
Okay, let's consider it then: unroll would be easy after all.

--yliu
Yuanhan Liu
2015-12-03 06:06:10 UTC
Permalink
This is a simple refactor, as there isn't any twisted logic in old
code. Here I just broke the code and introduced two helper functions,
reserve_avail_buf() and copy_mbuf_to_desc() to make the code more
readable.

It saves nearly 1K bytes of code size:

# without this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
37435 0 0 37435 923b /path/to/vhost_rxtx.o

# with this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
36539 0 0 36539 8ebb /path/to/vhost_rxtx.o

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 275 ++++++++++++++++++++----------------------
1 file changed, 129 insertions(+), 146 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index b4c6cab..cb29459 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -61,6 +61,107 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
return (is_tx ^ (idx & 1)) == 0 && idx < qp_nb * VIRTIO_QNUM;
}

+static inline int __attribute__((always_inline))
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
+{
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+ desc_offset = vq->vhost_hlen;
+ desc_avail = desc->len - vq->vhost_hlen;
+
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current mbuf, fetch next */
+ if (mbuf_avail == 0) {
+ m = m->next;
+ if (m == NULL)
+ break;
+
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }
+
+ /* done with current desc buf, fetch next */
+ if (desc_avail == 0) {
+ if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+ /* Room in vring buffer is not enough */
+ return -1;
+ }
+
+ desc = &vq->desc[desc->next];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ desc_offset = 0;
+ desc_avail = desc->len;
+ }
+
+ COPY(desc_addr + desc_offset,
+ rte_pktmbuf_mtod_offset(m, uint64_t, mbuf_offset));
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);
+ }
+ *copied = rte_pktmbuf_pkt_len(m);
+
+ return 0;
+}
+
+/*
+ * As many data cores may want access to available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_end_idx;
+ uint16_t avail_idx;
+ uint16_t free_entries;
+ int success;
+
+ count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+ res_start_idx = vq->last_used_idx_res;
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+ free_entries = (avail_idx - res_start_idx);
+ count = RTE_MIN(count, free_entries);
+ if (count == 0)
+ return 0;
+
+ res_end_idx = res_start_idx + count;
+
+ /*
+ * update vq->last_used_idx_res atomically; try again if failed.
+ *
+ * TODO: Allow to disable cmpset if no concurrency in application.
+ */
+ success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_end_idx);
+ if (!success)
+ goto again;
+
+ *start = res_start_idx;
+ *end = res_end_idx;
+
+ return count;
+}
+
/**
* This function adds buffers to the virtio devices RX virtqueue. Buffers can
* be received from the physical port or from another virtio device. A packet
@@ -70,21 +171,12 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t qp_nb)
*/
static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- struct rte_mbuf *buff;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
- uint64_t buff_addr = 0;
- uint64_t buff_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
- uint32_t head_idx, packet_success = 0;
- uint16_t avail_idx, res_cur_idx;
- uint16_t res_base_idx, res_end_idx;
- uint16_t free_entries;
- uint8_t success = 0;
+ uint16_t res_start_idx, res_end_idx;
+ uint16_t desc_indexes[MAX_PKT_BURST];
+ uint32_t i;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -98,152 +190,43 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
if (unlikely(vq->enabled == 0))
return 0;

- count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
-
- /*
- * As many data cores may want access to available buffers,
- * they need to be reserved.
- */
- do {
- res_base_idx = vq->last_used_idx_res;
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- free_entries = (avail_idx - res_base_idx);
- /*check that we have enough buffers*/
- if (unlikely(count > free_entries))
- count = free_entries;
-
- if (count == 0)
- return 0;
-
- res_end_idx = res_base_idx + count;
- /* vq->last_used_idx_res is atomically updated. */
- /* TODO: Allow to disable cmpset if no concurrency in application. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx, res_end_idx);
- } while (unlikely(success == 0));
- res_cur_idx = res_base_idx;
- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
- dev->device_fh, res_cur_idx, res_end_idx);
-
- /* Prefetch available ring to retrieve indexes. */
- rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);
-
- /* Retrieve all of the head indexes first to avoid caching issues. */
- for (head_idx = 0; head_idx < count; head_idx++)
- head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) &
- (vq->size - 1)];
-
- /*Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[packet_success]]);
-
- while (res_cur_idx != res_end_idx) {
- uint32_t offset = 0, vb_offset = 0;
- uint32_t pkt_len, len_to_cpy, data_len, total_copied = 0;
- uint8_t hdr = 0, uncompleted_pkt = 0;
+ count = reserve_avail_buf(vq, count, &res_start_idx, &res_end_idx);
+ if (count == 0)
+ return 0;

- /* Get descriptor from available ring */
- desc = &vq->desc[head[packet_success]];
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") res_start_idx %d| res_end_idx Index %d\n",
+ dev->device_fh, res_start_idx, res_end_idx);

- buff = pkts[packet_success];
+ /* Retrieve all of the desc indexes first to avoid caching issues. */
+ rte_prefetch0(&vq->avail->ring[res_start_idx & (vq->size - 1)]);
+ for (i = 0; i < count; i++)
+ desc_indexes[i] = vq->avail->ring[(res_start_idx + i) & (vq->size - 1)];

- /* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
- buff_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)buff_addr);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
+ for (i = 0; i < count; i++) {
+ uint16_t desc_idx = desc_indexes[i];
+ uint16_t used_idx = (res_start_idx + i) & (vq->size - 1);
+ uint32_t copied;
+ int err;

- /* Copy virtio_hdr to packet and increment buffer address */
- buff_hdr_addr = buff_addr;
+ err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx, &copied);

- /*
- * If the descriptors are chained the header and data are
- * placed in separate buffers.
- */
- if ((desc->flags & VRING_DESC_F_NEXT) &&
- (desc->len == vq->vhost_hlen)) {
- desc = &vq->desc[desc->next];
- /* Buffer address translation. */
- buff_addr = gpa_to_vva(dev, desc->addr);
+ vq->used->ring[used_idx].id = desc_idx;
+ if (unlikely(err)) {
+ vq->used->ring[used_idx].len = vq->vhost_hlen;
} else {
- vb_offset += vq->vhost_hlen;
- hdr = 1;
- }
-
- pkt_len = rte_pktmbuf_pkt_len(buff);
- data_len = rte_pktmbuf_data_len(buff);
- len_to_cpy = RTE_MIN(data_len,
- hdr ? desc->len - vq->vhost_hlen : desc->len);
- while (total_copied < pkt_len) {
- /* Copy mbuf data to buffer */
- rte_memcpy((void *)(uintptr_t)(buff_addr + vb_offset),
- rte_pktmbuf_mtod_offset(buff, const void *, offset),
- len_to_cpy);
- PRINT_PACKET(dev, (uintptr_t)(buff_addr + vb_offset),
- len_to_cpy, 0);
-
- offset += len_to_cpy;
- vb_offset += len_to_cpy;
- total_copied += len_to_cpy;
-
- /* The whole packet completes */
- if (total_copied == pkt_len)
- break;
-
- /* The current segment completes */
- if (offset == data_len) {
- buff = buff->next;
- offset = 0;
- data_len = rte_pktmbuf_data_len(buff);
- }
-
- /* The current vring descriptor done */
- if (vb_offset == desc->len) {
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- buff_addr = gpa_to_vva(dev, desc->addr);
- vb_offset = 0;
- } else {
- /* Room in vring buffer is not enough */
- uncompleted_pkt = 1;
- break;
- }
- }
- len_to_cpy = RTE_MIN(data_len - offset, desc->len - vb_offset);
+ vq->used->ring[used_idx].len = copied + vq->vhost_hlen;
}

- /* Update used ring with desc information */
- vq->used->ring[res_cur_idx & (vq->size - 1)].id =
- head[packet_success];
-
- /* Drop the packet if it is uncompleted */
- if (unlikely(uncompleted_pkt == 1))
- vq->used->ring[res_cur_idx & (vq->size - 1)].len =
- vq->vhost_hlen;
- else
- vq->used->ring[res_cur_idx & (vq->size - 1)].len =
- pkt_len + vq->vhost_hlen;
-
- res_cur_idx++;
- packet_success++;
-
- if (unlikely(uncompleted_pkt == 1))
- continue;
-
- rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
-
- PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
-
- if (res_cur_idx < res_end_idx) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[packet_success]]);
- }
+ if (i + 1 < count)
+ rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
}

rte_compiler_barrier();

/* Wait until it's our turn to add our buffer to the used ring. */
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != res_start_idx))
rte_pause();

*(volatile uint16_t *)&vq->used->idx += count;
--
1.9.0
Rich Lane
2015-12-11 20:42:33 UTC
Permalink
Post by Yuanhan Liu
+static inline int __attribute__((always_inline))
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
+{
...
+ while (1) {
+ /* done with current mbuf, fetch next */
+ if (mbuf_avail == 0) {
+ m = m->next;
+ if (m == NULL)
+ break;
+
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }
+
+ /* done with current desc buf, fetch next */
+ if (desc_avail == 0) {
+ if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+ /* Room in vring buffer is not enough */
+ return -1;
+ }
+
+ desc = &vq->desc[desc->next];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ desc_offset = 0;
+ desc_avail = desc->len;
+ }
+
+ COPY(desc_addr + desc_offset,
+ rte_pktmbuf_mtod_offset(m, uint64_t, mbuf_offset));
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);
+ }
+ *copied = rte_pktmbuf_pkt_len(m);
AFAICT m will always be NULL at this point so the call to rte_pktmbuf_len
will segfault.
Yuanhan Liu
2015-12-14 01:47:16 UTC
Permalink
Post by Yuanhan Liu
+static inline int __attribute__((always_inline))
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+                 struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
+{
...
+       while (1) {
+               /* done with current mbuf, fetch next */
+               if (mbuf_avail == 0) {
+                       m = m->next;
+                       if (m == NULL)
+                               break;
+
+                       mbuf_offset = 0;
+                       mbuf_avail  = rte_pktmbuf_data_len(m);
+               }
+
+               /* done with current desc buf, fetch next */
+               if (desc_avail == 0) {
+                       if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+                               /* Room in vring buffer is not enough */
+                               return -1;
+                       }
+
+                       desc = &vq->desc[desc->next];
+                       desc_addr   = gpa_to_vva(dev, desc->addr);
+                       desc_offset = 0;
+                       desc_avail  = desc->len;
+               }
+
+               COPY(desc_addr + desc_offset,
+                       rte_pktmbuf_mtod_offset(m, uint64_t, mbuf_offset));
+               PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+                            cpy_len, 0);
+       }
+       *copied = rte_pktmbuf_pkt_len(m);
AFAICT m will always be NULL at this point so the call to rte_pktmbuf_len will
segfault.
Right, I should move it in the beginning of this function.

--yliu
Jérôme Jutteau
2016-01-21 13:50:01 UTC
Permalink
Hi Yuanhan,
Post by Yuanhan Liu
Right, I should move it in the beginning of this function.
Any news about this refactoring ?
--
Jérôme Jutteau,
Tel : 0826.206.307 (poste 304)
IMPORTANT: The information contained in this message may be privileged
and confidential and protected from disclosure. If the reader of this
message is not the intended recipient, or an employee or agent
responsible for delivering this message to the intended recipient, you
are hereby notified that any dissemination, distribution or copying of
this communication is strictly prohibited. If you have received this
communication in error, please notify us immediately by replying to
the message and deleting it from your computer.
Yuanhan Liu
2016-01-27 03:27:34 UTC
Permalink
Post by Jérôme Jutteau
Hi Yuanhan,
Post by Yuanhan Liu
Right, I should move it in the beginning of this function.
Any news about this refactoring ?
Hi Jérôme,

Thanks for showing interests in this patch set; I was waiting for
Huawei's comments. And fortunately, he starts making comments.

--yliu
Yuanhan Liu
2015-12-03 06:06:11 UTC
Permalink
Current virtio_dev_merge_rx just looks like the old rte_vhost_dequeue_burst,
twisted logic, that you can see same code block in quite many places.

However, the logic virtio_dev_merge_rx is quite similar to virtio_dev_rx.
The big difference is that the meregeable one could allocate more than one
available entries to hold the data. Fetching all available entries to vec_buf
at once makes the difference a bit bigger then.

Anyway, it could be simpler, just like what we did for virtio_dev_rx(). The
difference is that we need to update used ring properly, as there could
be more than one entries:

while (1) {
if (this_desc_has_no_room) {
this_desc = fetch_next_from_vec_buf();

if (it is the last of a desc chain) {
update_used_ring();
}
}

if (this_mbuf_has_drained_totally) {
this_mbuf = fetch_next_mbuf();

if (this_mbuf == NULL)
break;
}

COPY(this_desc, this_mbuf);
}

It reduces quite many lines of code, but it just saves a few bytes of
code size (which is expected, btw):

# without this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
36539 0 0 36539 8ebb /path/to/vhost_rxtx.o

# with this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
36171 0 0 36171 8d4b /path/to/vhost_rxtx.o

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 389 +++++++++++++++++-------------------------
1 file changed, 158 insertions(+), 231 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index cb29459..7464b6b 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -241,235 +241,194 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
return count;
}

+static inline int
+fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
+ uint32_t *allocated, uint32_t *vec_idx)
+{
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
+
+ while (1) {
+ if (vec_id >= BUF_VECTOR_MAX)
+ return -1;
+
+ len += vq->desc[idx].len;
+ vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
+ vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
+ vq->buf_vec[vec_id].desc_idx = idx;
+ vec_id++;
+
+ if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
+ break;
+
+ idx = vq->desc[idx].next;
+ }
+
+ *allocated = len;
+ *vec_idx = vec_id;
+
+ return 0;
+}
+
+/*
+ * As many data cores may want to access available buffers concurrently,
+ * they need to be reserved.
+ *
+ * Returns -1 on fail, 0 on success
+ */
+static inline int
+reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_cur_idx;
+ uint16_t avail_idx;
+ uint32_t allocated;
+ uint32_t vec_idx;
+ uint16_t tries;
+
+again:
+ res_start_idx = vq->last_used_idx_res;
+ res_cur_idx = res_start_idx;
+
+ allocated = 0;
+ vec_idx = 0;
+ tries = 0;
+ while (1) {
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+ if (unlikely(res_cur_idx == avail_idx)) {
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Failed "
+ "to get enough desc from vring\n",
+ dev->device_fh);
+ return -1;
+ }
+
+ if (fill_vec_buf(vq, res_cur_idx, &allocated, &vec_idx) < 0)
+ return -1;
+
+ res_cur_idx++;
+ tries++;
+
+ if (allocated >= size)
+ break;
+
+ /*
+ * if we tried all available ring items, and still
+ * can't get enough buf, it means something abnormal
+ * happened.
+ */
+ if (tries >= vq->size)
+ return -1;
+ }
+
+ /*
+ * update vq->last_used_idx_res atomically.
+ * retry again if failed.
+ */
+ if (rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_cur_idx) == 0)
+ goto again;
+
+ *start = res_start_idx;
+ *end = res_cur_idx;
+ return 0;
+}
+
static inline uint32_t __attribute__((always_inline))
-copy_from_mbuf_to_vring(struct virtio_net *dev, uint32_t queue_id,
- uint16_t res_base_idx, uint16_t res_end_idx,
- struct rte_mbuf *pkt)
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *pkt)
{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
uint32_t vec_idx = 0;
- uint32_t entry_success = 0;
- struct vhost_virtqueue *vq;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
- {0, 0, 0, 0, 0, 0}, 0};
- uint16_t cur_idx = res_base_idx;
- uint64_t vb_addr = 0;
- uint64_t vb_hdr_addr = 0;
- uint32_t seg_offset = 0;
- uint32_t vb_offset = 0;
- uint32_t seg_avail;
- uint32_t vb_avail;
- uint32_t cpy_len, entry_len;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;
+ uint32_t nr_used = 0;

if (pkt == NULL)
return 0;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
- "End Index %d\n",
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") Current Index %d| End Index %d\n",
dev->device_fh, cur_idx, res_end_idx);

- /*
- * Convert from gpa to vva
- * (guest physical addr -> vhost virtual addr)
- */
- vq = dev->virtqueue[queue_id];
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);

- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
- vb_hdr_addr = vb_addr;
-
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);

- virtio_hdr.num_buffers = res_end_idx - res_base_idx;
+ virtio_hdr.num_buffers = res_end_idx - res_start_idx;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
dev->device_fh, virtio_hdr.num_buffers);

- rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
+ rte_memcpy((void *)(uintptr_t)desc_addr,
(const void *)&virtio_hdr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

- PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;

- seg_avail = rte_pktmbuf_data_len(pkt);
- vb_offset = vq->vhost_hlen;
- vb_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ mbuf_avail = rte_pktmbuf_data_len(pkt);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current desc buf, get the next one */
+ if (desc_avail == 0) {
+ desc_idx = vq->buf_vec[vec_idx].desc_idx;

- entry_len = vq->vhost_hlen;
+ if ((vq->desc[desc_idx].flags & VRING_DESC_F_NEXT) == 0) {
+ /* Update used ring with desc information */
+ used_idx = cur_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;

- if (vb_avail == 0) {
- uint32_t desc_idx =
- vq->buf_vec[vec_idx].desc_idx;
+ nr_used++;
+ }

- if ((vq->desc[desc_idx].flags
- & VRING_DESC_F_NEXT) == 0) {
- /* Update used ring with desc information */
- vq->used->ring[cur_idx & (vq->size - 1)].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[cur_idx & (vq->size - 1)].len
- = entry_len;
+ vec_idx++;
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);

- entry_len = 0;
- cur_idx++;
- entry_success++;
+ /* Prefetch buffer address. */
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+ desc_offset = 0;
+ desc_avail = vq->buf_vec[vec_idx].buf_len;
}

- vec_idx++;
- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
-
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- }
-
- cpy_len = RTE_MIN(vb_avail, seg_avail);
-
- while (cpy_len > 0) {
- /* Copy mbuf data to vring buffer */
- rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
- rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
- cpy_len);
-
- PRINT_PACKET(dev,
- (uintptr_t)(vb_addr + vb_offset),
- cpy_len, 0);
-
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- seg_avail -= cpy_len;
- vb_avail -= cpy_len;
- entry_len += cpy_len;
-
- if (seg_avail != 0) {
- /*
- * The virtio buffer in this vring
- * entry reach to its end.
- * But the segment doesn't complete.
- */
- if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
- /* Update used ring with desc information */
- vq->used->ring[cur_idx & (vq->size - 1)].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[cur_idx & (vq->size - 1)].len
- = entry_len;
- entry_len = 0;
- cur_idx++;
- entry_success++;
- }
-
- vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This current segment complete, need continue to
- * check if the whole packet complete or not.
- */
+ /* done with current mbuf, get the next one */
+ if (mbuf_avail == 0) {
pkt = pkt->next;
- if (pkt != NULL) {
- /*
- * There are more segments.
- */
- if (vb_avail == 0) {
- /*
- * This current buffer from vring is
- * used up, need fetch next buffer
- * from buf_vec.
- */
- uint32_t desc_idx =
- vq->buf_vec[vec_idx].desc_idx;
-
- if ((vq->desc[desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
- uint16_t wrapped_idx =
- cur_idx & (vq->size - 1);
- /*
- * Update used ring with the
- * descriptor information
- */
- vq->used->ring[wrapped_idx].id
- = desc_idx;
- vq->used->ring[wrapped_idx].len
- = entry_len;
- entry_success++;
- entry_len = 0;
- cur_idx++;
- }
-
- /* Get next buffer from buf_vec. */
- vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_avail =
- vq->buf_vec[vec_idx].buf_len;
- vb_offset = 0;
- }
-
- seg_offset = 0;
- seg_avail = rte_pktmbuf_data_len(pkt);
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This whole packet completes.
- */
- /* Update used ring with desc information */
- vq->used->ring[cur_idx & (vq->size - 1)].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[cur_idx & (vq->size - 1)].len
- = entry_len;
- entry_success++;
+ if (!pkt)
break;
- }
- }
- }
-
- return entry_success;
-}

-static inline void __attribute__((always_inline))
-update_secure_len(struct vhost_virtqueue *vq, uint32_t id,
- uint32_t *secure_len, uint32_t *vec_idx)
-{
- uint16_t wrapped_idx = id & (vq->size - 1);
- uint32_t idx = vq->avail->ring[wrapped_idx];
- uint8_t next_desc;
- uint32_t len = *secure_len;
- uint32_t vec_id = *vec_idx;
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(pkt);
+ }

- do {
- next_desc = 0;
- len += vq->desc[idx].len;
- vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
- vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
- vq->buf_vec[vec_id].desc_idx = idx;
- vec_id++;
+ COPY(desc_addr + desc_offset,
+ rte_pktmbuf_mtod_offset(pkt, uint64_t, mbuf_offset));
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);
+ }

- if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
- idx = vq->desc[idx].next;
- next_desc = 1;
- }
- } while (next_desc);
+ used_idx = cur_idx & (vq->size - 1);
+ vq->used->ring[used_idx].id = vq->buf_vec[vec_idx].desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
+ nr_used++;

- *secure_len = len;
- *vec_idx = vec_id;
+ return nr_used;
}

-/*
- * This function works for mergeable RX.
- */
static inline uint32_t __attribute__((always_inline))
virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- uint32_t pkt_idx = 0, entry_success = 0;
- uint16_t avail_idx;
- uint16_t res_base_idx, res_cur_idx;
- uint8_t success = 0;
+ uint32_t pkt_idx = 0, nr_used = 0;
+ uint16_t start, end;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
dev->device_fh);
@@ -485,62 +444,30 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
return 0;

count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
if (count == 0)
return 0;

for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;

- do {
- /*
- * As many data cores may want access to available
- * buffers, they need to be reserved.
- */
- uint32_t secure_len = 0;
- uint32_t vec_idx = 0;
-
- res_base_idx = vq->last_used_idx_res;
- res_cur_idx = res_base_idx;
-
- do {
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
- if (unlikely(res_cur_idx == avail_idx)) {
- LOG_DEBUG(VHOST_DATA,
- "(%"PRIu64") Failed "
- "to get enough desc from "
- "vring\n",
- dev->device_fh);
- goto merge_rx_exit;
- } else {
- update_secure_len(vq, res_cur_idx, &secure_len, &vec_idx);
- res_cur_idx++;
- }
- } while (pkt_len > secure_len);
-
- /* vq->last_used_idx_res is atomically updated. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx,
- res_cur_idx);
- } while (success == 0);
-
- entry_success = copy_from_mbuf_to_vring(dev, queue_id,
- res_base_idx, res_cur_idx, pkts[pkt_idx]);
+ if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+ break;

+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
rte_compiler_barrier();

/*
* Wait until it's our turn to add our buffer
* to the used ring.
*/
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != start))
rte_pause();

- *(volatile uint16_t *)&vq->used->idx += entry_success;
- vq->last_used_idx = res_cur_idx;
+ *(volatile uint16_t *)&vq->used->idx += nr_used;
+ vq->last_used_idx = end;
}

-merge_rx_exit:
if (likely(pkt_idx)) {
/* flush used->idx update before we read avail->flags. */
rte_mb();
--
1.9.0
Yuanhan Liu
2015-12-03 06:06:12 UTC
Permalink
First of all, rte_memcpy() is mostly useful for coping big packets
by leveraging hardware advanced instructions like AVX. But for virtio
net hdr, which is 12 bytes at most, invoking rte_memcpy() will not
introduce any performance boost.

And, to my suprise, rte_memcpy() is huge. Since rte_memcpy() is
inlined, it takes more space every time we call it at a different
place. Replacing the two rte_memcpy() with directly copy saves
nearly 12K bytes of code size!

# without this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
36171 0 0 36171 8d4b /path/to/vhost_rxtx.o

# with this patch
$ size /path/to/vhost_rxtx.o
text data bss dec hex filename
24179 0 0 24179 5e73 /path/to/vhost_rxtx.o

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 7464b6b..1e0a24e 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -70,14 +70,17 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
uint32_t cpy_len;
struct vring_desc *desc;
uint64_t desc_addr;
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ struct virtio_net_hdr_mrg_rxbuf hdr = {{0, 0, 0, 0, 0, 0}, 0};

desc = &vq->desc[desc_idx];
desc_addr = gpa_to_vva(dev, desc->addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);

- rte_memcpy((void *)(uintptr_t)desc_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

desc_offset = vq->vhost_hlen;
@@ -340,7 +343,7 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
uint16_t res_start_idx, uint16_t res_end_idx,
struct rte_mbuf *pkt)
{
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ struct virtio_net_hdr_mrg_rxbuf hdr = {{0, 0, 0, 0, 0, 0}, 0};
uint32_t vec_idx = 0;
uint16_t cur_idx = res_start_idx;
uint64_t desc_addr;
@@ -361,13 +364,16 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,

rte_prefetch0((void *)(uintptr_t)desc_addr);

- virtio_hdr.num_buffers = res_end_idx - res_start_idx;
+ hdr.num_buffers = res_end_idx - res_start_idx;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
dev->device_fh, virtio_hdr.num_buffers);

- rte_memcpy((void *)(uintptr_t)desc_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
--
1.9.0
Xie, Huawei
2016-01-27 02:46:39 UTC
Permalink
Post by Yuanhan Liu
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
Thanks!
We might simplify this further. Just reset the first two fields flags
and gso_type.
Yuanhan Liu
2016-01-27 03:22:45 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
Thanks!
We might simplify this further. Just reset the first two fields flags
and gso_type.
What's this "simplification" for? Don't even to say that we will add
TSO support, which modifies few more files, such as csum_start: reseting
the first two fields only is wrong here.

--yliu
Xie, Huawei
2016-01-27 05:56:56 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
Thanks!
We might simplify this further. Just reset the first two fields flags
and gso_type.
What's this "simplification" for? Don't even to say that we will add
TSO support, which modifies few more files, such as csum_start: reseting
the first two fields only is wrong here.
I know TSO before commenting, but at least in this implementation and
this specific patch, i guess zeroing two fields are enough.

What is wrong resetting only two fields?
Post by Yuanhan Liu
--yliu
Yuanhan Liu
2016-01-27 06:02:36 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
Thanks!
We might simplify this further. Just reset the first two fields flags
and gso_type.
What's this "simplification" for? Don't even to say that we will add
TSO support, which modifies few more files, such as csum_start: reseting
the first two fields only is wrong here.
I know TSO before commenting, but at least in this implementation and
this specific patch, i guess zeroing two fields are enough.
What is wrong resetting only two fields?
I then have to ask "What is the benifit of resetting only two fields"?
If doing so, we have to change it back for TSO. My proposal requires no
extra change when adding TSO support.

--yliu
Xie, Huawei
2016-01-27 06:16:37 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
Thanks!
We might simplify this further. Just reset the first two fields flags
and gso_type.
What's this "simplification" for? Don't even to say that we will add
TSO support, which modifies few more files, such as csum_start: reseting
the first two fields only is wrong here.
I know TSO before commenting, but at least in this implementation and
this specific patch, i guess zeroing two fields are enough.
What is wrong resetting only two fields?
I then have to ask "What is the benifit of resetting only two fields"?
If doing so, we have to change it back for TSO. My proposal requires no
extra change when adding TSO support.
? Benefit is we save four unnecessary stores.
Post by Yuanhan Liu
--yliu
Yuanhan Liu
2016-01-27 06:35:58 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
Thanks!
We might simplify this further. Just reset the first two fields flags
and gso_type.
What's this "simplification" for? Don't even to say that we will add
TSO support, which modifies few more files, such as csum_start: reseting
the first two fields only is wrong here.
I know TSO before commenting, but at least in this implementation and
this specific patch, i guess zeroing two fields are enough.
What is wrong resetting only two fields?
I then have to ask "What is the benifit of resetting only two fields"?
If doing so, we have to change it back for TSO. My proposal requires no
extra change when adding TSO support.
? Benefit is we save four unnecessary stores.
Hmm..., the hdr size is 12 bytes at most. I mean, does it really matter,
coping 3 bytes, or coping 12 bytes in a row?

--yliu
Yuanhan Liu
2015-12-03 06:06:13 UTC
Permalink
VIRTIO_NET_F_MRG_RXBUF is a default feature supported by vhost.
Adding unlikely for VIRTIO_NET_F_MRG_RXBUF detection doesn't
make sense to me at all.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 1e0a24e..7d96cd4 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -490,7 +490,7 @@ uint16_t
rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint16_t count)
{
- if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
+ if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
return virtio_dev_merge_rx(dev, queue_id, pkts, count);
else
return virtio_dev_rx(dev, queue_id, pkts, count);
--
1.9.0
Thomas Monjalon
2016-02-17 22:50:22 UTC
Permalink
Hi Yuanhan,
Post by Yuanhan Liu
Vhost rxtx code is derived from vhost-switch example, which is very
likely the most messy code in DPDK. Unluckily, the move also brings
over the bad merits: twisted logic, bad comments.
When I joined this team firstly, I was quite scared off by the messy
and long vhost rxtx code. While adding the vhost-user live migration
support, that I have to make fews changes to it, I then ventured to
look at it again, to understand it better, in the meantime, to see if
I can refactor it.
And, here you go.
There were several comments and a typo detected.
Please what is the status of this patchset?
Yuanhan Liu
2016-02-18 04:09:13 UTC
Permalink
Post by Jérôme Jutteau
Hi Yuanhan,
Post by Yuanhan Liu
Vhost rxtx code is derived from vhost-switch example, which is very
likely the most messy code in DPDK. Unluckily, the move also brings
over the bad merits: twisted logic, bad comments.
When I joined this team firstly, I was quite scared off by the messy
and long vhost rxtx code. While adding the vhost-user live migration
support, that I have to make fews changes to it, I then ventured to
look at it again, to understand it better, in the meantime, to see if
I can refactor it.
And, here you go.
There were several comments and a typo detected.
Please what is the status of this patchset?
Hi Thomas,

It was delayed; and I will address those comments and send out a new
version recently.

--yliu
Yuanhan Liu
2016-02-18 13:49:12 UTC
Permalink
A malicious guest may easily forge some illegal vring desc buf.
To make our vhost robust, we need make sure desc->next will not
go beyond the vq->desc[] array.

Suggested-by: Rich Lane <***@bigswitch.com>
Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index c2adcd9..b0c0c94 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -148,6 +148,8 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
/* Room in vring buffer is not enough */
return -1;
}
+ if (unlikely(desc->next >= vq->size))
+ return -1;

desc = &vq->desc[desc->next];
desc_addr = gpa_to_vva(dev, desc->addr);
@@ -302,7 +304,7 @@ fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
uint32_t len = *allocated;

while (1) {
- if (vec_id >= BUF_VECTOR_MAX)
+ if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size))
return -1;

len += vq->desc[idx].len;
@@ -671,6 +673,8 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
/* This desc reachs to its end, get the next one */
if (desc_avail == 0) {
+ if (unlikely(desc->next >= vq->size))
+ goto fail;
desc = &vq->desc[desc->next];

desc_addr = gpa_to_vva(dev, desc->addr);
@@ -691,9 +695,7 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
if (unlikely(!cur)) {
RTE_LOG(ERR, VHOST_DATA, "Failed to "
"allocate memory for mbuf.\n");
- if (head)
- rte_pktmbuf_free(head);
- return NULL;
+ goto fail;
}
if (!head) {
head = cur;
@@ -729,6 +731,11 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
}

return head;
+
+fail:
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
}

uint16_t
--
1.9.0
Xie, Huawei
2016-03-07 03:10:43 UTC
Permalink
Post by Yuanhan Liu
+ if (unlikely(desc->next >= vq->size))
+ goto fail;
desc chains could be forged into a loop then vhost runs the dead loop
until it exhaust all mbuf memory.
Yuanhan Liu
2016-03-07 06:57:04 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
+ if (unlikely(desc->next >= vq->size))
+ goto fail;
desc chains could be forged into a loop then vhost runs the dead loop
until it exhaust all mbuf memory.
Good point. Any elegant solution to avoid that?

--yliu
Yuanhan Liu
2016-02-18 13:49:10 UTC
Permalink
VIRTIO_NET_F_MRG_RXBUF is a default feature supported by vhost.
Adding unlikely for VIRTIO_NET_F_MRG_RXBUF detection doesn't
make sense to me at all.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 97690c3..04af9b3 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -538,7 +538,7 @@ uint16_t
rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint16_t count)
{
- if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
+ if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
return virtio_dev_merge_rx(dev, queue_id, pkts, count);
else
return virtio_dev_rx(dev, queue_id, pkts, count);
--
1.9.0
Yuanhan Liu
2016-02-18 13:49:06 UTC
Permalink
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() three times at three different places!

However, rte_vhost_dequeue_burst() acutally does a simple job: copy
the packet data from vring desc to mbuf. What's tricky here is:

- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.

- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.

Even though, the logic could be simple. Here is the pseudo code.

while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}

if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}

COPY(mbuf, desc);
}

And this is how I refactored rte_vhost_dequeue_burst.

Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
and desc_offset var:

desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;

This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---

v2: - fix potential NULL dereference bug of var "prev" and "head"
---
lib/librte_vhost/vhost_rxtx.c | 297 +++++++++++++++++-------------------------
1 file changed, 116 insertions(+), 181 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5e7e5b1..d5cd0fa 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -702,21 +702,104 @@ vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
}
}

+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct rte_mbuf *head = NULL;
+ struct rte_mbuf *cur = NULL, *prev = NULL;
+ struct virtio_net_hdr *hdr;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Retrieve virtio net header */
+ hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+ desc_avail = desc->len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
+
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
+ if (!head) {
+ head = cur;
+ } else {
+ prev->next = cur;
+ prev->data_len = mbuf_offset;
+ head->nb_segs += 1;
+ }
+ head->pkt_len += mbuf_offset;
+ prev = cur;
+
+ mbuf_offset = 0;
+ mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+ }
+
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
+ (void *)((uintptr_t)(desc_addr + desc_offset)),
+ cpy_len);
+
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ if (prev) {
+ prev->data_len = mbuf_offset;
+ head->pkt_len += mbuf_offset;
+
+ if (hdr->flags != 0 || hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE)
+ vhost_dequeue_offload(hdr, head);
+ }
+
+ return head;
+}
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint64_t vb_net_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
uint32_t used_idx;
uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
- struct virtio_net_hdr *hdr = NULL;
+ struct rte_mbuf *m;

if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -730,197 +813,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
return 0;

avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- /* If there are no available buffers then return. */
- if (vq->last_used_idx == avail_idx)
+ free_entries = avail_idx - vq->last_used_idx;
+ if (free_entries == 0)
return 0;

- LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__,
- dev->device_fh);
+ LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__, dev->device_fh);

- /* Prefetch available ring to retrieve head indexes. */
- rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
+ used_idx = vq->last_used_idx & (vq->size -1);

- /*get the number of free entries in the ring*/
- free_entries = (avail_idx - vq->last_used_idx);
+ /* Prefetch available ring to retrieve head indexes. */
+ rte_prefetch0(&vq->avail->ring[used_idx]);

- free_entries = RTE_MIN(free_entries, count);
- /* Limit to MAX_PKT_BURST. */
- free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
+ count = RTE_MIN(count, MAX_PKT_BURST);
+ count = RTE_MIN(count, free_entries);
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequeue %u buffers\n",
+ dev->device_fh, count);

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
- dev->device_fh, free_entries);
/* Retrieve all of the head indexes first to avoid caching issues. */
- for (i = 0; i < free_entries; i++)
- head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
+ for (i = 0; i < count; i++) {
+ desc_indexes[i] = vq->avail->ring[(vq->last_used_idx + i) &
+ (vq->size - 1)];
+ }

/* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success]]);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);

- while (entry_success < free_entries) {
- uint32_t vb_avail, vb_offset;
- uint32_t seg_avail, seg_offset;
- uint32_t cpy_len;
- uint32_t seg_num = 0;
- struct rte_mbuf *cur;
- uint8_t alloc_err = 0;
-
- desc = &vq->desc[head[entry_success]];
-
- vb_net_hdr_addr = gpa_to_vva(dev, desc->addr);
- hdr = (struct virtio_net_hdr *)((uintptr_t)vb_net_hdr_addr);
-
- /* Discard first buffer as it is the virtio header */
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- vb_offset = 0;
- vb_avail = desc->len;
- } else {
- vb_offset = vq->vhost_hlen;
- vb_avail = desc->len - vb_offset;
- }
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
-
- used_idx = vq->last_used_idx & (vq->size - 1);
-
- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
- }
-
- /* Update used index buffer information. */
- vq->used->ring[used_idx].id = head[entry_success];
- vq->used->ring[used_idx].len = 0;
-
- /* Allocate an mbuf and populate the structure. */
- m = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(m == NULL)) {
- RTE_LOG(ERR, VHOST_DATA,
- "Failed to allocate memory for mbuf.\n");
- break;
- }
- seg_offset = 0;
- seg_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
-
- seg_num++;
- cur = m;
- prev = m;
- while (cpy_len != 0) {
- rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, seg_offset),
- (void *)((uintptr_t)(vb_addr + vb_offset)),
- cpy_len);
-
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- vb_avail -= cpy_len;
- seg_avail -= cpy_len;
-
- if (vb_avail != 0) {
- /*
- * The segment reachs to its end,
- * while the virtio buffer in TX vring has
- * more data to be copied.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /* Allocate mbuf and populate the structure. */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR, VHOST_DATA, "Failed to "
- "allocate memory for mbuf.\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
-
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- } else {
- if (desc->flags & VRING_DESC_F_NEXT) {
- /*
- * There are more virtio buffers in
- * same vring entry need to be copied.
- */
- if (seg_avail == 0) {
- /*
- * The current segment hasn't
- * room to accomodate more
- * data.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /*
- * Allocate an mbuf and
- * populate the structure.
- */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR,
- VHOST_DATA,
- "Failed to "
- "allocate memory "
- "for mbuf\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- }
-
- desc = &vq->desc[desc->next];
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = desc->len;
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr,
- desc->len, 0);
- } else {
- /* The whole packet completes. */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- vb_avail = 0;
- }
- }
-
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- }
-
- if (unlikely(alloc_err == 1))
+ for (i = 0; i < count; i++) {
+ m = copy_desc_to_mbuf(dev, vq, desc_indexes[i], mbuf_pool);
+ if (m == NULL)
break;
+ pkts[i] = m;

- m->nb_segs = seg_num;
- if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
- vhost_dequeue_offload(hdr, m);
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
}

rte_compiler_barrier();
- vq->used->idx += entry_success;
+ vq->used->idx += i;
+
/* Kick guest if required. */
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
eventfd_write(vq->callfd, (eventfd_t)1);
- return entry_success;
+
+ return i;
}
--
1.9.0
Xie, Huawei
2016-03-03 16:21:19 UTC
Permalink
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() three times at three different places!
However, rte_vhost_dequeue_burst() acutally does a simple job: copy
- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.
- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.
Even though, the logic could be simple. Here is the pseudo code.
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
And this is how I refactored rte_vhost_dequeue_burst.
Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
---
v2: - fix potential NULL dereference bug of var "prev" and "head"
---
lib/librte_vhost/vhost_rxtx.c | 297 +++++++++++++++++-------------------------
1 file changed, 116 insertions(+), 181 deletions(-)
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5e7e5b1..d5cd0fa 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -702,21 +702,104 @@ vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
}
}
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct rte_mbuf *head = NULL;
+ struct rte_mbuf *cur = NULL, *prev = NULL;
+ struct virtio_net_hdr *hdr;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Retrieve virtio net header */
+ hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+ desc_avail = desc->len - vq->vhost_hlen;
There is a serious bug here, desc->len - vq->vhost_len could overflow.
VM could easily create this case. Let us fix it here.
Post by Yuanhan Liu
+ desc_offset = vq->vhost_hlen;
+
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
+ if (!head) {
+ head = cur;
+ } else {
+ prev->next = cur;
+ prev->data_len = mbuf_offset;
+ head->nb_segs += 1;
+ }
+ head->pkt_len += mbuf_offset;
+ prev = cur;
+
+ mbuf_offset = 0;
+ mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+ }
+
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
+ (void *)((uintptr_t)(desc_addr + desc_offset)),
+ cpy_len);
+
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ if (prev) {
+ prev->data_len = mbuf_offset;
+ head->pkt_len += mbuf_offset;
+
+ if (hdr->flags != 0 || hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE)
+ vhost_dequeue_offload(hdr, head);
+ }
+
+ return head;
+}
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint64_t vb_net_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
uint32_t used_idx;
uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
- struct virtio_net_hdr *hdr = NULL;
+ struct rte_mbuf *m;
if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -730,197 +813,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
return 0;
avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- /* If there are no available buffers then return. */
- if (vq->last_used_idx == avail_idx)
+ free_entries = avail_idx - vq->last_used_idx;
+ if (free_entries == 0)
return 0;
- LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__,
- dev->device_fh);
+ LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__, dev->device_fh);
- /* Prefetch available ring to retrieve head indexes. */
- rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
+ used_idx = vq->last_used_idx & (vq->size -1);
- /*get the number of free entries in the ring*/
- free_entries = (avail_idx - vq->last_used_idx);
+ /* Prefetch available ring to retrieve head indexes. */
+ rte_prefetch0(&vq->avail->ring[used_idx]);
- free_entries = RTE_MIN(free_entries, count);
- /* Limit to MAX_PKT_BURST. */
- free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
+ count = RTE_MIN(count, MAX_PKT_BURST);
+ count = RTE_MIN(count, free_entries);
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequeue %u buffers\n",
+ dev->device_fh, count);
- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
- dev->device_fh, free_entries);
/* Retrieve all of the head indexes first to avoid caching issues. */
- for (i = 0; i < free_entries; i++)
- head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
+ for (i = 0; i < count; i++) {
+ desc_indexes[i] = vq->avail->ring[(vq->last_used_idx + i) &
+ (vq->size - 1)];
+ }
/* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success]]);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);
- while (entry_success < free_entries) {
- uint32_t vb_avail, vb_offset;
- uint32_t seg_avail, seg_offset;
- uint32_t cpy_len;
- uint32_t seg_num = 0;
- struct rte_mbuf *cur;
- uint8_t alloc_err = 0;
-
- desc = &vq->desc[head[entry_success]];
-
- vb_net_hdr_addr = gpa_to_vva(dev, desc->addr);
- hdr = (struct virtio_net_hdr *)((uintptr_t)vb_net_hdr_addr);
-
- /* Discard first buffer as it is the virtio header */
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- vb_offset = 0;
- vb_avail = desc->len;
- } else {
- vb_offset = vq->vhost_hlen;
- vb_avail = desc->len - vb_offset;
- }
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
-
- used_idx = vq->last_used_idx & (vq->size - 1);
-
- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
- }
-
- /* Update used index buffer information. */
- vq->used->ring[used_idx].id = head[entry_success];
- vq->used->ring[used_idx].len = 0;
-
- /* Allocate an mbuf and populate the structure. */
- m = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(m == NULL)) {
- RTE_LOG(ERR, VHOST_DATA,
- "Failed to allocate memory for mbuf.\n");
- break;
- }
- seg_offset = 0;
- seg_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
-
- seg_num++;
- cur = m;
- prev = m;
- while (cpy_len != 0) {
- rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, seg_offset),
- (void *)((uintptr_t)(vb_addr + vb_offset)),
- cpy_len);
-
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- vb_avail -= cpy_len;
- seg_avail -= cpy_len;
-
- if (vb_avail != 0) {
- /*
- * The segment reachs to its end,
- * while the virtio buffer in TX vring has
- * more data to be copied.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /* Allocate mbuf and populate the structure. */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR, VHOST_DATA, "Failed to "
- "allocate memory for mbuf.\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
-
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- } else {
- if (desc->flags & VRING_DESC_F_NEXT) {
- /*
- * There are more virtio buffers in
- * same vring entry need to be copied.
- */
- if (seg_avail == 0) {
- /*
- * The current segment hasn't
- * room to accomodate more
- * data.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /*
- * Allocate an mbuf and
- * populate the structure.
- */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR,
- VHOST_DATA,
- "Failed to "
- "allocate memory "
- "for mbuf\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- }
-
- desc = &vq->desc[desc->next];
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = desc->len;
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr,
- desc->len, 0);
- } else {
- /* The whole packet completes. */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- vb_avail = 0;
- }
- }
-
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- }
-
- if (unlikely(alloc_err == 1))
+ for (i = 0; i < count; i++) {
+ m = copy_desc_to_mbuf(dev, vq, desc_indexes[i], mbuf_pool);
+ if (m == NULL)
break;
+ pkts[i] = m;
- m->nb_segs = seg_num;
- if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
- vhost_dequeue_offload(hdr, m);
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
}
rte_compiler_barrier();
- vq->used->idx += entry_success;
+ vq->used->idx += i;
+
/* Kick guest if required. */
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
eventfd_write(vq->callfd, (eventfd_t)1);
- return entry_success;
+
+ return i;
}
Yuanhan Liu
2016-03-04 02:21:18 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() three times at three different places!
However, rte_vhost_dequeue_burst() acutally does a simple job: copy
- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.
- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.
Even though, the logic could be simple. Here is the pseudo code.
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
And this is how I refactored rte_vhost_dequeue_burst.
Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
---
v2: - fix potential NULL dereference bug of var "prev" and "head"
---
lib/librte_vhost/vhost_rxtx.c | 297 +++++++++++++++++-------------------------
1 file changed, 116 insertions(+), 181 deletions(-)
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5e7e5b1..d5cd0fa 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -702,21 +702,104 @@ vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
}
}
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct rte_mbuf *head = NULL;
+ struct rte_mbuf *cur = NULL, *prev = NULL;
+ struct virtio_net_hdr *hdr;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Retrieve virtio net header */
+ hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+ desc_avail = desc->len - vq->vhost_hlen;
There is a serious bug here, desc->len - vq->vhost_len could overflow.
VM could easily create this case. Let us fix it here.
Nope, this issue has been there since the beginning, and this patch
is a refactor: we should not bring any functional changes. Therefore,
we should not fix it here.

And actually, it's been fixed in the 6th patch in this series:

[PATCH v2 6/7] vhost: do sanity check for desc->len

--yliu
Xie, Huawei
2016-03-07 02:19:54 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() three times at three different places!
However, rte_vhost_dequeue_burst() acutally does a simple job: copy
- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.
- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.
Even though, the logic could be simple. Here is the pseudo code.
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
And this is how I refactored rte_vhost_dequeue_burst.
Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
---
v2: - fix potential NULL dereference bug of var "prev" and "head"
---
lib/librte_vhost/vhost_rxtx.c | 297 +++++++++++++++++-------------------------
1 file changed, 116 insertions(+), 181 deletions(-)
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5e7e5b1..d5cd0fa 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -702,21 +702,104 @@ vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
}
}
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct rte_mbuf *head = NULL;
+ struct rte_mbuf *cur = NULL, *prev = NULL;
+ struct virtio_net_hdr *hdr;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Retrieve virtio net header */
+ hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+ desc_avail = desc->len - vq->vhost_hlen;
There is a serious bug here, desc->len - vq->vhost_len could overflow.
VM could easily create this case. Let us fix it here.
Nope, this issue has been there since the beginning, and this patch
is a refactor: we should not bring any functional changes. Therefore,
we should not fix it here.
No, I don't mean exactly fixing in this patch but in this series.

Besides, from refactoring's perspective, actually we could make things
further much simpler and more readable. Both the desc chains and mbuf
could be converted into iovec, then both dequeue(copy_desc_to_mbuf) and
enqueue(copy_mbuf_to_desc) could use the commonly used iovec copying
algorithms As data path are performance oriented, let us stop here.
Will check that.
Post by Yuanhan Liu
[PATCH v2 6/7] vhost: do sanity check for desc->len
--yliu
Yuanhan Liu
2016-03-07 02:44:06 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there: it
invokes rte_pktmbuf_alloc() three times at three different places!
However, rte_vhost_dequeue_burst() acutally does a simple job: copy
- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.
- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.
Even though, the logic could be simple. Here is the pseudo code.
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
And this is how I refactored rte_vhost_dequeue_burst.
Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
---
v2: - fix potential NULL dereference bug of var "prev" and "head"
---
lib/librte_vhost/vhost_rxtx.c | 297 +++++++++++++++++-------------------------
1 file changed, 116 insertions(+), 181 deletions(-)
diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 5e7e5b1..d5cd0fa 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -702,21 +702,104 @@ vhost_dequeue_offload(struct virtio_net_hdr *hdr, struct rte_mbuf *m)
}
}
+static inline struct rte_mbuf *
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t desc_idx, struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct rte_mbuf *head = NULL;
+ struct rte_mbuf *cur = NULL, *prev = NULL;
+ struct virtio_net_hdr *hdr;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Retrieve virtio net header */
+ hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+ desc_avail = desc->len - vq->vhost_hlen;
There is a serious bug here, desc->len - vq->vhost_len could overflow.
VM could easily create this case. Let us fix it here.
Nope, this issue has been there since the beginning, and this patch
is a refactor: we should not bring any functional changes. Therefore,
we should not fix it here.
No, I don't mean exactly fixing in this patch but in this series.
Besides, from refactoring's perspective, actually we could make things
further much simpler and more readable. Both the desc chains and mbuf
could be converted into iovec, then both dequeue(copy_desc_to_mbuf) and
enqueue(copy_mbuf_to_desc) could use the commonly used iovec copying
algorithms As data path are performance oriented, let us stop here.
Agreed, I had same performance concern on further simplication,
therefore I didn't go further.
Post by Xie, Huawei
Will check that.
Do you have other comments for other patches? I'm considering to send
a new version recently, say maybe tomorrow.

--yliu
Xie, Huawei
2016-03-03 16:30:42 UTC
Permalink
Post by Yuanhan Liu
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
We could always allocate the head mbuf before the loop, then we save the
following branch and make the code more streamlined.
It reminds me that this change prevents the possibility of mbuf bulk
allocation, one solution is we pass the head mbuf from an additional
parameter.
Btw, put unlikely before the check of mbuf_avail and checks elsewhere.
Post by Yuanhan Liu
+ if (!head) {
+ head = cur;
+ } else {
+ prev->next = cur;
+ prev->data_len = mbuf_offset;
+ head->nb_segs += 1;
+ }
+ head->pkt_len += mbuf_offset;
+ prev = cur;
+
+ mbuf_offset = 0;
+ mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+ }
+
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
+ (void *)((uintptr_t)(desc_addr + desc_offset)),
+ cpy_len);
+
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
Yuanhan Liu
2016-03-04 02:17:05 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
We could always allocate the head mbuf before the loop, then we save the
following branch and make the code more streamlined.
It reminds me that this change prevents the possibility of mbuf bulk
allocation, one solution is we pass the head mbuf from an additional
parameter.
Yep, that's also something I have thought of.
Post by Xie, Huawei
Btw, put unlikely before the check of mbuf_avail and checks elsewhere.
I don't think so. It would benifit for the small packets. What if,
however, when TSO or jumbo frame is enabled that we have big packets?

--yliu
Xie, Huawei
2016-03-07 02:32:46 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
We could always allocate the head mbuf before the loop, then we save the
following branch and make the code more streamlined.
It reminds me that this change prevents the possibility of mbuf bulk
allocation, one solution is we pass the head mbuf from an additional
parameter.
Yep, that's also something I have thought of.
Post by Xie, Huawei
Btw, put unlikely before the check of mbuf_avail and checks elsewhere.
I don't think so. It would benifit for the small packets. What if,
however, when TSO or jumbo frame is enabled that we have big packets?
Prefer to favor the path that packet could fit in one mbuf.

Btw, not specially to this, return "m = copy_desc_to_mbuf(dev, vq,
desc_indexes[i], mbuf_pool)", failure case is unlikely to happen, so add
unlikely for the check if (m == NULL) there. Please check all branches
elsewhere.
Post by Yuanhan Liu
--yliu
Yuanhan Liu
2016-03-07 02:48:38 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
We could always allocate the head mbuf before the loop, then we save the
following branch and make the code more streamlined.
It reminds me that this change prevents the possibility of mbuf bulk
allocation, one solution is we pass the head mbuf from an additional
parameter.
Yep, that's also something I have thought of.
Post by Xie, Huawei
Btw, put unlikely before the check of mbuf_avail and checks elsewhere.
I don't think so. It would benifit for the small packets. What if,
however, when TSO or jumbo frame is enabled that we have big packets?
Prefer to favor the path that packet could fit in one mbuf.
Hmmm, why? While I know that TSO and mergeable buf is disable by default
in our vhost example vhost-switch, they are enabled by default in real
life.
Post by Xie, Huawei
Btw, not specially to this, return "m = copy_desc_to_mbuf(dev, vq,
desc_indexes[i], mbuf_pool)", failure case is unlikely to happen, so add
unlikely for the check if (m == NULL) there. Please check all branches
elsewhere.
Thanks for the remind, will have a detail check.

--yliu
Xie, Huawei
2016-03-07 02:59:55 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ mbuf_avail = 0;
+ mbuf_offset = 0;
+ while (desc_avail || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reachs to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reachs to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(!cur)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ if (head)
+ rte_pktmbuf_free(head);
+ return NULL;
+ }
We could always allocate the head mbuf before the loop, then we save the
following branch and make the code more streamlined.
It reminds me that this change prevents the possibility of mbuf bulk
allocation, one solution is we pass the head mbuf from an additional
parameter.
Yep, that's also something I have thought of.
Post by Xie, Huawei
Btw, put unlikely before the check of mbuf_avail and checks elsewhere.
I don't think so. It would benifit for the small packets. What if,
however, when TSO or jumbo frame is enabled that we have big packets?
Prefer to favor the path that packet could fit in one mbuf.
Hmmm, why? While I know that TSO and mergeable buf is disable by default
in our vhost example vhost-switch, they are enabled by default in real
life.
mergable is only meaningful in RX path.
If you mean big packets in TX path, i personally favor the path that
packet fits in one mbuf.
Post by Yuanhan Liu
Post by Xie, Huawei
Btw, not specially to this, return "m = copy_desc_to_mbuf(dev, vq,
desc_indexes[i], mbuf_pool)", failure case is unlikely to happen, so add
unlikely for the check if (m == NULL) there. Please check all branches
elsewhere.
Thanks for the remind, will have a detail check.
--yliu
Yuanhan Liu
2016-03-07 06:14:24 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Xie, Huawei
We could always allocate the head mbuf before the loop, then we save the
following branch and make the code more streamlined.
It reminds me that this change prevents the possibility of mbuf bulk
allocation, one solution is we pass the head mbuf from an additional
parameter.
Yep, that's also something I have thought of.
Post by Xie, Huawei
Btw, put unlikely before the check of mbuf_avail and checks elsewhere.
I don't think so. It would benifit for the small packets. What if,
however, when TSO or jumbo frame is enabled that we have big packets?
Prefer to favor the path that packet could fit in one mbuf.
Hmmm, why? While I know that TSO and mergeable buf is disable by default
in our vhost example vhost-switch, they are enabled by default in real
life.
mergable is only meaningful in RX path.
If you mean big packets in TX path,
Sorry, and yes, I meant that.
Post by Xie, Huawei
i personally favor the path that
packet fits in one mbuf.
Sorry, that will not convince me.

--yliu
Xie, Huawei
2016-03-03 17:19:42 UTC
Permalink
Post by Yuanhan Liu
[...]
CCed changchun, the author for the chained handling of desc and mbuf.
The change makes the code more readable, but i think the following
commit message is simple and enough.
Post by Yuanhan Liu
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
[...]
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
I guess the reduced binary code size comes from reduced inline calls to
mbuf allocation.
Yuanhan Liu
2016-03-04 02:11:54 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
[...]
CCed changchun, the author for the chained handling of desc and mbuf.
The change makes the code more readable, but i think the following
commit message is simple and enough.
Hmm.., my commit log tells a full story:

- What is the issue? (messy/logic twisted code)

- What the code does? (And what are the challenges: few tricky places)

- What's the proposed solution to fix it. (the below pseudo code)

And you suggest me to get rid of the first 2 items and leave 3rd item
(a solution) only?

--yliu
Post by Xie, Huawei
Post by Yuanhan Liu
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
[...]
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
I guess the reduced binary code size comes from reduced inline calls to
mbuf allocation.
Xie, Huawei
2016-03-07 02:55:54 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
[...]
CCed changchun, the author for the chained handling of desc and mbuf.
The change makes the code more readable, but i think the following
commit message is simple and enough.
- What is the issue? (messy/logic twisted code)
- What the code does? (And what are the challenges: few tricky places)
- What's the proposed solution to fix it. (the below pseudo code)
And you suggest me to get rid of the first 2 items and leave 3rd item
(a solution) only?
The following are simple and enough with just one additional statement
for the repeated mbuf allocation or your twisted. Other commit messages
are overly duplicated. Just my personal opinion. Up to you.

To this special case, for example, we could make both mbuf and
vring_desc chains into iovec, then use commonly used iovec copy
algorithms for both dequeue and enqueue, which further makes the code
much simpler and more readable. For this change, one or two sentences
are clear to me.
Post by Yuanhan Liu
--yliu
Post by Xie, Huawei
Post by Yuanhan Liu
while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}
if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}
COPY(mbuf, desc);
}
[...]
This refactor makes the code much more readable (IMO), yet it reduces
binary code size (nearly 2K).
I guess the reduced binary code size comes from reduced inline calls to
mbuf allocation.
Xie, Huawei
2016-03-03 17:40:14 UTC
Permalink
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
[...]
Post by Yuanhan Liu
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint64_t vb_net_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
indices
Post by Yuanhan Liu
uint32_t used_idx;
uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
- struct virtio_net_hdr *hdr = NULL;
+ struct rte_mbuf *m;
if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -730,197 +813,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
return 0;
avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- /* If there are no available buffers then return. */
- if (vq->last_used_idx == avail_idx)
+ free_entries = avail_idx - vq->last_used_idx;
+ if (free_entries == 0)
return 0;
- LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__,
- dev->device_fh);
+ LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__, dev->device_fh);
- /* Prefetch available ring to retrieve head indexes. */
- rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
+ used_idx = vq->last_used_idx & (vq->size -1);
- /*get the number of free entries in the ring*/
- free_entries = (avail_idx - vq->last_used_idx);
+ /* Prefetch available ring to retrieve head indexes. */
+ rte_prefetch0(&vq->avail->ring[used_idx]);
- free_entries = RTE_MIN(free_entries, count);
- /* Limit to MAX_PKT_BURST. */
- free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
+ count = RTE_MIN(count, MAX_PKT_BURST);
+ count = RTE_MIN(count, free_entries);
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequeue %u buffers\n",
+ dev->device_fh, count);
- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
- dev->device_fh, free_entries);
/* Retrieve all of the head indexes first to avoid caching issues. */
- for (i = 0; i < free_entries; i++)
- head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
+ for (i = 0; i < count; i++) {
+ desc_indexes[i] = vq->avail->ring[(vq->last_used_idx + i) &
+ (vq->size - 1)];
+ }
/* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success]]);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);
- while (entry_success < free_entries) {
- uint32_t vb_avail, vb_offset;
- uint32_t seg_avail, seg_offset;
- uint32_t cpy_len;
- uint32_t seg_num = 0;
- struct rte_mbuf *cur;
- uint8_t alloc_err = 0;
-
- desc = &vq->desc[head[entry_success]];
-
- vb_net_hdr_addr = gpa_to_vva(dev, desc->addr);
- hdr = (struct virtio_net_hdr *)((uintptr_t)vb_net_hdr_addr);
-
- /* Discard first buffer as it is the virtio header */
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- vb_offset = 0;
- vb_avail = desc->len;
- } else {
- vb_offset = vq->vhost_hlen;
- vb_avail = desc->len - vb_offset;
- }
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
-
- used_idx = vq->last_used_idx & (vq->size - 1);
-
- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
- }
Why is this prefetch silently dropped in the patch?
Post by Yuanhan Liu
-
- /* Update used index buffer information. */
- vq->used->ring[used_idx].id = head[entry_success];
- vq->used->ring[used_idx].len = 0;
-
- /* Allocate an mbuf and populate the structure. */
- m = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(m == NULL)) {
- RTE_LOG(ERR, VHOST_DATA,
- "Failed to allocate memory for mbuf.\n");
- break;
- }
- seg_offset = 0;
- seg_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
-
- seg_num++;
- cur = m;
- prev = m;
- while (cpy_len != 0) {
- rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, seg_offset),
- (void *)((uintptr_t)(vb_addr + vb_offset)),
- cpy_len);
-
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- vb_avail -= cpy_len;
- seg_avail -= cpy_len;
-
- if (vb_avail != 0) {
- /*
- * The segment reachs to its end,
- * while the virtio buffer in TX vring has
- * more data to be copied.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /* Allocate mbuf and populate the structure. */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR, VHOST_DATA, "Failed to "
- "allocate memory for mbuf.\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
-
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- } else {
- if (desc->flags & VRING_DESC_F_NEXT) {
- /*
- * There are more virtio buffers in
- * same vring entry need to be copied.
- */
- if (seg_avail == 0) {
- /*
- * The current segment hasn't
- * room to accomodate more
- * data.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /*
- * Allocate an mbuf and
- * populate the structure.
- */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR,
- VHOST_DATA,
- "Failed to "
- "allocate memory "
- "for mbuf\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- }
-
- desc = &vq->desc[desc->next];
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = desc->len;
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr,
- desc->len, 0);
- } else {
- /* The whole packet completes. */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- vb_avail = 0;
- }
- }
-
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- }
-
- if (unlikely(alloc_err == 1))
+ for (i = 0; i < count; i++) {
+ m = copy_desc_to_mbuf(dev, vq, desc_indexes[i], mbuf_pool);
+ if (m == NULL)
add unlikely for every case not possible to happen
Post by Yuanhan Liu
break;
+ pkts[i] = m;
- m->nb_segs = seg_num;
- if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
- vhost_dequeue_offload(hdr, m);
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
What is the correct value for ring[used_idx].len, the packet length or 0?
Post by Yuanhan Liu
}
rte_compiler_barrier();
- vq->used->idx += entry_success;
+ vq->used->idx += i;
+
/* Kick guest if required. */
if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT))
eventfd_write(vq->callfd, (eventfd_t)1);
- return entry_success;
+
+ return i;
}
Yuanhan Liu
2016-03-04 02:32:05 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
[...]
Post by Yuanhan Liu
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint64_t vb_net_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
indices
http://dictionary.reference.com/browse/index

index
noun, plural indexes, indices
Post by Xie, Huawei
Post by Yuanhan Liu
uint32_t used_idx;
uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
- struct virtio_net_hdr *hdr = NULL;
+ struct rte_mbuf *m;
if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -730,197 +813,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
return 0;
- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
- }
Why is this prefetch silently dropped in the patch?
Oops, good catching. Will fix it. Thanks.
Post by Xie, Huawei
Post by Yuanhan Liu
break;
+ pkts[i] = m;
- m->nb_segs = seg_num;
- if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
- vhost_dequeue_offload(hdr, m);
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
What is the correct value for ring[used_idx].len, the packet length or 0?
Good question. I didn't notice that before. Sounds buggy to me. However,
that's from the old code. Will check it.

--yliu
Xie, Huawei
2016-03-07 03:02:04 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
The current rte_vhost_dequeue_burst() implementation is a bit messy
[...]
Post by Yuanhan Liu
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint64_t vb_net_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
indices
http://dictionary.reference.com/browse/index
index
noun, plural indexes, indices
ok, i see both two are used.
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
uint32_t used_idx;
uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
- struct virtio_net_hdr *hdr = NULL;
+ struct rte_mbuf *m;
if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -730,197 +813,49 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
return 0;
- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
- }
Why is this prefetch silently dropped in the patch?
Oops, good catching. Will fix it. Thanks.
Post by Xie, Huawei
Post by Yuanhan Liu
break;
+ pkts[i] = m;
- m->nb_segs = seg_num;
- if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
- vhost_dequeue_offload(hdr, m);
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
What is the correct value for ring[used_idx].len, the packet length or 0?
Good question. I didn't notice that before. Sounds buggy to me. However,
that's from the old code. Will check it.
Yes, i knew it is in old code also. Thanks.
Post by Yuanhan Liu
--yliu
Xie, Huawei
2016-03-07 03:03:52 UTC
Permalink
Post by Yuanhan Liu
+ mbuf_avail = 0;
+ mbuf_offset = 0;
one cs nit, put it at the definition.
Yuanhan Liu
2016-02-18 13:49:07 UTC
Permalink
This is a simple refactor, as there isn't any twisted logic in old
code. Here I just broke the code and introduced two helper functions,
reserve_avail_buf() and copy_mbuf_to_desc() to make the code more
readable.

Also, it saves nearly 1K bytes of binary code size.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---

v2: - fix NULL dereference bug found by Rich.
---
lib/librte_vhost/vhost_rxtx.c | 286 ++++++++++++++++++++----------------------
1 file changed, 137 insertions(+), 149 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index d5cd0fa..d3775ad 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -92,6 +92,115 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
return;
}

+static inline int __attribute__((always_inline))
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
+{
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ virtio_enqueue_offload(m, &virtio_hdr.hdr);
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+ desc_offset = vq->vhost_hlen;
+ desc_avail = desc->len - vq->vhost_hlen;
+
+ *copied = rte_pktmbuf_pkt_len(m);
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current mbuf, fetch next */
+ if (mbuf_avail == 0) {
+ m = m->next;
+ if (m == NULL)
+ break;
+
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }
+
+ /* done with current desc buf, fetch next */
+ if (desc_avail == 0) {
+ if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+ /* Room in vring buffer is not enough */
+ return -1;
+ }
+
+ desc = &vq->desc[desc->next];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ desc_offset = 0;
+ desc_avail = desc->len;
+ }
+
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+ rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+ cpy_len);
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);
+
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ return 0;
+}
+
+/*
+ * As many data cores may want to access available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_end_idx;
+ uint16_t avail_idx;
+ uint16_t free_entries;
+ int success;
+
+ count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+ res_start_idx = vq->last_used_idx_res;
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+ free_entries = (avail_idx - res_start_idx);
+ count = RTE_MIN(count, free_entries);
+ if (count == 0)
+ return 0;
+
+ res_end_idx = res_start_idx + count;
+
+ /*
+ * update vq->last_used_idx_res atomically; try again if failed.
+ *
+ * TODO: Allow to disable cmpset if no concurrency in application.
+ */
+ success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_end_idx);
+ if (!success)
+ goto again;
+
+ *start = res_start_idx;
+ *end = res_end_idx;
+
+ return count;
+}
+
/**
* This function adds buffers to the virtio devices RX virtqueue. Buffers can
* be received from the physical port or from another virtio device. A packet
@@ -101,21 +210,12 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
*/
static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- struct rte_mbuf *buff, *first_buff;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
- uint64_t buff_addr = 0;
- uint64_t buff_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
- uint32_t head_idx, packet_success = 0;
- uint16_t avail_idx, res_cur_idx;
- uint16_t res_base_idx, res_end_idx;
- uint16_t free_entries;
- uint8_t success = 0;
+ uint16_t res_start_idx, res_end_idx;
+ uint16_t desc_indexes[MAX_PKT_BURST];
+ uint32_t i;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -129,155 +229,43 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
if (unlikely(vq->enabled == 0))
return 0;

- count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
-
- /*
- * As many data cores may want access to available buffers,
- * they need to be reserved.
- */
- do {
- res_base_idx = vq->last_used_idx_res;
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- free_entries = (avail_idx - res_base_idx);
- /*check that we have enough buffers*/
- if (unlikely(count > free_entries))
- count = free_entries;
-
- if (count == 0)
- return 0;
-
- res_end_idx = res_base_idx + count;
- /* vq->last_used_idx_res is atomically updated. */
- /* TODO: Allow to disable cmpset if no concurrency in application. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx, res_end_idx);
- } while (unlikely(success == 0));
- res_cur_idx = res_base_idx;
- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
- dev->device_fh, res_cur_idx, res_end_idx);
-
- /* Prefetch available ring to retrieve indexes. */
- rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);
-
- /* Retrieve all of the head indexes first to avoid caching issues. */
- for (head_idx = 0; head_idx < count; head_idx++)
- head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) &
- (vq->size - 1)];
-
- /*Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[packet_success]]);
-
- while (res_cur_idx != res_end_idx) {
- uint32_t offset = 0, vb_offset = 0;
- uint32_t pkt_len, len_to_cpy, data_len, total_copied = 0;
- uint8_t hdr = 0, uncompleted_pkt = 0;
+ count = reserve_avail_buf(vq, count, &res_start_idx, &res_end_idx);
+ if (count == 0)
+ return 0;

- /* Get descriptor from available ring */
- desc = &vq->desc[head[packet_success]];
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") res_start_idx %d| res_end_idx Index %d\n",
+ dev->device_fh, res_start_idx, res_end_idx);

- buff = pkts[packet_success];
- first_buff = buff;
+ /* Retrieve all of the desc indexes first to avoid caching issues. */
+ rte_prefetch0(&vq->avail->ring[res_start_idx & (vq->size - 1)]);
+ for (i = 0; i < count; i++)
+ desc_indexes[i] = vq->avail->ring[(res_start_idx + i) & (vq->size - 1)];

- /* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
- buff_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)buff_addr);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
+ for (i = 0; i < count; i++) {
+ uint16_t desc_idx = desc_indexes[i];
+ uint16_t used_idx = (res_start_idx + i) & (vq->size - 1);
+ uint32_t copied;
+ int err;

- /* Copy virtio_hdr to packet and increment buffer address */
- buff_hdr_addr = buff_addr;
+ err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx, &copied);

- /*
- * If the descriptors are chained the header and data are
- * placed in separate buffers.
- */
- if ((desc->flags & VRING_DESC_F_NEXT) &&
- (desc->len == vq->vhost_hlen)) {
- desc = &vq->desc[desc->next];
- /* Buffer address translation. */
- buff_addr = gpa_to_vva(dev, desc->addr);
+ vq->used->ring[used_idx].id = desc_idx;
+ if (unlikely(err)) {
+ vq->used->ring[used_idx].len = vq->vhost_hlen;
} else {
- vb_offset += vq->vhost_hlen;
- hdr = 1;
- }
-
- pkt_len = rte_pktmbuf_pkt_len(buff);
- data_len = rte_pktmbuf_data_len(buff);
- len_to_cpy = RTE_MIN(data_len,
- hdr ? desc->len - vq->vhost_hlen : desc->len);
- while (total_copied < pkt_len) {
- /* Copy mbuf data to buffer */
- rte_memcpy((void *)(uintptr_t)(buff_addr + vb_offset),
- rte_pktmbuf_mtod_offset(buff, const void *, offset),
- len_to_cpy);
- PRINT_PACKET(dev, (uintptr_t)(buff_addr + vb_offset),
- len_to_cpy, 0);
-
- offset += len_to_cpy;
- vb_offset += len_to_cpy;
- total_copied += len_to_cpy;
-
- /* The whole packet completes */
- if (total_copied == pkt_len)
- break;
-
- /* The current segment completes */
- if (offset == data_len) {
- buff = buff->next;
- offset = 0;
- data_len = rte_pktmbuf_data_len(buff);
- }
-
- /* The current vring descriptor done */
- if (vb_offset == desc->len) {
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- buff_addr = gpa_to_vva(dev, desc->addr);
- vb_offset = 0;
- } else {
- /* Room in vring buffer is not enough */
- uncompleted_pkt = 1;
- break;
- }
- }
- len_to_cpy = RTE_MIN(data_len - offset, desc->len - vb_offset);
+ vq->used->ring[used_idx].len = copied + vq->vhost_hlen;
}

- /* Update used ring with desc information */
- vq->used->ring[res_cur_idx & (vq->size - 1)].id =
- head[packet_success];
-
- /* Drop the packet if it is uncompleted */
- if (unlikely(uncompleted_pkt == 1))
- vq->used->ring[res_cur_idx & (vq->size - 1)].len =
- vq->vhost_hlen;
- else
- vq->used->ring[res_cur_idx & (vq->size - 1)].len =
- pkt_len + vq->vhost_hlen;
-
- res_cur_idx++;
- packet_success++;
-
- if (unlikely(uncompleted_pkt == 1))
- continue;
-
- virtio_enqueue_offload(first_buff, &virtio_hdr.hdr);
-
- rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
-
- PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
-
- if (res_cur_idx < res_end_idx) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[packet_success]]);
- }
+ if (i + 1 < count)
+ rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
}

rte_compiler_barrier();

/* Wait until it's our turn to add our buffer to the used ring. */
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != res_start_idx))
rte_pause();

*(volatile uint16_t *)&vq->used->idx += count;
--
1.9.0
Xie, Huawei
2016-03-07 03:34:53 UTC
Permalink
Post by Yuanhan Liu
+ while (1) {
+ /* done with current mbuf, fetch next */
+ if (mbuf_avail == 0) {
+ m = m->next;
+ if (m == NULL)
+ break;
+
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }
+
You could use while (mbuf_avail || m->next) to align with the style of
coyp_desc_to_mbuf?
Yuanhan Liu
2016-03-08 12:27:11 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
+ while (1) {
+ /* done with current mbuf, fetch next */
+ if (mbuf_avail == 0) {
+ m = m->next;
+ if (m == NULL)
+ break;
+
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }
+
You could use while (mbuf_avail || m->next) to align with the style of
coyp_desc_to_mbuf?
Good suggestion, will do that.

Thanks.

--yliu
Yuanhan Liu
2016-02-18 13:49:11 UTC
Permalink
We need make sure that desc->len is bigger than the size of virtio net
header, otherwise, unexpected behaviour might happen due to "desc_avail"
would become a huge number with for following code:

desc_avail = desc->len - vq->vhost_hlen;

For dequeue code path, it will try to allocate enough mbuf to hold such
size of desc buf, which ends up with consuming all mbufs, leading to no
free mbuf is avaliable. Therefore, you might see an error message:

Failed to allocate memory for mbuf.

Also, for both dequeue/enqueue code path, while it copies data from/to
desc buf, the big "desc_avail" would result to access memory not belong
the desc buf, which could lead to some potential memory access errors.

A malicious guest could easily forge such malformed vring desc buf. Every
time we restart an interrupted DPDK application inside guest would also
trigger this issue, as all huge pages are reset to 0 during DPDK re-init,
leading to desc->len being 0.

Therefore, this patch does a sanity check for desc->len, to make vhost
robust.

Reported-by: Rich Lane <***@bigswitch.com>
Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 9 +++++++++
1 file changed, 9 insertions(+)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 04af9b3..c2adcd9 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -115,6 +115,9 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};

desc = &vq->desc[desc_idx];
+ if (unlikely(desc->len < vq->vhost_hlen))
+ return -1;
+
desc_addr = gpa_to_vva(dev, desc->addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);

@@ -406,6 +409,9 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
"(%"PRIu64") Current Index %d| End Index %d\n",
dev->device_fh, cur_idx, res_end_idx);

+ if (vq->buf_vec[vec_idx].buf_len < vq->vhost_hlen)
+ return -1;
+
desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);

@@ -649,6 +655,9 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct virtio_net_hdr *hdr;

desc = &vq->desc[desc_idx];
+ if (unlikely(desc->len < vq->vhost_hlen))
+ return NULL;
+
desc_addr = gpa_to_vva(dev, desc->addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);
--
1.9.0
Yuanhan Liu
2016-02-18 13:49:09 UTC
Permalink
First of all, rte_memcpy() is mostly useful for coping big packets
by leveraging hardware advanced instructions like AVX. But for virtio
net hdr, which is 12 bytes at most, invoking rte_memcpy() will not
introduce any performance boost.

And, to my suprise, rte_memcpy() is VERY huge. Since rte_memcpy()
is inlined, it increases the binary code size linearly every time
we call it at a different place. Replacing the two rte_memcpy()
with directly copy saves nearly 12K bytes of code size!

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 3909584..97690c3 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -92,6 +92,17 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
return;
}

+static inline void
+copy_virtio_net_hdr(struct vhost_virtqueue *vq, uint64_t desc_addr,
+ struct virtio_net_hdr_mrg_rxbuf hdr)
+{
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
+}
+
static inline int __attribute__((always_inline))
copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
@@ -108,8 +119,7 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
rte_prefetch0((void *)(uintptr_t)desc_addr);

virtio_enqueue_offload(m, &virtio_hdr.hdr);
- rte_memcpy((void *)(uintptr_t)desc_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+ copy_virtio_net_hdr(vq, desc_addr, virtio_hdr);
PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

desc_offset = vq->vhost_hlen;
@@ -404,8 +414,7 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
dev->device_fh, virtio_hdr.num_buffers);

virtio_enqueue_offload(m, &virtio_hdr.hdr);
- rte_memcpy((void *)(uintptr_t)desc_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+ copy_virtio_net_hdr(vq, desc_addr, virtio_hdr);
PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
--
1.9.0
Xie, Huawei
2016-03-07 01:20:45 UTC
Permalink
Acked-by: Huawei Xie <***@intel.com>
Stephen Hemminger
2016-03-07 04:20:00 UTC
Permalink
On Thu, 18 Feb 2016 21:49:09 +0800
Post by Yuanhan Liu
+static inline void
+copy_virtio_net_hdr(struct vhost_virtqueue *vq, uint64_t desc_addr,
+ struct virtio_net_hdr_mrg_rxbuf hdr)
+{
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
+}
+
Don't use {} around single statements.
Since you are doing all this casting, why not just use regular old memcpy
which will be inlined by Gcc into same instructions anyway.
And since are always casting the desc_addr, why not pass a type that
doesn't need the additional cast (like void *)
Xie, Huawei
2016-03-07 05:24:50 UTC
Permalink
Post by Stephen Hemminger
On Thu, 18 Feb 2016 21:49:09 +0800
Post by Yuanhan Liu
+static inline void
+copy_virtio_net_hdr(struct vhost_virtqueue *vq, uint64_t desc_addr,
+ struct virtio_net_hdr_mrg_rxbuf hdr)
+{
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
+}
+
Don't use {} around single statements.
There are other cs issues, like
used_idx = vq->last_used_idx & (vq->size -1);
^ space needed
Please run checkpatch against your patch.
Post by Stephen Hemminger
Since you are doing all this casting, why not just use regular old memcpy
which will be inlined by Gcc into same instructions anyway.
And since are always casting the desc_addr, why not pass a type that
doesn't need the additional cast (like void *)
Yuanhan Liu
2016-03-07 06:21:00 UTC
Permalink
Post by Stephen Hemminger
On Thu, 18 Feb 2016 21:49:09 +0800
Post by Yuanhan Liu
+static inline void
+copy_virtio_net_hdr(struct vhost_virtqueue *vq, uint64_t desc_addr,
+ struct virtio_net_hdr_mrg_rxbuf hdr)
+{
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf)) {
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ } else {
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+ }
+}
+
Don't use {} around single statements.
Oh, I was thinking that it's a personal preference. Okay, I will remove
them.
Post by Stephen Hemminger
Since you are doing all this casting, why not just use regular old memcpy
which will be inlined by Gcc into same instructions anyway.
I thought there are some (tiny) differences: memcpy() is not an inlined
function. And I was thinking it generates some slightly more complicated
instructions.
Post by Stephen Hemminger
And since are always casting the desc_addr, why not pass a type that
doesn't need the additional cast (like void *)
You have to cast it from "uint64_t" to "void *" as well while call it.
So, that makes no difference.

--yliu
Yuanhan Liu
2016-02-18 13:49:08 UTC
Permalink
Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.

However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx(). The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
difference a bit bigger then.

Anyway, it could be simpler, just like what we did for virtio_dev_rx().
One difference is that we need to update used ring properly.

The pseudo code looks like below:

while (1) {
if (this_desc_has_no_room) {
this_desc = fetch_next_from_vec_buf();

if (it is the last of a desc chain) {
update_used_ring();
}
}

if (this_mbuf_has_drained_totally) {
this_mbuf = fetch_next_mbuf();

if (this_mbuf == NULL)
break;
}

COPY(this_desc, this_mbuf);
}

This patch reduces quite many lines of code, therefore, make it much
more readable.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 390 ++++++++++++++++++------------------------
1 file changed, 163 insertions(+), 227 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index d3775ad..3909584 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -280,237 +280,200 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
return count;
}

-static inline uint32_t __attribute__((always_inline))
-copy_from_mbuf_to_vring(struct virtio_net *dev, uint32_t queue_id,
- uint16_t res_base_idx, uint16_t res_end_idx,
- struct rte_mbuf *pkt)
+static inline int
+fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
+ uint32_t *allocated, uint32_t *vec_idx)
{
- uint32_t vec_idx = 0;
- uint32_t entry_success = 0;
- struct vhost_virtqueue *vq;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
- {0, 0, 0, 0, 0, 0}, 0};
- uint16_t cur_idx = res_base_idx;
- uint64_t vb_addr = 0;
- uint64_t vb_hdr_addr = 0;
- uint32_t seg_offset = 0;
- uint32_t vb_offset = 0;
- uint32_t seg_avail;
- uint32_t vb_avail;
- uint32_t cpy_len, entry_len;
-
- if (pkt == NULL)
- return 0;
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
- "End Index %d\n",
- dev->device_fh, cur_idx, res_end_idx);
+ while (1) {
+ if (vec_id >= BUF_VECTOR_MAX)
+ return -1;

- /*
- * Convert from gpa to vva
- * (guest physical addr -> vhost virtual addr)
- */
- vq = dev->virtqueue[queue_id];
+ len += vq->desc[idx].len;
+ vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
+ vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
+ vq->buf_vec[vec_id].desc_idx = idx;
+ vec_id++;

- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
- vb_hdr_addr = vb_addr;
+ if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
+ break;

- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
+ idx = vq->desc[idx].next;
+ }

- virtio_hdr.num_buffers = res_end_idx - res_base_idx;
+ *allocated = len;
+ *vec_idx = vec_id;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
- dev->device_fh, virtio_hdr.num_buffers);
+ return 0;
+}

- virtio_enqueue_offload(pkt, &virtio_hdr.hdr);
+/*
+ * As many data cores may want to access available buffers concurrently,
+ * they need to be reserved.
+ *
+ * Returns -1 on fail, 0 on success
+ */
+static inline int
+reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_cur_idx;
+ uint16_t avail_idx;
+ uint32_t allocated;
+ uint32_t vec_idx;
+ uint16_t tries;

- rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+again:
+ res_start_idx = vq->last_used_idx_res;
+ res_cur_idx = res_start_idx;

- PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
+ allocated = 0;
+ vec_idx = 0;
+ tries = 0;
+ while (1) {
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+ if (unlikely(res_cur_idx == avail_idx)) {
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Failed "
+ "to get enough desc from vring\n",
+ dev->device_fh);
+ return -1;
+ }

- seg_avail = rte_pktmbuf_data_len(pkt);
- vb_offset = vq->vhost_hlen;
- vb_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ if (fill_vec_buf(vq, res_cur_idx, &allocated, &vec_idx) < 0)
+ return -1;

- entry_len = vq->vhost_hlen;
+ res_cur_idx++;
+ tries++;

- if (vb_avail == 0) {
- uint32_t desc_idx =
- vq->buf_vec[vec_idx].desc_idx;
+ if (allocated >= size)
+ break;

- if ((vq->desc[desc_idx].flags
- & VRING_DESC_F_NEXT) == 0) {
- /* Update used ring with desc information */
- vq->used->ring[cur_idx & (vq->size - 1)].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[cur_idx & (vq->size - 1)].len
- = entry_len;
+ /*
+ * if we tried all available ring items, and still
+ * can't get enough buf, it means something abnormal
+ * happened.
+ */
+ if (tries >= vq->size)
+ return -1;
+ }

- entry_len = 0;
- cur_idx++;
- entry_success++;
- }
+ /*
+ * update vq->last_used_idx_res atomically.
+ * retry again if failed.
+ */
+ if (rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_cur_idx) == 0)
+ goto again;

- vec_idx++;
- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ *start = res_start_idx;
+ *end = res_cur_idx;
+ return 0;
+}

- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- }
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *m)
+{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ uint32_t vec_idx = 0;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;
+ uint32_t nr_used = 0;

- cpy_len = RTE_MIN(vb_avail, seg_avail);
+ if (m == NULL)
+ return 0;

- while (cpy_len > 0) {
- /* Copy mbuf data to vring buffer */
- rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
- rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
- cpy_len);
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") Current Index %d| End Index %d\n",
+ dev->device_fh, cur_idx, res_end_idx);

- PRINT_PACKET(dev,
- (uintptr_t)(vb_addr + vb_offset),
- cpy_len, 0);
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ virtio_hdr.num_buffers = res_end_idx - res_start_idx;
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
+ dev->device_fh, virtio_hdr.num_buffers);

- seg_offset += cpy_len;
- vb_offset += cpy_len;
- seg_avail -= cpy_len;
- vb_avail -= cpy_len;
- entry_len += cpy_len;
-
- if (seg_avail != 0) {
- /*
- * The virtio buffer in this vring
- * entry reach to its end.
- * But the segment doesn't complete.
- */
- if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
+ virtio_enqueue_offload(m, &virtio_hdr.hdr);
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
+
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current desc buf, get the next one */
+ if (desc_avail == 0) {
+ desc_idx = vq->buf_vec[vec_idx].desc_idx;
+
+ if ((vq->desc[desc_idx].flags & VRING_DESC_F_NEXT) == 0) {
/* Update used ring with desc information */
- vq->used->ring[cur_idx & (vq->size - 1)].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[cur_idx & (vq->size - 1)].len
- = entry_len;
- entry_len = 0;
- cur_idx++;
- entry_success++;
+ used_idx = cur_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
+
+ nr_used++;
}

vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This current segment complete, need continue to
- * check if the whole packet complete or not.
- */
- pkt = pkt->next;
- if (pkt != NULL) {
- /*
- * There are more segments.
- */
- if (vb_avail == 0) {
- /*
- * This current buffer from vring is
- * used up, need fetch next buffer
- * from buf_vec.
- */
- uint32_t desc_idx =
- vq->buf_vec[vec_idx].desc_idx;
-
- if ((vq->desc[desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
- uint16_t wrapped_idx =
- cur_idx & (vq->size - 1);
- /*
- * Update used ring with the
- * descriptor information
- */
- vq->used->ring[wrapped_idx].id
- = desc_idx;
- vq->used->ring[wrapped_idx].len
- = entry_len;
- entry_success++;
- entry_len = 0;
- cur_idx++;
- }
-
- /* Get next buffer from buf_vec. */
- vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_avail =
- vq->buf_vec[vec_idx].buf_len;
- vb_offset = 0;
- }
-
- seg_offset = 0;
- seg_avail = rte_pktmbuf_data_len(pkt);
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This whole packet completes.
- */
- /* Update used ring with desc information */
- vq->used->ring[cur_idx & (vq->size - 1)].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[cur_idx & (vq->size - 1)].len
- = entry_len;
- entry_success++;
- break;
- }
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+
+ /* Prefetch buffer address. */
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+ desc_offset = 0;
+ desc_avail = vq->buf_vec[vec_idx].buf_len;
}
- }

- return entry_success;
-}
+ /* done with current mbuf, get the next one */
+ if (mbuf_avail == 0) {
+ m = m->next;
+ if (!m)
+ break;

-static inline void __attribute__((always_inline))
-update_secure_len(struct vhost_virtqueue *vq, uint32_t id,
- uint32_t *secure_len, uint32_t *vec_idx)
-{
- uint16_t wrapped_idx = id & (vq->size - 1);
- uint32_t idx = vq->avail->ring[wrapped_idx];
- uint8_t next_desc;
- uint32_t len = *secure_len;
- uint32_t vec_id = *vec_idx;
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }

- do {
- next_desc = 0;
- len += vq->desc[idx].len;
- vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
- vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
- vq->buf_vec[vec_id].desc_idx = idx;
- vec_id++;
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+ rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+ cpy_len);
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);

- if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
- idx = vq->desc[idx].next;
- next_desc = 1;
- }
- } while (next_desc);
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ used_idx = cur_idx & (vq->size - 1);
+ vq->used->ring[used_idx].id = vq->buf_vec[vec_idx].desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
+ nr_used++;

- *secure_len = len;
- *vec_idx = vec_id;
+ return nr_used;
}

-/*
- * This function works for mergeable RX.
- */
static inline uint32_t __attribute__((always_inline))
virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- uint32_t pkt_idx = 0, entry_success = 0;
- uint16_t avail_idx;
- uint16_t res_base_idx, res_cur_idx;
- uint8_t success = 0;
+ uint32_t pkt_idx = 0, nr_used = 0;
+ uint16_t start, end;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
dev->device_fh);
@@ -526,57 +489,30 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
return 0;

count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
if (count == 0)
return 0;

for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;

- do {
- /*
- * As many data cores may want access to available
- * buffers, they need to be reserved.
- */
- uint32_t secure_len = 0;
- uint32_t vec_idx = 0;
-
- res_base_idx = vq->last_used_idx_res;
- res_cur_idx = res_base_idx;
-
- do {
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
- if (unlikely(res_cur_idx == avail_idx))
- goto merge_rx_exit;
-
- update_secure_len(vq, res_cur_idx,
- &secure_len, &vec_idx);
- res_cur_idx++;
- } while (pkt_len > secure_len);
-
- /* vq->last_used_idx_res is atomically updated. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx,
- res_cur_idx);
- } while (success == 0);
-
- entry_success = copy_from_mbuf_to_vring(dev, queue_id,
- res_base_idx, res_cur_idx, pkts[pkt_idx]);
+ if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+ break;

+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
rte_compiler_barrier();

/*
* Wait until it's our turn to add our buffer
* to the used ring.
*/
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != start))
rte_pause();

- *(volatile uint16_t *)&vq->used->idx += entry_success;
- vq->last_used_idx = res_cur_idx;
+ *(volatile uint16_t *)&vq->used->idx += nr_used;
+ vq->last_used_idx = end;
}

-merge_rx_exit:
if (likely(pkt_idx)) {
/* flush used->idx update before we read avail->flags. */
rte_mb();
--
1.9.0
Xie, Huawei
2016-03-07 06:22:25 UTC
Permalink
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
Yuanhan Liu
2016-03-07 06:36:39 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
avail_idx? This is actually from "vq->last_used_idx_res".

--yliu
Xie, Huawei
2016-03-07 06:38:42 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
avail_idx? This is actually from "vq->last_used_idx_res".
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)]

the idx retrieved from avail->ring.
Post by Yuanhan Liu
--yliu
Yuanhan Liu
2016-03-07 06:51:30 UTC
Permalink
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
avail_idx? This is actually from "vq->last_used_idx_res".
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)]
the idx retrieved from avail->ring.
Hmm.. I saw quite many similar lines of code retrieving an index from
avail->ring, but none of them acutally use "volatile". So, a bug?

--yliu
Xie, Huawei
2016-03-07 07:03:09 UTC
Permalink
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
avail_idx? This is actually from "vq->last_used_idx_res".
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)]
the idx retrieved from avail->ring.
Hmm.. I saw quite many similar lines of code retrieving an index from
avail->ring, but none of them acutally use "volatile". So, a bug?
Others are not. This function is inline, and is in one translation unit
with its caller.
Post by Yuanhan Liu
--yliu
Xie, Huawei
2016-03-07 07:16:39 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
avail_idx? This is actually from "vq->last_used_idx_res".
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)]
the idx retrieved from avail->ring.
Hmm.. I saw quite many similar lines of code retrieving an index from
avail->ring, but none of them acutally use "volatile". So, a bug?
Others are not. This function is inline, and is in one translation unit
with its caller.
Oh, my fault. For the avail idx, we should take care on whether using
volatile.
Post by Xie, Huawei
Post by Yuanhan Liu
--yliu
Yuanhan Liu
2016-03-07 08:20:18 UTC
Permalink
Post by Xie, Huawei
Post by Xie, Huawei
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;
There is bug not using volatile to retrieve the avail idx.
avail_idx? This is actually from "vq->last_used_idx_res".
uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)]
the idx retrieved from avail->ring.
Hmm.. I saw quite many similar lines of code retrieving an index from
avail->ring, but none of them acutally use "volatile". So, a bug?
Others are not. This function is inline, and is in one translation unit
with its caller.
Oh, my fault. For the avail idx, we should take care on whether using
volatile.
I will keep it as it is. If there are any issues with it, let's fix it
in another patch, but not in this refactor patch.

--yliu
Xie, Huawei
2016-03-07 07:52:22 UTC
Permalink
Post by Yuanhan Liu
Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.
However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx(). The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
[...]
Post by Yuanhan Liu
- }
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *m)
+{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ uint32_t vec_idx = 0;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;
+ uint32_t nr_used = 0;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
+ if (m == NULL)
+ return 0;
Is this inherited from old code? Let us remove the unnecessary check.
Caller ensures it is not NULL.
Post by Yuanhan Liu
- while (cpy_len > 0) {
- /* Copy mbuf data to vring buffer */
- rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
- rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
- cpy_len);
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") Current Index %d| End Index %d\n",
+ dev->device_fh, cur_idx, res_end_idx);
- PRINT_PACKET(dev,
- (uintptr_t)(vb_addr + vb_offset),
- cpy_len, 0);
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ virtio_hdr.num_buffers = res_end_idx - res_start_idx;
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
+ dev->device_fh, virtio_hdr.num_buffers);
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- seg_avail -= cpy_len;
- vb_avail -= cpy_len;
- entry_len += cpy_len;
-
- if (seg_avail != 0) {
- /*
- * The virtio buffer in this vring
- * entry reach to its end.
- * But the segment doesn't complete.
- */
- if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
+ virtio_enqueue_offload(m, &virtio_hdr.hdr);
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
As we know we are in merge-able path, use sizeof(virtio_net_hdr) to save
one load for the header len.
Post by Yuanhan Liu
+
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current desc buf, get the next one */
+
[...]
Post by Yuanhan Liu
+ if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+ break;
+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
In which case couldn't we get nr_used from start and end?
Post by Yuanhan Liu
rte_compiler_barrier();
/*
* Wait until it's our turn to add our buffer
* to the used ring.
*/
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != start))
rte_pause();
- *(volatile uint16_t *)&vq->used->idx += entry_success;
- vq->last_used_idx = res_cur_idx;
+ *(volatile uint16_t *)&vq->used->idx += nr_used;
+ vq->last_used_idx = end;
}
if (likely(pkt_idx)) {
/* flush used->idx update before we read avail->flags. */
rte_mb();
Yuanhan Liu
2016-03-07 08:38:45 UTC
Permalink
Post by Xie, Huawei
Post by Yuanhan Liu
Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.
However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx(). The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
[...]
Post by Yuanhan Liu
- }
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *m)
+{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ uint32_t vec_idx = 0;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;
+ uint32_t nr_used = 0;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
+ if (m == NULL)
+ return 0;
Is this inherited from old code?
Yes.
Post by Xie, Huawei
Let us remove the unnecessary check.
Caller ensures it is not NULL.
...
Post by Xie, Huawei
Post by Yuanhan Liu
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
As we know we are in merge-able path, use sizeof(virtio_net_hdr) to save
one load for the header len.
Please, it's a refactor patch series. You have mentioned quite many
trivial issues here and there, which I don't care too much and I don't
think they would matter somehow. In addition, they are actually from
the old code.
Post by Xie, Huawei
Post by Yuanhan Liu
+
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current desc buf, get the next one */
+
[...]
Post by Yuanhan Liu
+ if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+ break;
+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
In which case couldn't we get nr_used from start and end?
When pkts[pkt_idx] is NULL, though you suggest to remove it, the check
is here.

--yliu
Xie, Huawei
2016-03-07 09:27:46 UTC
Permalink
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.
However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx(). The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
[...]
Post by Yuanhan Liu
- }
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *m)
+{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ uint32_t vec_idx = 0;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;
+ uint32_t nr_used = 0;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
+ if (m == NULL)
+ return 0;
Is this inherited from old code?
Yes.
Post by Xie, Huawei
Let us remove the unnecessary check.
Caller ensures it is not NULL.
...
Post by Xie, Huawei
Post by Yuanhan Liu
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
As we know we are in merge-able path, use sizeof(virtio_net_hdr) to save
one load for the header len.
Please, it's a refactor patch series. You have mentioned quite many
trivial issues here and there, which I don't care too much and I don't
think they would matter somehow. In addition, they are actually from
the old code.
For normal code, it would be better using vq->vhost_hlen for example for
future compatibility. For DPDK, we don't waste cycles whenever possible,
especially vhost is the centralized bottleneck.
For the check of m == NULL, it should be removed, which not only
occupies unnecessary branch predication resource but also causes
confusion for return nr_used from copy_mbuf_to_desc_mergeable.
It is OK if you don't want to fix this in this patchset.
Post by Yuanhan Liu
Post by Xie, Huawei
Post by Yuanhan Liu
+
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (1) {
+ /* done with current desc buf, get the next one */
+
[...]
Post by Yuanhan Liu
+ if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+ break;
+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
In which case couldn't we get nr_used from start and end?
When pkts[pkt_idx] is NULL, though you suggest to remove it, the check
is here.
--yliu
Thomas Monjalon
2016-02-29 16:06:27 UTC
Permalink
Hi Yuanhan
Here is a patchset for refactoring vhost rxtx code, mainly for
improving readability.
This series requires to be rebased.

And maybe you could check also the series about numa_realloc.

Thanks
Yuanhan Liu
2016-03-01 06:01:08 UTC
Permalink
Post by Jérôme Jutteau
Hi Yuanhan
Here is a patchset for refactoring vhost rxtx code, mainly for
improving readability.
This series requires to be rebased.
And maybe you could check also the series about numa_realloc.
Hi Thomas,

Sure, I will. And since you are considering to merge it, I will do
more tests, espeically on this patchset (it touches the vhost-user
core). Thus, I may send the new version out a bit late, say, next
week.

--yliu
Yuanhan Liu
2016-03-10 04:32:38 UTC
Permalink
v3: - quite few minor changes, including using likely/unlikely
when possible.

- Added a new patch 8 to avoid desc dead loop chain

The first 3 patches refactor 3 major functions at vhost_rxtx.c.
It simplifies the code logic, making it more readable. OTOH, it
reduces binary code size, due to a lot of duplicate code are
removed, as well as some huge inline functions are diminished.

Patch 4 gets rid of the rte_memcpy for virtio_hdr copy, which
nearly saves 12K bytes of binary code size!

Patch 5 removes "unlikely" for VIRTIO_NET_F_MRG_RXBUF detection.

Patch 6, 7 and 8 do some sanity check for two desc fields, to make
vhost robust and be protected from malicious guest or abnormal use
cases.

---
Yuanhan Liu (8):
vhost: refactor rte_vhost_dequeue_burst
vhost: refactor virtio_dev_rx
vhost: refactor virtio_dev_merge_rx
vhost: do not use rte_memcpy for virtio_hdr copy
vhost: don't use unlikely for VIRTIO_NET_F_MRG_RXBUF detection
vhost: do sanity check for desc->len
vhost: do sanity check for desc->next against with vq->size
vhost: avoid dead loop chain.

lib/librte_vhost/vhost_rxtx.c | 1027 ++++++++++++++++++-----------------------
1 file changed, 453 insertions(+), 574 deletions(-)
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:39 UTC
Permalink
The current rte_vhost_dequeue_burst() implementation is a bit messy
and logic twisted. And you could see repeat code here and there.

However, rte_vhost_dequeue_burst() acutally does a simple job: copy
the packet data from vring desc to mbuf. What's tricky here is:

- desc buff could be chained (by desc->next field), so that you need
fetch next one if current is wholly drained.

- One mbuf could not be big enough to hold all desc buff, hence you
need to chain the mbuf as well, by the mbuf->next field.

The simplified code looks like following:

while (this_desc_is_not_drained_totally || has_next_desc) {
if (this_desc_has_drained_totally) {
this_desc = next_desc();
}

if (mbuf_has_no_room) {
mbuf = allocate_a_new_mbuf();
}

COPY(mbuf, desc);
}

Note that the old patch does a special handling for skipping virtio
header. However, that could be simply done by adjusting desc_avail
and desc_offset var:

desc_avail = desc->len - vq->vhost_hlen;
desc_offset = vq->vhost_hlen;

This refactor makes the code much more readable (IMO), yet it reduces
binary code size.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---

v2: - fix potential NULL dereference bug of var "prev" and "head"

v3: - add back missing prefetches reported by Huawei

- Passing head mbuf as an arg, instead of allocating it at
copy_desc_to_mbuf().
---
lib/librte_vhost/vhost_rxtx.c | 301 +++++++++++++++++-------------------------
1 file changed, 121 insertions(+), 180 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 9d23eb1..e12e9ba 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -801,21 +801,97 @@ make_rarp_packet(struct rte_mbuf *rarp_mbuf, const struct ether_addr *mac)
return 0;
}

+static inline int __attribute__((always_inline))
+copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ struct rte_mbuf *m, uint16_t desc_idx,
+ struct rte_mempool *mbuf_pool)
+{
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct rte_mbuf *cur = m, *prev = m;
+ struct virtio_net_hdr *hdr;
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ /* Retrieve virtio net header */
+ hdr = (struct virtio_net_hdr *)((uintptr_t)desc_addr);
+ desc_avail = desc->len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;
+
+ mbuf_offset = 0;
+ mbuf_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
+ while (desc_avail != 0 || (desc->flags & VRING_DESC_F_NEXT) != 0) {
+ /* This desc reaches to its end, get the next one */
+ if (desc_avail == 0) {
+ desc = &vq->desc[desc->next];
+
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ desc_offset = 0;
+ desc_avail = desc->len;
+
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, desc->len, 0);
+ }
+
+ /*
+ * This mbuf reaches to its end, get a new one
+ * to hold more data.
+ */
+ if (mbuf_avail == 0) {
+ cur = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(cur == NULL)) {
+ RTE_LOG(ERR, VHOST_DATA, "Failed to "
+ "allocate memory for mbuf.\n");
+ return -1;
+ }
+
+ prev->next = cur;
+ prev->data_len = mbuf_offset;
+ m->nb_segs += 1;
+ m->pkt_len += mbuf_offset;
+ prev = cur;
+
+ mbuf_offset = 0;
+ mbuf_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
+ }
+
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
+ (void *)((uintptr_t)(desc_addr + desc_offset)),
+ cpy_len);
+
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ prev->data_len = mbuf_offset;
+ m->pkt_len += mbuf_offset;
+
+ if (hdr->flags != 0 || hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE)
+ vhost_dequeue_offload(hdr, m);
+
+ return 0;
+}
+
uint16_t
rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
{
- struct rte_mbuf *m, *prev, *rarp_mbuf = NULL;
+ struct rte_mbuf *rarp_mbuf = NULL;
struct vhost_virtqueue *vq;
- struct vring_desc *desc;
- uint64_t vb_addr = 0;
- uint64_t vb_net_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
+ uint32_t desc_indexes[MAX_PKT_BURST];
uint32_t used_idx;
- uint32_t i;
- uint16_t free_entries, entry_success = 0;
+ uint32_t i = 0;
+ uint16_t free_entries;
uint16_t avail_idx;
- struct virtio_net_hdr *hdr = NULL;

if (unlikely(!is_valid_virt_queue_idx(queue_id, 1, dev->virt_qp_nb))) {
RTE_LOG(ERR, VHOST_DATA,
@@ -852,198 +928,63 @@ rte_vhost_dequeue_burst(struct virtio_net *dev, uint16_t queue_id,
}

avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- /* If there are no available buffers then return. */
- if (vq->last_used_idx == avail_idx)
+ free_entries = avail_idx - vq->last_used_idx;
+ if (free_entries == 0)
goto out;

- LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__,
- dev->device_fh);
+ LOG_DEBUG(VHOST_DATA, "%s (%"PRIu64")\n", __func__, dev->device_fh);

/* Prefetch available ring to retrieve head indexes. */
- rte_prefetch0(&vq->avail->ring[vq->last_used_idx & (vq->size - 1)]);
+ used_idx = vq->last_used_idx & (vq->size - 1);
+ rte_prefetch0(&vq->avail->ring[used_idx]);

- /*get the number of free entries in the ring*/
- free_entries = (avail_idx - vq->last_used_idx);
+ count = RTE_MIN(count, MAX_PKT_BURST);
+ count = RTE_MIN(count, free_entries);
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") about to dequeue %u buffers\n",
+ dev->device_fh, count);

- free_entries = RTE_MIN(free_entries, count);
- /* Limit to MAX_PKT_BURST. */
- free_entries = RTE_MIN(free_entries, MAX_PKT_BURST);
-
- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Buffers available %d\n",
- dev->device_fh, free_entries);
/* Retrieve all of the head indexes first to avoid caching issues. */
- for (i = 0; i < free_entries; i++)
- head[i] = vq->avail->ring[(vq->last_used_idx + i) & (vq->size - 1)];
+ for (i = 0; i < count; i++) {
+ desc_indexes[i] = vq->avail->ring[(vq->last_used_idx + i) &
+ (vq->size - 1)];
+ }

/* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success]]);
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
rte_prefetch0(&vq->used->ring[vq->last_used_idx & (vq->size - 1)]);

- while (entry_success < free_entries) {
- uint32_t vb_avail, vb_offset;
- uint32_t seg_avail, seg_offset;
- uint32_t cpy_len;
- uint32_t seg_num = 0;
- struct rte_mbuf *cur;
- uint8_t alloc_err = 0;
-
- desc = &vq->desc[head[entry_success]];
-
- vb_net_hdr_addr = gpa_to_vva(dev, desc->addr);
- hdr = (struct virtio_net_hdr *)((uintptr_t)vb_net_hdr_addr);
-
- /* Discard first buffer as it is the virtio header */
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- vb_offset = 0;
- vb_avail = desc->len;
- } else {
- vb_offset = vq->vhost_hlen;
- vb_avail = desc->len - vb_offset;
- }
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
-
- used_idx = vq->last_used_idx & (vq->size - 1);
+ for (i = 0; i < count; i++) {
+ int err;

- if (entry_success < (free_entries - 1)) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[entry_success+1]]);
- rte_prefetch0(&vq->used->ring[(used_idx + 1) & (vq->size - 1)]);
+ if (likely(i + 1 < count)) {
+ rte_prefetch0(&vq->desc[desc_indexes[i + 1]]);
+ rte_prefetch0(&vq->used->ring[(used_idx + 1) &
+ (vq->size - 1)]);
}

- /* Update used index buffer information. */
- vq->used->ring[used_idx].id = head[entry_success];
- vq->used->ring[used_idx].len = 0;
- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[used_idx]),
- sizeof(vq->used->ring[used_idx]));
-
- /* Allocate an mbuf and populate the structure. */
- m = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(m == NULL)) {
+ pkts[i] = rte_pktmbuf_alloc(mbuf_pool);
+ if (unlikely(pkts[i] == NULL)) {
RTE_LOG(ERR, VHOST_DATA,
"Failed to allocate memory for mbuf.\n");
break;
}
- seg_offset = 0;
- seg_avail = m->buf_len - RTE_PKTMBUF_HEADROOM;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr, desc->len, 0);
-
- seg_num++;
- cur = m;
- prev = m;
- while (cpy_len != 0) {
- rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, seg_offset),
- (void *)((uintptr_t)(vb_addr + vb_offset)),
- cpy_len);
-
- seg_offset += cpy_len;
- vb_offset += cpy_len;
- vb_avail -= cpy_len;
- seg_avail -= cpy_len;
-
- if (vb_avail != 0) {
- /*
- * The segment reachs to its end,
- * while the virtio buffer in TX vring has
- * more data to be copied.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /* Allocate mbuf and populate the structure. */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR, VHOST_DATA, "Failed to "
- "allocate memory for mbuf.\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
-
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- } else {
- if (desc->flags & VRING_DESC_F_NEXT) {
- /*
- * There are more virtio buffers in
- * same vring entry need to be copied.
- */
- if (seg_avail == 0) {
- /*
- * The current segment hasn't
- * room to accomodate more
- * data.
- */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- /*
- * Allocate an mbuf and
- * populate the structure.
- */
- cur = rte_pktmbuf_alloc(mbuf_pool);
- if (unlikely(cur == NULL)) {
- RTE_LOG(ERR,
- VHOST_DATA,
- "Failed to "
- "allocate memory "
- "for mbuf\n");
- rte_pktmbuf_free(m);
- alloc_err = 1;
- break;
- }
- seg_num++;
- prev->next = cur;
- prev = cur;
- seg_offset = 0;
- seg_avail = cur->buf_len - RTE_PKTMBUF_HEADROOM;
- }
-
- desc = &vq->desc[desc->next];
-
- /* Buffer address translation. */
- vb_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = desc->len;
-
- PRINT_PACKET(dev, (uintptr_t)vb_addr,
- desc->len, 0);
- } else {
- /* The whole packet completes. */
- cur->data_len = seg_offset;
- m->pkt_len += seg_offset;
- vb_avail = 0;
- }
- }
-
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- }
-
- if (unlikely(alloc_err == 1))
+ err = copy_desc_to_mbuf(dev, vq, pkts[i], desc_indexes[i],
+ mbuf_pool);
+ if (unlikely(err)) {
+ rte_pktmbuf_free(pkts[i]);
break;
+ }

- m->nb_segs = seg_num;
- if ((hdr->flags != 0) || (hdr->gso_type != VIRTIO_NET_HDR_GSO_NONE))
- vhost_dequeue_offload(hdr, m);
-
- pkts[entry_success] = m;
- vq->last_used_idx++;
- entry_success++;
+ used_idx = vq->last_used_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_indexes[i];
+ vq->used->ring[used_idx].len = 0;
+ vhost_log_used_vring(dev, vq,
+ offsetof(struct vring_used, ring[used_idx]),
+ sizeof(vq->used->ring[used_idx]));
}

rte_compiler_barrier();
- vq->used->idx += entry_success;
+ vq->used->idx += i;
vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
sizeof(vq->used->idx));

@@ -1057,10 +998,10 @@ out:
* Inject it to the head of "pkts" array, so that switch's mac
* learning table will get updated first.
*/
- memmove(&pkts[1], pkts, entry_success * sizeof(m));
+ memmove(&pkts[1], pkts, i * sizeof(struct rte_mbuf *));
pkts[0] = rarp_mbuf;
- entry_success += 1;
+ i += 1;
}

- return entry_success;
+ return i;
}
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:40 UTC
Permalink
This is a simple refactor, as there isn't any twisted logic in old
code. Here I just broke the code and introduced two helper functions,
reserve_avail_buf() and copy_mbuf_to_desc() to make the code more
readable.

Also, it saves nearly 1K bytes of binary code size.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---

v2: - fix NULL dereference bug found by Rich.

v3: - use while (mbuf_avail || m->next) to align with the style of
coyp_desc_to_mbuf() -- suggestec by Huawei

- use likely/unlikely when possible
---
lib/librte_vhost/vhost_rxtx.c | 296 ++++++++++++++++++++----------------------
1 file changed, 141 insertions(+), 155 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index e12e9ba..0df0612 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -129,6 +129,115 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
return;
}

+static inline int __attribute__((always_inline))
+copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
+{
+ uint32_t desc_avail, desc_offset;
+ uint32_t mbuf_avail, mbuf_offset;
+ uint32_t cpy_len;
+ struct vring_desc *desc;
+ uint64_t desc_addr;
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+
+ desc = &vq->desc[desc_idx];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+ virtio_enqueue_offload(m, &virtio_hdr.hdr);
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ vhost_log_write(dev, desc->addr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+ desc_offset = vq->vhost_hlen;
+ desc_avail = desc->len - vq->vhost_hlen;
+
+ *copied = rte_pktmbuf_pkt_len(m);
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (mbuf_avail != 0 || m->next != NULL) {
+ /* done with current mbuf, fetch next */
+ if (mbuf_avail == 0) {
+ m = m->next;
+
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }
+
+ /* done with current desc buf, fetch next */
+ if (desc_avail == 0) {
+ if ((desc->flags & VRING_DESC_F_NEXT) == 0) {
+ /* Room in vring buffer is not enough */
+ return -1;
+ }
+
+ desc = &vq->desc[desc->next];
+ desc_addr = gpa_to_vva(dev, desc->addr);
+ desc_offset = 0;
+ desc_avail = desc->len;
+ }
+
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+ rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+ cpy_len);
+ vhost_log_write(dev, desc->addr + desc_offset, cpy_len);
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);
+
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ return 0;
+}
+
+/*
+ * As many data cores may want to access available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_end_idx;
+ uint16_t avail_idx;
+ uint16_t free_entries;
+ int success;
+
+ count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+ res_start_idx = vq->last_used_idx_res;
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+ free_entries = avail_idx - res_start_idx;
+ count = RTE_MIN(count, free_entries);
+ if (count == 0)
+ return 0;
+
+ res_end_idx = res_start_idx + count;
+
+ /*
+ * update vq->last_used_idx_res atomically; try again if failed.
+ *
+ * TODO: Allow to disable cmpset if no concurrency in application.
+ */
+ success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_end_idx);
+ if (unlikely(!success))
+ goto again;
+
+ *start = res_start_idx;
+ *end = res_end_idx;
+
+ return count;
+}
+
/**
* This function adds buffers to the virtio devices RX virtqueue. Buffers can
* be received from the physical port or from another virtio device. A packet
@@ -138,21 +247,12 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
*/
static inline uint32_t __attribute__((always_inline))
virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- struct vring_desc *desc, *hdr_desc;
- struct rte_mbuf *buff, *first_buff;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
- uint64_t buff_addr = 0;
- uint64_t buff_hdr_addr = 0;
- uint32_t head[MAX_PKT_BURST];
- uint32_t head_idx, packet_success = 0;
- uint16_t avail_idx, res_cur_idx;
- uint16_t res_base_idx, res_end_idx;
- uint16_t free_entries;
- uint8_t success = 0;
+ uint16_t res_start_idx, res_end_idx;
+ uint16_t desc_indexes[MAX_PKT_BURST];
+ uint32_t i;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -166,161 +266,47 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
if (unlikely(vq->enabled == 0))
return 0;

- count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;
-
- /*
- * As many data cores may want access to available buffers,
- * they need to be reserved.
- */
- do {
- res_base_idx = vq->last_used_idx_res;
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
- free_entries = (avail_idx - res_base_idx);
- /*check that we have enough buffers*/
- if (unlikely(count > free_entries))
- count = free_entries;
-
- if (count == 0)
- return 0;
-
- res_end_idx = res_base_idx + count;
- /* vq->last_used_idx_res is atomically updated. */
- /* TODO: Allow to disable cmpset if no concurrency in application. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx, res_end_idx);
- } while (unlikely(success == 0));
- res_cur_idx = res_base_idx;
- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
- dev->device_fh, res_cur_idx, res_end_idx);
-
- /* Prefetch available ring to retrieve indexes. */
- rte_prefetch0(&vq->avail->ring[res_cur_idx & (vq->size - 1)]);
-
- /* Retrieve all of the head indexes first to avoid caching issues. */
- for (head_idx = 0; head_idx < count; head_idx++)
- head[head_idx] = vq->avail->ring[(res_cur_idx + head_idx) &
- (vq->size - 1)];
-
- /*Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[packet_success]]);
-
- while (res_cur_idx != res_end_idx) {
- uint32_t offset = 0, vb_offset = 0;
- uint32_t pkt_len, len_to_cpy, data_len, total_copied = 0;
- uint8_t hdr = 0, uncompleted_pkt = 0;
- uint16_t idx;
-
- /* Get descriptor from available ring */
- desc = &vq->desc[head[packet_success]];
-
- buff = pkts[packet_success];
- first_buff = buff;
-
- /* Convert from gpa to vva (guest physical addr -> vhost virtual addr) */
- buff_addr = gpa_to_vva(dev, desc->addr);
- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)buff_addr);
-
- /* Copy virtio_hdr to packet and increment buffer address */
- buff_hdr_addr = buff_addr;
- hdr_desc = desc;
-
- /*
- * If the descriptors are chained the header and data are
- * placed in separate buffers.
- */
- if ((desc->flags & VRING_DESC_F_NEXT) &&
- (desc->len == vq->vhost_hlen)) {
- desc = &vq->desc[desc->next];
- /* Buffer address translation. */
- buff_addr = gpa_to_vva(dev, desc->addr);
- } else {
- vb_offset += vq->vhost_hlen;
- hdr = 1;
- }
+ count = reserve_avail_buf(vq, count, &res_start_idx, &res_end_idx);
+ if (count == 0)
+ return 0;

- pkt_len = rte_pktmbuf_pkt_len(buff);
- data_len = rte_pktmbuf_data_len(buff);
- len_to_cpy = RTE_MIN(data_len,
- hdr ? desc->len - vq->vhost_hlen : desc->len);
- while (total_copied < pkt_len) {
- /* Copy mbuf data to buffer */
- rte_memcpy((void *)(uintptr_t)(buff_addr + vb_offset),
- rte_pktmbuf_mtod_offset(buff, const void *, offset),
- len_to_cpy);
- vhost_log_write(dev, desc->addr + vb_offset, len_to_cpy);
- PRINT_PACKET(dev, (uintptr_t)(buff_addr + vb_offset),
- len_to_cpy, 0);
-
- offset += len_to_cpy;
- vb_offset += len_to_cpy;
- total_copied += len_to_cpy;
-
- /* The whole packet completes */
- if (total_copied == pkt_len)
- break;
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") res_start_idx %d| res_end_idx Index %d\n",
+ dev->device_fh, res_start_idx, res_end_idx);

- /* The current segment completes */
- if (offset == data_len) {
- buff = buff->next;
- offset = 0;
- data_len = rte_pktmbuf_data_len(buff);
- }
+ /* Retrieve all of the desc indexes first to avoid caching issues. */
+ rte_prefetch0(&vq->avail->ring[res_start_idx & (vq->size - 1)]);
+ for (i = 0; i < count; i++) {
+ desc_indexes[i] = vq->avail->ring[(res_start_idx + i) &
+ (vq->size - 1)];
+ }

- /* The current vring descriptor done */
- if (vb_offset == desc->len) {
- if (desc->flags & VRING_DESC_F_NEXT) {
- desc = &vq->desc[desc->next];
- buff_addr = gpa_to_vva(dev, desc->addr);
- vb_offset = 0;
- } else {
- /* Room in vring buffer is not enough */
- uncompleted_pkt = 1;
- break;
- }
- }
- len_to_cpy = RTE_MIN(data_len - offset, desc->len - vb_offset);
- }
+ rte_prefetch0(&vq->desc[desc_indexes[0]]);
+ for (i = 0; i < count; i++) {
+ uint16_t desc_idx = desc_indexes[i];
+ uint16_t used_idx = (res_start_idx + i) & (vq->size - 1);
+ uint32_t copied;
+ int err;

- /* Update used ring with desc information */
- idx = res_cur_idx & (vq->size - 1);
- vq->used->ring[idx].id = head[packet_success];
+ err = copy_mbuf_to_desc(dev, vq, pkts[i], desc_idx, &copied);

- /* Drop the packet if it is uncompleted */
- if (unlikely(uncompleted_pkt == 1))
- vq->used->ring[idx].len = vq->vhost_hlen;
+ vq->used->ring[used_idx].id = desc_idx;
+ if (unlikely(err))
+ vq->used->ring[used_idx].len = vq->vhost_hlen;
else
- vq->used->ring[idx].len = pkt_len + vq->vhost_hlen;
-
+ vq->used->ring[used_idx].len = copied + vq->vhost_hlen;
vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
+ offsetof(struct vring_used, ring[used_idx]),
+ sizeof(vq->used->ring[used_idx]));

- res_cur_idx++;
- packet_success++;
-
- if (unlikely(uncompleted_pkt == 1))
- continue;
-
- virtio_enqueue_offload(first_buff, &virtio_hdr.hdr);
-
- rte_memcpy((void *)(uintptr_t)buff_hdr_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
- vhost_log_write(dev, hdr_desc->addr, vq->vhost_hlen);
-
- PRINT_PACKET(dev, (uintptr_t)buff_hdr_addr, vq->vhost_hlen, 1);
-
- if (res_cur_idx < res_end_idx) {
- /* Prefetch descriptor index. */
- rte_prefetch0(&vq->desc[head[packet_success]]);
- }
+ if (i + 1 < count)
+ rte_prefetch0(&vq->desc[desc_indexes[i+1]]);
}

rte_compiler_barrier();

/* Wait until it's our turn to add our buffer to the used ring. */
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != res_start_idx))
rte_pause();

*(volatile uint16_t *)&vq->used->idx += count;
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:41 UTC
Permalink
Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.

However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx(). The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
difference a bit bigger then.

The refactored code looks like below:

while (mbuf_has_not_drained_totally || mbuf_has_next) {
if (this_desc_has_no_room) {
this_desc = fetch_next_from_vec_buf();

if (it is the last of a desc chain)
update_used_ring();
}

if (this_mbuf_has_drained_totally)
mbuf = fetch_next_mbuf();

COPY(this_desc, this_mbuf);
}

This patch reduces quite many lines of code, therefore, make it much
more readable.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 404 +++++++++++++++++-------------------------
1 file changed, 166 insertions(+), 238 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 0df0612..9be3593 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -324,251 +324,204 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
return count;
}

-static inline uint32_t __attribute__((always_inline))
-copy_from_mbuf_to_vring(struct virtio_net *dev, uint32_t queue_id,
- uint16_t res_base_idx, uint16_t res_end_idx,
- struct rte_mbuf *pkt)
+static inline int
+fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
+ uint32_t *allocated, uint32_t *vec_idx)
{
- uint32_t vec_idx = 0;
- uint32_t entry_success = 0;
- struct vhost_virtqueue *vq;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
- {0, 0, 0, 0, 0, 0}, 0};
- uint16_t cur_idx = res_base_idx;
- uint64_t vb_addr = 0;
- uint64_t vb_hdr_addr = 0;
- uint32_t seg_offset = 0;
- uint32_t vb_offset = 0;
- uint32_t seg_avail;
- uint32_t vb_avail;
- uint32_t cpy_len, entry_len;
- uint16_t idx;
-
- if (pkt == NULL)
- return 0;
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
- "End Index %d\n",
- dev->device_fh, cur_idx, res_end_idx);
+ while (1) {
+ if (vec_id >= BUF_VECTOR_MAX)
+ return -1;

- /*
- * Convert from gpa to vva
- * (guest physical addr -> vhost virtual addr)
- */
- vq = dev->virtqueue[queue_id];
+ len += vq->desc[idx].len;
+ vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
+ vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
+ vq->buf_vec[vec_id].desc_idx = idx;
+ vec_id++;

- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
- vb_hdr_addr = vb_addr;
+ if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
+ break;

- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
+ idx = vq->desc[idx].next;
+ }

- virtio_hdr.num_buffers = res_end_idx - res_base_idx;
+ *allocated = len;
+ *vec_idx = vec_id;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
- dev->device_fh, virtio_hdr.num_buffers);
+ return 0;
+}

- virtio_enqueue_offload(pkt, &virtio_hdr.hdr);
+/*
+ * As many data cores may want to access available buffers concurrently,
+ * they need to be reserved.
+ *
+ * Returns -1 on fail, 0 on success
+ */
+static inline int
+reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_cur_idx;
+ uint16_t avail_idx;
+ uint32_t allocated;
+ uint32_t vec_idx;
+ uint16_t tries;

- rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
- vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr, vq->vhost_hlen);
+again:
+ res_start_idx = vq->last_used_idx_res;
+ res_cur_idx = res_start_idx;
+
+ allocated = 0;
+ vec_idx = 0;
+ tries = 0;
+ while (1) {
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+ if (unlikely(res_cur_idx == avail_idx)) {
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Failed "
+ "to get enough desc from vring\n",
+ dev->device_fh);
+ return -1;
+ }

- PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
+ if (fill_vec_buf(vq, res_cur_idx, &allocated, &vec_idx) < 0)
+ return -1;

- seg_avail = rte_pktmbuf_data_len(pkt);
- vb_offset = vq->vhost_hlen;
- vb_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ res_cur_idx++;
+ tries++;

- entry_len = vq->vhost_hlen;
+ if (allocated >= size)
+ break;

- if (vb_avail == 0) {
- uint32_t desc_idx = vq->buf_vec[vec_idx].desc_idx;
+ /*
+ * if we tried all available ring items, and still
+ * can't get enough buf, it means something abnormal
+ * happened.
+ */
+ if (tries >= vq->size)
+ return -1;
+ }

- if ((vq->desc[desc_idx].flags & VRING_DESC_F_NEXT) == 0) {
- idx = cur_idx & (vq->size - 1);
+ /*
+ * update vq->last_used_idx_res atomically.
+ * retry again if failed.
+ */
+ if (rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_cur_idx) == 0)
+ goto again;

- /* Update used ring with desc information */
- vq->used->ring[idx].id = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[idx].len = entry_len;
+ *start = res_start_idx;
+ *end = res_cur_idx;
+ return 0;
+}

- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *m)
+{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ uint32_t vec_idx = 0;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;

- entry_len = 0;
- cur_idx++;
- entry_success++;
- }
+ if (unlikely(m == NULL))
+ return 0;

- vec_idx++;
- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") Current Index %d| End Index %d\n",
+ dev->device_fh, cur_idx, res_end_idx);

- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- }
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);

- cpy_len = RTE_MIN(vb_avail, seg_avail);
+ virtio_hdr.num_buffers = res_end_idx - res_start_idx;
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
+ dev->device_fh, virtio_hdr.num_buffers);

- while (cpy_len > 0) {
- /* Copy mbuf data to vring buffer */
- rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
- rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
- cpy_len);
- vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr + vb_offset,
- cpy_len);
+ virtio_enqueue_offload(m, &virtio_hdr.hdr);
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

- PRINT_PACKET(dev,
- (uintptr_t)(vb_addr + vb_offset),
- cpy_len, 0);
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;

- seg_offset += cpy_len;
- vb_offset += cpy_len;
- seg_avail -= cpy_len;
- vb_avail -= cpy_len;
- entry_len += cpy_len;
-
- if (seg_avail != 0) {
- /*
- * The virtio buffer in this vring
- * entry reach to its end.
- * But the segment doesn't complete.
- */
- if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (mbuf_avail != 0 || m->next != NULL) {
+ /* done with current desc buf, get the next one */
+ if (desc_avail == 0) {
+ desc_idx = vq->buf_vec[vec_idx].desc_idx;
+
+ if (!(vq->desc[desc_idx].flags & VRING_DESC_F_NEXT)) {
/* Update used ring with desc information */
- idx = cur_idx & (vq->size - 1);
- vq->used->ring[idx].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[idx].len = entry_len;
+ used_idx = cur_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
- entry_len = 0;
- cur_idx++;
- entry_success++;
+ offsetof(struct vring_used,
+ ring[used_idx]),
+ sizeof(vq->used->ring[used_idx]));
}

vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This current segment complete, need continue to
- * check if the whole packet complete or not.
- */
- pkt = pkt->next;
- if (pkt != NULL) {
- /*
- * There are more segments.
- */
- if (vb_avail == 0) {
- /*
- * This current buffer from vring is
- * used up, need fetch next buffer
- * from buf_vec.
- */
- uint32_t desc_idx =
- vq->buf_vec[vec_idx].desc_idx;
-
- if ((vq->desc[desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
- idx = cur_idx & (vq->size - 1);
- /*
- * Update used ring with the
- * descriptor information
- */
- vq->used->ring[idx].id
- = desc_idx;
- vq->used->ring[idx].len
- = entry_len;
- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
- entry_success++;
- entry_len = 0;
- cur_idx++;
- }
-
- /* Get next buffer from buf_vec. */
- vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_avail =
- vq->buf_vec[vec_idx].buf_len;
- vb_offset = 0;
- }
-
- seg_offset = 0;
- seg_avail = rte_pktmbuf_data_len(pkt);
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This whole packet completes.
- */
- /* Update used ring with desc information */
- idx = cur_idx & (vq->size - 1);
- vq->used->ring[idx].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[idx].len = entry_len;
- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
- entry_success++;
- break;
- }
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+
+ /* Prefetch buffer address. */
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+ desc_offset = 0;
+ desc_avail = vq->buf_vec[vec_idx].buf_len;
}
- }

- return entry_success;
-}
+ /* done with current mbuf, get the next one */
+ if (mbuf_avail == 0) {
+ m = m->next;

-static inline void __attribute__((always_inline))
-update_secure_len(struct vhost_virtqueue *vq, uint32_t id,
- uint32_t *secure_len, uint32_t *vec_idx)
-{
- uint16_t wrapped_idx = id & (vq->size - 1);
- uint32_t idx = vq->avail->ring[wrapped_idx];
- uint8_t next_desc;
- uint32_t len = *secure_len;
- uint32_t vec_id = *vec_idx;
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }

- do {
- next_desc = 0;
- len += vq->desc[idx].len;
- vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
- vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
- vq->buf_vec[vec_id].desc_idx = idx;
- vec_id++;
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+ rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+ cpy_len);
+ vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr + desc_offset,
+ cpy_len);
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);

- if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
- idx = vq->desc[idx].next;
- next_desc = 1;
- }
- } while (next_desc);
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }
+
+ used_idx = cur_idx & (vq->size - 1);
+ vq->used->ring[used_idx].id = vq->buf_vec[vec_idx].desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
+ vhost_log_used_vring(dev, vq,
+ offsetof(struct vring_used, ring[used_idx]),
+ sizeof(vq->used->ring[used_idx]));

- *secure_len = len;
- *vec_idx = vec_id;
+ return res_end_idx - res_start_idx;
}

-/*
- * This function works for mergeable RX.
- */
static inline uint32_t __attribute__((always_inline))
virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- uint32_t pkt_idx = 0, entry_success = 0;
- uint16_t avail_idx;
- uint16_t res_base_idx, res_cur_idx;
- uint8_t success = 0;
+ uint32_t pkt_idx = 0, nr_used = 0;
+ uint16_t start, end;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
dev->device_fh);
@@ -584,57 +537,32 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
return 0;

count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
if (count == 0)
return 0;

for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;

- do {
- /*
- * As many data cores may want access to available
- * buffers, they need to be reserved.
- */
- uint32_t secure_len = 0;
- uint32_t vec_idx = 0;
-
- res_base_idx = vq->last_used_idx_res;
- res_cur_idx = res_base_idx;
-
- do {
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
- if (unlikely(res_cur_idx == avail_idx))
- goto merge_rx_exit;
-
- update_secure_len(vq, res_cur_idx,
- &secure_len, &vec_idx);
- res_cur_idx++;
- } while (pkt_len > secure_len);
-
- /* vq->last_used_idx_res is atomically updated. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx,
- res_cur_idx);
- } while (success == 0);
-
- entry_success = copy_from_mbuf_to_vring(dev, queue_id,
- res_base_idx, res_cur_idx, pkts[pkt_idx]);
+ if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+ break;

+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
rte_compiler_barrier();

/*
* Wait until it's our turn to add our buffer
* to the used ring.
*/
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != start))
rte_pause();

- *(volatile uint16_t *)&vq->used->idx += entry_success;
- vq->last_used_idx = res_cur_idx;
+ *(volatile uint16_t *)&vq->used->idx += nr_used;
+ vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
+ sizeof(vq->used->idx));
+ vq->last_used_idx = end;
}

-merge_rx_exit:
if (likely(pkt_idx)) {
/* flush used->idx update before we read avail->flags. */
rte_mb();
--
1.9.0
Thomas Monjalon
2016-03-11 16:18:54 UTC
Permalink
This patch does not compile:
lib/librte_vhost/vhost_rxtx.c:386:5: error: ‘dev’ undeclared
Yuanhan Liu
2016-03-14 06:45:30 UTC
Permalink
Post by Thomas Monjalon
lib/librte_vhost/vhost_rxtx.c:386:5: error: ‘dev’ undeclared
Oops, I acutally did the basic compile test on every commit and every
time I did a rebase. I had a quick look, it turned out that it's a build
bug when RTE_LIBRTE_VHOST_DEBUG is enabled. It is disabled by default,
and that's why I didn't catch it.

Thanks for the report, I will fix it soon.

--yliu
Yuanhan Liu
2016-03-14 07:35:22 UTC
Permalink
Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.

However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx(). The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
difference a bit bigger then.

The refactored code looks like below:

while (mbuf_has_not_drained_totally || mbuf_has_next) {
if (this_desc_has_no_room) {
this_desc = fetch_next_from_vec_buf();

if (it is the last of a desc chain)
update_used_ring();
}

if (this_mbuf_has_drained_totally)
mbuf = fetch_next_mbuf();

COPY(this_desc, this_mbuf);
}

This patch reduces quite many lines of code, therefore, make it much
more readable.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---

v4: fix build error when DEBUG is enabled
---
lib/librte_vhost/vhost_rxtx.c | 406 +++++++++++++++++-------------------------
1 file changed, 168 insertions(+), 238 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 0df0612..3ebecee 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -324,251 +324,201 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
return count;
}

-static inline uint32_t __attribute__((always_inline))
-copy_from_mbuf_to_vring(struct virtio_net *dev, uint32_t queue_id,
- uint16_t res_base_idx, uint16_t res_end_idx,
- struct rte_mbuf *pkt)
+static inline int
+fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
+ uint32_t *allocated, uint32_t *vec_idx)
{
- uint32_t vec_idx = 0;
- uint32_t entry_success = 0;
- struct vhost_virtqueue *vq;
- /* The virtio_hdr is initialised to 0. */
- struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
- {0, 0, 0, 0, 0, 0}, 0};
- uint16_t cur_idx = res_base_idx;
- uint64_t vb_addr = 0;
- uint64_t vb_hdr_addr = 0;
- uint32_t seg_offset = 0;
- uint32_t vb_offset = 0;
- uint32_t seg_avail;
- uint32_t vb_avail;
- uint32_t cpy_len, entry_len;
- uint16_t idx;
-
- if (pkt == NULL)
- return 0;
+ uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+ uint32_t vec_id = *vec_idx;
+ uint32_t len = *allocated;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
- "End Index %d\n",
- dev->device_fh, cur_idx, res_end_idx);
+ while (1) {
+ if (vec_id >= BUF_VECTOR_MAX)
+ return -1;

- /*
- * Convert from gpa to vva
- * (guest physical addr -> vhost virtual addr)
- */
- vq = dev->virtqueue[queue_id];
+ len += vq->desc[idx].len;
+ vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
+ vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
+ vq->buf_vec[vec_id].desc_idx = idx;
+ vec_id++;

- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
- vb_hdr_addr = vb_addr;
+ if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
+ break;

- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
+ idx = vq->desc[idx].next;
+ }

- virtio_hdr.num_buffers = res_end_idx - res_base_idx;
+ *allocated = len;
+ *vec_idx = vec_id;

- LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
- dev->device_fh, virtio_hdr.num_buffers);
+ return 0;
+}

- virtio_enqueue_offload(pkt, &virtio_hdr.hdr);
+/*
+ * As many data cores may want to access available buffers concurrently,
+ * they need to be reserved.
+ *
+ * Returns -1 on fail, 0 on success
+ */
+static inline int
+reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
+ uint16_t *start, uint16_t *end)
+{
+ uint16_t res_start_idx;
+ uint16_t res_cur_idx;
+ uint16_t avail_idx;
+ uint32_t allocated;
+ uint32_t vec_idx;
+ uint16_t tries;

- rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
- vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr, vq->vhost_hlen);
+again:
+ res_start_idx = vq->last_used_idx_res;
+ res_cur_idx = res_start_idx;

- PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
+ allocated = 0;
+ vec_idx = 0;
+ tries = 0;
+ while (1) {
+ avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+ if (unlikely(res_cur_idx == avail_idx))
+ return -1;

- seg_avail = rte_pktmbuf_data_len(pkt);
- vb_offset = vq->vhost_hlen;
- vb_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ if (unlikely(fill_vec_buf(vq, res_cur_idx, &allocated,
+ &vec_idx) < 0))
+ return -1;

- entry_len = vq->vhost_hlen;
+ res_cur_idx++;
+ tries++;

- if (vb_avail == 0) {
- uint32_t desc_idx = vq->buf_vec[vec_idx].desc_idx;
+ if (allocated >= size)
+ break;

- if ((vq->desc[desc_idx].flags & VRING_DESC_F_NEXT) == 0) {
- idx = cur_idx & (vq->size - 1);
+ /*
+ * if we tried all available ring items, and still
+ * can't get enough buf, it means something abnormal
+ * happened.
+ */
+ if (unlikely(tries >= vq->size))
+ return -1;
+ }

- /* Update used ring with desc information */
- vq->used->ring[idx].id = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[idx].len = entry_len;
+ /*
+ * update vq->last_used_idx_res atomically.
+ * retry again if failed.
+ */
+ if (rte_atomic16_cmpset(&vq->last_used_idx_res,
+ res_start_idx, res_cur_idx) == 0)
+ goto again;

- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
+ *start = res_start_idx;
+ *end = res_cur_idx;
+ return 0;
+}

- entry_len = 0;
- cur_idx++;
- entry_success++;
- }
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+ uint16_t res_start_idx, uint16_t res_end_idx,
+ struct rte_mbuf *m)
+{
+ struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+ uint32_t vec_idx = 0;
+ uint16_t cur_idx = res_start_idx;
+ uint64_t desc_addr;
+ uint32_t mbuf_offset, mbuf_avail;
+ uint32_t desc_offset, desc_avail;
+ uint32_t cpy_len;
+ uint16_t desc_idx, used_idx;

- vec_idx++;
- vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ if (unlikely(m == NULL))
+ return 0;

- /* Prefetch buffer address. */
- rte_prefetch0((void *)(uintptr_t)vb_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- }
+ LOG_DEBUG(VHOST_DATA,
+ "(%"PRIu64") Current Index %d| End Index %d\n",
+ dev->device_fh, cur_idx, res_end_idx);

- cpy_len = RTE_MIN(vb_avail, seg_avail);
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+ rte_prefetch0((void *)(uintptr_t)desc_addr);

- while (cpy_len > 0) {
- /* Copy mbuf data to vring buffer */
- rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
- rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
- cpy_len);
- vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr + vb_offset,
- cpy_len);
+ virtio_hdr.num_buffers = res_end_idx - res_start_idx;
+ LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
+ dev->device_fh, virtio_hdr.num_buffers);

- PRINT_PACKET(dev,
- (uintptr_t)(vb_addr + vb_offset),
- cpy_len, 0);
+ virtio_enqueue_offload(m, &virtio_hdr.hdr);
+ rte_memcpy((void *)(uintptr_t)desc_addr,
+ (const void *)&virtio_hdr, vq->vhost_hlen);
+ vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr, vq->vhost_hlen);
+ PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+ desc_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+ desc_offset = vq->vhost_hlen;

- seg_offset += cpy_len;
- vb_offset += cpy_len;
- seg_avail -= cpy_len;
- vb_avail -= cpy_len;
- entry_len += cpy_len;
-
- if (seg_avail != 0) {
- /*
- * The virtio buffer in this vring
- * entry reach to its end.
- * But the segment doesn't complete.
- */
- if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ mbuf_offset = 0;
+ while (mbuf_avail != 0 || m->next != NULL) {
+ /* done with current desc buf, get the next one */
+ if (desc_avail == 0) {
+ desc_idx = vq->buf_vec[vec_idx].desc_idx;
+
+ if (!(vq->desc[desc_idx].flags & VRING_DESC_F_NEXT)) {
/* Update used ring with desc information */
- idx = cur_idx & (vq->size - 1);
- vq->used->ring[idx].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[idx].len = entry_len;
+ used_idx = cur_idx++ & (vq->size - 1);
+ vq->used->ring[used_idx].id = desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
- entry_len = 0;
- cur_idx++;
- entry_success++;
+ offsetof(struct vring_used,
+ ring[used_idx]),
+ sizeof(vq->used->ring[used_idx]));
}

vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_offset = 0;
- vb_avail = vq->buf_vec[vec_idx].buf_len;
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This current segment complete, need continue to
- * check if the whole packet complete or not.
- */
- pkt = pkt->next;
- if (pkt != NULL) {
- /*
- * There are more segments.
- */
- if (vb_avail == 0) {
- /*
- * This current buffer from vring is
- * used up, need fetch next buffer
- * from buf_vec.
- */
- uint32_t desc_idx =
- vq->buf_vec[vec_idx].desc_idx;
-
- if ((vq->desc[desc_idx].flags &
- VRING_DESC_F_NEXT) == 0) {
- idx = cur_idx & (vq->size - 1);
- /*
- * Update used ring with the
- * descriptor information
- */
- vq->used->ring[idx].id
- = desc_idx;
- vq->used->ring[idx].len
- = entry_len;
- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
- entry_success++;
- entry_len = 0;
- cur_idx++;
- }
-
- /* Get next buffer from buf_vec. */
- vec_idx++;
- vb_addr = gpa_to_vva(dev,
- vq->buf_vec[vec_idx].buf_addr);
- vb_avail =
- vq->buf_vec[vec_idx].buf_len;
- vb_offset = 0;
- }
-
- seg_offset = 0;
- seg_avail = rte_pktmbuf_data_len(pkt);
- cpy_len = RTE_MIN(vb_avail, seg_avail);
- } else {
- /*
- * This whole packet completes.
- */
- /* Update used ring with desc information */
- idx = cur_idx & (vq->size - 1);
- vq->used->ring[idx].id
- = vq->buf_vec[vec_idx].desc_idx;
- vq->used->ring[idx].len = entry_len;
- vhost_log_used_vring(dev, vq,
- offsetof(struct vring_used, ring[idx]),
- sizeof(vq->used->ring[idx]));
- entry_success++;
- break;
- }
+ desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+
+ /* Prefetch buffer address. */
+ rte_prefetch0((void *)(uintptr_t)desc_addr);
+ desc_offset = 0;
+ desc_avail = vq->buf_vec[vec_idx].buf_len;
}
- }

- return entry_success;
-}
+ /* done with current mbuf, get the next one */
+ if (mbuf_avail == 0) {
+ m = m->next;

-static inline void __attribute__((always_inline))
-update_secure_len(struct vhost_virtqueue *vq, uint32_t id,
- uint32_t *secure_len, uint32_t *vec_idx)
-{
- uint16_t wrapped_idx = id & (vq->size - 1);
- uint32_t idx = vq->avail->ring[wrapped_idx];
- uint8_t next_desc;
- uint32_t len = *secure_len;
- uint32_t vec_id = *vec_idx;
+ mbuf_offset = 0;
+ mbuf_avail = rte_pktmbuf_data_len(m);
+ }

- do {
- next_desc = 0;
- len += vq->desc[idx].len;
- vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
- vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
- vq->buf_vec[vec_id].desc_idx = idx;
- vec_id++;
+ cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+ rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+ rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+ cpy_len);
+ vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr + desc_offset,
+ cpy_len);
+ PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+ cpy_len, 0);

- if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
- idx = vq->desc[idx].next;
- next_desc = 1;
- }
- } while (next_desc);
+ mbuf_avail -= cpy_len;
+ mbuf_offset += cpy_len;
+ desc_avail -= cpy_len;
+ desc_offset += cpy_len;
+ }

- *secure_len = len;
- *vec_idx = vec_id;
+ used_idx = cur_idx & (vq->size - 1);
+ vq->used->ring[used_idx].id = vq->buf_vec[vec_idx].desc_idx;
+ vq->used->ring[used_idx].len = desc_offset;
+ vhost_log_used_vring(dev, vq,
+ offsetof(struct vring_used, ring[used_idx]),
+ sizeof(vq->used->ring[used_idx]));
+
+ return res_end_idx - res_start_idx;
}

-/*
- * This function works for mergeable RX.
- */
static inline uint32_t __attribute__((always_inline))
virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint32_t count)
{
struct vhost_virtqueue *vq;
- uint32_t pkt_idx = 0, entry_success = 0;
- uint16_t avail_idx;
- uint16_t res_base_idx, res_cur_idx;
- uint8_t success = 0;
+ uint32_t pkt_idx = 0, nr_used = 0;
+ uint16_t start, end;

LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
dev->device_fh);
@@ -584,57 +534,37 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
return 0;

count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
if (count == 0)
return 0;

for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;

- do {
- /*
- * As many data cores may want access to available
- * buffers, they need to be reserved.
- */
- uint32_t secure_len = 0;
- uint32_t vec_idx = 0;
-
- res_base_idx = vq->last_used_idx_res;
- res_cur_idx = res_base_idx;
-
- do {
- avail_idx = *((volatile uint16_t *)&vq->avail->idx);
- if (unlikely(res_cur_idx == avail_idx))
- goto merge_rx_exit;
-
- update_secure_len(vq, res_cur_idx,
- &secure_len, &vec_idx);
- res_cur_idx++;
- } while (pkt_len > secure_len);
-
- /* vq->last_used_idx_res is atomically updated. */
- success = rte_atomic16_cmpset(&vq->last_used_idx_res,
- res_base_idx,
- res_cur_idx);
- } while (success == 0);
-
- entry_success = copy_from_mbuf_to_vring(dev, queue_id,
- res_base_idx, res_cur_idx, pkts[pkt_idx]);
+ if (unlikely(reserve_avail_buf_mergeable(vq, pkt_len,
+ &start, &end) < 0)) {
+ LOG_DEBUG(VHOST_DATA,
+ "(%" PRIu64 ") Failed to get enough desc from vring\n",
+ dev->device_fh);
+ break;
+ }

+ nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+ pkts[pkt_idx]);
rte_compiler_barrier();

/*
* Wait until it's our turn to add our buffer
* to the used ring.
*/
- while (unlikely(vq->last_used_idx != res_base_idx))
+ while (unlikely(vq->last_used_idx != start))
rte_pause();

- *(volatile uint16_t *)&vq->used->idx += entry_success;
- vq->last_used_idx = res_cur_idx;
+ *(volatile uint16_t *)&vq->used->idx += nr_used;
+ vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
+ sizeof(vq->used->idx));
+ vq->last_used_idx = end;
}

-merge_rx_exit:
if (likely(pkt_idx)) {
/* flush used->idx update before we read avail->flags. */
rte_mb();
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:42 UTC
Permalink
First of all, rte_memcpy() is mostly useful for coping big packets
by leveraging hardware advanced instructions like AVX. But for virtio
net hdr, which is 12 bytes at most, invoking rte_memcpy() will not
introduce any performance boost.

And, to my suprise, rte_memcpy() is VERY huge. Since rte_memcpy()
is inlined, it increases the binary code size linearly every time
we call it at a different place. Replacing the two rte_memcpy()
with directly copy saves nearly 12K bytes of code size!

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 9be3593..bafcb52 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -129,6 +129,16 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
return;
}

+static inline void
+copy_virtio_net_hdr(struct vhost_virtqueue *vq, uint64_t desc_addr,
+ struct virtio_net_hdr_mrg_rxbuf hdr)
+{
+ if (vq->vhost_hlen == sizeof(struct virtio_net_hdr_mrg_rxbuf))
+ *(struct virtio_net_hdr_mrg_rxbuf *)(uintptr_t)desc_addr = hdr;
+ else
+ *(struct virtio_net_hdr *)(uintptr_t)desc_addr = hdr.hdr;
+}
+
static inline int __attribute__((always_inline))
copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct rte_mbuf *m, uint16_t desc_idx, uint32_t *copied)
@@ -145,8 +155,7 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
rte_prefetch0((void *)(uintptr_t)desc_addr);

virtio_enqueue_offload(m, &virtio_hdr.hdr);
- rte_memcpy((void *)(uintptr_t)desc_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+ copy_virtio_net_hdr(vq, desc_addr, virtio_hdr);
vhost_log_write(dev, desc->addr, vq->vhost_hlen);
PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);

@@ -447,8 +456,7 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
dev->device_fh, virtio_hdr.num_buffers);

virtio_enqueue_offload(m, &virtio_hdr.hdr);
- rte_memcpy((void *)(uintptr_t)desc_addr,
- (const void *)&virtio_hdr, vq->vhost_hlen);
+ copy_virtio_net_hdr(vq, desc_addr, virtio_hdr);
vhost_log_write(dev, vq->buf_vec[vec_idx].buf_addr, vq->vhost_hlen);
PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:43 UTC
Permalink
VIRTIO_NET_F_MRG_RXBUF is a default feature supported by vhost.
Adding unlikely for VIRTIO_NET_F_MRG_RXBUF detection doesn't
make sense to me at all.

Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index bafcb52..50f449f 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -587,7 +587,7 @@ uint16_t
rte_vhost_enqueue_burst(struct virtio_net *dev, uint16_t queue_id,
struct rte_mbuf **pkts, uint16_t count)
{
- if (unlikely(dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)))
+ if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
return virtio_dev_merge_rx(dev, queue_id, pkts, count);
else
return virtio_dev_rx(dev, queue_id, pkts, count);
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:44 UTC
Permalink
We need make sure that desc->len is bigger than the size of virtio net
header, otherwise, unexpected behaviour might happen due to "desc_avail"
would become a huge number with for following code:

desc_avail = desc->len - vq->vhost_hlen;

For dequeue code path, it will try to allocate enough mbuf to hold such
size of desc buf, which ends up with consuming all mbufs, leading to no
free mbuf is available. Therefore, you might see an error message:

Failed to allocate memory for mbuf.

Also, for both dequeue/enqueue code path, while it copies data from/to
desc buf, the big "desc_avail" would result to access memory not belong
the desc buf, which could lead to some potential memory access errors.

A malicious guest could easily forge such malformed vring desc buf. Every
time we restart an interrupted DPDK application inside guest would also
trigger this issue, as all huge pages are reset to 0 during DPDK re-init,
leading to desc->len being 0.

Therefore, this patch does a sanity check for desc->len, to make vhost
robust.

Reported-by: Rich Lane <***@bigswitch.com>
Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 9 +++++++++
1 file changed, 9 insertions(+)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 50f449f..86e4d1a 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -151,6 +151,9 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};

desc = &vq->desc[desc_idx];
+ if (unlikely(desc->len < vq->vhost_hlen))
+ return -1;
+
desc_addr = gpa_to_vva(dev, desc->addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);

@@ -448,6 +451,9 @@ copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
"(%"PRIu64") Current Index %d| End Index %d\n",
dev->device_fh, cur_idx, res_end_idx);

+ if (vq->buf_vec[vec_idx].buf_len < vq->vhost_hlen)
+ return -1;
+
desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);

@@ -737,6 +743,9 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
struct virtio_net_hdr *hdr;

desc = &vq->desc[desc_idx];
+ if (unlikely(desc->len < vq->vhost_hlen))
+ return -1;
+
desc_addr = gpa_to_vva(dev, desc->addr);
rte_prefetch0((void *)(uintptr_t)desc_addr);
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:45 UTC
Permalink
A malicious guest may easily forge some illegal vring desc buf.
To make our vhost robust, we need make sure desc->next will not
go beyond the vq->desc[] array.

Suggested-by: Rich Lane <***@bigswitch.com>
Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 86e4d1a..43db6c7 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -183,6 +183,8 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct vhost_virtqueue *vq,
/* Room in vring buffer is not enough */
return -1;
}
+ if (unlikely(desc->next >= vq->size))
+ return -1;

desc = &vq->desc[desc->next];
desc_addr = gpa_to_vva(dev, desc->addr);
@@ -345,7 +347,7 @@ fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
uint32_t len = *allocated;

while (1) {
- if (vec_id >= BUF_VECTOR_MAX)
+ if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size))
return -1;

len += vq->desc[idx].len;
@@ -759,6 +761,8 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
while (desc_avail != 0 || (desc->flags & VRING_DESC_F_NEXT) != 0) {
/* This desc reaches to its end, get the next one */
if (desc_avail == 0) {
+ if (unlikely(desc->next >= vq->size))
+ return -1;
desc = &vq->desc[desc->next];

desc_addr = gpa_to_vva(dev, desc->addr);
--
1.9.0
Yuanhan Liu
2016-03-10 04:32:46 UTC
Permalink
If a malicious guest forges a dead loop chain, it could lead to a dead
loop of copying the desc buf to mbuf, which results to all mbuf being
exhausted.

Add a var nr_desc to avoid such case.

Suggested-by: Huawei Xie <***@intel.com>
Signed-off-by: Yuanhan Liu <***@linux.intel.com>
---
lib/librte_vhost/vhost_rxtx.c | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 43db6c7..73fab7e 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -743,6 +743,8 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
uint32_t cpy_len;
struct rte_mbuf *cur = m, *prev = m;
struct virtio_net_hdr *hdr;
+ /* A counter to avoid desc dead loop chain */
+ uint32_t nr_desc = 1;

desc = &vq->desc[desc_idx];
if (unlikely(desc->len < vq->vhost_hlen))
@@ -761,7 +763,8 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vhost_virtqueue *vq,
while (desc_avail != 0 || (desc->flags & VRING_DESC_F_NEXT) != 0) {
/* This desc reaches to its end, get the next one */
if (desc_avail == 0) {
- if (unlikely(desc->next >= vq->size))
+ if (unlikely(desc->next >= vq->size ||
+ ++nr_desc >= vq->size))
return -1;
desc = &vq->desc[desc->next];
--
1.9.0
Thomas Monjalon
2016-03-14 23:09:34 UTC
Permalink
Post by Yuanhan Liu
v3: - quite few minor changes, including using likely/unlikely
when possible.
- Added a new patch 8 to avoid desc dead loop chain
The first 3 patches refactor 3 major functions at vhost_rxtx.c.
It simplifies the code logic, making it more readable. OTOH, it
reduces binary code size, due to a lot of duplicate code are
removed, as well as some huge inline functions are diminished.
Patch 4 gets rid of the rte_memcpy for virtio_hdr copy, which
nearly saves 12K bytes of binary code size!
Patch 5 removes "unlikely" for VIRTIO_NET_F_MRG_RXBUF detection.
Patch 6, 7 and 8 do some sanity check for two desc fields, to make
vhost robust and be protected from malicious guest or abnormal use
cases.
---
vhost: refactor rte_vhost_dequeue_burst
vhost: refactor virtio_dev_rx
vhost: refactor virtio_dev_merge_rx
vhost: do not use rte_memcpy for virtio_hdr copy
vhost: don't use unlikely for VIRTIO_NET_F_MRG_RXBUF detection
vhost: do sanity check for desc->len
vhost: do sanity check for desc->next against with vq->size
vhost: avoid dead loop chain.
Applied with 3/8 v4, thanks.

Loading...