For multiqueue support, we need to ensure packets are routed to the
correct RX queue based on which TX queue they originated from. This
requires tracking the queue pair association for each flow.
Add a qpair field to struct flow_common to store the queue pair number
for each flow (FLOW_QPAIR_INVALID if not assigned). The field uses 5
bits, allowing support for up to 31 queue pairs (index 31 is reserved
for FLOW_QPAIR_INVALID), which we verify is sufficient for
VHOST_USER_MAX_VQS via static assertion.
Introduce flow_qp() to retrieve the queue pair for a flow (returning 0
for NULL flows or flows without a valid assignment), and flow_setqp()
to assign queue pairs. Update all protocol handlers (TCP, UDP, ICMP)
and their tap handlers to accept a qpair parameter and assign it to
flows using FLOW_SETQP().
The implementation updates the queue pair assignment on every packet
received from TX. This follows the virtio specification's requirement
for automatic receive steering: "After the driver transmitted a packet
of a flow on transmitqX, the device SHOULD cause incoming packets for
that flow to be steered to receiveqX." By tracking the most recent TX
queue for each flow, we ensure return traffic is directed to the
corresponding RX queue, maintaining flow affinity across queue pairs.
The vhost-user code now uses FLOW_QP() to select the appropriate RX
queue when sending packets, ensuring they're routed based on the
originating TX queue rather than always using queue 0.
Note that flows initiated from the host side (via sockets, for example
udp_flow_from_sock()) currently default to queue pair 0, as they don't
have an associated incoming queue to derive the assignment from.
Signed-off-by: Laurent Vivier
---
flow.c | 33 +++++++++++++++++++++++++++++++++
flow.h | 17 +++++++++++++++++
icmp.c | 23 +++++++++++++----------
icmp.h | 4 ++--
tap.c | 8 ++++----
tcp.c | 33 +++++++++++++++++++--------------
tcp_vu.c | 8 +++++---
udp.c | 29 ++++++++++++++++-------------
udp.h | 2 +-
udp_flow.c | 8 +++++++-
udp_flow.h | 2 +-
udp_vu.c | 4 +++-
12 files changed, 121 insertions(+), 50 deletions(-)
diff --git a/flow.c b/flow.c
index 278a9cf0ac6d..fd6c4752ec81 100644
--- a/flow.c
+++ b/flow.c
@@ -405,6 +405,38 @@ void flow_epollid_register(int epollid, int epollfd)
epoll_id_to_fd[epollid] = epollfd;
}
+/**
+ * flow_qp() - Get the queue pair for a flow
+ * @f: Flow to query (may be NULL)
+ *
+ * Return: queue pair number for the flow, or 0 if flow is NULL or has no
+ * valid queue pair assignment
+ */
+unsigned int flow_qp(const struct flow_common *f)
+{
+ if (f == NULL || f->qpair == FLOW_QPAIR_INVALID)
+ return 0;
+ return f->qpair;
+}
+
+/**
+ * flow_setqp() - Set queue pair assignment for a flow
+ * @f: Flow to update
+ * @qpair: Queue pair number to assign
+ */
+void flow_setqp(struct flow_common *f, unsigned int qpair)
+{
+ ASSERT(qpair < FLOW_QPAIR_MAX);
+
+ if (f->qpair == qpair)
+ return;
+
+ flow_trace((union flow *)f, "updating queue pair from %d to %d",
+ f->qpair, qpair);
+
+ f->qpair = qpair;
+}
+
/**
* flow_initiate_() - Move flow to INI, setting pif[INISIDE]
* @flow: Flow to change state
@@ -609,6 +641,7 @@ union flow *flow_alloc(void)
flow_new_entry = flow;
memset(flow, 0, sizeof(*flow));
flow_epollid_clear(&flow->f);
+ flow->f.qpair = FLOW_QPAIR_INVALID;
flow_set_state(&flow->f, FLOW_STATE_NEW);
return flow;
diff --git a/flow.h b/flow.h
index b43b0b1dd7f2..a4a1e680227c 100644
--- a/flow.h
+++ b/flow.h
@@ -179,6 +179,8 @@ int flowside_connect(const struct ctx *c, int s,
* @side[]: Information for each side of the flow
* @tap_omac: MAC address of remote endpoint as seen from the guest
* @epollid: epollfd identifier, or EPOLLFD_ID_INVALID
+ * @qpair: Queue pair number assigned to this flow
+ * (FLOW_QPAIR_INVALID if not assigned)
*/
struct flow_common {
#ifdef __GNUC__
@@ -199,6 +201,8 @@ struct flow_common {
#define EPOLLFD_ID_BITS 8
unsigned int epollid:EPOLLFD_ID_BITS;
+#define FLOW_QPAIR_BITS 5
+ unsigned int qpair:FLOW_QPAIR_BITS;
};
#define EPOLLFD_ID_DEFAULT 0
@@ -206,6 +210,12 @@ struct flow_common {
#define EPOLLFD_ID_MAX (EPOLLFD_ID_SIZE - 1)
#define EPOLLFD_ID_INVALID EPOLLFD_ID_MAX
+#define FLOW_QPAIR_NUM (1 << FLOW_QPAIR_BITS)
+#define FLOW_QPAIR_MAX (FLOW_QPAIR_NUM - 1)
+#define FLOW_QPAIR_INVALID FLOW_QPAIR_MAX
+
+static_assert(VHOST_USER_MAX_VQS <= FLOW_QPAIR_MAX * 2);
+
#define FLOW_INDEX_BITS 17 /* 128k - 1 */
#define FLOW_MAX MAX_FROM_BITS(FLOW_INDEX_BITS)
@@ -266,6 +276,13 @@ int flow_epollfd(const struct flow_common *f);
void flow_epollid_set(struct flow_common *f, int epollid);
void flow_epollid_clear(struct flow_common *f);
void flow_epollid_register(int epollid, int epollfd);
+unsigned int flow_qp(const struct flow_common *f);
+#define FLOW_QP(flow_) \
+ (flow_qp(&(flow_)->f))
+void flow_setqp(struct flow_common *f, unsigned int qpair);
+#define FLOW_SETQP(flow_, _qpair) \
+ (flow_setqp(&(flow_)->f, _qpair))
+
void flow_defer_handler(const struct ctx *c, const struct timespec *now);
int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
int fd);
diff --git a/icmp.c b/icmp.c
index a9f0518c2f61..744b0ec9edae 100644
--- a/icmp.c
+++ b/icmp.c
@@ -132,13 +132,13 @@ void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
const struct in_addr *daddr = inany_v4(&ini->eaddr);
ASSERT(saddr && daddr); /* Must have IPv4 addresses */
- tap_icmp4_send(c, 0, *saddr, *daddr, buf,
+ tap_icmp4_send(c, FLOW_QP(pingf), *saddr, *daddr, buf,
pingf->f.tap_omac, n);
} else if (pingf->f.type == FLOW_PING6) {
const struct in6_addr *saddr = &ini->oaddr.a6;
const struct in6_addr *daddr = &ini->eaddr.a6;
- tap_icmp6_send(c, 0, saddr, daddr, buf,
+ tap_icmp6_send(c, FLOW_QP(pingf), saddr, daddr, buf,
pingf->f.tap_omac, n);
}
return;
@@ -238,17 +238,18 @@ cancel:
/**
* icmp_tap_handler() - Handle packets from tap
- * @c: Execution context
- * @pif: pif on which the packet is arriving
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address
- * @daddr: Destination address
- * @data: Single packet with ICMP/ICMPv6 header
- * @now: Current timestamp
+ * @c: Execution context
+ * @qpair: Queue pair
+ * @pif: pif on which the packet is arriving
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address
+ * @daddr: Destination address
+ * @data: Single packet with ICMP/ICMPv6 header
+ * @now: Current timestamp
*
* Return: count of consumed packets (always 1, even if malformed)
*/
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
+int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
struct iov_tail *data, const struct timespec *now)
{
@@ -309,6 +310,8 @@ int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
else if (!(pingf = icmp_ping_new(c, af, id, saddr, daddr)))
return 1;
+ FLOW_SETQP(pingf, qpair);
+
tgt = &pingf->f.side[TGTSIDE];
ASSERT(flow_proto[pingf->f.type] == proto);
diff --git a/icmp.h b/icmp.h
index 1a0e6205f087..7b9982529fd1 100644
--- a/icmp.h
+++ b/icmp.h
@@ -10,8 +10,8 @@ struct ctx;
struct icmp_ping_flow;
void icmp_sock_handler(const struct ctx *c, union epoll_ref ref);
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
- const void *saddr, const void *daddr,
+int icmp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
+ sa_family_t af, const void *saddr, const void *daddr,
struct iov_tail *data, const struct timespec *now);
void icmp_init(void);
diff --git a/tap.c b/tap.c
index c56afb73fd7e..c603b48ea4b5 100644
--- a/tap.c
+++ b/tap.c
@@ -787,7 +787,7 @@ resume:
tap_packet_debug(iph, NULL, NULL, 0, NULL, 1);
- icmp_tap_handler(c, PIF_TAP, AF_INET,
+ icmp_tap_handler(c, qpair, PIF_TAP, AF_INET,
&iph->saddr, &iph->daddr,
&data, now);
continue;
@@ -871,7 +871,7 @@ append:
if (c->no_udp)
continue;
for (k = 0; k < p->count; )
- k += udp_tap_handler(c, PIF_TAP, AF_INET,
+ k += udp_tap_handler(c, qpair, PIF_TAP, AF_INET,
&seq->saddr, &seq->daddr,
seq->ttl, p, k, now);
}
@@ -973,7 +973,7 @@ resume:
tap_packet_debug(NULL, ip6h, NULL, proto, NULL, 1);
- icmp_tap_handler(c, PIF_TAP, AF_INET6,
+ icmp_tap_handler(c, qpair, PIF_TAP, AF_INET6,
saddr, daddr, &data, now);
continue;
}
@@ -1062,7 +1062,7 @@ append:
if (c->no_udp)
continue;
for (k = 0; k < p->count; )
- k += udp_tap_handler(c, PIF_TAP, AF_INET6,
+ k += udp_tap_handler(c, qpair, PIF_TAP, AF_INET6,
&seq->saddr, &seq->daddr,
seq->hop_limit, p, k, now);
}
diff --git a/tcp.c b/tcp.c
index 4c84f0e621b8..3b7322a655bc 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1497,21 +1497,23 @@ static void tcp_bind_outbound(const struct ctx *c,
/**
* tcp_conn_from_tap() - Handle connection request (SYN segment) from tap
- * @c: Execution context
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address, pointer to in_addr or in6_addr
- * @daddr: Destination address, pointer to in_addr or in6_addr
- * @th: TCP header from tap: caller MUST ensure it's there
- * @opts: Pointer to start of options
- * @optlen: Bytes in options: caller MUST ensure available length
- * @now: Current timestamp
+ * @c: Execution context
+ * @qpair: Queue pair for the flow
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address, pointer to in_addr or in6_addr
+ * @daddr: Destination address, pointer to in_addr or in6_addr
+ * @th: TCP header from tap: caller MUST ensure it's there
+ * @opts: Pointer to start of options
+ * @optlen: Bytes in options: caller MUST ensure available length
+ * @now: Current timestamp
*
* #syscalls:vu getsockname
*/
-static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
- const void *saddr, const void *daddr,
- const struct tcphdr *th, const char *opts,
- size_t optlen, const struct timespec *now)
+static void tcp_conn_from_tap(const struct ctx *c, unsigned int qpair,
+ sa_family_t af, const void *saddr,
+ const void *daddr, const struct tcphdr *th,
+ const char *opts, size_t optlen,
+ const struct timespec *now)
{
in_port_t srcport = ntohs(th->source);
in_port_t dstport = ntohs(th->dest);
@@ -1623,6 +1625,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
conn_event(c, conn, TAP_SYN_ACK_SENT);
}
+ FLOW_SETQP(conn, qpair);
tcp_epoll_ctl(c, conn);
if (c->mode == MODE_VU) { /* To rebind to same oport after migration */
@@ -2057,7 +2060,6 @@ static void tcp_rst_no_conn(const struct ctx *c, unsigned int qpair, int af,
/**
* tcp_tap_handler() - Handle packets from tap and state transitions
* @c: Execution context
- * @qpair: Queue pair on which to send packets
* @pif: pif on which the packet is arriving
* @af: Address family, AF_INET or AF_INET6
* @saddr: Source address
@@ -2109,7 +2111,7 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
/* New connection from tap */
if (!flow) {
if (opts && th->syn && !th->ack)
- tcp_conn_from_tap(c, af, saddr, daddr, th,
+ tcp_conn_from_tap(c, qpair, af, saddr, daddr, th,
opts, optlen, now);
else
tcp_rst_no_conn(c, qpair, af, saddr, daddr, flow_lbl, th,
@@ -2121,6 +2123,9 @@ int tcp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
ASSERT(pif_at_sidx(sidx) == PIF_TAP);
conn = &flow->tcp;
+ /* update queue pair */
+ FLOW_SETQP(flow, qpair);
+
flow_trace(conn, "packet length %zu from tap", l4len);
if (th->rst) {
diff --git a/tcp_vu.c b/tcp_vu.c
index 1c81ce376dad..1044491d404c 100644
--- a/tcp_vu.c
+++ b/tcp_vu.c
@@ -71,14 +71,15 @@ static size_t tcp_vu_hdrlen(bool v6)
int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags)
{
struct vu_dev *vdev = c->vdev;
- struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
- size_t optlen, hdrlen;
+ int rx_queue = FLOW_QP(conn) * 2;
+ struct vu_virtq *vq = &vdev->vq[rx_queue];
struct vu_virtq_element flags_elem[2];
struct ipv6hdr *ip6h = NULL;
struct iphdr *ip4h = NULL;
struct iovec flags_iov[2];
struct tcp_syn_opts *opts;
struct iov_tail payload;
+ size_t optlen, hdrlen;
struct tcphdr *th;
struct ethhdr *eh;
uint32_t seq;
@@ -349,7 +350,8 @@ int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
{
uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
struct vu_dev *vdev = c->vdev;
- struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+ int rx_queue = FLOW_QP(conn) * 2;
+ struct vu_virtq *vq = &vdev->vq[rx_queue];
ssize_t len, previous_dlen;
int i, iov_cnt, head_cnt;
size_t hdrlen, fillsize;
diff --git a/udp.c b/udp.c
index cbc2d7055647..e83daaaaf0d5 100644
--- a/udp.c
+++ b/udp.c
@@ -636,12 +636,14 @@ static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
if (hdr->cmsg_level == IPPROTO_IP &&
(o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) {
dlen = MIN(dlen, ICMP4_MAX_DLEN);
- udp_send_tap_icmp4(c, 0, ee, toside, *o4, data, dlen);
+ udp_send_tap_icmp4(c, FLOW_QP(uflow), ee, toside,
+ *o4, data, dlen);
return 1;
}
if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) {
- udp_send_tap_icmp6(c, 0, ee, toside, &otap.a6, data, dlen,
+ udp_send_tap_icmp6(c, FLOW_QP(uflow), ee,
+ toside, &otap.a6, data, dlen,
FLOW_IDX(uflow));
return 1;
}
@@ -970,21 +972,22 @@ fail:
/**
* udp_tap_handler() - Handle packets from tap
- * @c: Execution context
- * @pif: pif on which the packet is arriving
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address
- * @daddr: Destination address
- * @ttl: TTL or hop limit for packets to be sent in this call
- * @p: Pool of UDP packets, with UDP headers
- * @idx: Index of first packet to process
- * @now: Current timestamp
+ * @c: Execution context
+ * @qpair: Queue pair
+ * @pif: pif on which the packet is arriving
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address
+ * @daddr: Destination address
+ * @ttl: TTL or hop limit for packets to be sent in this call
+ * @p: Pool of UDP packets, with UDP headers
+ * @idx: Index of first packet to process
+ * @now: Current timestamp
*
* Return: count of consumed packets
*
* #syscalls sendmmsg
*/
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+int udp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
uint8_t ttl, const struct pool *p, int idx,
const struct timespec *now)
@@ -1018,7 +1021,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
src = ntohs(uh->source);
dst = ntohs(uh->dest);
- tosidx = udp_flow_from_tap(c, pif, af, saddr, daddr, src, dst, now);
+ tosidx = udp_flow_from_tap(c, qpair, pif, af, saddr, daddr, src, dst, now);
if (!(uflow = udp_at_sidx(tosidx))) {
char sstr[INET6_ADDRSTRLEN], dstr[INET6_ADDRSTRLEN];
diff --git a/udp.h b/udp.h
index 7d3cd59d9a42..b2f2dc8b5ac4 100644
--- a/udp.h
+++ b/udp.h
@@ -11,7 +11,7 @@ void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref,
uint32_t events, const struct timespec *now);
void udp_sock_handler(const struct ctx *c, union epoll_ref ref, uint32_t events,
const struct timespec *now);
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+int udp_tap_handler(const struct ctx *c, unsigned int qpair, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
uint8_t ttl, const struct pool *p, int idx,
const struct timespec *now);
diff --git a/udp_flow.c b/udp_flow.c
index 8907f2f72741..35014c3692a9 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -266,17 +266,19 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
/**
* udp_flow_from_tap() - Find or create UDP flow for tap packets
* @c: Execution context
+ * @qpair: Queue pair for the flow
* @pif: pif on which the packet is arriving
* @af: Address family, AF_INET or AF_INET6
* @saddr: Source address on guest side
* @daddr: Destination address guest side
* @srcport: Source port on guest side
* @dstport: Destination port on guest side
+ * @now: Current timestamp
*
* Return: sidx for the destination side of the flow for this packet, or
* FLOW_SIDX_NONE if we couldn't find or create a flow.
*/
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
in_port_t srcport, in_port_t dstport,
@@ -293,6 +295,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
srcport, dstport);
if ((uflow = udp_at_sidx(sidx))) {
uflow->ts = now->tv_sec;
+ /* update qpair */
+ FLOW_SETQP(uflow, qpair);
return flow_sidx_opposite(sidx);
}
@@ -316,6 +320,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
return FLOW_SIDX_NONE;
}
+ FLOW_SETQP(flow, qpair);
+
return udp_flow_new(c, flow, now);
}
diff --git a/udp_flow.h b/udp_flow.h
index 4c528e95ca66..03e6ecdcbaf2 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -36,7 +36,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
const union inany_addr *dst, in_port_t port,
const union sockaddr_inany *s_in,
const struct timespec *now);
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, unsigned int qpair,
uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
in_port_t srcport, in_port_t dstport,
diff --git a/udp_vu.c b/udp_vu.c
index 099677f914e7..f3cf97393d0a 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -202,9 +202,11 @@ static void udp_vu_csum(const struct flowside *toside, int iov_used)
void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
{
const struct flowside *toside = flowside_at_sidx(tosidx);
+ const struct udp_flow *uflow = udp_at_sidx(tosidx);
bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
struct vu_dev *vdev = c->vdev;
- struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+ int rx_queue = FLOW_QP(uflow) * 2;
+ struct vu_virtq *vq = &vdev->vq[rx_queue];
int i;
for (i = 0; i < n; i++) {
--
2.51.1