Packets are now routed to the correct RX queue based on which TX queue
they arrived on, rather than always using queue 0.
Note: Flows initiated from the host (via sockets, udp_flow_from_sock())
currently default to queue 0, as they don't have an associated incoming
queue.
Signed-off-by: Laurent Vivier
---
flow.c | 32 ++++++++++++++++++++++
flow.h | 10 +++++++
icmp.c | 23 +++++++++-------
icmp.h | 2 +-
tap.c | 77 +++++++++++++++++++++++++++------------------------
tap.h | 5 ++--
tcp.c | 79 +++++++++++++++++++++++++++++------------------------
tcp.h | 2 +-
tcp_vu.c | 8 ++++--
udp.c | 33 ++++++++++++----------
udp.h | 12 ++++----
udp_flow.c | 8 +++++-
udp_flow.h | 2 +-
udp_vu.c | 4 ++-
vu_common.c | 4 +--
15 files changed, 187 insertions(+), 114 deletions(-)
diff --git a/flow.c b/flow.c
index 278a9cf0ac6d..8e9d7e5e1847 100644
--- a/flow.c
+++ b/flow.c
@@ -405,6 +405,37 @@ void flow_epollid_register(int epollid, int epollfd)
epoll_id_to_fd[epollid] = epollfd;
}
+/**
+ * flow_rx_virtqueue() - Get RX (receive) queue number for a flow
+ * @f: Flow to query (may be NULL)
+ *
+ * Return: RX queue number for the flow, or 0 if flow is NULL or has no
+ * valid queue assignment
+ */
+int flow_rx_virtqueue(const struct flow_common *f)
+{
+ if (f == NULL || f->queueid == FLOW_QUEUEID_INVALID)
+ return 0;
+ return f->queueid << 1;
+}
+
+/**
+ * flow_queue_set() - Set queue pair assignment for a flow
+ * @f: Flow to update
+ * @queueid: Queue pair number to assign (even number, RX queue; TX is RX+1)
+ */
+void flow_queue_set(struct flow_common *f, int queueid)
+{
+ queueid >>= 1;
+
+ ASSERT(queueid < FLOW_QUEUEID_MAX);
+
+ flow_trace((union flow *)f, "updating queue from %d to %d",
+ f->queueid, queueid);
+
+ f->queueid = queueid;
+}
+
/**
* flow_initiate_() - Move flow to INI, setting pif[INISIDE]
* @flow: Flow to change state
@@ -609,6 +640,7 @@ union flow *flow_alloc(void)
flow_new_entry = flow;
memset(flow, 0, sizeof(*flow));
flow_epollid_clear(&flow->f);
+ flow->f.queueid = FLOW_QUEUEID_INVALID;
flow_set_state(&flow->f, FLOW_STATE_NEW);
return flow;
diff --git a/flow.h b/flow.h
index b43b0b1dd7f2..44ab4ae8fd6a 100644
--- a/flow.h
+++ b/flow.h
@@ -179,6 +179,8 @@ int flowside_connect(const struct ctx *c, int s,
* @side[]: Information for each side of the flow
* @tap_omac: MAC address of remote endpoint as seen from the guest
* @epollid: epollfd identifier, or EPOLLFD_ID_INVALID
+ * @queueid: Queue pair number assigned to this flow
+ * (FLOW_QUEUEID_INVALID if not assigned)
*/
struct flow_common {
#ifdef __GNUC__
@@ -199,6 +201,8 @@ struct flow_common {
#define EPOLLFD_ID_BITS 8
unsigned int epollid:EPOLLFD_ID_BITS;
+#define FLOW_QUEUEID_BITS 5
+ unsigned int queueid:FLOW_QUEUEID_BITS;
};
#define EPOLLFD_ID_DEFAULT 0
@@ -206,6 +210,10 @@ struct flow_common {
#define EPOLLFD_ID_MAX (EPOLLFD_ID_SIZE - 1)
#define EPOLLFD_ID_INVALID EPOLLFD_ID_MAX
+#define FLOW_QUEUEID_SIZE (1 << FLOW_QUEUEID_BITS)
+#define FLOW_QUEUEID_MAX (FLOW_QUEUEID_SIZE - 1)
+#define FLOW_QUEUEID_INVALID FLOW_QUEUEID_MAX
+
#define FLOW_INDEX_BITS 17 /* 128k - 1 */
#define FLOW_MAX MAX_FROM_BITS(FLOW_INDEX_BITS)
@@ -266,6 +274,8 @@ int flow_epollfd(const struct flow_common *f);
void flow_epollid_set(struct flow_common *f, int epollid);
void flow_epollid_clear(struct flow_common *f);
void flow_epollid_register(int epollid, int epollfd);
+int flow_rx_virtqueue(const struct flow_common *f);
+void flow_queue_set(struct flow_common *f, int queueid);
void flow_defer_handler(const struct ctx *c, const struct timespec *now);
int flow_migrate_source_early(struct ctx *c, const struct migrate_stage *stage,
int fd);
diff --git a/icmp.c b/icmp.c
index d58499c3bf5c..80e8753072fa 100644
--- a/icmp.c
+++ b/icmp.c
@@ -132,13 +132,13 @@ void icmp_sock_handler(const struct ctx *c, union epoll_ref ref)
const struct in_addr *daddr = inany_v4(&ini->eaddr);
ASSERT(saddr && daddr); /* Must have IPv4 addresses */
- tap_icmp4_send(c, VHOST_USER_RX_QUEUE, *saddr, *daddr, buf,
+ tap_icmp4_send(c, flow_rx_virtqueue(&pingf->f), *saddr, *daddr, buf,
pingf->f.tap_omac, n);
} else if (pingf->f.type == FLOW_PING6) {
const struct in6_addr *saddr = &ini->oaddr.a6;
const struct in6_addr *daddr = &ini->eaddr.a6;
- tap_icmp6_send(c, VHOST_USER_RX_QUEUE, saddr, daddr, buf,
+ tap_icmp6_send(c, flow_rx_virtqueue(&pingf->f), saddr, daddr, buf,
pingf->f.tap_omac, n);
}
return;
@@ -238,17 +238,18 @@ cancel:
/**
* icmp_tap_handler() - Handle packets from tap
- * @c: Execution context
- * @pif: pif on which the packet is arriving
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address
- * @daddr: Destination address
- * @data: Single packet with ICMP/ICMPv6 header
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @pif: pif on which the packet is arriving
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address
+ * @daddr: Destination address
+ * @data: Single packet with ICMP/ICMPv6 header
+ * @now: Current timestamp
*
* Return: count of consumed packets (always 1, even if malformed)
*/
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
+int icmp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
struct iov_tail *data, const struct timespec *now)
{
@@ -309,6 +310,8 @@ int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
else if (!(pingf = icmp_ping_new(c, af, id, saddr, daddr)))
return 1;
+ flow_queue_set(&pingf->f, incoming_queue);
+
tgt = &pingf->f.side[TGTSIDE];
ASSERT(flow_proto[pingf->f.type] == proto);
diff --git a/icmp.h b/icmp.h
index 1a0e6205f087..6d6d6358bb33 100644
--- a/icmp.h
+++ b/icmp.h
@@ -10,7 +10,7 @@ struct ctx;
struct icmp_ping_flow;
void icmp_sock_handler(const struct ctx *c, union epoll_ref ref);
-int icmp_tap_handler(const struct ctx *c, uint8_t pif, sa_family_t af,
+int icmp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
struct iov_tail *data, const struct timespec *now);
void icmp_init(void);
diff --git a/tap.c b/tap.c
index 1308d49242e8..9a4399d947a3 100644
--- a/tap.c
+++ b/tap.c
@@ -702,15 +702,17 @@ static bool tap4_is_fragment(const struct iphdr *iph,
/**
* tap4_handler() - IPv4 and ARP packet handler for tap file descriptor
- * @c: Execution context
- * @in: Ingress packet pool, packets with Ethernet headers
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @in: Ingress packet pool, packets with Ethernet headers
+ * @now: Current timestamp
*
* Return: count of packets consumed by handlers
*/
-static int tap4_handler(struct ctx *c, const struct pool *in,
- const struct timespec *now)
+static int tap4_handler(struct ctx *c, int incoming_queue,
+ const struct pool *in, const struct timespec *now)
{
+ int outgoing_queue = incoming_queue & ~1;
unsigned int i, j, seq_count;
struct tap4_l4_t *seq;
@@ -736,7 +738,7 @@ resume:
if (!eh)
continue;
if (ntohs(eh->h_proto) == ETH_P_ARP) {
- arp(c, VHOST_USER_RX_QUEUE, &data);
+ arp(c, outgoing_queue, &data);
continue;
}
@@ -783,7 +785,7 @@ resume:
tap_packet_debug(iph, NULL, NULL, 0, NULL, 1);
- icmp_tap_handler(c, PIF_TAP, AF_INET,
+ icmp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
&iph->saddr, &iph->daddr,
&data, now);
continue;
@@ -797,7 +799,7 @@ resume:
struct iov_tail eh_data;
packet_get(in, i, &eh_data);
- if (dhcp(c, VHOST_USER_RX_QUEUE, &eh_data))
+ if (dhcp(c, outgoing_queue, &eh_data))
continue;
}
@@ -860,14 +862,14 @@ append:
if (c->no_tcp)
continue;
for (k = 0; k < p->count; )
- k += tcp_tap_handler(c, PIF_TAP, AF_INET,
+ k += tcp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
&seq->saddr, &seq->daddr,
0, p, k, now);
} else if (seq->protocol == IPPROTO_UDP) {
if (c->no_udp)
continue;
for (k = 0; k < p->count; )
- k += udp_tap_handler(c, PIF_TAP, AF_INET,
+ k += udp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET,
&seq->saddr, &seq->daddr,
seq->ttl, p, k, now);
}
@@ -881,15 +883,17 @@ append:
/**
* tap6_handler() - IPv6 packet handler for tap file descriptor
- * @c: Execution context
- * @in: Ingress packet pool, packets with Ethernet headers
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @in: Ingress packet pool, packets with Ethernet headers
+ * @now: Current timestamp
*
* Return: count of packets consumed by handlers
*/
-static int tap6_handler(struct ctx *c, const struct pool *in,
- const struct timespec *now)
+static int tap6_handler(struct ctx *c, int incoming_queue,
+ const struct pool *in, const struct timespec *now)
{
+ int outgoing_queue = incoming_queue & ~1;
unsigned int i, j, seq_count = 0;
struct tap6_l4_t *seq;
@@ -965,12 +969,12 @@ resume:
continue;
ndp_data = data;
- if (ndp(c, VHOST_USER_RX_QUEUE, saddr, &ndp_data))
+ if (ndp(c, outgoing_queue, saddr, &ndp_data))
continue;
tap_packet_debug(NULL, ip6h, NULL, proto, NULL, 1);
- icmp_tap_handler(c, PIF_TAP, AF_INET6,
+ icmp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
saddr, daddr, &data, now);
continue;
}
@@ -984,8 +988,7 @@ resume:
if (proto == IPPROTO_UDP) {
struct iov_tail uh_data = data;
- if (dhcpv6(c, VHOST_USER_RX_QUEUE, &uh_data, saddr,
- daddr))
+ if (dhcpv6(c, outgoing_queue, &uh_data, saddr, daddr))
continue;
}
@@ -1053,14 +1056,14 @@ append:
if (c->no_tcp)
continue;
for (k = 0; k < p->count; )
- k += tcp_tap_handler(c, PIF_TAP, AF_INET6,
+ k += tcp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
&seq->saddr, &seq->daddr,
seq->flow_lbl, p, k, now);
} else if (seq->protocol == IPPROTO_UDP) {
if (c->no_udp)
continue;
for (k = 0; k < p->count; )
- k += udp_tap_handler(c, PIF_TAP, AF_INET6,
+ k += udp_tap_handler(c, incoming_queue, PIF_TAP, AF_INET6,
&seq->saddr, &seq->daddr,
seq->hop_limit, p, k, now);
}
@@ -1083,22 +1086,24 @@ void tap_flush_pools(void)
/**
* tap_handler() - IPv4/IPv6 and ARP packet handler for tap file descriptor
- * @c: Execution context
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @now: Current timestamp
*/
-void tap_handler(struct ctx *c, const struct timespec *now)
+void tap_handler(struct ctx *c, int incoming_queue, const struct timespec *now)
{
- tap4_handler(c, pool_tap4, now);
- tap6_handler(c, pool_tap6, now);
+ tap4_handler(c, incoming_queue, pool_tap4, now);
+ tap6_handler(c, incoming_queue, pool_tap6, now);
}
/**
* tap_add_packet() - Queue/capture packet, update notion of guest MAC address
- * @c: Execution context
- * @data: Packet to add to the pool
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @data: Packet to add to the pool
+ * @now: Current timestamp
*/
-void tap_add_packet(struct ctx *c, struct iov_tail *data,
+void tap_add_packet(struct ctx *c, int incoming_queue, struct iov_tail *data,
const struct timespec *now)
{
struct ethhdr eh_storage;
@@ -1123,14 +1128,14 @@ void tap_add_packet(struct ctx *c, struct iov_tail *data,
case ETH_P_ARP:
case ETH_P_IP:
if (!pool_can_fit(pool_tap4, data)) {
- tap4_handler(c, pool_tap4, now);
+ tap4_handler(c, incoming_queue, pool_tap4, now);
pool_flush(pool_tap4);
}
packet_add(pool_tap4, data);
break;
case ETH_P_IPV6:
if (!pool_can_fit(pool_tap6, data)) {
- tap6_handler(c, pool_tap6, now);
+ tap6_handler(c, incoming_queue, pool_tap6, now);
pool_flush(pool_tap6);
}
packet_add(pool_tap6, data);
@@ -1217,7 +1222,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
n -= sizeof(uint32_t);
data = IOV_TAIL_FROM_BUF(p, l2len, 0);
- tap_add_packet(c, &data, now);
+ tap_add_packet(c, 0, &data, now);
p += l2len;
n -= l2len;
@@ -1226,7 +1231,7 @@ static void tap_passt_input(struct ctx *c, const struct timespec *now)
partial_len = n;
partial_frame = p;
- tap_handler(c, now);
+ tap_handler(c, 0, now);
}
/**
@@ -1285,10 +1290,10 @@ static void tap_pasta_input(struct ctx *c, const struct timespec *now)
continue;
data = IOV_TAIL_FROM_BUF(pkt_buf + n, len, 0);
- tap_add_packet(c, &data, now);
+ tap_add_packet(c, 0, &data, now);
}
- tap_handler(c, now);
+ tap_handler(c, 0, now);
}
/**
diff --git a/tap.h b/tap.h
index 76403a43edbc..fe9455ffcf4b 100644
--- a/tap.h
+++ b/tap.h
@@ -119,7 +119,8 @@ int tap_sock_unix_open(char *sock_path);
void tap_sock_reset(struct ctx *c);
void tap_backend_init(struct ctx *c);
void tap_flush_pools(void);
-void tap_handler(struct ctx *c, const struct timespec *now);
-void tap_add_packet(struct ctx *c, struct iov_tail *data,
+void tap_handler(struct ctx *c, int incoming_queue,
+ const struct timespec *now);
+void tap_add_packet(struct ctx *c, int incoming_queue, struct iov_tail *data,
const struct timespec *now);
#endif /* TAP_H */
diff --git a/tcp.c b/tcp.c
index 5ce34baa8a5a..78494a2dc69f 100644
--- a/tcp.c
+++ b/tcp.c
@@ -1496,21 +1496,23 @@ static void tcp_bind_outbound(const struct ctx *c,
/**
* tcp_conn_from_tap() - Handle connection request (SYN segment) from tap
- * @c: Execution context
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address, pointer to in_addr or in6_addr
- * @daddr: Destination address, pointer to in_addr or in6_addr
- * @th: TCP header from tap: caller MUST ensure it's there
- * @opts: Pointer to start of options
- * @optlen: Bytes in options: caller MUST ensure available length
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: TX queue number for the flow
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address, pointer to in_addr or in6_addr
+ * @daddr: Destination address, pointer to in_addr or in6_addr
+ * @th: TCP header from tap: caller MUST ensure it's there
+ * @opts: Pointer to start of options
+ * @optlen: Bytes in options: caller MUST ensure available length
+ * @now: Current timestamp
*
* #syscalls:vu getsockname
*/
-static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
- const void *saddr, const void *daddr,
- const struct tcphdr *th, const char *opts,
- size_t optlen, const struct timespec *now)
+static void tcp_conn_from_tap(const struct ctx *c, int incoming_queue,
+ sa_family_t af, const void *saddr,
+ const void *daddr, const struct tcphdr *th,
+ const char *opts, size_t optlen,
+ const struct timespec *now)
{
in_port_t srcport = ntohs(th->source);
in_port_t dstport = ntohs(th->dest);
@@ -1622,6 +1624,7 @@ static void tcp_conn_from_tap(const struct ctx *c, sa_family_t af,
conn_event(c, conn, TAP_SYN_ACK_SENT);
}
+ flow_queue_set(&conn->f, incoming_queue);
tcp_epoll_ctl(c, conn);
if (c->mode == MODE_VU) { /* To rebind to same oport after migration */
@@ -1983,16 +1986,16 @@ static void tcp_conn_from_sock_finish(const struct ctx *c,
/**
* tcp_rst_no_conn() - Send RST in response to a packet with no connection
- * @c: Execution context
- * @queue: Queue to use to send the reply
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address of the packet we're responding to
- * @daddr: Destination address of the packet we're responding to
- * @flow_lbl: IPv6 flow label (ignored for IPv4)
- * @th: TCP header of the packet we're responding to
- * @l4len: Packet length, including TCP header
- */
-static void tcp_rst_no_conn(const struct ctx *c, int queue, int af,
+ * @c: Execution context
+ * @outgoing_queue: Queue to use to send the reply
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address of the packet we're responding to
+ * @daddr: Destination address of the packet we're responding to
+ * @flow_lbl: IPv6 flow label (ignored for IPv4)
+ * @th: TCP header of the packet we're responding to
+ * @l4len: Packet length, including TCP header
+ */
+static void tcp_rst_no_conn(const struct ctx *c, int outgoing_queue, int af,
const void *saddr, const void *daddr,
uint32_t flow_lbl,
const struct tcphdr *th, size_t l4len)
@@ -2050,24 +2053,25 @@ static void tcp_rst_no_conn(const struct ctx *c, int queue, int af,
tcp_update_csum(psum, rsth, &payload);
rst_l2len = ((char *)rsth - buf) + sizeof(*rsth);
- tap_send_single(c, queue, buf, rst_l2len);
+ tap_send_single(c, outgoing_queue, buf, rst_l2len);
}
/**
* tcp_tap_handler() - Handle packets from tap and state transitions
- * @c: Execution context
- * @pif: pif on which the packet is arriving
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address
- * @daddr: Destination address
- * @flow_lbl: IPv6 flow label (ignored for IPv4)
- * @p: Pool of TCP packets, with TCP headers
- * @idx: Index of first packet in pool to process
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @pif: pif on which the packet is arriving
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address
+ * @daddr: Destination address
+ * @flow_lbl: IPv6 flow label (ignored for IPv4)
+ * @p: Pool of TCP packets, with TCP headers
+ * @idx: Index of first packet in pool to process
+ * @now: Current timestamp
*
* Return: count of consumed packets
*/
-int tcp_tap_handler(const struct ctx *c, uint8_t pif,
+int tcp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
uint32_t flow_lbl, const struct pool *p, int idx,
const struct timespec *now)
@@ -2107,11 +2111,11 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif,
/* New connection from tap */
if (!flow) {
if (opts && th->syn && !th->ack)
- tcp_conn_from_tap(c, af, saddr, daddr, th,
+ tcp_conn_from_tap(c, incoming_queue, af, saddr, daddr, th,
opts, optlen, now);
else
- tcp_rst_no_conn(c, VHOST_USER_RX_QUEUE, af, saddr,
- daddr, flow_lbl, th, l4len);
+ tcp_rst_no_conn(c, incoming_queue & ~1, af, saddr, daddr,
+ flow_lbl, th, l4len);
return 1;
}
@@ -2119,6 +2123,9 @@ int tcp_tap_handler(const struct ctx *c, uint8_t pif,
ASSERT(pif_at_sidx(sidx) == PIF_TAP);
conn = &flow->tcp;
+ /* update queue */
+ flow_queue_set(&flow->f, incoming_queue);
+
flow_trace(conn, "packet length %zu from tap", l4len);
if (th->rst) {
diff --git a/tcp.h b/tcp.h
index 320683ce5679..cddd36cadc97 100644
--- a/tcp.h
+++ b/tcp.h
@@ -15,7 +15,7 @@ void tcp_listen_handler(const struct ctx *c, union epoll_ref ref,
const struct timespec *now);
void tcp_sock_handler(const struct ctx *c,
union epoll_ref ref, uint32_t events);
-int tcp_tap_handler(const struct ctx *c, uint8_t pif,
+int tcp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
uint32_t flow_lbl, const struct pool *p, int idx,
const struct timespec *now);
diff --git a/tcp_vu.c b/tcp_vu.c
index 1c81ce376dad..40f552087bc5 100644
--- a/tcp_vu.c
+++ b/tcp_vu.c
@@ -70,15 +70,16 @@ static size_t tcp_vu_hdrlen(bool v6)
*/
int tcp_vu_send_flag(const struct ctx *c, struct tcp_tap_conn *conn, int flags)
{
+ int rx_queue = flow_rx_virtqueue(&conn->f);
struct vu_dev *vdev = c->vdev;
- struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
- size_t optlen, hdrlen;
+ struct vu_virtq *vq = &vdev->vq[rx_queue];
struct vu_virtq_element flags_elem[2];
struct ipv6hdr *ip6h = NULL;
struct iphdr *ip4h = NULL;
struct iovec flags_iov[2];
struct tcp_syn_opts *opts;
struct iov_tail payload;
+ size_t optlen, hdrlen;
struct tcphdr *th;
struct ethhdr *eh;
uint32_t seq;
@@ -348,8 +349,9 @@ static void tcp_vu_prepare(const struct ctx *c, struct tcp_tap_conn *conn,
int tcp_vu_data_from_sock(const struct ctx *c, struct tcp_tap_conn *conn)
{
uint32_t wnd_scaled = conn->wnd_from_tap << conn->ws_from_tap;
+ int rx_queue = flow_rx_virtqueue(&conn->f);
struct vu_dev *vdev = c->vdev;
- struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+ struct vu_virtq *vq = &vdev->vq[rx_queue];
ssize_t len, previous_dlen;
int i, iov_cnt, head_cnt;
size_t hdrlen, fillsize;
diff --git a/udp.c b/udp.c
index 868ffebb5802..a0eb719888cd 100644
--- a/udp.c
+++ b/udp.c
@@ -636,14 +636,15 @@ static int udp_sock_recverr(const struct ctx *c, int s, flow_sidx_t sidx,
if (hdr->cmsg_level == IPPROTO_IP &&
(o4 = inany_v4(&otap)) && inany_v4(&toside->eaddr)) {
dlen = MIN(dlen, ICMP4_MAX_DLEN);
- udp_send_tap_icmp4(c, VHOST_USER_RX_QUEUE, ee, toside, *o4,
- data, dlen);
+ udp_send_tap_icmp4(c, flow_rx_virtqueue(&uflow->f), ee, toside,
+ *o4, data, dlen);
return 1;
}
if (hdr->cmsg_level == IPPROTO_IPV6 && !inany_v4(&toside->eaddr)) {
- udp_send_tap_icmp6(c, VHOST_USER_RX_QUEUE, ee, toside,
- &otap.a6, data, dlen, FLOW_IDX(uflow));
+ udp_send_tap_icmp6(c, flow_rx_virtqueue(&uflow->f), ee,
+ toside, &otap.a6, data, dlen,
+ FLOW_IDX(uflow));
return 1;
}
@@ -971,25 +972,27 @@ fail:
/**
* udp_tap_handler() - Handle packets from tap
- * @c: Execution context
- * @pif: pif on which the packet is arriving
- * @af: Address family, AF_INET or AF_INET6
- * @saddr: Source address
- * @daddr: Destination address
- * @ttl: TTL or hop limit for packets to be sent in this call
- * @p: Pool of UDP packets, with UDP headers
- * @idx: Index of first packet to process
- * @now: Current timestamp
+ * @c: Execution context
+ * @incoming_queue: Incoming queue number
+ * @pif: pif on which the packet is arriving
+ * @af: Address family, AF_INET or AF_INET6
+ * @saddr: Source address
+ * @daddr: Destination address
+ * @ttl: TTL or hop limit for packets to be sent in this call
+ * @p: Pool of UDP packets, with UDP headers
+ * @idx: Index of first packet to process
+ * @now: Current timestamp
*
* Return: count of consumed packets
*
* #syscalls sendmmsg
*/
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+int udp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
uint8_t ttl, const struct pool *p, int idx,
const struct timespec *now)
{
+ int outgoing_queue = incoming_queue & ~1;
const struct flowside *toside;
struct mmsghdr mm[UIO_MAXIOV];
union sockaddr_inany to_sa;
@@ -1019,7 +1022,7 @@ int udp_tap_handler(const struct ctx *c, uint8_t pif,
src = ntohs(uh->source);
dst = ntohs(uh->dest);
- tosidx = udp_flow_from_tap(c, pif, af, saddr, daddr, src, dst, now);
+ tosidx = udp_flow_from_tap(c, outgoing_queue, pif, af, saddr, daddr, src, dst, now);
if (!(uflow = udp_at_sidx(tosidx))) {
char sstr[INET6_ADDRSTRLEN], dstr[INET6_ADDRSTRLEN];
diff --git a/udp.h b/udp.h
index f1d83f380b3f..8ba4ccfe646a 100644
--- a/udp.h
+++ b/udp.h
@@ -7,11 +7,13 @@
#define UDP_H
void udp_portmap_clear(void);
-void udp_listen_sock_handler(const struct ctx *c, union epoll_ref ref,
- uint32_t events, const struct timespec *now);
-void udp_sock_handler(const struct ctx *c, union epoll_ref ref,
- uint32_t events, const struct timespec *now);
-int udp_tap_handler(const struct ctx *c, uint8_t pif,
+void udp_listen_sock_handler(const struct ctx *c,
+ union epoll_ref ref, uint32_t events,
+ const struct timespec *now);
+void udp_sock_handler(const struct ctx *c,
+ union epoll_ref ref, uint32_t events,
+ const struct timespec *now);
+int udp_tap_handler(const struct ctx *c, int incoming_queue, uint8_t pif,
sa_family_t af, const void *saddr, const void *daddr,
uint8_t ttl, const struct pool *p, int idx,
const struct timespec *now);
diff --git a/udp_flow.c b/udp_flow.c
index 8907f2f72741..b4a709b8d976 100644
--- a/udp_flow.c
+++ b/udp_flow.c
@@ -266,17 +266,19 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
/**
* udp_flow_from_tap() - Find or create UDP flow for tap packets
* @c: Execution context
+ * @queue: RX queue number for the flow
* @pif: pif on which the packet is arriving
* @af: Address family, AF_INET or AF_INET6
* @saddr: Source address on guest side
* @daddr: Destination address guest side
* @srcport: Source port on guest side
* @dstport: Destination port on guest side
+ * @now: Current timestamp
*
* Return: sidx for the destination side of the flow for this packet, or
* FLOW_SIDX_NONE if we couldn't find or create a flow.
*/
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, int queue,
uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
in_port_t srcport, in_port_t dstport,
@@ -293,6 +295,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
srcport, dstport);
if ((uflow = udp_at_sidx(sidx))) {
uflow->ts = now->tv_sec;
+ /* update queue */
+ flow_queue_set(&uflow->f, queue);
return flow_sidx_opposite(sidx);
}
@@ -316,6 +320,8 @@ flow_sidx_t udp_flow_from_tap(const struct ctx *c,
return FLOW_SIDX_NONE;
}
+ flow_queue_set(&flow->f, queue);
+
return udp_flow_new(c, flow, now);
}
diff --git a/udp_flow.h b/udp_flow.h
index 4c528e95ca66..4a057a9d44a8 100644
--- a/udp_flow.h
+++ b/udp_flow.h
@@ -36,7 +36,7 @@ flow_sidx_t udp_flow_from_sock(const struct ctx *c, uint8_t pif,
const union inany_addr *dst, in_port_t port,
const union sockaddr_inany *s_in,
const struct timespec *now);
-flow_sidx_t udp_flow_from_tap(const struct ctx *c,
+flow_sidx_t udp_flow_from_tap(const struct ctx *c, int queue,
uint8_t pif, sa_family_t af,
const void *saddr, const void *daddr,
in_port_t srcport, in_port_t dstport,
diff --git a/udp_vu.c b/udp_vu.c
index 099677f914e7..cd2c9c516d44 100644
--- a/udp_vu.c
+++ b/udp_vu.c
@@ -202,9 +202,11 @@ static void udp_vu_csum(const struct flowside *toside, int iov_used)
void udp_vu_sock_to_tap(const struct ctx *c, int s, int n, flow_sidx_t tosidx)
{
const struct flowside *toside = flowside_at_sidx(tosidx);
+ const struct udp_flow *uflow = udp_at_sidx(tosidx);
bool v6 = !(inany_v4(&toside->eaddr) && inany_v4(&toside->oaddr));
+ int rx_queue = flow_rx_virtqueue(&uflow->f);
struct vu_dev *vdev = c->vdev;
- struct vu_virtq *vq = &vdev->vq[VHOST_USER_RX_QUEUE];
+ struct vu_virtq *vq = &vdev->vq[rx_queue];
int i;
for (i = 0; i < n; i++) {
diff --git a/vu_common.c b/vu_common.c
index 8904403e66af..56f26317b192 100644
--- a/vu_common.c
+++ b/vu_common.c
@@ -196,11 +196,11 @@ static void vu_handle_tx(struct vu_dev *vdev, int index,
data = IOV_TAIL(elem[count].out_sg, elem[count].out_num, 0);
if (IOV_DROP_HEADER(&data, struct virtio_net_hdr_mrg_rxbuf))
- tap_add_packet(vdev->context, &data, now);
+ tap_add_packet(vdev->context, index, &data, now);
count++;
}
- tap_handler(vdev->context, now);
+ tap_handler(vdev->context, index, now);
if (count) {
int i;
--
2.51.0