[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH net-next] xen-netback: improve guest-receive-side flow control



The flow control code relies on a double pass of tke skb, firstly to count
the number of ring slots needed to supply sufficient grant references, and
another to actually consume those references and build the grant copy
operations. It transpires that, in some scenarios, the initial count and the
number of references consumed differs and, after this happens a number of
times, flow control is completely broken.
This patch simplifies things by making a very quick static analysis of the
minimal number or ring slots needed for an skb and then only stopping the
queue if that minimal number of slots is not available. The worker thread
does a worst-case analysis and only processes what will definitely fit,
leaving remaining skbs on the internal queue. This patch also gets rid of the
drop that occurs if an skb won't fit (because that doesn't matter - we always
internally queue *at least* what we can put in the shared ring) and adds
some hysteresis to waking the external queue. (It only gets woken when at
least half the shared ring is available).
Without this patch I can trivially stall netback permanently by just doing
a large guest to guest file copy between two Windows Server 2008R2 VMs on a
single host.

Signed-off-by: Paul Durrant <paul.durrant@xxxxxxxxxx>
Cc: Wei Liu <wei.liu2@xxxxxxxxxx>
Cc: Ian Campbell <ian.campbell@xxxxxxxxxx>
Cc: David Vrabel <david.vrabel@xxxxxxxxxx>
---
 drivers/net/xen-netback/common.h    |   26 ++---
 drivers/net/xen-netback/interface.c |   45 ++++----
 drivers/net/xen-netback/netback.c   |  202 ++++++++++-------------------------
 3 files changed, 87 insertions(+), 186 deletions(-)

diff --git a/drivers/net/xen-netback/common.h b/drivers/net/xen-netback/common.h
index 08ae01b..d7888db 100644
--- a/drivers/net/xen-netback/common.h
+++ b/drivers/net/xen-netback/common.h
@@ -136,12 +136,7 @@ struct xenvif {
        char rx_irq_name[IFNAMSIZ+4]; /* DEVNAME-rx */
        struct xen_netif_rx_back_ring rx;
        struct sk_buff_head rx_queue;
-
-       /* Allow xenvif_start_xmit() to peek ahead in the rx request
-        * ring.  This is a prediction of what rx_req_cons will be
-        * once all queued skbs are put on the ring.
-        */
-       RING_IDX rx_req_cons_peek;
+       bool rx_event;
 
        /* Given MAX_BUFFER_OFFSET of 4096 the worst case is that each
         * head/fragment page uses 2 copy operations because it
@@ -198,8 +193,6 @@ void xenvif_xenbus_fini(void);
 
 int xenvif_schedulable(struct xenvif *vif);
 
-int xenvif_rx_ring_full(struct xenvif *vif);
-
 int xenvif_must_stop_queue(struct xenvif *vif);
 
 /* (Un)Map communication rings. */
@@ -211,21 +204,20 @@ int xenvif_map_frontend_rings(struct xenvif *vif,
 /* Check for SKBs from frontend and schedule backend processing */
 void xenvif_check_rx_xenvif(struct xenvif *vif);
 
-/* Queue an SKB for transmission to the frontend */
-void xenvif_queue_tx_skb(struct xenvif *vif, struct sk_buff *skb);
-/* Notify xenvif that ring now has space to send an skb to the frontend */
-void xenvif_notify_tx_completion(struct xenvif *vif);
-
 /* Prevent the device from generating any further traffic. */
 void xenvif_carrier_off(struct xenvif *vif);
 
-/* Returns number of ring slots required to send an skb to the frontend */
-unsigned int xenvif_count_skb_slots(struct xenvif *vif, struct sk_buff *skb);
-
 int xenvif_tx_action(struct xenvif *vif, int budget);
-void xenvif_rx_action(struct xenvif *vif);
 
 int xenvif_kthread(void *data);
+void xenvif_kick_thread(struct xenvif *vif);
+
+/* Determine whether the needed number of slots (req) are available,
+ * and set req_event if not.
+ */
+bool xenvif_rx_ring_slots_available(struct xenvif *vif, int needed);
+
+void xenvif_stop_queue(struct xenvif *vif);
 
 extern bool separate_tx_rx_irq;
 
diff --git a/drivers/net/xen-netback/interface.c 
b/drivers/net/xen-netback/interface.c
index b78ee10..dbd01c7 100644
--- a/drivers/net/xen-netback/interface.c
+++ b/drivers/net/xen-netback/interface.c
@@ -46,11 +46,6 @@ int xenvif_schedulable(struct xenvif *vif)
        return netif_running(vif->dev) && netif_carrier_ok(vif->dev);
 }
 
-static int xenvif_rx_schedulable(struct xenvif *vif)
-{
-       return xenvif_schedulable(vif) && !xenvif_rx_ring_full(vif);
-}
-
 static irqreturn_t xenvif_tx_interrupt(int irq, void *dev_id)
 {
        struct xenvif *vif = dev_id;
@@ -104,8 +99,8 @@ static irqreturn_t xenvif_rx_interrupt(int irq, void *dev_id)
 {
        struct xenvif *vif = dev_id;
 
-       if (xenvif_rx_schedulable(vif))
-               netif_wake_queue(vif->dev);
+       vif->rx_event = true;
+       xenvif_kick_thread(vif);
 
        return IRQ_HANDLED;
 }
@@ -121,6 +116,7 @@ static irqreturn_t xenvif_interrupt(int irq, void *dev_id)
 static int xenvif_start_xmit(struct sk_buff *skb, struct net_device *dev)
 {
        struct xenvif *vif = netdev_priv(dev);
+       int min_slots_needed;
 
        BUG_ON(skb->dev != dev);
 
@@ -128,17 +124,27 @@ static int xenvif_start_xmit(struct sk_buff *skb, struct 
net_device *dev)
        if (vif->task == NULL)
                goto drop;
 
-       /* Drop the packet if the target domain has no receive buffers. */
-       if (!xenvif_rx_schedulable(vif))
-               goto drop;
+       /* At best we'll need one slot for the header and one for each
+        * frag.
+        */
+       min_slots_needed = 1 + skb_shinfo(skb)->nr_frags;
 
-       /* Reserve ring slots for the worst-case number of fragments. */
-       vif->rx_req_cons_peek += xenvif_count_skb_slots(vif, skb);
+       /* If the skb is GSO then we'll also need an extra slot for the
+        * metadata.
+        */
+       if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV4 ||
+           skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6)
+               min_slots_needed++;
 
-       if (vif->can_queue && xenvif_must_stop_queue(vif))
-               netif_stop_queue(dev);
+       /* If the skb can't possibly fit in the remaining slots
+        * then turn off the queue to give the ring a chance to
+        * drain.
+        */
+       if (!xenvif_rx_ring_slots_available(vif, min_slots_needed))
+               xenvif_stop_queue(vif);
 
-       xenvif_queue_tx_skb(vif, skb);
+       skb_queue_tail(&vif->rx_queue, skb);
+       xenvif_kick_thread(vif);
 
        return NETDEV_TX_OK;
 
@@ -148,12 +154,6 @@ static int xenvif_start_xmit(struct sk_buff *skb, struct 
net_device *dev)
        return NETDEV_TX_OK;
 }
 
-void xenvif_notify_tx_completion(struct xenvif *vif)
-{
-       if (netif_queue_stopped(vif->dev) && xenvif_rx_schedulable(vif))
-               netif_wake_queue(vif->dev);
-}
-
 static struct net_device_stats *xenvif_get_stats(struct net_device *dev)
 {
        struct xenvif *vif = netdev_priv(dev);
@@ -378,6 +378,8 @@ int xenvif_connect(struct xenvif *vif, unsigned long 
tx_ring_ref,
        if (err < 0)
                goto err;
 
+       init_waitqueue_head(&vif->wq);
+
        if (tx_evtchn == rx_evtchn) {
                /* feature-split-event-channels == 0 */
                err = bind_interdomain_evtchn_to_irqhandler(
@@ -410,7 +412,6 @@ int xenvif_connect(struct xenvif *vif, unsigned long 
tx_ring_ref,
                disable_irq(vif->rx_irq);
        }
 
-       init_waitqueue_head(&vif->wq);
        vif->task = kthread_create(xenvif_kthread,
                                   (void *)vif, "%s", vif->dev->name);
        if (IS_ERR(vif->task)) {
diff --git a/drivers/net/xen-netback/netback.c 
b/drivers/net/xen-netback/netback.c
index 919b650..37bb994 100644
--- a/drivers/net/xen-netback/netback.c
+++ b/drivers/net/xen-netback/netback.c
@@ -137,36 +137,19 @@ static inline pending_ring_idx_t nr_pending_reqs(struct 
xenvif *vif)
                vif->pending_prod + vif->pending_cons;
 }
 
-static int max_required_rx_slots(struct xenvif *vif)
+bool xenvif_rx_ring_slots_available(struct xenvif *vif, int needed)
 {
-       int max = DIV_ROUND_UP(vif->dev->mtu, PAGE_SIZE);
-
-       /* XXX FIXME: RX path dependent on MAX_SKB_FRAGS */
-       if (vif->can_sg || vif->gso_mask || vif->gso_prefix_mask)
-               max += MAX_SKB_FRAGS + 1; /* extra_info + frags */
-
-       return max;
-}
-
-int xenvif_rx_ring_full(struct xenvif *vif)
-{
-       RING_IDX peek   = vif->rx_req_cons_peek;
-       RING_IDX needed = max_required_rx_slots(vif);
-
-       return ((vif->rx.sring->req_prod - peek) < needed) ||
-              ((vif->rx.rsp_prod_pvt + XEN_NETIF_RX_RING_SIZE - peek) < 
needed);
-}
+       if (vif->rx.sring->req_prod - vif->rx.req_cons > needed)
+               return true;
 
-int xenvif_must_stop_queue(struct xenvif *vif)
-{
-       if (!xenvif_rx_ring_full(vif))
-               return 0;
+       vif->rx.sring->req_event = vif->rx.req_cons + 1;
 
-       vif->rx.sring->req_event = vif->rx_req_cons_peek +
-               max_required_rx_slots(vif);
-       mb(); /* request notification /then/ check the queue */
+       /* Make the new req_event visible to the frontend before checking
+        * again.
+        */
+       mb();
 
-       return xenvif_rx_ring_full(vif);
+       return vif->rx.sring->req_prod - vif->rx.req_cons > needed;
 }
 
 /*
@@ -209,93 +192,6 @@ static bool start_new_rx_buffer(int offset, unsigned long 
size, int head)
        return false;
 }
 
-struct xenvif_count_slot_state {
-       unsigned long copy_off;
-       bool head;
-};
-
-unsigned int xenvif_count_frag_slots(struct xenvif *vif,
-                                    unsigned long offset, unsigned long size,
-                                    struct xenvif_count_slot_state *state)
-{
-       unsigned count = 0;
-
-       offset &= ~PAGE_MASK;
-
-       while (size > 0) {
-               unsigned long bytes;
-
-               bytes = PAGE_SIZE - offset;
-
-               if (bytes > size)
-                       bytes = size;
-
-               if (start_new_rx_buffer(state->copy_off, bytes, state->head)) {
-                       count++;
-                       state->copy_off = 0;
-               }
-
-               if (state->copy_off + bytes > MAX_BUFFER_OFFSET)
-                       bytes = MAX_BUFFER_OFFSET - state->copy_off;
-
-               state->copy_off += bytes;
-
-               offset += bytes;
-               size -= bytes;
-
-               if (offset == PAGE_SIZE)
-                       offset = 0;
-
-               state->head = false;
-       }
-
-       return count;
-}
-
-/*
- * Figure out how many ring slots we're going to need to send @skb to
- * the guest. This function is essentially a dry run of
- * xenvif_gop_frag_copy.
- */
-unsigned int xenvif_count_skb_slots(struct xenvif *vif, struct sk_buff *skb)
-{
-       struct xenvif_count_slot_state state;
-       unsigned int count;
-       unsigned char *data;
-       unsigned i;
-
-       state.head = true;
-       state.copy_off = 0;
-
-       /* Slot for the first (partial) page of data. */
-       count = 1;
-
-       /* Need a slot for the GSO prefix for GSO extra data? */
-       if (skb_shinfo(skb)->gso_size)
-               count++;
-
-       data = skb->data;
-       while (data < skb_tail_pointer(skb)) {
-               unsigned long offset = offset_in_page(data);
-               unsigned long size = PAGE_SIZE - offset;
-
-               if (data + size > skb_tail_pointer(skb))
-                       size = skb_tail_pointer(skb) - data;
-
-               count += xenvif_count_frag_slots(vif, offset, size, &state);
-
-               data += size;
-       }
-
-       for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
-               unsigned long size = skb_frag_size(&skb_shinfo(skb)->frags[i]);
-               unsigned long offset = skb_shinfo(skb)->frags[i].page_offset;
-
-               count += xenvif_count_frag_slots(vif, offset, size, &state);
-       }
-       return count;
-}
-
 struct netrx_pending_operations {
        unsigned copy_prod, copy_cons;
        unsigned meta_prod, meta_cons;
@@ -556,12 +452,12 @@ struct skb_cb_overlay {
        int meta_slots_used;
 };
 
-static void xenvif_kick_thread(struct xenvif *vif)
+void xenvif_kick_thread(struct xenvif *vif)
 {
        wake_up(&vif->wq);
 }
 
-void xenvif_rx_action(struct xenvif *vif)
+static void xenvif_rx_action(struct xenvif *vif)
 {
        s8 status;
        u16 flags;
@@ -570,8 +466,6 @@ void xenvif_rx_action(struct xenvif *vif)
        struct sk_buff *skb;
        LIST_HEAD(notify);
        int ret;
-       int nr_frags;
-       int count;
        unsigned long offset;
        struct skb_cb_overlay *sco;
        int need_to_notify = 0;
@@ -583,23 +477,36 @@ void xenvif_rx_action(struct xenvif *vif)
 
        skb_queue_head_init(&rxq);
 
-       count = 0;
-
        while ((skb = skb_dequeue(&vif->rx_queue)) != NULL) {
-               vif = netdev_priv(skb->dev);
-               nr_frags = skb_shinfo(skb)->nr_frags;
+               int max_slots_needed;
+               int i;
 
-               sco = (struct skb_cb_overlay *)skb->cb;
-               sco->meta_slots_used = xenvif_gop_skb(skb, &npo);
+               /* We need a cheap worse case estimate for the number of
+                * slots we'll use.
+                */
 
-               count += nr_frags + 1;
+               max_slots_needed = 2;
+               for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
+                       struct page *page;
 
-               __skb_queue_tail(&rxq, skb);
+                       page = skb_frag_page(&skb_shinfo(skb)->frags[i]);
+                       max_slots_needed += 1 << compound_order(page);
+               }
+               if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV4 ||
+                   skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6)
+                       max_slots_needed++;
 
-               /* Filled the batch queue? */
-               /* XXX FIXME: RX path dependent on MAX_SKB_FRAGS */
-               if (count + MAX_SKB_FRAGS >= XEN_NETIF_RX_RING_SIZE)
+               /* If the skb may not fit then bail out now */
+               if (!xenvif_rx_ring_slots_available(vif, max_slots_needed)) {
+                       skb_queue_head(&vif->rx_queue, skb);
                        break;
+               }
+
+               sco = (struct skb_cb_overlay *)skb->cb;
+               sco->meta_slots_used = xenvif_gop_skb(skb, &npo);
+               BUG_ON(sco->meta_slots_used > max_slots_needed);
+
+               __skb_queue_tail(&rxq, skb);
        }
 
        BUG_ON(npo.meta_prod > ARRAY_SIZE(vif->meta));
@@ -613,8 +520,6 @@ void xenvif_rx_action(struct xenvif *vif)
        while ((skb = __skb_dequeue(&rxq)) != NULL) {
                sco = (struct skb_cb_overlay *)skb->cb;
 
-               vif = netdev_priv(skb->dev);
-
                if ((1 << vif->meta[npo.meta_cons].gso_type) &
                    vif->gso_prefix_mask) {
                        resp = RING_GET_RESPONSE(&vif->rx,
@@ -680,25 +585,12 @@ void xenvif_rx_action(struct xenvif *vif)
                if (ret)
                        need_to_notify = 1;
 
-               xenvif_notify_tx_completion(vif);
-
                npo.meta_cons += sco->meta_slots_used;
                dev_kfree_skb(skb);
        }
 
        if (need_to_notify)
                notify_remote_via_irq(vif->rx_irq);
-
-       /* More work to do? */
-       if (!skb_queue_empty(&vif->rx_queue))
-               xenvif_kick_thread(vif);
-}
-
-void xenvif_queue_tx_skb(struct xenvif *vif, struct sk_buff *skb)
-{
-       skb_queue_tail(&vif->rx_queue, skb);
-
-       xenvif_kick_thread(vif);
 }
 
 void xenvif_check_rx_xenvif(struct xenvif *vif)
@@ -1765,7 +1657,7 @@ static struct xen_netif_rx_response 
*make_rx_response(struct xenvif *vif,
 
 static inline int rx_work_todo(struct xenvif *vif)
 {
-       return !skb_queue_empty(&vif->rx_queue);
+       return !skb_queue_empty(&vif->rx_queue) || vif->rx_event;
 }
 
 static inline int tx_work_todo(struct xenvif *vif)
@@ -1815,8 +1707,6 @@ int xenvif_map_frontend_rings(struct xenvif *vif,
        rxs = (struct xen_netif_rx_sring *)addr;
        BACK_RING_INIT(&vif->rx, rxs, PAGE_SIZE);
 
-       vif->rx_req_cons_peek = 0;
-
        return 0;
 
 err:
@@ -1824,20 +1714,38 @@ err:
        return err;
 }
 
+void xenvif_stop_queue(struct xenvif *vif)
+{
+       if (vif->can_queue)
+               netif_stop_queue(vif->dev);
+}
+
+static void xenvif_start_queue(struct xenvif *vif)
+{
+       if (xenvif_schedulable(vif))
+               netif_wake_queue(vif->dev);
+}
+
 int xenvif_kthread(void *data)
 {
        struct xenvif *vif = data;
+       const int threshold = XEN_NETIF_RX_RING_SIZE / 2;
 
        while (!kthread_should_stop()) {
+
                wait_event_interruptible(vif->wq,
                                         rx_work_todo(vif) ||
                                         kthread_should_stop());
                if (kthread_should_stop())
                        break;
 
-               if (rx_work_todo(vif))
+               if (!skb_queue_empty(&vif->rx_queue))
                        xenvif_rx_action(vif);
 
+               vif->rx_event = false;
+               if (xenvif_rx_ring_slots_available(vif, threshold))
+                       xenvif_start_queue(vif);
+
                cond_resched();
        }
 
-- 
1.7.10.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.