Discussion:
[dpdk-dev] [PATCH v4 00/10] A Distributed Software Event Device
(too old to reply)
Mattias Rönnblom
2018-09-18 12:45:04 UTC
Permalink
v4:
* Reworded DSW introduction in the documentation.
* Updated the MAINTAINERS file and the 18.11 release notes.

v3:
* Fixed incorrect headline prefixes.
* Removed dummy dsw_event_schedule() function.
* Removed redundant output buffer flush.

v2:
* Added support for Meson builds.
* Eventdev 'xstats' support is now mandatory.
* Added check in dsw_probe() to allow secondary processes.
* rte_event_dev_stop() now runs the flush callback.
* Added documentation.
* Fixed uninitialized-use warning in ‘dsw_port_consider_migration'.
* Removed some dead (#if 0) debugging code.
* Version .map file is bumped to 18.11.
* Header files sorted in alphabetic order, newline after declarations
and various other coding style-related improvements.

This is the Distributed Software (DSW) event device, which distributes
the task of scheduling events among all the eventdev ports and their
lcore threads.

DSW is primarily designed for atomic-only queues, but also supports
single-link and parallel queues.

(DSW would be more accurately described as 'parallel', but since that
term is used to describe an eventdev queue type, it's referred to as
'distributed', to avoid suggesting it's somehow biased toward parallel
queues.)

Event Scheduling
================

Internally, DSW hashes an eventdev flow id to a 15-bit "flow
hash". For each eventdev queue, there's a table mapping a flow hash to
an eventdev port. That port is considered the owner of the
flow. Owners are randomly picked at initialization time, among the
ports serving (i.e. are linked to) that queue.

The scheduling of an event to a port is done (by the sender port) at
time of the enqueue operation, and in most cases simply consists of
hashing the flow id and performing a lookup in the destination queue's
table. Each port has an MP/SC event ring to which the events are
enqueued. This means events go directly port-to-port, typically
meaning core-to-core.

Port Load Measurement
=====================

DSW includes a concept of port load. The event device keeps track of
transitions between "idle" and "busy" (or vice versa) on a per-port
basis, compares this to the wall time passed, and computes to what
extent the port was busy (for a certain interval). A port transitions
to "busy" on a non-zero dequeue, and again back to "idle" at the point
it performs a dequeue operation returning zero events.

Flow Migration
==============

Periodically, hidden to the API user and as a part of a normal
enqueue/dequeue operations, a port updates its load estimate, and in
case the load has reached a certain threshold, considers moving one of
its flow to a different, more lightly loaded, port. This process is
called migration.

Migration Strategy
~~~~~~~~~~~~~~~~~~

The DSW migration strategy is to move a small, but yet active flow. To
quickly find which are the active flows (w/o resorting to scanning
through the tables and/or keeping per-event counters), each port
maintains a list of the last 128 events it has dequeued. If there are
lightly-loaded enough target ports, it will attempt to migrate one of
those flows, starting with the smallest. The size is estimated by the
number of events seen on that flow, in that small sample of events.

A good migration strategy, based on reasonably good estimates of port
and current flow event rates, is key for proper load balancing in a
DSW-style event device.

Migration Process
~~~~~~~~~~~~~~~~~

If the prerequisites are met, and a migration target flow and port is
found, the owning (source) port will initiate the migration
process. For parallel queues it's a very straightforward operation -
simply a table update. For atomic queues, in order to maintain their
semantics, it's a fair bit more elaborate a procedure.

A high-level view the migration process is available[1] in the form a
sequence diagram.

Much simplified, it consist of the source port sending messages to all
ports configured, asking them to "pause" the to-be-migrated flow. Such
ports will flush their output buffers and provide a confirmation back
to the source port.

Each port holds a list of which flows are paused. Upon the enqueue of
an event belonging to a paused flow, it will be accepted into the
machinery, but kept in a paused-events buffer located on the sending
port.

After receiving confirmations from all ports, the source port will
make sure its application-level user has finished processing of all
events related to the migrating flow, update the relevant queue's
table, and forward all unprocessed events (in its input event ring) to
the new target port.

The source port will then send out a request to "unpause" the flow to
all ports. Upon receiving such a request, the port will flush any
buffered (paused) events related to the paused flow, and provide a
confirmation.

All the signaling are done on regular DPDK rings (separate from the
event-carrying rings), and are pulled as a part of normal
enqueue/dequeue operation.

The migrations can be made fairly rapidly (in the range of a couple
hundred us, or even faster), but the algorithm, load measurement and
migration interval parameters must be carefully chosen not to cause
the system to oscillate or otherwise misbehave.

The migration rate is primarily limited by eventdev enqueue/dequeue
function call rate, which in turn in the typical application is
limited by event burst sizes and event processing latency.

Migration API Implications
~~~~~~~~~~~~~~~~~~~~~~~~~~

The migration process puts an addition requirement on the application
beyond the regular eventdev API, which is to not leave ports
'unattended'. Unattended here means a port on that neither enqueue nor
dequeue operations are performed within a reasonable time frame. What
is 'reasonable' depends on the migration latency requirements, which
in turns depends on the degree of variation in the workload. For
enqueue-only ports, which might well come into situations where no
events are enqueued for long duration of time, DSW includes an
less-than-elegant solution, allowing zero-sized enqueue operations,
which serve no other purpose that to drive the migration machinery.

Workload Suitability
====================

DSW operates under the assumption that an active flow will remain so
for a duration which is significantly longer than the migration
latency.

DSW should do well with a larger number of small flows, and also large
flows that increase their rates at a pace which is low-enough for the
migration process to move away smaller flows to make room on that
port. TCP slow-start kind of traffic, with end-to-end latencies on the
ms level, should be possible to handle, even though their exponential
nature - but all of this is speculation.

DSW won't be able to properly load balance workloads with few, highly
bursty, and high intensity flows.

Compared to the SW event device, DSW allows scaling to higher-core
count machines, with its substantially higher throughput and avoiding
a single bottleneck core, especially for long pipelines, or systems
with very short pipeline stages.

In addition, it also scales down to configurations with very few or
even just a single core, avoiding the issue with SW has with running
application work and event scheduling on the same core.

The downsides is that DSW doesn't have SW's near-immediate load
balancing flow-rerouting capability, but instead relies on flows
changing their inter-event time at a pace which isn't too high for the
migration process to handle.

Backpressure
============

Like any event device, DSW provides a backpressure mechanism to
prevent event producers flooding it.

DSW employs a credit system, with a central pool equal to the
configured max number of in-flight events. The ports are allowed to
take loans from this central pool, and may also return credits, so
that consumer-heavy ports don't end up draining the pool.

A port will, at the time of enqueue, make sure it has enough credits
(one per event) to allow the events into DSW. If not, the port will
attempt to retrieve more from the central pool. If this fails, the
enqueue operation fails. For efficiency reasons, at least 64 credits
are take from the pool (even if fewer might be needed).

A port will, at the time of dequeue, gain as many credits as the
number of events it dequeued. A port will not return credits until
they reach 128, and will always keep 64 credits.

All this in a similar although not identical manner to the SW event
device.

Output Buffering
================

Upon a successful enqueue operation, DSW will not immediately send the
events to their destination ports' event input rings. Events will
however - unless paused - be assigned a destination port and enqueued
on a buffer on the sending port. Such buffered events are considered
accepted into the event device, and is so handled from a migration and
in-flight credit system point of view.

Upon reaching a certain threshold, buffered events will be flushed,
and enqueued on the destination port's input ring.

The output buffers make the DSW ports use longer bursts against the
receiving port rings, much improving event ring efficiency.

To avoid having buffered events lingering too long (or even endlessly)
in these buffers, DSW has a schema where it only allows a certain
number of enqueue/dequeue operations ('ops') to be performed, before
the buffers are flushed.

A side effect of how 'ops' are counted is that in case a port goes
idle, it will likely perform many dequeue operations to pull new work,
and thus quickly up the 'ops' to a level it's output buffers are
flushed. That will cause lower ring efficiency, but this is of no
major concern since the worker is idling anyways.

This allows for single-event enqueue operations to be efficient,
although credit system and statistics update overhead will still make
them slower than burst enqueues.

Output Buffering API Implications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The output buffering schema puts the same requirement on the
application as the migration process in that it disallows unattended ports.

In addition, DSW also implement a schema (maybe more accurately
described as a hack) where the application can force a buffer flush by
doing a zero-sized enqueue.

Alternative Approach
~~~~~~~~~~~~~~~~~~~~

An alternative to the DSW-internal output buffering is to have the
application to use burst enqueues, preferably with very large bursts
(the more cores, the larger bursts are preferred).

Depending on how the application is structured, this might well lead
to it having an internal buffer to which it does efficient,
single-event enqueue operations to, and then flushes it on a regular
basis.

However, since the DSW output buffering happens after the scheduling
is performed, the buffers can actually be flushed earlier than if
buffering happens in the application, if a large fraction of the
events are scheduled to a particular port (since the output buffer
limit is on a per-destination port basis).

In addition, since events in the output buffer are considered accepted
into DSW, migration can happen somewhat more quickly, since those
events can be flushed on migrations, as oppose to an
application-controlled buffer.

Statistics
==========

DSW supports the eventdev 'xstats' interface. It provides a large,
most port-based, set of counters, including things like port load,
number of migrations and migration latency, number of events dequeued
and enqueued, and on which queue, the average event processing latency
and a timestamp to allow the detection of unattended ports.

DSW xstats also allows reading the current global total and port
credits, making it possible to give a rough estimate of how many
events are in flight.

Performance Indications
=======================

The primary source of performance metrics comes from a test
application implementing a simulate pipeline. With zero work in each
pipe line stage, running on single socket x86_64 system, fourteen 2.4
GHz worker cores can sustain 300-400 million event/s. With a pipeline
with 1000 clock cycles of work per stage, the average event device
overhead is somewhere 50-150 clock cycles/event.

The benchmark is run when the system is fully loaded (i.e. there are
always events available on the pipeline ingress), and thus the event
device will benefit from batching effects, which are crucial for
performance. Also beneficial for DSW efficiency is the fact that the
"dummy" application work cycles has a very small memory working set,
leaving all the caches to DSW.

The simulated load has flows with a fixed event rate, causing very few
migrations - and more importantly - allows DSW to provide near-ideal
load balancing. So inefficienes due to imperfect load balancing is
also not accounted for.

The flow-to-port/thread/core affinity of DSW should provide for some
caching benefits for the application, for flow-related data
structures, compared to an event device where the flows move around
the ports in a more stochastic manner.

[1] http://www.lysator.liu.se/~hofors/dsw/migration-sequence.svg

Mattias Rönnblom (10):
event/dsw: add DSW device registration and build system
event/dsw: add DSW device and queue configuration
event/dsw: add DSW port configuration
event/dsw: add support in DSW for linking/unlinking ports
event/dsw: add DSW event scheduling and device start/stop
event/dsw: add DSW port load measurements
event/dsw: add load balancing to the DSW event device
event/dsw: let DSW event device sort events on dequeue
event/dsw: implement eventdev 'xstats' counters in DSW
event/dsw: include DSW event device documentation

MAINTAINERS | 5 +
config/common_base | 5 +
doc/guides/eventdevs/dsw.rst | 96 ++
doc/guides/eventdevs/index.rst | 1 +
doc/guides/rel_notes/release_18_11.rst | 8 +
drivers/event/Makefile | 1 +
drivers/event/dsw/Makefile | 27 +
drivers/event/dsw/dsw_evdev.c | 435 ++++++
drivers/event/dsw/dsw_evdev.h | 279 ++++
drivers/event/dsw/dsw_event.c | 1253 +++++++++++++++++
drivers/event/dsw/dsw_sort.h | 48 +
drivers/event/dsw/dsw_xstats.c | 288 ++++
drivers/event/dsw/meson.build | 6 +
.../event/dsw/rte_pmd_dsw_event_version.map | 3 +
drivers/event/meson.build | 2 +-
mk/rte.app.mk | 1 +
16 files changed, 2457 insertions(+), 1 deletion(-)
create mode 100644 doc/guides/eventdevs/dsw.rst
create mode 100644 drivers/event/dsw/Makefile
create mode 100644 drivers/event/dsw/dsw_evdev.c
create mode 100644 drivers/event/dsw/dsw_evdev.h
create mode 100644 drivers/event/dsw/dsw_event.c
create mode 100644 drivers/event/dsw/dsw_sort.h
create mode 100644 drivers/event/dsw/dsw_xstats.c
create mode 100644 drivers/event/dsw/meson.build
create mode 100644 drivers/event/dsw/rte_pmd_dsw_event_version.map
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:06 UTC
Permalink
Allow queue- and device-level configuration for and retrieval of
contextual information from a DSW event device.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 87 +++++++++++++++++++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 28 +++++++++++
2 files changed, 115 insertions(+)

diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 6990bbc9e..1500d2426 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -9,6 +9,91 @@

#define EVENTDEV_NAME_DSW_PMD event_dsw

+static int
+dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
+ const struct rte_event_queue_conf *conf)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+
+ if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
+ return -ENOTSUP;
+
+ if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
+ return -ENOTSUP;
+
+ /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
+ * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
+ * the queue will only have a single serving port, no
+ * migration will ever happen, so the extra TYPE_ATOMIC
+ * migration overhead is avoided.
+ */
+ if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
+ queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
+ else /* atomic or parallel */
+ queue->schedule_type = conf->schedule_type;
+
+ queue->num_serving_ports = 0;
+
+ return 0;
+}
+
+static void
+dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
+ uint8_t queue_id __rte_unused,
+ struct rte_event_queue_conf *queue_conf)
+{
+ *queue_conf = (struct rte_event_queue_conf) {
+ .nb_atomic_flows = 4096,
+ .schedule_type = RTE_SCHED_TYPE_ATOMIC,
+ .priority = RTE_EVENT_DEV_PRIORITY_NORMAL
+ };
+}
+
+static void
+dsw_queue_release(struct rte_eventdev *dev __rte_unused,
+ uint8_t queue_id __rte_unused)
+{
+}
+
+static void
+dsw_info_get(struct rte_eventdev *dev __rte_unused,
+ struct rte_event_dev_info *info)
+{
+ *info = (struct rte_event_dev_info) {
+ .driver_name = DSW_PMD_NAME,
+ .max_event_queues = DSW_MAX_QUEUES,
+ .max_event_queue_flows = DSW_MAX_FLOWS,
+ .max_event_queue_priority_levels = 1,
+ .max_event_priority_levels = 1,
+ .max_event_ports = DSW_MAX_PORTS,
+ .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
+ .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
+ .max_num_events = DSW_MAX_EVENTS,
+ .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
+ };
+}
+
+static int
+dsw_configure(const struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+
+ dsw->num_queues = conf->nb_event_queues;
+
+ return 0;
+}
+
+static struct rte_eventdev_ops dsw_evdev_ops = {
+ .queue_setup = dsw_queue_setup,
+ .queue_def_conf = dsw_queue_def_conf,
+ .queue_release = dsw_queue_release,
+ .dev_infos_get = dsw_info_get,
+ .dev_configure = dsw_configure,
+};
+
static int
dsw_probe(struct rte_vdev_device *vdev)
{
@@ -23,6 +108,8 @@ dsw_probe(struct rte_vdev_device *vdev)
if (dev == NULL)
return -EFAULT;

+ dev->dev_ops = &dsw_evdev_ops;
+
if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;

diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 9a0f4c357..5eda8d114 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -9,8 +9,36 @@

#define DSW_PMD_NAME RTE_STR(event_dsw)

+/* Code changes are required to allow more ports. */
+#define DSW_MAX_PORTS (64)
+#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
+#define DSW_MAX_PORT_ENQUEUE_DEPTH (128)
+
+#define DSW_MAX_QUEUES (16)
+
+#define DSW_MAX_EVENTS (16384)
+
+/* Code changes are required to allow more flows than 32k. */
+#define DSW_MAX_FLOWS_BITS (15)
+#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
+#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)
+
+struct dsw_queue {
+ uint8_t schedule_type;
+ uint16_t num_serving_ports;
+};
+
struct dsw_evdev {
struct rte_eventdev_data *data;
+
+ struct dsw_queue queues[DSW_MAX_QUEUES];
+ uint8_t num_queues;
};

+static inline struct dsw_evdev *
+dsw_pmd_priv(const struct rte_eventdev *eventdev)
+{
+ return eventdev->data->dev_private;
+}
+
#endif
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:08 UTC
Permalink
Added support for linking and unlinking ports to queues in a DSW event
device.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 67 +++++++++++++++++++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 1 +
2 files changed, 68 insertions(+)

diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 91b1a2449..5dccc232a 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -2,6 +2,8 @@
* Copyright(c) 2018 Ericsson AB
*/

+#include <stdbool.h>
+
#include <rte_eventdev_pmd.h>
#include <rte_eventdev_pmd_vdev.h>

@@ -112,6 +114,69 @@ dsw_queue_release(struct rte_eventdev *dev __rte_unused,
{
}

+static void
+queue_add_port(struct dsw_queue *queue, uint16_t port_id)
+{
+ queue->serving_ports[queue->num_serving_ports] = port_id;
+ queue->num_serving_ports++;
+}
+
+static bool
+queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
+{
+ uint16_t i;
+
+ for (i = 0; i < queue->num_serving_ports; i++)
+ if (queue->serving_ports[i] == port_id) {
+ uint16_t last_idx = queue->num_serving_ports - 1;
+ if (i != last_idx)
+ queue->serving_ports[i] =
+ queue->serving_ports[last_idx];
+ queue->num_serving_ports--;
+ return true;
+ }
+ return false;
+}
+
+static int
+dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
+ const uint8_t queues[], uint16_t num, bool link)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ struct dsw_port *p = port;
+ uint16_t i;
+ uint16_t count = 0;
+
+ for (i = 0; i < num; i++) {
+ uint8_t qid = queues[i];
+ struct dsw_queue *q = &dsw->queues[qid];
+ if (link) {
+ queue_add_port(q, p->id);
+ count++;
+ } else {
+ bool removed = queue_remove_port(q, p->id);
+ if (removed)
+ count++;
+ }
+ }
+
+ return count;
+}
+
+static int
+dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
+ const uint8_t priorities[] __rte_unused, uint16_t num)
+{
+ return dsw_port_link_unlink(dev, port, queues, num, true);
+}
+
+static int
+dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
+ uint16_t num)
+{
+ return dsw_port_link_unlink(dev, port, queues, num, false);
+}
+
static void
dsw_info_get(struct rte_eventdev *dev __rte_unused,
struct rte_event_dev_info *info)
@@ -150,6 +215,8 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
.queue_setup = dsw_queue_setup,
.queue_def_conf = dsw_queue_def_conf,
.queue_release = dsw_queue_release,
+ .port_link = dsw_port_link,
+ .port_unlink = dsw_port_unlink,
.dev_infos_get = dsw_info_get,
.dev_configure = dsw_configure,
};
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 2a4f10421..ad0f857cc 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -51,6 +51,7 @@ struct dsw_port {

struct dsw_queue {
uint8_t schedule_type;
+ uint8_t serving_ports[DSW_MAX_PORTS];
uint16_t num_serving_ports;
};
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:07 UTC
Permalink
Allow port setup and release in the DSW event device.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 60 +++++++++++++++++++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 28 ++++++++++++++++
2 files changed, 88 insertions(+)

diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 1500d2426..91b1a2449 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -9,6 +9,62 @@

#define EVENTDEV_NAME_DSW_PMD event_dsw

+static int
+dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
+ const struct rte_event_port_conf *conf)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ struct dsw_port *port;
+ struct rte_event_ring *in_ring;
+ char ring_name[RTE_RING_NAMESIZE];
+
+ port = &dsw->ports[port_id];
+
+ *port = (struct dsw_port) {
+ .id = port_id,
+ .dsw = dsw,
+ .dequeue_depth = conf->dequeue_depth,
+ .enqueue_depth = conf->enqueue_depth,
+ .new_event_threshold = conf->new_event_threshold
+ };
+
+ snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
+ port_id);
+
+ in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
+ dev->data->socket_id,
+ RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+ if (in_ring == NULL)
+ return -ENOMEM;
+
+ port->in_ring = in_ring;
+
+ dev->data->ports[port_id] = port;
+
+ return 0;
+}
+
+static void
+dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
+ uint8_t port_id __rte_unused,
+ struct rte_event_port_conf *port_conf)
+{
+ *port_conf = (struct rte_event_port_conf) {
+ .new_event_threshold = 1024,
+ .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
+ .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
+ };
+}
+
+static void
+dsw_port_release(void *p)
+{
+ struct dsw_port *port = p;
+
+ rte_event_ring_free(port->in_ring);
+}
+
static int
dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
const struct rte_event_queue_conf *conf)
@@ -81,12 +137,16 @@ dsw_configure(const struct rte_eventdev *dev)
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
const struct rte_event_dev_config *conf = &dev->data->dev_conf;

+ dsw->num_ports = conf->nb_event_ports;
dsw->num_queues = conf->nb_event_queues;

return 0;
}

static struct rte_eventdev_ops dsw_evdev_ops = {
+ .port_setup = dsw_port_setup,
+ .port_def_conf = dsw_port_def_conf,
+ .port_release = dsw_port_release,
.queue_setup = dsw_queue_setup,
.queue_def_conf = dsw_queue_def_conf,
.queue_release = dsw_queue_release,
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 5eda8d114..2a4f10421 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -5,6 +5,7 @@
#ifndef _DSW_EVDEV_H_
#define _DSW_EVDEV_H_

+#include <rte_event_ring.h>
#include <rte_eventdev.h>

#define DSW_PMD_NAME RTE_STR(event_dsw)
@@ -23,6 +24,31 @@
#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)

+/* The rings are dimensioned so that all in-flight events can reside
+ * on any one of the port rings, to avoid the trouble of having to
+ * care about the case where there's no room on the destination port's
+ * input ring.
+ */
+#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)
+
+struct dsw_port {
+ uint16_t id;
+
+ /* Keeping a pointer here to avoid container_of() calls, which
+ * are expensive since they are very frequent and will result
+ * in an integer multiplication (since the port id is an index
+ * into the dsw_evdev port array).
+ */
+ struct dsw_evdev *dsw;
+
+ uint16_t dequeue_depth;
+ uint16_t enqueue_depth;
+
+ int32_t new_event_threshold;
+
+ struct rte_event_ring *in_ring __rte_cache_aligned;
+} __rte_cache_aligned;
+
struct dsw_queue {
uint8_t schedule_type;
uint16_t num_serving_ports;
@@ -31,6 +57,8 @@ struct dsw_queue {
struct dsw_evdev {
struct rte_eventdev_data *data;

+ struct dsw_port ports[DSW_MAX_PORTS];
+ uint16_t num_ports;
struct dsw_queue queues[DSW_MAX_QUEUES];
uint8_t num_queues;
};
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:09 UTC
Permalink
With this patch, the DSW event device can be started and stopped,
and also supports scheduling events between ports.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/Makefile | 2 +-
drivers/event/dsw/dsw_evdev.c | 125 ++++++++++++
drivers/event/dsw/dsw_evdev.h | 56 ++++++
drivers/event/dsw/dsw_event.c | 359 ++++++++++++++++++++++++++++++++++
drivers/event/dsw/meson.build | 2 +-
5 files changed, 542 insertions(+), 2 deletions(-)
create mode 100644 drivers/event/dsw/dsw_event.c

diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
index 5cbf488ff..6374a454e 100644
--- a/drivers/event/dsw/Makefile
+++ b/drivers/event/dsw/Makefile
@@ -21,6 +21,6 @@ LIBABIVER := 1

EXPORT_MAP := rte_pmd_dsw_event_version.map

-SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c dsw_event.c

include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 5dccc232a..40a7435be 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -6,6 +6,7 @@

#include <rte_eventdev_pmd.h>
#include <rte_eventdev_pmd_vdev.h>
+#include <rte_random.h>

#include "dsw_evdev.h"

@@ -201,10 +202,125 @@ dsw_configure(const struct rte_eventdev *dev)
{
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
const struct rte_event_dev_config *conf = &dev->data->dev_conf;
+ int32_t min_max_in_flight;

dsw->num_ports = conf->nb_event_ports;
dsw->num_queues = conf->nb_event_queues;

+ /* Avoid a situation where consumer ports are holding all the
+ * credits, without making use of them.
+ */
+ min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
+
+ dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
+
+ return 0;
+}
+
+
+static void
+initial_flow_to_port_assignment(struct dsw_evdev *dsw)
+{
+ uint8_t queue_id;
+ for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+ uint16_t flow_hash;
+ for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
+ uint8_t port_idx =
+ rte_rand() % queue->num_serving_ports;
+ uint8_t port_id =
+ queue->serving_ports[port_idx];
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ port_id;
+ }
+ }
+}
+
+static int
+dsw_start(struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+ rte_atomic32_init(&dsw->credits_on_loan);
+
+ initial_flow_to_port_assignment(dsw);
+
+ return 0;
+}
+
+static void
+dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ uint16_t i;
+
+ for (i = 0; i < buf_len; i++)
+ flush(dev_id, buf[i], flush_arg);
+}
+
+static void
+dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ uint16_t dport_id;
+
+ for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
+ if (dport_id != port->id)
+ dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
+ port->out_buffer_len[dport_id],
+ flush, flush_arg);
+}
+
+static void
+dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ struct rte_event ev;
+
+ while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
+ flush(dev_id, ev, flush_arg);
+}
+
+static void
+dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ uint16_t port_id;
+
+ if (flush == NULL)
+ return;
+
+ for (port_id = 0; port_id < dsw->num_ports; port_id++) {
+ struct dsw_port *port = &dsw->ports[port_id];
+
+ dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
+ dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
+ }
+}
+
+static void
+dsw_stop(struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ uint8_t dev_id;
+ eventdev_stop_flush_t flush;
+ void *flush_arg;
+
+ dev_id = dev->data->dev_id;
+ flush = dev->dev_ops->dev_stop_flush;
+ flush_arg = dev->data->dev_stop_flush_arg;
+
+ dsw_drain(dev_id, dsw, flush, flush_arg);
+}
+
+static int
+dsw_close(struct rte_eventdev *dev)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+ dsw->num_ports = 0;
+ dsw->num_queues = 0;
+
return 0;
}

@@ -219,6 +335,9 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
.port_unlink = dsw_port_unlink,
.dev_infos_get = dsw_info_get,
.dev_configure = dsw_configure,
+ .dev_start = dsw_start,
+ .dev_stop = dsw_stop,
+ .dev_close = dsw_close
};

static int
@@ -236,6 +355,12 @@ dsw_probe(struct rte_vdev_device *vdev)
return -EFAULT;

dev->dev_ops = &dsw_evdev_ops;
+ dev->enqueue = dsw_event_enqueue;
+ dev->enqueue_burst = dsw_event_enqueue_burst;
+ dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
+ dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
+ dev->dequeue = dsw_event_dequeue;
+ dev->dequeue_burst = dsw_event_dequeue_burst;

if (rte_eal_process_type() != RTE_PROC_PRIMARY)
return 0;
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index ad0f857cc..f8e94e4a4 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -14,6 +14,7 @@
#define DSW_MAX_PORTS (64)
#define DSW_MAX_PORT_DEQUEUE_DEPTH (128)
#define DSW_MAX_PORT_ENQUEUE_DEPTH (128)
+#define DSW_MAX_PORT_OUT_BUFFER (32)

#define DSW_MAX_QUEUES (16)

@@ -24,6 +25,24 @@
#define DSW_MAX_FLOWS (1<<(DSW_MAX_FLOWS_BITS))
#define DSW_MAX_FLOWS_MASK (DSW_MAX_FLOWS-1)

+/* Eventdev RTE_SCHED_TYPE_PARALLEL doesn't have a concept of flows,
+ * but the 'dsw' scheduler (more or less) randomly assign flow id to
+ * events on parallel queues, to be able to reuse some of the
+ * migration mechanism and scheduling logic from
+ * RTE_SCHED_TYPE_ATOMIC. By moving one of the parallel "flows" from a
+ * particular port, the likely-hood of events being scheduled to this
+ * port is reduced, and thus a kind of statistical load balancing is
+ * achieved.
+ */
+#define DSW_PARALLEL_FLOWS (1024)
+
+/* Avoid making small 'loans' from the central in-flight event credit
+ * pool, to improve efficiency.
+ */
+#define DSW_MIN_CREDIT_LOAN (64)
+#define DSW_PORT_MAX_CREDITS (2*DSW_MIN_CREDIT_LOAN)
+#define DSW_PORT_MIN_CREDITS (DSW_MIN_CREDIT_LOAN)
+
/* The rings are dimensioned so that all in-flight events can reside
* on any one of the port rings, to avoid the trouble of having to
* care about the case where there's no room on the destination port's
@@ -44,8 +63,17 @@ struct dsw_port {
uint16_t dequeue_depth;
uint16_t enqueue_depth;

+ int32_t inflight_credits;
+
int32_t new_event_threshold;

+ uint16_t pending_releases;
+
+ uint16_t next_parallel_flow_id;
+
+ uint16_t out_buffer_len[DSW_MAX_PORTS];
+ struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];
+
struct rte_event_ring *in_ring __rte_cache_aligned;
} __rte_cache_aligned;

@@ -53,6 +81,8 @@ struct dsw_queue {
uint8_t schedule_type;
uint8_t serving_ports[DSW_MAX_PORTS];
uint16_t num_serving_ports;
+
+ uint8_t flow_to_port_map[DSW_MAX_FLOWS] __rte_cache_aligned;
};

struct dsw_evdev {
@@ -62,12 +92,38 @@ struct dsw_evdev {
uint16_t num_ports;
struct dsw_queue queues[DSW_MAX_QUEUES];
uint8_t num_queues;
+ int32_t max_inflight;
+
+ rte_atomic32_t credits_on_loan __rte_cache_aligned;
};

+uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
+uint16_t dsw_event_enqueue_burst(void *port,
+ const struct rte_event events[],
+ uint16_t events_len);
+uint16_t dsw_event_enqueue_new_burst(void *port,
+ const struct rte_event events[],
+ uint16_t events_len);
+uint16_t dsw_event_enqueue_forward_burst(void *port,
+ const struct rte_event events[],
+ uint16_t events_len);
+
+uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
+uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
+ uint16_t num, uint64_t wait);
+
static inline struct dsw_evdev *
dsw_pmd_priv(const struct rte_eventdev *eventdev)
{
return eventdev->data->dev_private;
}

+#define DSW_LOG_DP(level, fmt, args...) \
+ RTE_LOG_DP(level, EVENTDEV, "[%s] %s() line %u: " fmt, \
+ DSW_PMD_NAME, \
+ __func__, __LINE__, ## args)
+
+#define DSW_LOG_DP_PORT(level, port_id, fmt, args...) \
+ DSW_LOG_DP(level, "<Port %d> " fmt, port_id, ## args)
+
#endif
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
new file mode 100644
index 000000000..4a3af8ecd
--- /dev/null
+++ b/drivers/event/dsw/dsw_event.c
@@ -0,0 +1,359 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include "dsw_evdev.h"
+
+#include <stdbool.h>
+
+#include <rte_atomic.h>
+#include <rte_random.h>
+
+static bool
+dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+ int32_t credits)
+{
+ int32_t inflight_credits = port->inflight_credits;
+ int32_t missing_credits = credits - inflight_credits;
+ int32_t total_on_loan;
+ int32_t available;
+ int32_t acquired_credits;
+ int32_t new_total_on_loan;
+
+ if (likely(missing_credits <= 0)) {
+ port->inflight_credits -= credits;
+ return true;
+ }
+
+ total_on_loan = rte_atomic32_read(&dsw->credits_on_loan);
+ available = dsw->max_inflight - total_on_loan;
+ acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
+
+ if (available < acquired_credits)
+ return false;
+
+ /* This is a race, no locks are involved, and thus some other
+ * thread can allocate tokens in between the check and the
+ * allocation.
+ */
+ new_total_on_loan = rte_atomic32_add_return(&dsw->credits_on_loan,
+ acquired_credits);
+
+ if (unlikely(new_total_on_loan > dsw->max_inflight)) {
+ /* Some other port took the last credits */
+ rte_atomic32_sub(&dsw->credits_on_loan, acquired_credits);
+ return false;
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
+ acquired_credits);
+
+ port->inflight_credits += acquired_credits;
+ port->inflight_credits -= credits;
+
+ return true;
+}
+
+static void
+dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
+ int32_t credits)
+{
+ port->inflight_credits += credits;
+
+ if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
+ int32_t leave_credits = DSW_PORT_MIN_CREDITS;
+ int32_t return_credits =
+ port->inflight_credits - leave_credits;
+
+ port->inflight_credits = leave_credits;
+
+ rte_atomic32_sub(&dsw->credits_on_loan, return_credits);
+
+ DSW_LOG_DP_PORT(DEBUG, port->id,
+ "Returned %d tokens to pool.\n",
+ return_credits);
+ }
+}
+
+static uint8_t
+dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
+{
+ struct dsw_queue *queue = &dsw->queues[queue_id];
+ uint8_t port_id;
+
+ if (queue->num_serving_ports > 1)
+ port_id = queue->flow_to_port_map[flow_hash];
+ else
+ /* A single-link queue, or atomic/ordered/parallel but
+ * with just a single serving port.
+ */
+ port_id = queue->serving_ports[0];
+
+ DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
+ "to port %d.\n", queue_id, flow_hash, port_id);
+
+ return port_id;
+}
+
+static void
+dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ uint8_t dest_port_id)
+{
+ struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
+ uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+ struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+ uint16_t enqueued = 0;
+
+ if (*buffer_len == 0)
+ return;
+
+ /* The rings are dimensioned to fit all in-flight events (even
+ * on a single ring), so looping will work.
+ */
+ do {
+ enqueued +=
+ rte_event_ring_enqueue_burst(dest_port->in_ring,
+ buffer+enqueued,
+ *buffer_len-enqueued,
+ NULL);
+ } while (unlikely(enqueued != *buffer_len));
+
+ (*buffer_len) = 0;
+}
+
+static uint16_t
+dsw_port_get_parallel_flow_id(struct dsw_port *port)
+{
+ uint16_t flow_id = port->next_parallel_flow_id;
+
+ port->next_parallel_flow_id =
+ (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
+
+ return flow_id;
+}
+
+static void
+dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ uint8_t dest_port_id, const struct rte_event *event)
+{
+ struct rte_event *buffer = source_port->out_buffer[dest_port_id];
+ uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
+
+ if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
+ dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+
+ buffer[*buffer_len] = *event;
+
+ (*buffer_len)++;
+}
+
+#define DSW_FLOW_ID_BITS (24)
+static uint16_t
+dsw_flow_id_hash(uint32_t flow_id)
+{
+ uint16_t hash = 0;
+ uint16_t offset = 0;
+
+ do {
+ hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
+ offset += DSW_MAX_FLOWS_BITS;
+ } while (offset < DSW_FLOW_ID_BITS);
+
+ return hash;
+}
+
+static void
+dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ struct rte_event event)
+{
+ uint8_t dest_port_id;
+
+ event.flow_id = dsw_port_get_parallel_flow_id(source_port);
+
+ dest_port_id = dsw_schedule(dsw, event.queue_id,
+ dsw_flow_id_hash(event.flow_id));
+
+ dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
+}
+
+static void
+dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ const struct rte_event *event)
+{
+ uint16_t flow_hash;
+ uint8_t dest_port_id;
+
+ if (unlikely(dsw->queues[event->queue_id].schedule_type ==
+ RTE_SCHED_TYPE_PARALLEL)) {
+ dsw_port_buffer_parallel(dsw, source_port, *event);
+ return;
+ }
+
+ flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
+
+ dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
+{
+ uint16_t dest_port_id;
+
+ for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
+ dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
+}
+
+uint16_t
+dsw_event_enqueue(void *port, const struct rte_event *ev)
+{
+ return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
+}
+
+static __rte_always_inline uint16_t
+dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
+ uint16_t events_len, bool op_types_known,
+ uint16_t num_new, uint16_t num_release,
+ uint16_t num_non_release)
+{
+ struct dsw_port *source_port = port;
+ struct dsw_evdev *dsw = source_port->dsw;
+ bool enough_credits;
+ uint16_t i;
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
+ "events to port %d.\n", events_len, source_port->id);
+
+ /* XXX: For performance (=ring efficiency) reasons, the
+ * scheduler relies on internal non-ring buffers instead of
+ * immediately sending the event to the destination ring. For
+ * a producer that doesn't intend to produce or consume any
+ * more events, the scheduler provides a way to flush the
+ * buffer, by means of doing an enqueue of zero events. In
+ * addition, a port cannot be left "unattended" (e.g. unused)
+ * for long periods of time, since that would stall
+ * migration. Eventdev API extensions to provide a cleaner way
+ * to archieve both of these functions should be
+ * considered.
+ */
+ if (unlikely(events_len == 0)) {
+ dsw_port_flush_out_buffers(dsw, source_port);
+ return 0;
+ }
+
+ if (unlikely(events_len > source_port->enqueue_depth))
+ events_len = source_port->enqueue_depth;
+
+ if (!op_types_known)
+ for (i = 0; i < events_len; i++) {
+ switch (events[i].op) {
+ case RTE_EVENT_OP_RELEASE:
+ num_release++;
+ break;
+ case RTE_EVENT_OP_NEW:
+ num_new++;
+ /* Falls through. */
+ default:
+ num_non_release++;
+ break;
+ }
+ }
+
+ /* Technically, we could allow the non-new events up to the
+ * first new event in the array into the system, but for
+ * simplicity reasons, we deny the whole burst if the port is
+ * above the water mark.
+ */
+ if (unlikely(num_new > 0 && rte_atomic32_read(&dsw->credits_on_loan) >
+ source_port->new_event_threshold))
+ return 0;
+
+ enough_credits = dsw_port_acquire_credits(dsw, source_port,
+ num_non_release);
+ if (unlikely(!enough_credits))
+ return 0;
+
+ source_port->pending_releases -= num_release;
+
+ for (i = 0; i < events_len; i++) {
+ const struct rte_event *event = &events[i];
+
+ if (likely(num_release == 0 ||
+ event->op != RTE_EVENT_OP_RELEASE))
+ dsw_port_buffer_event(dsw, source_port, event);
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
+ "accepted.\n", num_non_release);
+
+ return num_non_release;
+}
+
+uint16_t
+dsw_event_enqueue_burst(void *port, const struct rte_event events[],
+ uint16_t events_len)
+{
+ return dsw_event_enqueue_burst_generic(port, events, events_len, false,
+ 0, 0, 0);
+}
+
+uint16_t
+dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
+ uint16_t events_len)
+{
+ return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+ events_len, 0, events_len);
+}
+
+uint16_t
+dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
+ uint16_t events_len)
+{
+ return dsw_event_enqueue_burst_generic(port, events, events_len, true,
+ 0, 0, events_len);
+}
+
+uint16_t
+dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
+{
+ return dsw_event_dequeue_burst(port, events, 1, wait);
+}
+
+static uint16_t
+dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
+ uint16_t num)
+{
+ return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
+}
+
+uint16_t
+dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
+ uint64_t wait __rte_unused)
+{
+ struct dsw_port *source_port = port;
+ struct dsw_evdev *dsw = source_port->dsw;
+ uint16_t dequeued;
+
+ source_port->pending_releases = 0;
+
+ if (unlikely(num > source_port->dequeue_depth))
+ num = source_port->dequeue_depth;
+
+ dequeued = dsw_port_dequeue_burst(source_port, events, num);
+
+ source_port->pending_releases = dequeued;
+
+ if (dequeued > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
+ dequeued);
+
+ dsw_port_return_credits(dsw, source_port, dequeued);
+ }
+ /* XXX: Assuming the port can't produce any more work,
+ * consider flushing the output buffer, on dequeued ==
+ * 0.
+ */
+
+ return dequeued;
+}
diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build
index 275d051c3..bd2e4c809 100644
--- a/drivers/event/dsw/meson.build
+++ b/drivers/event/dsw/meson.build
@@ -3,4 +3,4 @@

allow_experimental_apis = true
deps += ['bus_vdev']
-sources = files('dsw_evdev.c')
+sources = files('dsw_evdev.c', 'dsw_event.c')
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:11 UTC
Permalink
The DSW event device will now attempt to migrate (move) flows between
ports in order to balance the load.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 27 ++
drivers/event/dsw/dsw_evdev.h | 80 ++++
drivers/event/dsw/dsw_event.c | 735 +++++++++++++++++++++++++++++++++-
3 files changed, 838 insertions(+), 4 deletions(-)

diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index bcfa17bab..2ecb365ba 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -20,6 +20,7 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
struct dsw_port *port;
struct rte_event_ring *in_ring;
+ struct rte_ring *ctl_in_ring;
char ring_name[RTE_RING_NAMESIZE];

port = &dsw->ports[port_id];
@@ -42,13 +43,29 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
if (in_ring == NULL)
return -ENOMEM;

+ snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
+ dev->data->dev_id, port_id);
+
+ ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
+ dev->data->socket_id,
+ RING_F_SC_DEQ|RING_F_EXACT_SZ);
+
+ if (ctl_in_ring == NULL) {
+ rte_event_ring_free(in_ring);
+ return -ENOMEM;
+ }
+
port->in_ring = in_ring;
+ port->ctl_in_ring = ctl_in_ring;

rte_atomic16_init(&port->load);

port->load_update_interval =
(DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;

+ port->migration_interval =
+ (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
dev->data->ports[port_id] = port;

return 0;
@@ -72,6 +89,7 @@ dsw_port_release(void *p)
struct dsw_port *port = p;

rte_event_ring_free(port->in_ring);
+ rte_ring_free(port->ctl_in_ring);
}

static int
@@ -272,6 +290,14 @@ dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
flush(dev_id, buf[i], flush_arg);
}

+static void
+dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
+ eventdev_stop_flush_t flush, void *flush_arg)
+{
+ dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
+ flush, flush_arg);
+}
+
static void
dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
eventdev_stop_flush_t flush, void *flush_arg)
@@ -308,6 +334,7 @@ dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
struct dsw_port *port = &dsw->ports[port_id];

dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
+ dsw_port_drain_paused(dev_id, port, flush, flush_arg);
dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
}
}
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index a5399dda5..783c418bf 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -73,7 +73,37 @@
#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
#define DSW_OLD_LOAD_WEIGHT (1)

+/* The minimum time (in us) between two flow migrations. What puts an
+ * upper limit on the actual migration rate is primarily the pace in
+ * which the ports send and receive control messages, which in turn is
+ * largely a function of how much cycles are spent the processing of
+ * an event burst.
+ */
#define DSW_MIGRATION_INTERVAL (1000)
+#define DSW_MIN_SOURCE_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(70))
+#define DSW_MAX_TARGET_LOAD_FOR_MIGRATION (DSW_LOAD_FROM_PERCENT(95))
+
+#define DSW_MAX_EVENTS_RECORDED (128)
+
+/* Only one outstanding migration per port is allowed */
+#define DSW_MAX_PAUSED_FLOWS (DSW_MAX_PORTS)
+
+/* Enough room for paus request/confirm and unpaus request/confirm for
+ * all possible senders.
+ */
+#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)
+
+struct dsw_queue_flow {
+ uint8_t queue_id;
+ uint16_t flow_hash;
+};
+
+enum dsw_migration_state {
+ DSW_MIGRATION_STATE_IDLE,
+ DSW_MIGRATION_STATE_PAUSING,
+ DSW_MIGRATION_STATE_FORWARDING,
+ DSW_MIGRATION_STATE_UNPAUSING
+};

struct dsw_port {
uint16_t id;
@@ -98,6 +128,7 @@ struct dsw_port {

uint16_t ops_since_bg_task;

+ /* most recent 'background' processing */
uint64_t last_bg;

/* For port load measurement. */
@@ -108,11 +139,46 @@ struct dsw_port {
uint64_t busy_cycles;
uint64_t total_busy_cycles;

+ /* For the ctl interface and flow migration mechanism. */
+ uint64_t next_migration;
+ uint64_t migration_interval;
+ enum dsw_migration_state migration_state;
+
+ uint64_t migration_start;
+ uint64_t migrations;
+ uint64_t migration_latency;
+
+ uint8_t migration_target_port_id;
+ struct dsw_queue_flow migration_target_qf;
+ uint8_t cfm_cnt;
+
+ uint16_t paused_flows_len;
+ struct dsw_queue_flow paused_flows[DSW_MAX_PAUSED_FLOWS];
+
+ /* In a very contrived worst case all inflight events can be
+ * laying around paused here.
+ */
+ uint16_t paused_events_len;
+ struct rte_event paused_events[DSW_MAX_EVENTS];
+
+ uint16_t seen_events_len;
+ uint16_t seen_events_idx;
+ struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];

+ uint16_t in_buffer_len;
+ uint16_t in_buffer_start;
+ /* This buffer may contain events that were read up from the
+ * in_ring during the flow migration process.
+ */
+ struct rte_event in_buffer[DSW_MAX_EVENTS];
+
struct rte_event_ring *in_ring __rte_cache_aligned;

+ struct rte_ring *ctl_in_ring __rte_cache_aligned;
+
/* Estimate of current port load. */
rte_atomic16_t load __rte_cache_aligned;
} __rte_cache_aligned;
@@ -137,6 +203,20 @@ struct dsw_evdev {
rte_atomic32_t credits_on_loan __rte_cache_aligned;
};

+#define DSW_CTL_PAUS_REQ (0)
+#define DSW_CTL_UNPAUS_REQ (1)
+#define DSW_CTL_CFM (2)
+
+/* sizeof(struct dsw_ctl_msg) must be equal or less than
+ * sizeof(void *), to fit on the control ring.
+ */
+struct dsw_ctl_msg {
+ uint8_t type:2;
+ uint8_t originating_port_id:6;
+ uint8_t queue_id;
+ uint16_t flow_hash;
+} __rte_packed;
+
uint16_t dsw_event_enqueue(void *port, const struct rte_event *event);
uint16_t dsw_event_enqueue_burst(void *port,
const struct rte_event events[],
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index f326147c9..f0347592d 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -5,9 +5,11 @@
#include "dsw_evdev.h"

#include <stdbool.h>
+#include <string.h>

#include <rte_atomic.h>
#include <rte_cycles.h>
+#include <rte_memcpy.h>
#include <rte_random.h>

static bool
@@ -140,6 +142,269 @@ dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
dsw_port_load_update(port, now);
}

+static void
+dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+ void *raw_msg;
+
+ memcpy(&raw_msg, msg, sizeof(*msg));
+
+ /* there's always room on the ring */
+ while (rte_ring_enqueue(port->ctl_in_ring, raw_msg) != 0)
+ rte_pause();
+}
+
+static int
+dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
+{
+ void *raw_msg;
+ int rc;
+
+ rc = rte_ring_dequeue(port->ctl_in_ring, &raw_msg);
+
+ if (rc == 0)
+ memcpy(msg, &raw_msg, sizeof(*msg));
+
+ return rc;
+}
+
+static void
+dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
+ uint8_t type, uint8_t queue_id, uint16_t flow_hash)
+{
+ uint16_t port_id;
+ struct dsw_ctl_msg msg = {
+ .type = type,
+ .originating_port_id = source_port->id,
+ .queue_id = queue_id,
+ .flow_hash = flow_hash
+ };
+
+ for (port_id = 0; port_id < dsw->num_ports; port_id++)
+ if (port_id != source_port->id)
+ dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
+}
+
+static bool
+dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ uint16_t i;
+
+ for (i = 0; i < port->paused_flows_len; i++) {
+ struct dsw_queue_flow *qf = &port->paused_flows[i];
+ if (qf->queue_id == queue_id &&
+ qf->flow_hash == flow_hash)
+ return true;
+ }
+ return false;
+}
+
+static void
+dsw_port_add_paused_flow(struct dsw_port *port, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ port->paused_flows[port->paused_flows_len] = (struct dsw_queue_flow) {
+ .queue_id = queue_id,
+ .flow_hash = paused_flow_hash
+ };
+ port->paused_flows_len++;
+}
+
+static void
+dsw_port_remove_paused_flow(struct dsw_port *port, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ uint16_t i;
+
+ for (i = 0; i < port->paused_flows_len; i++) {
+ struct dsw_queue_flow *qf = &port->paused_flows[i];
+
+ if (qf->queue_id == queue_id &&
+ qf->flow_hash == paused_flow_hash) {
+ uint16_t last_idx = port->paused_flows_len-1;
+ if (i != last_idx)
+ port->paused_flows[i] =
+ port->paused_flows[last_idx];
+ port->paused_flows_len--;
+ break;
+ }
+ }
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_handle_pause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ struct dsw_ctl_msg cfm = {
+ .type = DSW_CTL_CFM,
+ .originating_port_id = port->id,
+ .queue_id = queue_id,
+ .flow_hash = paused_flow_hash
+ };
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Pausing queue_id %d flow_hash %d.\n",
+ queue_id, paused_flow_hash);
+
+ /* There might be already-scheduled events belonging to the
+ * paused flow in the output buffers.
+ */
+ dsw_port_flush_out_buffers(dsw, port);
+
+ dsw_port_add_paused_flow(port, queue_id, paused_flow_hash);
+
+ /* Make sure any stores to the original port's in_ring is seen
+ * before the ctl message.
+ */
+ rte_smp_wmb();
+
+ dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+}
+
+static void
+dsw_find_lowest_load_port(uint8_t *port_ids, uint16_t num_port_ids,
+ uint8_t exclude_port_id, int16_t *port_loads,
+ uint8_t *target_port_id, int16_t *target_load)
+{
+ int16_t candidate_port_id = -1;
+ int16_t candidate_load = DSW_MAX_LOAD;
+ uint16_t i;
+
+ for (i = 0; i < num_port_ids; i++) {
+ uint8_t port_id = port_ids[i];
+ if (port_id != exclude_port_id) {
+ int16_t load = port_loads[port_id];
+ if (candidate_port_id == -1 ||
+ load < candidate_load) {
+ candidate_port_id = port_id;
+ candidate_load = load;
+ }
+ }
+ }
+ *target_port_id = candidate_port_id;
+ *target_load = candidate_load;
+}
+
+struct dsw_queue_flow_burst {
+ struct dsw_queue_flow queue_flow;
+ uint16_t count;
+};
+
+static inline int
+dsw_cmp_burst(const void *v_burst_a, const void *v_burst_b)
+{
+ const struct dsw_queue_flow_burst *burst_a = v_burst_a;
+ const struct dsw_queue_flow_burst *burst_b = v_burst_b;
+
+ int a_count = burst_a->count;
+ int b_count = burst_b->count;
+
+ return a_count - b_count;
+}
+
+#define DSW_QF_TO_INT(_qf) \
+ ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
+
+static inline int
+dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
+{
+ const struct dsw_queue_flow *qf_a = v_qf_a;
+ const struct dsw_queue_flow *qf_b = v_qf_b;
+
+ return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
+}
+
+static uint16_t
+dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
+ struct dsw_queue_flow_burst *bursts)
+{
+ uint16_t i;
+ struct dsw_queue_flow_burst *current_burst = NULL;
+ uint16_t num_bursts = 0;
+
+ /* We don't need the stable property, and the list is likely
+ * large enough for qsort() to outperform dsw_stable_sort(),
+ * so we use qsort() here.
+ */
+ qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
+
+ /* arrange the (now-consecutive) events into bursts */
+ for (i = 0; i < qfs_len; i++) {
+ if (i == 0 ||
+ dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
+ current_burst = &bursts[num_bursts];
+ current_burst->queue_flow = qfs[i];
+ current_burst->count = 0;
+ num_bursts++;
+ }
+ current_burst->count++;
+ }
+
+ qsort(bursts, num_bursts, sizeof(bursts[0]), dsw_cmp_burst);
+
+ return num_bursts;
+}
+
+static bool
+dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
+ int16_t load_limit)
+{
+ bool below_limit = false;
+ uint16_t i;
+
+ for (i = 0; i < dsw->num_ports; i++) {
+ int16_t load = rte_atomic16_read(&dsw->ports[i].load);
+ if (load < load_limit)
+ below_limit = true;
+ port_loads[i] = load;
+ }
+ return below_limit;
+}
+
+static bool
+dsw_select_migration_target(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ struct dsw_queue_flow_burst *bursts,
+ uint16_t num_bursts, int16_t *port_loads,
+ int16_t max_load, struct dsw_queue_flow *target_qf,
+ uint8_t *target_port_id)
+{
+ uint16_t source_load = port_loads[source_port->id];
+ uint16_t i;
+
+ for (i = 0; i < num_bursts; i++) {
+ struct dsw_queue_flow *qf = &bursts[i].queue_flow;
+
+ if (dsw_port_is_flow_paused(source_port, qf->queue_id,
+ qf->flow_hash))
+ continue;
+
+ struct dsw_queue *queue = &dsw->queues[qf->queue_id];
+ int16_t target_load;
+
+ dsw_find_lowest_load_port(queue->serving_ports,
+ queue->num_serving_ports,
+ source_port->id, port_loads,
+ target_port_id, &target_load);
+
+ if (target_load < source_load &&
+ target_load < max_load) {
+ *target_qf = *qf;
+ return true;
+ }
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "For the %d flows considered, "
+ "no target port found with load less than %d.\n",
+ num_bursts, DSW_LOAD_TO_PERCENT(max_load));
+
+ return false;
+}
+
static uint8_t
dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
{
@@ -197,6 +462,14 @@ dsw_port_get_parallel_flow_id(struct dsw_port *port)
return flow_id;
}

+static void
+dsw_port_buffer_paused(struct dsw_port *port,
+ const struct rte_event *paused_event)
+{
+ port->paused_events[port->paused_events_len] = *paused_event;
+ port->paused_events_len++;
+}
+
static void
dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
uint8_t dest_port_id, const struct rte_event *event)
@@ -256,11 +529,401 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,

flow_hash = dsw_flow_id_hash(event->flow_id);

+ if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
+ flow_hash))) {
+ dsw_port_buffer_paused(source_port, event);
+ return;
+ }
+
dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);

dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
}

+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint8_t queue_id, uint16_t paused_flow_hash)
+{
+ uint16_t paused_events_len = source_port->paused_events_len;
+ struct rte_event paused_events[paused_events_len];
+ uint8_t dest_port_id;
+ uint16_t i;
+
+ if (paused_events_len == 0)
+ return;
+
+ if (dsw_port_is_flow_paused(source_port, queue_id, paused_flow_hash))
+ return;
+
+ rte_memcpy(paused_events, source_port->paused_events,
+ paused_events_len * sizeof(struct rte_event));
+
+ source_port->paused_events_len = 0;
+
+ dest_port_id = dsw_schedule(dsw, queue_id, paused_flow_hash);
+
+ for (i = 0; i < paused_events_len; i++) {
+ struct rte_event *event = &paused_events[i];
+ uint16_t flow_hash;
+
+ flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ if (event->queue_id == queue_id &&
+ flow_hash == paused_flow_hash)
+ dsw_port_buffer_non_paused(dsw, source_port,
+ dest_port_id, event);
+ else
+ dsw_port_buffer_paused(source_port, event);
+ }
+}
+
+static void
+dsw_port_migration_stats(struct dsw_port *port)
+{
+ uint64_t migration_latency;
+
+ migration_latency = (rte_get_timer_cycles() - port->migration_start);
+ port->migration_latency += migration_latency;
+ port->migrations++;
+}
+
+static void
+dsw_port_end_migration(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ uint8_t queue_id = port->migration_target_qf.queue_id;
+ uint16_t flow_hash = port->migration_target_qf.flow_hash;
+
+ port->migration_state = DSW_MIGRATION_STATE_IDLE;
+ port->seen_events_len = 0;
+
+ dsw_port_migration_stats(port);
+
+ if (dsw->queues[queue_id].schedule_type != RTE_SCHED_TYPE_PARALLEL) {
+ dsw_port_remove_paused_flow(port, queue_id, flow_hash);
+ dsw_port_flush_paused_events(dsw, port, queue_id, flow_hash);
+ }
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for queue_id "
+ "%d flow_hash %d.\n", queue_id, flow_hash);
+}
+
+static void
+dsw_port_consider_migration(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint64_t now)
+{
+ bool any_port_below_limit;
+ struct dsw_queue_flow *seen_events = source_port->seen_events;
+ uint16_t seen_events_len = source_port->seen_events_len;
+ struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
+ uint16_t num_bursts;
+ int16_t source_port_load;
+ int16_t port_loads[dsw->num_ports];
+
+ if (now < source_port->next_migration)
+ return;
+
+ if (dsw->num_ports == 1)
+ return;
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering migration.\n");
+
+ /* Randomize interval to avoid having all threads considering
+ * migration at the same in point in time, which might lead to
+ * all choosing the same target port.
+ */
+ source_port->next_migration = now +
+ source_port->migration_interval / 2 +
+ rte_rand() % source_port->migration_interval;
+
+ if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Migration already in progress.\n");
+ return;
+ }
+
+ /* For simplicity, avoid migration in the unlikely case there
+ * is still events to consume in the in_buffer (from the last
+ * migration).
+ */
+ if (source_port->in_buffer_len > 0) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
+ "events in the input buffer.\n");
+ return;
+ }
+
+ source_port_load = rte_atomic16_read(&source_port->load);
+ if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Load %d is below threshold level %d.\n",
+ DSW_LOAD_TO_PERCENT(source_port_load),
+ DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
+ return;
+ }
+
+ /* Avoid starting any expensive operations (sorting etc), in
+ * case of a scenario with all ports above the load limit.
+ */
+ any_port_below_limit =
+ dsw_retrieve_port_loads(dsw, port_loads,
+ DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
+ if (!any_port_below_limit) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id,
+ "Candidate target ports are all too highly "
+ "loaded.\n");
+ return;
+ }
+
+ /* Sort flows into 'bursts' to allow attempting to migrating
+ * small (but still active) flows first - this it to avoid
+ * having large flows moving around the worker cores too much
+ * (to avoid cache misses, among other things). Of course, the
+ * number of recorded events (queue+flow ids) are limited, and
+ * provides only a snapshot, so only so many conclusions can
+ * be drawn from this data.
+ */
+ num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
+ bursts);
+ /* For non-big-little systems, there's no point in moving the
+ * only (known) flow.
+ */
+ if (num_bursts < 2) {
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
+ "queue_id %d flow_hash %d has been seen.\n",
+ bursts[0].queue_flow.queue_id,
+ bursts[0].queue_flow.flow_hash);
+ return;
+ }
+
+ /* The strategy is to first try to find a flow to move to a
+ * port with low load (below the migration-attempt
+ * threshold). If that fails, we try to find a port which is
+ * below the max threshold, and also less loaded than this
+ * port is.
+ */
+ if (!dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+ port_loads,
+ DSW_MIN_SOURCE_LOAD_FOR_MIGRATION,
+ &source_port->migration_target_qf,
+ &source_port->migration_target_port_id)
+ &&
+ !dsw_select_migration_target(dsw, source_port, bursts, num_bursts,
+ port_loads,
+ DSW_MAX_TARGET_LOAD_FOR_MIGRATION,
+ &source_port->migration_target_qf,
+ &source_port->migration_target_port_id))
+ return;
+
+ DSW_LOG_DP_PORT(DEBUG, source_port->id, "Migrating queue_id %d "
+ "flow_hash %d from port %d to port %d.\n",
+ source_port->migration_target_qf.queue_id,
+ source_port->migration_target_qf.flow_hash,
+ source_port->id, source_port->migration_target_port_id);
+
+ /* We have a winner. */
+
+ source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
+ source_port->migration_start = rte_get_timer_cycles();
+
+ /* No need to go through the whole pause procedure for
+ * parallel queues, since atomic/ordered semantics need not to
+ * be maintained.
+ */
+
+ if (dsw->queues[source_port->migration_target_qf.queue_id].schedule_type
+ == RTE_SCHED_TYPE_PARALLEL) {
+ uint8_t queue_id = source_port->migration_target_qf.queue_id;
+ uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+ uint8_t dest_port_id = source_port->migration_target_port_id;
+
+ /* Single byte-sized stores are always atomic. */
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ dest_port_id;
+ rte_smp_wmb();
+
+ dsw_port_end_migration(dsw, source_port);
+
+ return;
+ }
+
+ /* There might be 'loopback' events already scheduled in the
+ * output buffers.
+ */
+ dsw_port_flush_out_buffers(dsw, source_port);
+
+ dsw_port_add_paused_flow(source_port,
+ source_port->migration_target_qf.queue_id,
+ source_port->migration_target_qf.flow_hash);
+
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
+ source_port->migration_target_qf.queue_id,
+ source_port->migration_target_qf.flow_hash);
+ source_port->cfm_cnt = 0;
+}
+
+static void
+dsw_port_flush_paused_events(struct dsw_evdev *dsw,
+ struct dsw_port *source_port,
+ uint8_t queue_id, uint16_t paused_flow_hash);
+
+static void
+dsw_port_handle_unpause_flow(struct dsw_evdev *dsw, struct dsw_port *port,
+ uint8_t originating_port_id, uint8_t queue_id,
+ uint16_t paused_flow_hash)
+{
+ struct dsw_ctl_msg cfm = {
+ .type = DSW_CTL_CFM,
+ .originating_port_id = port->id,
+ .queue_id = queue_id,
+ .flow_hash = paused_flow_hash
+ };
+
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Un-pausing queue_id %d flow_hash %d.\n",
+ queue_id, paused_flow_hash);
+
+ dsw_port_remove_paused_flow(port, queue_id, paused_flow_hash);
+
+ rte_smp_rmb();
+
+ dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
+
+ dsw_port_flush_paused_events(dsw, port, queue_id, paused_flow_hash);
+}
+
+#define FORWARD_BURST_SIZE (32)
+
+static void
+dsw_port_forward_migrated_flow(struct dsw_port *source_port,
+ struct rte_event_ring *dest_ring,
+ uint8_t queue_id,
+ uint16_t flow_hash)
+{
+ uint16_t events_left;
+
+ /* Control ring message should been seen before the ring count
+ * is read on the port's in_ring.
+ */
+ rte_smp_rmb();
+
+ events_left = rte_event_ring_count(source_port->in_ring);
+
+ while (events_left > 0) {
+ uint16_t in_burst_size =
+ RTE_MIN(FORWARD_BURST_SIZE, events_left);
+ struct rte_event in_burst[in_burst_size];
+ uint16_t in_len;
+ uint16_t i;
+
+ in_len = rte_event_ring_dequeue_burst(source_port->in_ring,
+ in_burst,
+ in_burst_size, NULL);
+ /* No need to care about bursting forwarded events (to
+ * the destination port's in_ring), since migration
+ * doesn't happen very often, and also the majority of
+ * the dequeued events will likely *not* be forwarded.
+ */
+ for (i = 0; i < in_len; i++) {
+ struct rte_event *e = &in_burst[i];
+ if (e->queue_id == queue_id &&
+ dsw_flow_id_hash(e->flow_id) == flow_hash) {
+ while (rte_event_ring_enqueue_burst(dest_ring,
+ e, 1,
+ NULL) != 1)
+ rte_pause();
+ } else {
+ uint16_t last_idx = source_port->in_buffer_len;
+ source_port->in_buffer[last_idx] = *e;
+ source_port->in_buffer_len++;
+ }
+ }
+
+ events_left -= in_len;
+ }
+}
+
+static void
+dsw_port_move_migrating_flow(struct dsw_evdev *dsw,
+ struct dsw_port *source_port)
+{
+ uint8_t queue_id = source_port->migration_target_qf.queue_id;
+ uint16_t flow_hash = source_port->migration_target_qf.flow_hash;
+ uint8_t dest_port_id = source_port->migration_target_port_id;
+ struct dsw_port *dest_port = &dsw->ports[dest_port_id];
+
+ dsw_port_flush_out_buffers(dsw, source_port);
+
+ rte_smp_wmb();
+
+ dsw->queues[queue_id].flow_to_port_map[flow_hash] =
+ dest_port_id;
+
+ dsw_port_forward_migrated_flow(source_port, dest_port->in_ring,
+ queue_id, flow_hash);
+
+ /* Flow table update and migration destination port's enqueues
+ * must be seen before the control message.
+ */
+ rte_smp_wmb();
+
+ dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ, queue_id,
+ flow_hash);
+ source_port->cfm_cnt = 0;
+ source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
+}
+
+static void
+dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ port->cfm_cnt++;
+
+ if (port->cfm_cnt == (dsw->num_ports-1)) {
+ switch (port->migration_state) {
+ case DSW_MIGRATION_STATE_PAUSING:
+ DSW_LOG_DP_PORT(DEBUG, port->id, "Going into forwarding "
+ "migration state.\n");
+ port->migration_state = DSW_MIGRATION_STATE_FORWARDING;
+ break;
+ case DSW_MIGRATION_STATE_UNPAUSING:
+ dsw_port_end_migration(dsw, port);
+ break;
+ default:
+ RTE_ASSERT(0);
+ break;
+ }
+ }
+}
+
+static void
+dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ struct dsw_ctl_msg msg;
+
+ /* So any table loads happens before the ring dequeue, in the
+ * case of a 'paus' message.
+ */
+ rte_smp_rmb();
+
+ if (dsw_port_ctl_dequeue(port, &msg) == 0) {
+ switch (msg.type) {
+ case DSW_CTL_PAUS_REQ:
+ dsw_port_handle_pause_flow(dsw, port,
+ msg.originating_port_id,
+ msg.queue_id, msg.flow_hash);
+ break;
+ case DSW_CTL_UNPAUS_REQ:
+ dsw_port_handle_unpause_flow(dsw, port,
+ msg.originating_port_id,
+ msg.queue_id,
+ msg.flow_hash);
+ break;
+ case DSW_CTL_CFM:
+ dsw_port_handle_confirm(dsw, port);
+ break;
+ }
+ }
+}
+
static void
dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
{
@@ -270,12 +933,24 @@ dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
port->ops_since_bg_task += (num_events+1);
}

-static void
-dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
-
static void
dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
{
+ if (unlikely(port->migration_state == DSW_MIGRATION_STATE_FORWARDING &&
+ port->pending_releases == 0))
+ dsw_port_move_migrating_flow(dsw, port);
+
+ /* Polling the control ring is relatively inexpensive, and
+ * polling it often helps bringing down migration latency, so
+ * do this for every iteration.
+ */
+ dsw_port_ctl_process(dsw, port);
+
+ /* To avoid considering migration and flushing output buffers
+ * on every dequeue/enqueue call, the scheduler only performs
+ * such 'background' tasks every nth
+ * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
+ */
if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
uint64_t now;

@@ -290,6 +965,8 @@ dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)

dsw_port_consider_load_update(port, now);

+ dsw_port_consider_migration(dsw, port, now);
+
port->ops_since_bg_task = 0;
}
}
@@ -339,7 +1016,6 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
*/
if (unlikely(events_len == 0)) {
dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
- dsw_port_flush_out_buffers(dsw, source_port);
return 0;
}

@@ -423,10 +1099,52 @@ dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
return dsw_event_dequeue_burst(port, events, 1, wait);
}

+static void
+dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
+ uint16_t num)
+{
+ uint16_t i;
+
+ for (i = 0; i < num; i++) {
+ uint16_t l_idx = port->seen_events_idx;
+ struct dsw_queue_flow *qf = &port->seen_events[l_idx];
+ struct rte_event *event = &events[i];
+ qf->queue_id = event->queue_id;
+ qf->flow_hash = dsw_flow_id_hash(event->flow_id);
+
+ port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+ }
+
+ if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
+ port->seen_events_len =
+ RTE_MIN(port->seen_events_len + num,
+ DSW_MAX_EVENTS_RECORDED);
+}
+
static uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
{
+ struct dsw_port *source_port = port;
+ struct dsw_evdev *dsw = source_port->dsw;
+
+ dsw_port_ctl_process(dsw, source_port);
+
+ if (unlikely(port->in_buffer_len > 0)) {
+ uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
+
+ rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
+ dequeued * sizeof(struct rte_event));
+
+ port->in_buffer_start += dequeued;
+ port->in_buffer_len -= dequeued;
+
+ if (port->in_buffer_len == 0)
+ port->in_buffer_start = 0;
+
+ return dequeued;
+ }
+
return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
}

@@ -458,6 +1176,15 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
dequeued);

dsw_port_return_credits(dsw, source_port, dequeued);
+
+ /* One potential optimization one might think of is to
+ * add a migration state (prior to 'pausing'), and
+ * only record seen events when the port is in this
+ * state (and transit to 'pausing' when enough events
+ * have been gathered). However, that schema doesn't
+ * seem to improve performance.
+ */
+ dsw_port_record_seen_events(port, events, dequeued);
}
/* XXX: Assuming the port can't produce any more work,
* consider flushing the output buffer, on dequeued ==
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:05 UTC
Permalink
This patch contains the Meson and GNU Make build system extensions
required for the Distributed Event Device, and also the initialization
code for the driver itself.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
config/common_base | 5 ++
drivers/event/Makefile | 1 +
drivers/event/dsw/Makefile | 26 ++++++++++
drivers/event/dsw/dsw_evdev.c | 52 +++++++++++++++++++
drivers/event/dsw/dsw_evdev.h | 16 ++++++
drivers/event/dsw/meson.build | 6 +++
.../event/dsw/rte_pmd_dsw_event_version.map | 3 ++
drivers/event/meson.build | 2 +-
mk/rte.app.mk | 1 +
9 files changed, 111 insertions(+), 1 deletion(-)
create mode 100644 drivers/event/dsw/Makefile
create mode 100644 drivers/event/dsw/dsw_evdev.c
create mode 100644 drivers/event/dsw/dsw_evdev.h
create mode 100644 drivers/event/dsw/meson.build
create mode 100644 drivers/event/dsw/rte_pmd_dsw_event_version.map

diff --git a/config/common_base b/config/common_base
index 4bcbaf923..c43f5139d 100644
--- a/config/common_base
+++ b/config/common_base
@@ -614,6 +614,11 @@ CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV_DEBUG=n
#
CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV=y

+#
+# Compile PMD for distributed software event device
+#
+CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV=y
+
#
# Compile PMD for octeontx sso event device
#
diff --git a/drivers/event/Makefile b/drivers/event/Makefile
index f301d8dc2..03ad1b6cb 100644
--- a/drivers/event/Makefile
+++ b/drivers/event/Makefile
@@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk

DIRS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += skeleton
DIRS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += sw
+DIRS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw
DIRS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += octeontx
ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
DIRS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += dpaa
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
new file mode 100644
index 000000000..5cbf488ff
--- /dev/null
+++ b/drivers/event/dsw/Makefile
@@ -0,0 +1,26 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+LIB = librte_pmd_dsw_event.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -Wno-format-nonliteral
+
+LDLIBS += -lrte_eal
+LDLIBS += -lrte_mbuf
+LDLIBS += -lrte_mempool
+LDLIBS += -lrte_ring
+LDLIBS += -lrte_eventdev
+LDLIBS += -lrte_bus_vdev
+
+LIBABIVER := 1
+
+EXPORT_MAP := rte_pmd_dsw_event_version.map
+
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
new file mode 100644
index 000000000..6990bbc9e
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -0,0 +1,52 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include <rte_eventdev_pmd.h>
+#include <rte_eventdev_pmd_vdev.h>
+
+#include "dsw_evdev.h"
+
+#define EVENTDEV_NAME_DSW_PMD event_dsw
+
+static int
+dsw_probe(struct rte_vdev_device *vdev)
+{
+ const char *name;
+ struct rte_eventdev *dev;
+ struct dsw_evdev *dsw;
+
+ name = rte_vdev_device_name(vdev);
+
+ dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
+ rte_socket_id());
+ if (dev == NULL)
+ return -EFAULT;
+
+ if (rte_eal_process_type() != RTE_PROC_PRIMARY)
+ return 0;
+
+ dsw = dev->data->dev_private;
+ dsw->data = dev->data;
+
+ return 0;
+}
+
+static int
+dsw_remove(struct rte_vdev_device *vdev)
+{
+ const char *name;
+
+ name = rte_vdev_device_name(vdev);
+ if (name == NULL)
+ return -EINVAL;
+
+ return rte_event_pmd_vdev_uninit(name);
+}
+
+static struct rte_vdev_driver evdev_dsw_pmd_drv = {
+ .probe = dsw_probe,
+ .remove = dsw_remove
+};
+
+RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
new file mode 100644
index 000000000..9a0f4c357
--- /dev/null
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -0,0 +1,16 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_EVDEV_H_
+#define _DSW_EVDEV_H_
+
+#include <rte_eventdev.h>
+
+#define DSW_PMD_NAME RTE_STR(event_dsw)
+
+struct dsw_evdev {
+ struct rte_eventdev_data *data;
+};
+
+#endif
diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build
new file mode 100644
index 000000000..275d051c3
--- /dev/null
+++ b/drivers/event/dsw/meson.build
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+allow_experimental_apis = true
+deps += ['bus_vdev']
+sources = files('dsw_evdev.c')
diff --git a/drivers/event/dsw/rte_pmd_dsw_event_version.map b/drivers/event/dsw/rte_pmd_dsw_event_version.map
new file mode 100644
index 000000000..24bd5cdb3
--- /dev/null
+++ b/drivers/event/dsw/rte_pmd_dsw_event_version.map
@@ -0,0 +1,3 @@
+DPDK_18.11 {
+ local: *;
+};
diff --git a/drivers/event/meson.build b/drivers/event/meson.build
index e95119935..4b9fed22b 100644
--- a/drivers/event/meson.build
+++ b/drivers/event/meson.build
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2017 Intel Corporation

-drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw']
+drivers = ['dpaa', 'dpaa2', 'octeontx', 'skeleton', 'sw', 'dsw']
std_deps = ['eventdev', 'kvargs']
config_flag_fmt = '***@0@_EVENTDEV_PMD'
driver_name_fmt = '***@0@_event'
diff --git a/mk/rte.app.mk b/mk/rte.app.mk
index de33883be..682e224eb 100644
--- a/mk/rte.app.mk
+++ b/mk/rte.app.mk
@@ -236,6 +236,7 @@ endif # CONFIG_RTE_LIBRTE_COMPRESSDEV
ifeq ($(CONFIG_RTE_LIBRTE_EVENTDEV),y)
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SKELETON_EVENTDEV) += -lrte_pmd_skeleton_event
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_SW_EVENTDEV) += -lrte_pmd_sw_event
+_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += -lrte_pmd_dsw_event
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_OCTEONTX_SSOVF) += -lrte_pmd_octeontx_ssovf
ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
_LDLIBS-$(CONFIG_RTE_LIBRTE_PMD_DPAA_EVENTDEV) += -lrte_pmd_dpaa_event
--
2.17.1
Ferruh Yigit
2018-10-02 13:43:18 UTC
Permalink
Post by Mattias Rönnblom
This patch contains the Meson and GNU Make build system extensions
required for the Distributed Event Device, and also the initialization
code for the driver itself.
<...>
Post by Mattias Rönnblom
@@ -0,0 +1,26 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2018 Ericsson AB
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+LIB = librte_pmd_dsw_event.a
+
+CFLAGS += -DALLOW_EXPERIMENTAL_API
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -Wno-format-nonliteral
This is causing build warning with icc [1], can you please add the flag
conditionally as done in other samples of this flag [2].

Since this patch has already merged, can you please send an incremental fix patch?


[1]
icc: command line warning #10148: option '-Wno-format-nonliteral' not supported

[2]
https://git.dpdk.org/dpdk/tree/drivers/net/qede/Makefile?h=v18.08#n40
https://git.dpdk.org/dpdk/tree/drivers/net/i40e/Makefile?h=v18.08#n38
Mattias Rönnblom
2018-10-04 11:21:13 UTC
Permalink
Make the -Wno-format-nonliteral flag conditional, and only set in
clang and gcc builds, since this flag is not supported (nor needed)
when building dsw with icc.

Fixes: 46a186b1f0c5 ("event/dsw: add device registration and build system")

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
Reviewed-by: Ferruh Yigit <***@intel.com>
---
drivers/event/dsw/Makefile | 2 ++
1 file changed, 2 insertions(+)

diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
index ea1e5259a..490ed0b98 100644
--- a/drivers/event/dsw/Makefile
+++ b/drivers/event/dsw/Makefile
@@ -8,7 +8,9 @@ LIB = librte_pmd_dsw_event.a
CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += -O3
CFLAGS += $(WERROR_FLAGS)
+ifneq ($(CONFIG_RTE_TOOLCHAIN_ICC),y)
CFLAGS += -Wno-format-nonliteral
+endif

LDLIBS += -lrte_eal
LDLIBS += -lrte_mbuf
--
2.17.1
Jerin Jacob
2018-10-04 14:17:59 UTC
Permalink
-----Original Message-----
Date: Thu, 4 Oct 2018 13:21:13 +0200
Subject: [PATCH v2] event/dsw: fix icc build
X-Mailer: git-send-email 2.17.1
Make the -Wno-format-nonliteral flag conditional, and only set in
clang and gcc builds, since this flag is not supported (nor needed)
when building dsw with icc.
Fixes: 46a186b1f0c5 ("event/dsw: add device registration and build system")
---
Applied to dpdk-next-eventdev/master. Thanks.
drivers/event/dsw/Makefile | 2 ++
1 file changed, 2 insertions(+)
diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
index ea1e5259a..490ed0b98 100644
--- a/drivers/event/dsw/Makefile
+++ b/drivers/event/dsw/Makefile
@@ -8,7 +8,9 @@ LIB = librte_pmd_dsw_event.a
CFLAGS += -DALLOW_EXPERIMENTAL_API
CFLAGS += -O3
CFLAGS += $(WERROR_FLAGS)
+ifneq ($(CONFIG_RTE_TOOLCHAIN_ICC),y)
CFLAGS += -Wno-format-nonliteral
+endif
LDLIBS += -lrte_eal
LDLIBS += -lrte_mbuf
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:12 UTC
Permalink
With this patch, the DSW event device will (optionally) sort the event
burst before giving it to the application. The sorting will primarily
be on queue id, and secondary on flow id.

The sorting is an attempt to optimize data and instruction cache usage
for the application, at the cost of additional event device overhead.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/dsw_evdev.h | 11 ++++++++
drivers/event/dsw/dsw_event.c | 23 +++++++++++++++++
drivers/event/dsw/dsw_sort.h | 48 +++++++++++++++++++++++++++++++++++
3 files changed, 82 insertions(+)
create mode 100644 drivers/event/dsw/dsw_sort.h

diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index 783c418bf..f6f8f0454 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -93,6 +93,17 @@
*/
#define DSW_CTL_IN_RING_SIZE ((DSW_MAX_PORTS-1)*4)

+/* With DSW_SORT_DEQUEUED enabled, the scheduler will, at the point of
+ * dequeue(), arrange events so that events with the same flow id on
+ * the same queue forms a back-to-back "burst", and also so that such
+ * bursts of different flow ids, but on the same queue, also come
+ * consecutively. All this in an attempt to improve data and
+ * instruction cache usage for the application, at the cost of a
+ * scheduler overhead increase.
+ */
+
+/* #define DSW_SORT_DEQUEUED */
+
struct dsw_queue_flow {
uint8_t queue_id;
uint16_t flow_hash;
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index f0347592d..a84b19c33 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -4,6 +4,10 @@

#include "dsw_evdev.h"

+#ifdef DSW_SORT_DEQUEUED
+#include "dsw_sort.h"
+#endif
+
#include <stdbool.h>
#include <string.h>

@@ -1121,6 +1125,21 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
DSW_MAX_EVENTS_RECORDED);
}

+#ifdef DSW_SORT_DEQUEUED
+
+#define DSW_EVENT_TO_INT(_event) \
+ ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
+
+static inline int
+dsw_cmp_event(const void *v_event_a, const void *v_event_b)
+{
+ const struct rte_event *event_a = v_event_a;
+ const struct rte_event *event_b = v_event_b;
+
+ return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
+}
+#endif
+
static uint16_t
dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
uint16_t num)
@@ -1191,5 +1210,9 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
* 0.
*/

+#ifdef DSW_SORT_DEQUEUED
+ dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
+#endif
+
return dequeued;
}
diff --git a/drivers/event/dsw/dsw_sort.h b/drivers/event/dsw/dsw_sort.h
new file mode 100644
index 000000000..609767fdf
--- /dev/null
+++ b/drivers/event/dsw/dsw_sort.h
@@ -0,0 +1,48 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#ifndef _DSW_SORT_
+#define _DSW_SORT_
+
+#include <string.h>
+
+#include <rte_common.h>
+
+#define DSW_ARY_ELEM_PTR(_ary, _idx, _elem_size) \
+ RTE_PTR_ADD(_ary, (_idx) * (_elem_size))
+
+#define DSW_ARY_ELEM_SWAP(_ary, _a_idx, _b_idx, _elem_size) \
+ do { \
+ char tmp[_elem_size]; \
+ void *_a_ptr = DSW_ARY_ELEM_PTR(_ary, _a_idx, _elem_size); \
+ void *_b_ptr = DSW_ARY_ELEM_PTR(_ary, _b_idx, _elem_size); \
+ memcpy(tmp, _a_ptr, _elem_size); \
+ memcpy(_a_ptr, _b_ptr, _elem_size); \
+ memcpy(_b_ptr, tmp, _elem_size); \
+ } while (0)
+
+static inline void
+dsw_insertion_sort(void *ary, uint16_t len, uint16_t elem_size,
+ int (*cmp_fn)(const void *, const void *))
+{
+ uint16_t i;
+
+ for (i = 1; i < len; i++) {
+ uint16_t j;
+ for (j = i; j > 0 &&
+ cmp_fn(DSW_ARY_ELEM_PTR(ary, j-1, elem_size),
+ DSW_ARY_ELEM_PTR(ary, j, elem_size)) > 0;
+ j--)
+ DSW_ARY_ELEM_SWAP(ary, j, j-1, elem_size);
+ }
+}
+
+static inline void
+dsw_stable_sort(void *ary, uint16_t len, uint16_t elem_size,
+ int (*cmp_fn)(const void *, const void *))
+{
+ dsw_insertion_sort(ary, len, elem_size, cmp_fn);
+}
+
+#endif
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:10 UTC
Permalink
The DSW event device port now attempts to estimate its load (i.e. how
busy it is). This is required for load balancing to work (although
load balancing is not included in this patch), and may also be useful
for debugging purposes.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/dsw_evdev.c | 14 +++++
drivers/event/dsw/dsw_evdev.h | 40 +++++++++++++
drivers/event/dsw/dsw_event.c | 109 ++++++++++++++++++++++++++++++++++
3 files changed, 163 insertions(+)

diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 40a7435be..bcfa17bab 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -4,6 +4,7 @@

#include <stdbool.h>

+#include <rte_cycles.h>
#include <rte_eventdev_pmd.h>
#include <rte_eventdev_pmd_vdev.h>
#include <rte_random.h>
@@ -43,6 +44,11 @@ dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,

port->in_ring = in_ring;

+ rte_atomic16_init(&port->load);
+
+ port->load_update_interval =
+ (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
+
dev->data->ports[port_id] = port;

return 0;
@@ -240,11 +246,19 @@ static int
dsw_start(struct rte_eventdev *dev)
{
struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ uint16_t i;
+ uint64_t now;

rte_atomic32_init(&dsw->credits_on_loan);

initial_flow_to_port_assignment(dsw);

+ now = rte_get_timer_cycles();
+ for (i = 0; i < dsw->num_ports; i++) {
+ dsw->ports[i].measurement_start = now;
+ dsw->ports[i].busy_start = now;
+ }
+
return 0;
}

diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index f8e94e4a4..a5399dda5 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -36,6 +36,15 @@
*/
#define DSW_PARALLEL_FLOWS (1024)

+/* 'Background tasks' are polling the control rings for *
+ * migration-related messages, or flush the output buffer (so
+ * buffered events doesn't linger too long). Shouldn't be too low,
+ * since the system won't benefit from the 'batching' effects from
+ * the output buffer, and shouldn't be too high, since it will make
+ * buffered events linger too long in case the port goes idle.
+ */
+#define DSW_MAX_PORT_OPS_PER_BG_TASK (128)
+
/* Avoid making small 'loans' from the central in-flight event credit
* pool, to improve efficiency.
*/
@@ -50,6 +59,22 @@
*/
#define DSW_IN_RING_SIZE (DSW_MAX_EVENTS)

+#define DSW_MAX_LOAD (INT16_MAX)
+#define DSW_LOAD_FROM_PERCENT(x) ((int16_t)(((x)*DSW_MAX_LOAD)/100))
+#define DSW_LOAD_TO_PERCENT(x) ((100*x)/DSW_MAX_LOAD)
+
+/* The thought behind keeping the load update interval shorter than
+ * the migration interval is that the load from newly migrated flows
+ * should 'show up' on the load measurement before new migrations are
+ * considered. This is to avoid having too many flows, from too many
+ * source ports, to be migrated too quickly to a lightly loaded port -
+ * in particular since this might cause the system to oscillate.
+ */
+#define DSW_LOAD_UPDATE_INTERVAL (DSW_MIGRATION_INTERVAL/4)
+#define DSW_OLD_LOAD_WEIGHT (1)
+
+#define DSW_MIGRATION_INTERVAL (1000)
+
struct dsw_port {
uint16_t id;

@@ -71,10 +96,25 @@ struct dsw_port {

uint16_t next_parallel_flow_id;

+ uint16_t ops_since_bg_task;
+
+ uint64_t last_bg;
+
+ /* For port load measurement. */
+ uint64_t next_load_update;
+ uint64_t load_update_interval;
+ uint64_t measurement_start;
+ uint64_t busy_start;
+ uint64_t busy_cycles;
+ uint64_t total_busy_cycles;
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];

struct rte_event_ring *in_ring __rte_cache_aligned;
+
+ /* Estimate of current port load. */
+ rte_atomic16_t load __rte_cache_aligned;
} __rte_cache_aligned;

struct dsw_queue {
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index 4a3af8ecd..f326147c9 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -7,6 +7,7 @@
#include <stdbool.h>

#include <rte_atomic.h>
+#include <rte_cycles.h>
#include <rte_random.h>

static bool
@@ -75,6 +76,70 @@ dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
}
}

+static void
+dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
+{
+ if (dequeued > 0 && port->busy_start == 0)
+ /* work period begins */
+ port->busy_start = rte_get_timer_cycles();
+ else if (dequeued == 0 && port->busy_start > 0) {
+ /* work period ends */
+ uint64_t work_period =
+ rte_get_timer_cycles() - port->busy_start;
+ port->busy_cycles += work_period;
+ port->busy_start = 0;
+ }
+}
+
+static int16_t
+dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
+{
+ uint64_t passed = now - port->measurement_start;
+ uint64_t busy_cycles = port->busy_cycles;
+
+ if (port->busy_start > 0) {
+ busy_cycles += (now - port->busy_start);
+ port->busy_start = now;
+ }
+
+ int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
+
+ port->measurement_start = now;
+ port->busy_cycles = 0;
+
+ port->total_busy_cycles += busy_cycles;
+
+ return load;
+}
+
+static void
+dsw_port_load_update(struct dsw_port *port, uint64_t now)
+{
+ int16_t old_load;
+ int16_t period_load;
+ int16_t new_load;
+
+ old_load = rte_atomic16_read(&port->load);
+
+ period_load = dsw_port_load_close_period(port, now);
+
+ new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
+ (DSW_OLD_LOAD_WEIGHT+1);
+
+ rte_atomic16_set(&port->load, new_load);
+}
+
+static void
+dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
+{
+ if (now < port->next_load_update)
+ return;
+
+ port->next_load_update = now + port->load_update_interval;
+
+ dsw_port_load_update(port, now);
+}
+
static uint8_t
dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
{
@@ -196,6 +261,39 @@ dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
}

+static void
+dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
+{
+ /* To pull the control ring reasonbly often on busy ports,
+ * each dequeued/enqueued event is considered an 'op' too.
+ */
+ port->ops_since_bg_task += (num_events+1);
+}
+
+static void
+dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
+
+static void
+dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
+{
+ if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
+ uint64_t now;
+
+ now = rte_get_timer_cycles();
+
+ port->last_bg = now;
+
+ /* Logic to avoid having events linger in the output
+ * buffer too long.
+ */
+ dsw_port_flush_out_buffers(dsw, port);
+
+ dsw_port_consider_load_update(port, now);
+
+ port->ops_since_bg_task = 0;
+ }
+}
+
static void
dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
{
@@ -225,6 +323,8 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
"events to port %d.\n", events_len, source_port->id);

+ dsw_port_bg_process(dsw, source_port);
+
/* XXX: For performance (=ring efficiency) reasons, the
* scheduler relies on internal non-ring buffers instead of
* immediately sending the event to the destination ring. For
@@ -238,6 +338,7 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
* considered.
*/
if (unlikely(events_len == 0)) {
+ dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
dsw_port_flush_out_buffers(dsw, source_port);
return 0;
}
@@ -245,6 +346,8 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],
if (unlikely(events_len > source_port->enqueue_depth))
events_len = source_port->enqueue_depth;

+ dsw_port_note_op(source_port, events_len);
+
if (!op_types_known)
for (i = 0; i < events_len; i++) {
switch (events[i].op) {
@@ -337,6 +440,8 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,

source_port->pending_releases = 0;

+ dsw_port_bg_process(dsw, source_port);
+
if (unlikely(num > source_port->dequeue_depth))
num = source_port->dequeue_depth;

@@ -344,6 +449,10 @@ dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,

source_port->pending_releases = dequeued;

+ dsw_port_load_record(source_port, dequeued);
+
+ dsw_port_note_op(source_port, dequeued);
+
if (dequeued > 0) {
DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
dequeued);
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:14 UTC
Permalink
The DSW event device is documented in DPDK Programmer's Guide.

The MAINTAINERS file and the 18.11 release notes are updated.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
MAINTAINERS | 5 ++
doc/guides/eventdevs/dsw.rst | 96 ++++++++++++++++++++++++++
doc/guides/eventdevs/index.rst | 1 +
doc/guides/rel_notes/release_18_11.rst | 8 +++
4 files changed, 110 insertions(+)
create mode 100644 doc/guides/eventdevs/dsw.rst

diff --git a/MAINTAINERS b/MAINTAINERS
index 9fd258fad..cbfa7e106 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -919,6 +919,11 @@ F: doc/guides/eventdevs/sw.rst
F: examples/eventdev_pipeline/
F: doc/guides/sample_app_ug/eventdev_pipeline.rst

+Distributed Software Eventdev PMD
+M: Mattias Rönnblom <***@ericsson.com>
+F: drivers/event/dsw/
+F: doc/guides/eventdevs/dsw.rst
+
Software OPDL Eventdev PMD
M: Liang Ma <***@intel.com>
M: Peter Mccarthy <***@intel.com>
diff --git a/doc/guides/eventdevs/dsw.rst b/doc/guides/eventdevs/dsw.rst
new file mode 100644
index 000000000..6653f501c
--- /dev/null
+++ b/doc/guides/eventdevs/dsw.rst
@@ -0,0 +1,96 @@
+.. SPDX-License-Identifier: BSD-3-Clause
+ Copyright(c) 2018 Ericsson AB
+
+Distributed Software Eventdev Poll Mode Driver
+==============================================
+
+The distributed software event device is an eventdev driver which
+distributes the task of scheduling events among all the eventdev ports
+and the lcore threads using them.
+
+Features
+--------
+
+Queues
+ * Atomic
+ * Parallel
+ * Single-Link
+
+Ports
+ * Load balanced (for Atomic, Ordered, Parallel queues)
+ * Single Link (for single-link queues)
+
+Configuration and Options
+-------------------------
+
+The distributed software eventdev is a vdev device, and as such can be
+created from the application code, or from the EAL command line:
+
+* Call ``rte_vdev_init("event_dsw0")`` from the application
+
+* Use ``--vdev="event_dsw0"`` in the EAL options, which will call
+ rte_vdev_init() internally
+
+Example:
+
+.. code-block:: console
+
+ ./your_eventdev_application --vdev="event_dsw0"
+
+Limitations
+-----------
+
+Unattended Ports
+~~~~~~~~~~~~~~~~
+
+The distributed software eventdev uses an internal signaling schema
+between the ports to achieve load balancing. In order for this to
+work, the application must perform enqueue and/or dequeue operations
+on all ports.
+
+Producer-only ports which currently have no events to enqueue should
+periodically call rte_event_enqueue_burst() with a zero-sized burst.
+
+Ports left unattended for longer periods of time will prevent load
+balancing, and also cause traffic interruptions on the flows which
+are in the process of being migrated.
+
+Output Buffering
+~~~~~~~~~~~~~~~~
+
+For efficiency reasons, the distributed software eventdev might not
+send enqueued events immediately to the destination port, but instead
+store them in an internal buffer in the source port.
+
+In case no more events are enqueued on a port with buffered events,
+these events will be sent after the application has performed a number
+of enqueue and/or dequeue operations.
+
+For explicit flushing, an application may call
+rte_event_enqueue_burst() with a zero-sized burst.
+
+
+Priorities
+~~~~~~~~~~
+
+The distributed software eventdev does not support event priorities.
+
+Ordered Queues
+~~~~~~~~~~~~~~
+
+The distributed software eventdev does not support the ordered queue type.
+
+
+"All Types" Queues
+~~~~~~~~~~~~~~~~~~
+
+The distributed software eventdev does not support queues of type
+RTE_EVENT_QUEUE_CFG_ALL_TYPES, which allow both atomic, ordered, and
+parallel events on the same queue.
+
+Dynamic Link/Unlink
+~~~~~~~~~~~~~~~~~~~
+
+The distributed software eventdev does not support calls to
+rte_event_port_link() or rte_event_port_unlink() after
+rte_event_dev_start() has been called.
diff --git a/doc/guides/eventdevs/index.rst b/doc/guides/eventdevs/index.rst
index 18ec8e462..984eea5f4 100644
--- a/doc/guides/eventdevs/index.rst
+++ b/doc/guides/eventdevs/index.rst
@@ -14,5 +14,6 @@ application trough the eventdev API.
dpaa
dpaa2
sw
+ dsw
octeontx
opdl
diff --git a/doc/guides/rel_notes/release_18_11.rst b/doc/guides/rel_notes/release_18_11.rst
index 3ae6b3f58..9c6b1fd0b 100644
--- a/doc/guides/rel_notes/release_18_11.rst
+++ b/doc/guides/rel_notes/release_18_11.rst
@@ -54,6 +54,14 @@ New Features
Also, make sure to start the actual text at the margin.
=========================================================

+* **Added Distributed Software Eventdev PMD.**
+
+ Added the new Distributed Software Event Device (DSW), which is a
+ pure-software eventdev driver distributing the work of scheduling
+ among all eventdev ports and the lcores using them. DSW, compared to
+ the SW eventdev PMD, sacrifices load balancing performance to
+ gain better event scheduling throughput and scalability.
+

API Changes
-----------
--
2.17.1
Mattias Rönnblom
2018-09-18 12:45:13 UTC
Permalink
The DSW event device now implements the 'xstats' interface and a
number of port- and device-level counters.

Signed-off-by: Mattias Rönnblom <***@ericsson.com>
---
drivers/event/dsw/Makefile | 3 +-
drivers/event/dsw/dsw_evdev.c | 5 +-
drivers/event/dsw/dsw_evdev.h | 19 +++
drivers/event/dsw/dsw_event.c | 35 ++++
drivers/event/dsw/dsw_xstats.c | 288 +++++++++++++++++++++++++++++++++
drivers/event/dsw/meson.build | 2 +-
6 files changed, 349 insertions(+), 3 deletions(-)
create mode 100644 drivers/event/dsw/dsw_xstats.c

diff --git a/drivers/event/dsw/Makefile b/drivers/event/dsw/Makefile
index 6374a454e..ea1e5259a 100644
--- a/drivers/event/dsw/Makefile
+++ b/drivers/event/dsw/Makefile
@@ -21,6 +21,7 @@ LIBABIVER := 1

EXPORT_MAP := rte_pmd_dsw_event_version.map

-SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += dsw_evdev.c dsw_event.c
+SRCS-$(CONFIG_RTE_LIBRTE_PMD_DSW_EVENTDEV) += \
+ dsw_evdev.c dsw_event.c dsw_xstats.c

include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/event/dsw/dsw_evdev.c b/drivers/event/dsw/dsw_evdev.c
index 2ecb365ba..33ba13647 100644
--- a/drivers/event/dsw/dsw_evdev.c
+++ b/drivers/event/dsw/dsw_evdev.c
@@ -378,7 +378,10 @@ static struct rte_eventdev_ops dsw_evdev_ops = {
.dev_configure = dsw_configure,
.dev_start = dsw_start,
.dev_stop = dsw_stop,
- .dev_close = dsw_close
+ .dev_close = dsw_close,
+ .xstats_get = dsw_xstats_get,
+ .xstats_get_names = dsw_xstats_get_names,
+ .xstats_get_by_name = dsw_xstats_get_by_name
};

static int
diff --git a/drivers/event/dsw/dsw_evdev.h b/drivers/event/dsw/dsw_evdev.h
index f6f8f0454..dc28ab125 100644
--- a/drivers/event/dsw/dsw_evdev.h
+++ b/drivers/event/dsw/dsw_evdev.h
@@ -176,6 +176,14 @@ struct dsw_port {
uint16_t seen_events_idx;
struct dsw_queue_flow seen_events[DSW_MAX_EVENTS_RECORDED];

+ uint64_t new_enqueued;
+ uint64_t forward_enqueued;
+ uint64_t release_enqueued;
+ uint64_t queue_enqueued[DSW_MAX_QUEUES];
+
+ uint64_t dequeued;
+ uint64_t queue_dequeued[DSW_MAX_QUEUES];
+
uint16_t out_buffer_len[DSW_MAX_PORTS];
struct rte_event out_buffer[DSW_MAX_PORTS][DSW_MAX_PORT_OUT_BUFFER];

@@ -243,6 +251,17 @@ uint16_t dsw_event_dequeue(void *port, struct rte_event *ev, uint64_t wait);
uint16_t dsw_event_dequeue_burst(void *port, struct rte_event *events,
uint16_t num, uint64_t wait);

+int dsw_xstats_get_names(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode,
+ uint8_t queue_port_id,
+ struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size);
+int dsw_xstats_get(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+ const unsigned int ids[], uint64_t values[], unsigned int n);
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+ const char *name, unsigned int *id);
+
static inline struct dsw_evdev *
dsw_pmd_priv(const struct rte_eventdev *eventdev)
{
diff --git a/drivers/event/dsw/dsw_event.c b/drivers/event/dsw/dsw_event.c
index a84b19c33..61a66fabf 100644
--- a/drivers/event/dsw/dsw_event.c
+++ b/drivers/event/dsw/dsw_event.c
@@ -82,6 +82,33 @@ dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
}
}

+static void
+dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
+ uint16_t num_forward, uint16_t num_release)
+{
+ port->new_enqueued += num_new;
+ port->forward_enqueued += num_forward;
+ port->release_enqueued += num_release;
+}
+
+static void
+dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+ source_port->queue_enqueued[queue_id]++;
+}
+
+static void
+dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
+{
+ port->dequeued += num;
+}
+
+static void
+dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
+{
+ source_port->queue_dequeued[queue_id]++;
+}
+
static void
dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
{
@@ -1059,12 +1086,16 @@ dsw_event_enqueue_burst_generic(void *port, const struct rte_event events[],

source_port->pending_releases -= num_release;

+ dsw_port_enqueue_stats(source_port, num_new,
+ num_non_release-num_new, num_release);
+
for (i = 0; i < events_len; i++) {
const struct rte_event *event = &events[i];

if (likely(num_release == 0 ||
event->op != RTE_EVENT_OP_RELEASE))
dsw_port_buffer_event(dsw, source_port, event);
+ dsw_port_queue_enqueue_stats(source_port, event->queue_id);
}

DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
@@ -1109,6 +1140,8 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
{
uint16_t i;

+ dsw_port_dequeue_stats(port, num);
+
for (i = 0; i < num; i++) {
uint16_t l_idx = port->seen_events_idx;
struct dsw_queue_flow *qf = &port->seen_events[l_idx];
@@ -1117,6 +1150,8 @@ dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
qf->flow_hash = dsw_flow_id_hash(event->flow_id);

port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
+
+ dsw_port_queue_dequeued_stats(port, event->queue_id);
}

if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
diff --git a/drivers/event/dsw/dsw_xstats.c b/drivers/event/dsw/dsw_xstats.c
new file mode 100644
index 000000000..bf2eec527
--- /dev/null
+++ b/drivers/event/dsw/dsw_xstats.c
@@ -0,0 +1,288 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2018 Ericsson AB
+ */
+
+#include "dsw_evdev.h"
+
+#include <stdbool.h>
+#include <string.h>
+
+#include <rte_debug.h>
+
+/* The high bits in the xstats id is used to store an additional
+ * parameter (beyond the queue or port id already in the xstats
+ * interface).
+ */
+#define DSW_XSTATS_ID_PARAM_BITS (8)
+#define DSW_XSTATS_ID_STAT_BITS \
+ (sizeof(unsigned int)*CHAR_BIT - DSW_XSTATS_ID_PARAM_BITS)
+#define DSW_XSTATS_ID_STAT_MASK ((1 << DSW_XSTATS_ID_STAT_BITS) - 1)
+
+#define DSW_XSTATS_ID_GET_PARAM(id) \
+ ((id)>>DSW_XSTATS_ID_STAT_BITS)
+
+#define DSW_XSTATS_ID_GET_STAT(id) \
+ ((id) & DSW_XSTATS_ID_STAT_MASK)
+
+#define DSW_XSTATS_ID_CREATE(id, param_value) \
+ (((param_value) << DSW_XSTATS_ID_STAT_BITS) | id)
+
+typedef
+uint64_t (*dsw_xstats_dev_get_value_fn)(struct dsw_evdev *dsw);
+
+struct dsw_xstat_dev {
+ const char *name;
+ dsw_xstats_dev_get_value_fn get_value_fn;
+};
+
+typedef
+uint64_t (*dsw_xstats_port_get_value_fn)(struct dsw_evdev *dsw,
+ uint8_t port_id, uint8_t queue_id);
+
+struct dsw_xstats_port {
+ const char *name_fmt;
+ dsw_xstats_port_get_value_fn get_value_fn;
+ bool per_queue;
+};
+
+static uint64_t
+dsw_xstats_dev_credits_on_loan(struct dsw_evdev *dsw)
+{
+ return rte_atomic32_read(&dsw->credits_on_loan);
+}
+
+static struct dsw_xstat_dev dsw_dev_xstats[] = {
+ { "dev_credits_on_loan", dsw_xstats_dev_credits_on_loan }
+};
+
+#define DSW_GEN_PORT_ACCESS_FN(_variable) \
+ static uint64_t \
+ dsw_xstats_port_get_ ## _variable(struct dsw_evdev *dsw, \
+ uint8_t port_id, \
+ uint8_t queue_id __rte_unused) \
+ { \
+ return dsw->ports[port_id]._variable; \
+ }
+
+DSW_GEN_PORT_ACCESS_FN(new_enqueued)
+DSW_GEN_PORT_ACCESS_FN(forward_enqueued)
+DSW_GEN_PORT_ACCESS_FN(release_enqueued)
+
+static uint64_t
+dsw_xstats_port_get_queue_enqueued(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id)
+{
+ return dsw->ports[port_id].queue_enqueued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(dequeued)
+
+static uint64_t
+dsw_xstats_port_get_queue_dequeued(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id)
+{
+ return dsw->ports[port_id].queue_dequeued[queue_id];
+}
+
+DSW_GEN_PORT_ACCESS_FN(migrations)
+
+static uint64_t
+dsw_xstats_port_get_migration_latency(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id __rte_unused)
+{
+ uint64_t total_latency = dsw->ports[port_id].migration_latency;
+ uint64_t num_migrations = dsw->ports[port_id].migrations;
+
+ return num_migrations > 0 ? total_latency / num_migrations : 0;
+}
+
+static uint64_t
+dsw_xstats_port_get_event_proc_latency(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id __rte_unused)
+{
+ uint64_t total_busy_cycles =
+ dsw->ports[port_id].total_busy_cycles;
+ uint64_t dequeued =
+ dsw->ports[port_id].dequeued;
+
+ return dequeued > 0 ? total_busy_cycles / dequeued : 0;
+}
+
+DSW_GEN_PORT_ACCESS_FN(inflight_credits)
+
+static uint64_t
+dsw_xstats_port_get_load(struct dsw_evdev *dsw, uint8_t port_id,
+ uint8_t queue_id __rte_unused)
+{
+ int16_t load;
+
+ load = rte_atomic16_read(&dsw->ports[port_id].load);
+
+ return DSW_LOAD_TO_PERCENT(load);
+}
+
+DSW_GEN_PORT_ACCESS_FN(last_bg)
+
+static struct dsw_xstats_port dsw_port_xstats[] = {
+ { "port_%u_new_enqueued", dsw_xstats_port_get_new_enqueued,
+ false },
+ { "port_%u_forward_enqueued", dsw_xstats_port_get_forward_enqueued,
+ false },
+ { "port_%u_release_enqueued", dsw_xstats_port_get_release_enqueued,
+ false },
+ { "port_%u_queue_%u_enqueued", dsw_xstats_port_get_queue_enqueued,
+ true },
+ { "port_%u_dequeued", dsw_xstats_port_get_dequeued,
+ false },
+ { "port_%u_queue_%u_dequeued", dsw_xstats_port_get_queue_dequeued,
+ true },
+ { "port_%u_migrations", dsw_xstats_port_get_migrations,
+ false },
+ { "port_%u_migration_latency", dsw_xstats_port_get_migration_latency,
+ false },
+ { "port_%u_event_proc_latency", dsw_xstats_port_get_event_proc_latency,
+ false },
+ { "port_%u_inflight_credits", dsw_xstats_port_get_inflight_credits,
+ false },
+ { "port_%u_load", dsw_xstats_port_get_load,
+ false },
+ { "port_%u_last_bg", dsw_xstats_port_get_last_bg,
+ false }
+};
+
+static int
+dsw_xstats_dev_get_names(struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size)
+{
+ unsigned int i;
+
+ for (i = 0; i < RTE_DIM(dsw_dev_xstats) && i < size; i++) {
+ ids[i] = i;
+ strcpy(xstats_names[i].name, dsw_dev_xstats[i].name);
+ }
+
+ return i;
+}
+
+static int
+dsw_xstats_port_get_names(struct dsw_evdev *dsw, uint8_t port_id,
+ struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size)
+{
+ uint8_t queue_id = 0;
+ unsigned int id_idx;
+ unsigned int stat_idx;
+
+ for (id_idx = 0, stat_idx = 0;
+ id_idx < size && stat_idx < RTE_DIM(dsw_port_xstats);
+ id_idx++) {
+ struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+
+ if (xstat->per_queue) {
+ ids[id_idx] = DSW_XSTATS_ID_CREATE(stat_idx, queue_id);
+ snprintf(xstats_names[id_idx].name,
+ RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+ dsw_port_xstats[stat_idx].name_fmt, port_id,
+ queue_id);
+ queue_id++;
+ } else {
+ ids[id_idx] = stat_idx;
+ snprintf(xstats_names[id_idx].name,
+ RTE_EVENT_DEV_XSTATS_NAME_SIZE,
+ dsw_port_xstats[stat_idx].name_fmt, port_id);
+ }
+
+ if (!(xstat->per_queue && queue_id < dsw->num_queues)) {
+ stat_idx++;
+ queue_id = 0;
+ }
+ }
+ return id_idx;
+}
+
+int
+dsw_xstats_get_names(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode,
+ uint8_t queue_port_id,
+ struct rte_event_dev_xstats_name *xstats_names,
+ unsigned int *ids, unsigned int size)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+
+ switch (mode) {
+ case RTE_EVENT_DEV_XSTATS_DEVICE:
+ return dsw_xstats_dev_get_names(xstats_names, ids, size);
+ case RTE_EVENT_DEV_XSTATS_PORT:
+ return dsw_xstats_port_get_names(dsw, queue_port_id,
+ xstats_names, ids, size);
+ case RTE_EVENT_DEV_XSTATS_QUEUE:
+ return 0;
+ default:
+ RTE_ASSERT(false);
+ return -1;
+ }
+}
+
+static int
+dsw_xstats_dev_get(const struct rte_eventdev *dev,
+ const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ unsigned int i;
+
+ for (i = 0; i < n; i++) {
+ unsigned int id = ids[i];
+ struct dsw_xstat_dev *xstat = &dsw_dev_xstats[id];
+ values[i] = xstat->get_value_fn(dsw);
+ }
+ return n;
+}
+
+static int
+dsw_xstats_port_get(const struct rte_eventdev *dev, uint8_t port_id,
+ const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+ struct dsw_evdev *dsw = dsw_pmd_priv(dev);
+ unsigned int i;
+
+ for (i = 0; i < n; i++) {
+ unsigned int id = ids[i];
+ unsigned int stat_idx = DSW_XSTATS_ID_GET_STAT(id);
+ struct dsw_xstats_port *xstat = &dsw_port_xstats[stat_idx];
+ uint8_t queue_id = 0;
+
+ if (xstat->per_queue)
+ queue_id = DSW_XSTATS_ID_GET_PARAM(id);
+
+ values[i] = xstat->get_value_fn(dsw, port_id, queue_id);
+ }
+ return n;
+}
+
+int
+dsw_xstats_get(const struct rte_eventdev *dev,
+ enum rte_event_dev_xstats_mode mode, uint8_t queue_port_id,
+ const unsigned int ids[], uint64_t values[], unsigned int n)
+{
+ switch (mode) {
+ case RTE_EVENT_DEV_XSTATS_DEVICE:
+ return dsw_xstats_dev_get(dev, ids, values, n);
+ case RTE_EVENT_DEV_XSTATS_PORT:
+ return dsw_xstats_port_get(dev, queue_port_id, ids, values, n);
+ case RTE_EVENT_DEV_XSTATS_QUEUE:
+ return 0;
+ default:
+ RTE_ASSERT(false);
+ return -1;
+ }
+ return 0;
+}
+
+uint64_t dsw_xstats_get_by_name(const struct rte_eventdev *dev,
+ const char *name, unsigned int *id)
+{
+ RTE_SET_USED(dev);
+ RTE_SET_USED(name);
+ RTE_SET_USED(id);
+ return 0;
+}
diff --git a/drivers/event/dsw/meson.build b/drivers/event/dsw/meson.build
index bd2e4c809..a6b7bfa59 100644
--- a/drivers/event/dsw/meson.build
+++ b/drivers/event/dsw/meson.build
@@ -3,4 +3,4 @@

allow_experimental_apis = true
deps += ['bus_vdev']
-sources = files('dsw_evdev.c', 'dsw_event.c')
+sources = files('dsw_evdev.c', 'dsw_event.c', 'dsw_xstats.c')
--
2.17.1
Jerin Jacob
2018-09-19 11:53:46 UTC
Permalink
-----Original Message-----
Date: Tue, 18 Sep 2018 14:45:04 +0200
Subject: [PATCH v4 00/10] A Distributed Software Event Device
X-Mailer: git-send-email 2.17.1
* Reworded DSW introduction in the documentation.
* Updated the MAINTAINERS file and the 18.11 release notes.
# Rebased to latest dpdk-next-eventdev tree
# Added the missing Acked-by:

Applied to dpdk-next-eventdev/master. Thanks.
Continue reading on narkive:
Loading...