WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-devel

[Xen-devel] [PATCH 15/17] Bypass support, for both frontend and backend.

A bypass is an auxiliary ring attached to a netchannel2 interface
which is used to communicate with a particular remote guest,
completely bypassing the bridge in dom0.  This is quite a bit faster,
and can also help to prevent dom0 from becoming a bottleneck on large
systems.

Bypasses are inherently incompatible with packet filtering in domain
0.  This is a moderately unusual configuration (there'll usually be a
firewall protecting the dom0 host stack, but bridge filtering is less
common), and we rely on the user turning off bypasses if they're doing
it.

Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx>
---
 drivers/net/Kconfig                              |   18 +
 drivers/net/xen-netchannel2/Makefile             |    8 +
 drivers/net/xen-netchannel2/bypass.c             |  793 ++++++++++++++++++++++
 drivers/net/xen-netchannel2/bypassee.c           |  738 ++++++++++++++++++++
 drivers/net/xen-netchannel2/chan.c               |  137 ++++-
 drivers/net/xen-netchannel2/netback2.c           |  128 ++++
 drivers/net/xen-netchannel2/netchannel2_core.h   |  279 ++++++++-
 drivers/net/xen-netchannel2/netchannel2_uspace.h |   17 +
 drivers/net/xen-netchannel2/netfront2.c          |   25 +
 drivers/net/xen-netchannel2/recv_packet.c        |    9 +
 drivers/net/xen-netchannel2/xmit_packet.c        |   17 +-
 include/xen/interface/io/netchannel2.h           |  138 ++++
 12 files changed, 2290 insertions(+), 17 deletions(-)
 create mode 100644 drivers/net/xen-netchannel2/bypass.c
 create mode 100644 drivers/net/xen-netchannel2/bypassee.c
 create mode 100644 drivers/net/xen-netchannel2/netchannel2_uspace.h

diff --git a/drivers/net/Kconfig b/drivers/net/Kconfig
index 3f599eb..9ac12a8 100644
--- a/drivers/net/Kconfig
+++ b/drivers/net/Kconfig
@@ -2799,6 +2799,24 @@ config XEN_NETDEV2_FRONTEND
        depends on XEN_NETCHANNEL2
        default y
 
+config XEN_NETDEV2_BYPASSABLE
+       bool "Net channel 2 bypassee support"
+       depends on XEN_NETDEV2_BACKEND
+       default y
+       help
+         This option allows net channel 2 endpoints in this domain to
+         be bypassed.  If this domain is acting as a bridge between
+         domains on a single host, bypass support will allow faster
+         inter-domain communication and reduce load in this domain.
+
+config XEN_NETDEV2_BYPASS_ENDPOINT
+       bool "Net channel 2 bypass endpoint support"
+       depends on XEN_NETDEV2_BACKEND && XEN_NETDEV2_FRONTEND
+       default y
+       help
+         Support for acting as the endpoint of a netchannel2 bypass.
+         Bypasses allow faster inter-domain communication, provided
+         every VM supports them.
 
 config ISERIES_VETH
        tristate "iSeries Virtual Ethernet driver support"
diff --git a/drivers/net/xen-netchannel2/Makefile 
b/drivers/net/xen-netchannel2/Makefile
index d6fb796..5aa3410 100644
--- a/drivers/net/xen-netchannel2/Makefile
+++ b/drivers/net/xen-netchannel2/Makefile
@@ -11,3 +11,11 @@ endif
 ifeq ($(CONFIG_XEN_NETDEV2_FRONTEND),y)
 netchannel2-objs += netfront2.o
 endif
+
+ifeq ($(CONFIG_XEN_NETDEV2_BYPASSABLE),y)
+netchannel2-objs += bypassee.o
+endif
+
+ifeq ($(CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT),y)
+netchannel2-objs += bypass.o
+endif
diff --git a/drivers/net/xen-netchannel2/bypass.c 
b/drivers/net/xen-netchannel2/bypass.c
new file mode 100644
index 0000000..05cb4d5
--- /dev/null
+++ b/drivers/net/xen-netchannel2/bypass.c
@@ -0,0 +1,793 @@
+#include <linux/kernel.h>
+#include <linux/kthread.h>
+#include <linux/interrupt.h>
+#include <linux/delay.h>
+#include <xen/events.h>
+#include "netchannel2_core.h"
+
+/* Can we send this packet on this bypass?  True if the destination
+   MAC address matches. */
+static int can_bypass_packet(struct nc2_alternate_ring *ncr,
+                            struct sk_buff *skb)
+{
+       struct ethhdr *eh;
+
+       if (skb_headlen(skb) < sizeof(*eh))
+               return 0;
+       eh = (struct ethhdr *)skb->data;
+       if (memcmp(eh->h_dest, ncr->rings.remote_mac, ETH_ALEN))
+               return 0;
+       else
+               return 1;
+}
+
+/* Called from the netdev start_xmit method.  We're holding the master
+   nc ring lock, but not the bypass ring lock. */
+int bypass_xmit_packet(struct netchannel2 *nc,
+                      struct nc2_alternate_ring *ncr,
+                      struct sk_buff *skb)
+{
+       struct netchannel2_ring_pair *rings = &ncr->rings;
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       size_t msg_size;
+       enum transmit_policy policy;
+       int r;
+
+       if (!can_bypass_packet(ncr, skb))
+               return 0;
+
+       spin_lock(&rings->lock);
+       if (ncr->state != nc2_alt_ring_ready) {
+               spin_unlock(&rings->lock);
+               return 0;
+       }
+       /* We're now committed to either transmitting this packet on
+          this ring or dropping it outright. */
+       if (skb->len <= PACKET_PREFIX_SIZE && !skb_is_nonlinear(skb)) {
+               r = prepare_xmit_allocate_small(rings, skb);
+               policy = transmit_policy_small;
+       } else {
+               r = prepare_xmit_allocate_grant(rings, skb, 1);
+               policy = transmit_policy_grant;
+       }
+       if (r < 0) {
+               spin_unlock(&rings->lock);
+               dev_kfree_skb(skb);
+               return 1;
+       }
+
+       skb_co->policy = policy;
+       msg_size = get_transmitted_packet_msg_size(skb);
+       if (!nc2_reserve_payload_bytes(&rings->prod_ring, msg_size)) {
+               /* Uh oh. */
+               release_tx_packet(rings, skb);
+               spin_unlock(&rings->lock);
+               return 1;
+       }
+
+       queue_packet_to_interface(skb, rings);
+
+       spin_unlock(&rings->lock);
+
+       return 1;
+}
+
+void nc2_aux_ring_start_disable_sequence(struct nc2_alternate_ring *nar)
+{
+       spin_lock(&nar->rings.lock);
+       if (nar->state < nc2_alt_ring_disabling) {
+               nar->state = nc2_alt_ring_disabling;
+               nc2_kick(&nar->rings);
+       }
+       spin_unlock(&nar->rings.lock);
+}
+
+static void start_detach_worker(struct work_struct *ws)
+{
+       struct nc2_alternate_ring *ncr =
+               container_of(ws, struct nc2_alternate_ring, detach_work_item);
+
+       /* Detach from the ring.  Note that it may still be running at
+          this point.  In that case, we need to stop it and then go
+          and discard any outstanding messages on it. */
+
+       /* Stop the IRQ and change state.  This will prevent us from
+          being added to the schedule list again, but we may still be
+          on it for other reasons, so we need to get back into the
+          worker thread to finish up. */
+
+       /* We defer actually unmapping the rings to
+          nc2_advertise_rings(), since that's on the worker thread
+          and we therefore know we're not going to race anything
+          doing it there. */
+
+       if (ncr->rings.irq >= 0)
+               unbind_from_irqhandler(ncr->rings.irq, &ncr->rings);
+       ncr->rings.irq = -1;
+
+       spin_lock_bh(&ncr->rings.lock);
+       ncr->state = nc2_alt_ring_detached_pending;
+       ncr->rings.interface->need_aux_ring_state_machine = 1;
+       nc2_kick(&ncr->rings.interface->rings);
+       spin_unlock_bh(&ncr->rings.lock);
+}
+
+void nc2_aux_ring_start_detach_sequence(struct nc2_alternate_ring *nar)
+{
+       spin_lock(&nar->rings.lock);
+       if (nar->state >= nc2_alt_ring_detaching) {
+               spin_unlock(&nar->rings.lock);
+               return;
+       }
+       nar->state = nc2_alt_ring_detaching;
+       spin_unlock(&nar->rings.lock);
+
+       /* We can't do unbind_from_irqhandler() from a tasklet, so
+          punt it to a workitem. */
+       INIT_WORK(&nar->detach_work_item,
+                 start_detach_worker);
+       schedule_work(&nar->detach_work_item);
+}
+
+/* Crank through the auxiliary ring state machine.  Called holding the
+ * master ring lock. */
+void _nc2_crank_aux_ring_state_machine(struct netchannel2 *nc)
+{
+       struct nc2_alternate_ring *nar;
+       struct nc2_alternate_ring *next_nar;
+       struct netchannel2_msg_bypass_disabled disabled_msg;
+       struct netchannel2_msg_bypass_detached detached_msg;
+       struct netchannel2_msg_bypass_frontend_ready frontend_ready_msg;
+
+       memset(&disabled_msg, 0, sizeof(disabled_msg));
+       memset(&detached_msg, 0, sizeof(detached_msg));
+       memset(&frontend_ready_msg, 0, sizeof(frontend_ready_msg));
+
+       if (nc->pending_bypass_error) {
+               if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring,
+                                               sizeof(frontend_ready_msg)))
+                       return;
+               frontend_ready_msg.port = -1;
+               nc2_send_message(&nc->rings.prod_ring,
+                                NETCHANNEL2_MSG_BYPASS_FRONTEND_READY,
+                                0,
+                                &frontend_ready_msg,
+                                sizeof(frontend_ready_msg));
+               nc->rings.pending_time_sensitive_messages = 1;
+               nc->pending_bypass_error = 0;
+       }
+
+       list_for_each_entry_safe(nar, next_nar, &nc->alternate_rings,
+                                rings_by_interface) {
+
+               spin_lock(&nar->rings.lock);
+               if (nar->state == nc2_alt_ring_frontend_send_ready_pending) {
+                       if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring,
+                                                       
sizeof(frontend_ready_msg))) {
+                               spin_unlock(&nar->rings.lock);
+                               return;
+                       }
+                       frontend_ready_msg.port = nar->rings.evtchn;
+                       nc2_send_message(&nc->rings.prod_ring,
+                                        NETCHANNEL2_MSG_BYPASS_FRONTEND_READY,
+                                        0,
+                                        &frontend_ready_msg,
+                                        sizeof(frontend_ready_msg));
+                       nar->state = nc2_alt_ring_frontend_sent_ready;
+                       nc->rings.pending_time_sensitive_messages = 1;
+               }
+               if (nar->state == nc2_alt_ring_disabled_pending) {
+                       if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring,
+                                                       sizeof(disabled_msg))) {
+                               spin_unlock(&nar->rings.lock);
+                               return;
+                       }
+                       disabled_msg.handle = nar->handle;
+                       nc2_send_message(&nc->rings.prod_ring,
+                                        NETCHANNEL2_MSG_BYPASS_DISABLED,
+                                        0,
+                                        &disabled_msg,
+                                        sizeof(disabled_msg));
+                       nar->state = nc2_alt_ring_disabled;
+                       nc->rings.pending_time_sensitive_messages = 1;
+               }
+               if (nar->state == nc2_alt_ring_detached_pending) {
+                       if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring,
+                                                       sizeof(detached_msg))) {
+                               spin_unlock(&nar->rings.lock);
+                               return;
+                       }
+
+                       /* If we get here then we know that nobody
+                          else is going to touch the ring, because
+                          that's what detached_pending means. */
+                       /* Deferred from start_detach_worker() */
+                       nc2_unmap_grants(&nar->prod_mapper);
+                       nc2_unmap_grants(&nar->cons_mapper);
+                       nc2_unmap_grants(&nar->control_mapper);
+
+                       detached_msg.handle = nar->handle;
+                       nc2_send_message(&nc->rings.prod_ring,
+                                        NETCHANNEL2_MSG_BYPASS_DETACHED,
+                                        0,
+                                        &detached_msg,
+                                        sizeof(detached_msg));
+                       nc->rings.pending_time_sensitive_messages = 1;
+
+                       list_del(&nar->rings_by_interface);
+
+                       spin_unlock(&nar->rings.lock);
+
+                       kfree(nar);
+               } else {
+                       spin_unlock(&nar->rings.lock);
+               }
+       }
+       nc->need_aux_ring_state_machine = 0;
+}
+
+static int map_rings_common(struct nc2_alternate_ring *ncr,
+                           struct netchannel2_msg_bypass_common *msg)
+{
+       int err;
+
+       if (msg->ring_domid == DOMID_SELF)
+               msg->ring_domid = ncr->rings.interface->rings.otherend_id;
+
+       err = nc2_map_grants(&ncr->prod_mapper,
+                            ncr->prod_grefs,
+                            msg->ring_pages,
+                            msg->ring_domid);
+       if (err < 0) {
+               printk(KERN_ERR "%d mapping producer ring", err);
+               return err;
+       }
+
+       err = nc2_map_grants(&ncr->cons_mapper,
+                            ncr->cons_grefs,
+                            msg->ring_pages,
+                            msg->ring_domid);
+       if (err < 0) {
+               printk(KERN_ERR "%d mapping consumer ring", err);
+               return err;
+       }
+
+       err = nc2_map_grants(&ncr->control_mapper,
+                            &msg->control_gref,
+                            1,
+                            msg->ring_domid);
+       if (err < 0)
+               printk(KERN_ERR "%d mapping control ring", err);
+       return err;
+}
+
+static int map_rings_frontend(struct nc2_alternate_ring *ncr)
+{
+       struct netchannel2_frontend_shared *nfs;
+       struct netchannel2_sring_prod *prod_sring;
+       struct netchannel2_sring_cons *cons_sring;
+       int err;
+
+       err = map_rings_common(ncr, &ncr->frontend_setup_msg.common);
+       if (err < 0)
+               return err;
+
+       nfs = ncr->control_mapper.mapping->addr;
+       cons_sring = &nfs->cons;
+       prod_sring = &nfs->prod;
+       _nc2_attach_rings(&ncr->rings,
+                         cons_sring,
+                         ncr->cons_mapper.mapping->addr,
+                         ncr->frontend_setup_msg.common.ring_pages * PAGE_SIZE,
+                         prod_sring,
+                         ncr->prod_mapper.mapping->addr,
+                         ncr->frontend_setup_msg.common.ring_pages * PAGE_SIZE,
+                         ncr->frontend_setup_msg.common.peer_domid);
+
+       return 0;
+}
+
+static int map_rings_backend(struct nc2_alternate_ring *ncr)
+{
+       struct netchannel2_backend_shared *nbs;
+       struct netchannel2_sring_prod *prod_sring;
+       struct netchannel2_sring_cons *cons_sring;
+       int err;
+
+       err = map_rings_common(ncr, &ncr->backend_setup_msg.common);
+       if (err < 0)
+               return err;
+
+       nbs = ncr->control_mapper.mapping->addr;
+       cons_sring = &nbs->cons;
+       prod_sring = &nbs->prod;
+       _nc2_attach_rings(&ncr->rings,
+                         cons_sring,
+                         ncr->cons_mapper.mapping->addr,
+                         ncr->backend_setup_msg.common.ring_pages * PAGE_SIZE,
+                         prod_sring,
+                         ncr->prod_mapper.mapping->addr,
+                         ncr->backend_setup_msg.common.ring_pages * PAGE_SIZE,
+                         ncr->backend_setup_msg.common.peer_domid);
+
+       return 0;
+}
+
+static void send_ready_message(struct nc2_alternate_ring *ncr)
+{
+       struct netchannel2_msg_bypass_ready msg;
+
+       memset(&msg, 0, sizeof(msg));
+       if (nc2_can_send_payload_bytes(&ncr->rings.prod_ring, sizeof(msg))) {
+               nc2_send_message(&ncr->rings.prod_ring,
+                                NETCHANNEL2_MSG_BYPASS_READY,
+                                0, &msg, sizeof(msg));
+               if (nc2_flush_ring(&ncr->rings.prod_ring))
+                       notify_remote_via_irq(ncr->rings.irq);
+       } else {
+               /* This shouldn't happen, because the producer ring
+                  should be essentially empty at this stage.  If it
+                  does, it probably means the other end is playing
+                  silly buggers with the ring indexes.  Drop the
+                  message. */
+               printk(KERN_WARNING "Failed to send bypass ring ready 
message.\n");
+       }
+}
+
+void nc2_handle_bypass_ready(struct netchannel2 *nc,
+                            struct netchannel2_ring_pair *ncrp,
+                            struct netchannel2_msg_hdr *hdr)
+{
+       struct nc2_alternate_ring *ncr;
+
+       if (ncrp == &nc->rings) {
+               pr_debug("bypass ready on principal interface?\n");
+               return;
+       }
+       ncr = container_of(ncrp, struct nc2_alternate_ring, rings);
+       /* We're now allowed to start sending packets over this
+        * ring. */
+       if (ncr->state == nc2_alt_ring_frontend_sent_ready)
+               ncr->state = nc2_alt_ring_ready;
+}
+
+/* Called holding the aux ring lock. */
+void _nc2_alternate_ring_disable_finish(struct nc2_alternate_ring *ncr)
+{
+       /* No more packets will ever come out of this ring -> it is
+          now disabled. */
+       ncr->state = nc2_alt_ring_disabled_pending;
+       ncr->rings.interface->need_aux_ring_state_machine = 1;
+       nc2_kick(&ncr->rings.interface->rings);
+}
+
+static void initialise_bypass_frontend_work_item(struct work_struct *ws)
+{
+       struct nc2_alternate_ring *ncr =
+               container_of(ws, struct nc2_alternate_ring, work_item);
+       struct netchannel2 *interface = ncr->rings.interface;
+       int err;
+
+       memcpy(&ncr->rings.remote_mac,
+              ncr->frontend_setup_msg.common.remote_mac, 6);
+       err = map_rings_frontend(ncr);
+       if (err < 0)
+               goto err;
+
+       BUG_ON(ncr->rings.cons_ring.sring == NULL);
+
+       err = xen_alloc_evtchn(ncr->rings.otherend_id, &ncr->rings.evtchn);
+       if (err)
+               goto err;
+       err = bind_evtchn_to_irqhandler(ncr->rings.evtchn, nc2_int,
+                                       0, "netchannel2_bypass",
+                                       &ncr->rings);
+       if (err < 0)
+               goto err;
+       ncr->rings.irq = err;
+
+       /* Get it going. */
+       nc2_kick(&ncr->rings);
+
+       /* And get the master ring to send a FRONTEND_READY message */
+       ncr->state = nc2_alt_ring_frontend_send_ready_pending;
+       spin_lock_bh(&interface->rings.lock);
+       interface->need_aux_ring_state_machine = 1;
+       nc2_kick(&interface->rings);
+       spin_unlock_bh(&interface->rings.lock);
+
+       return;
+
+err:
+       printk(KERN_ERR "Error %d setting up bypass ring!\n", err);
+
+       spin_lock_bh(&interface->rings.lock);
+       interface->pending_bypass_error = 1;
+       interface->need_aux_ring_state_machine = 1;
+       nc2_kick(&interface->rings);
+       list_del(&ncr->rings_by_interface);
+       spin_unlock_bh(&interface->rings.lock);
+
+       nc2_unmap_grants(&ncr->prod_mapper);
+       nc2_unmap_grants(&ncr->cons_mapper);
+       nc2_unmap_grants(&ncr->control_mapper);
+       kfree(ncr);
+       return;
+}
+
+static void initialise_bypass_backend_work_item(struct work_struct *ws)
+{
+       struct nc2_alternate_ring *ncr =
+               container_of(ws, struct nc2_alternate_ring, work_item);
+       struct netchannel2 *interface = ncr->rings.interface;
+       int err;
+
+       memcpy(&ncr->rings.remote_mac,
+              ncr->backend_setup_msg.common.remote_mac, 6);
+       err = map_rings_backend(ncr);
+       if (err < 0)
+               goto err;
+
+       err = bind_interdomain_evtchn_to_irqhandler(ncr->rings.otherend_id,
+                                                   ncr->backend_setup_msg.port,
+                                                   nc2_int,
+                                                   0,
+                                                   "netchannel2_bypass",
+                                                   &ncr->rings);
+       if (err < 0)
+               goto err;
+       ncr->rings.irq = err;
+
+       send_ready_message(ncr);
+
+       spin_lock_bh(&ncr->rings.lock);
+       ncr->state = nc2_alt_ring_ready;
+       spin_unlock_bh(&ncr->rings.lock);
+
+       nc2_kick(&ncr->rings);
+
+       return;
+
+err:
+       printk(KERN_ERR "Error %d setting up bypass ring!\n", err);
+
+       spin_lock_bh(&interface->rings.lock);
+       list_del(&ncr->rings_by_interface);
+       spin_unlock_bh(&interface->rings.lock);
+
+       nc2_unmap_grants(&ncr->prod_mapper);
+       nc2_unmap_grants(&ncr->cons_mapper);
+       nc2_unmap_grants(&ncr->control_mapper);
+       kfree(ncr);
+       return;
+}
+
+void nc2_handle_bypass_frontend(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr)
+{
+       struct nc2_alternate_ring *work;
+
+       if (hdr->size < sizeof(work->frontend_setup_msg)) {
+               pr_debug("Bypass message had strange size %d\n", hdr->size);
+               return;
+       }
+       if (ncrp != &nc->rings) {
+               pr_debug("Bypass message on ancillary ring!\n");
+               return;
+       }
+       if (!nc->remote_trusted) {
+               pr_debug("Untrusted domain tried to set up a bypass.\n");
+               return;
+       }
+       if (nc->pending_bypass_error) {
+               pr_debug("Remote tried to establish a bypass when we already 
had a pending error\n");
+               return;
+       }
+       work = kzalloc(sizeof(*work), GFP_ATOMIC);
+       if (!work) {
+               printk(KERN_WARNING "no memory for alternative ring pair!\n");
+               nc->pending_bypass_error = 1;
+               nc->need_aux_ring_state_machine = 1;
+               return;
+       }
+       nc2_copy_from_ring(&nc->rings.cons_ring, &work->frontend_setup_msg,
+                          sizeof(work->frontend_setup_msg));
+       if (hdr->size != sizeof(work->frontend_setup_msg) +
+                                 sizeof(uint32_t) * 2 *
+                                 work->frontend_setup_msg.common.ring_pages) {
+               printk(KERN_WARNING "inconsistent bypass message size (%d for 
%d pages)\n",
+                      hdr->size, work->frontend_setup_msg.common.ring_pages);
+               goto err;
+       }
+       if (work->frontend_setup_msg.common.ring_pages >
+           MAX_BYPASS_RING_PAGES_MAPPABLE) {
+               printk(KERN_WARNING "too many ring pages: %d > %d\n",
+                      work->frontend_setup_msg.common.ring_pages,
+                      MAX_BYPASS_RING_PAGES_MAPPABLE);
+err:
+               kfree(work);
+               nc->pending_bypass_error = 1;
+               nc->need_aux_ring_state_machine = 1;
+               return;
+       }
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              &work->prod_grefs,
+                              sizeof(uint32_t) *
+                                  work->frontend_setup_msg.common.ring_pages,
+                              sizeof(work->frontend_setup_msg));
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              &work->cons_grefs,
+                              sizeof(uint32_t) *
+                                  work->frontend_setup_msg.common.ring_pages,
+                              sizeof(work->frontend_setup_msg) +
+                                  sizeof(uint32_t) *
+                                  work->frontend_setup_msg.common.ring_pages);
+
+       work->state = nc2_alt_ring_frontend_preparing;
+       init_waitqueue_head(&work->eventq);
+       work->handle = work->frontend_setup_msg.common.handle;
+       INIT_WORK(&work->work_item, initialise_bypass_frontend_work_item);
+       if (init_ring_pair(&work->rings, nc) < 0)
+               goto err;
+       work->rings.filter_mac = 1;
+
+       list_add(&work->rings_by_interface, &nc->alternate_rings);
+       schedule_work(&work->work_item);
+}
+
+void nc2_handle_bypass_backend(struct netchannel2 *nc,
+                              struct netchannel2_ring_pair *ncrp,
+                              struct netchannel2_msg_hdr *hdr)
+{
+       struct nc2_alternate_ring *work;
+
+       if (hdr->size < sizeof(work->backend_setup_msg)) {
+               pr_debug("Bypass message had strange size %d\n", hdr->size);
+               return;
+       }
+       if (ncrp != &nc->rings) {
+               pr_debug("Bypass message on ancillary ring!\n");
+               return;
+       }
+       if (!nc->remote_trusted) {
+               pr_debug("Untrusted domain tried to set up a bypass.\n");
+               return;
+       }
+       work = kzalloc(sizeof(*work), GFP_ATOMIC);
+       if (!work) {
+               printk(KERN_WARNING "no memory for alternative ring pair!\n");
+               return;
+       }
+       nc2_copy_from_ring(&nc->rings.cons_ring, &work->backend_setup_msg,
+                          sizeof(work->backend_setup_msg));
+       if (hdr->size != sizeof(work->backend_setup_msg) +
+                                 sizeof(uint32_t) * 2 *
+                                 work->backend_setup_msg.common.ring_pages) {
+               printk(KERN_WARNING "inconsistent bypass message size (%d for 
%d pages)\n",
+                      hdr->size, work->backend_setup_msg.common.ring_pages);
+               goto err;
+       }
+       if (work->backend_setup_msg.common.ring_pages >
+           MAX_BYPASS_RING_PAGES_MAPPABLE) {
+               printk(KERN_WARNING "too many ring pages: %d > %d\n",
+                      work->backend_setup_msg.common.ring_pages,
+                      MAX_BYPASS_RING_PAGES_MAPPABLE);
+err:
+               kfree(work);
+               return;
+       }
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              &work->prod_grefs,
+                              sizeof(uint32_t) *
+                                  work->backend_setup_msg.common.ring_pages,
+                              sizeof(work->backend_setup_msg));
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              &work->cons_grefs,
+                              sizeof(uint32_t) *
+                                  work->backend_setup_msg.common.ring_pages,
+                              sizeof(work->backend_setup_msg) +
+                                  sizeof(uint32_t) *
+                                  work->backend_setup_msg.common.ring_pages);
+
+       work->state = nc2_alt_ring_backend_preparing;
+       init_waitqueue_head(&work->eventq);
+       work->handle = work->backend_setup_msg.common.handle;
+       INIT_WORK(&work->work_item, initialise_bypass_backend_work_item);
+       if (init_ring_pair(&work->rings, nc) < 0)
+               goto err;
+       work->rings.filter_mac = 1;
+
+       list_add(&work->rings_by_interface, &nc->alternate_rings);
+       schedule_work(&work->work_item);
+}
+
+/* Called under the nc master ring. */
+static struct nc2_alternate_ring *find_ring_by_handle(struct netchannel2 *nc,
+                                                     uint32_t handle)
+{
+       struct nc2_alternate_ring *nar;
+       list_for_each_entry(nar, &nc->alternate_rings, rings_by_interface) {
+               if (nar->handle == handle)
+                       return nar;
+       }
+       return NULL;
+}
+
+void nc2_handle_bypass_disable(struct netchannel2 *nc,
+                              struct netchannel2_ring_pair *ncrp,
+                              struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_bypass_disable msg;
+       struct nc2_alternate_ring *nar;
+
+       if (ncrp != &nc->rings) {
+               pr_debug("Bypass disable on ancillary ring!\n");
+               return;
+       }
+       if (!nc->remote_trusted) {
+               pr_debug("Untrusted remote requested bypass disable.\n");
+               return;
+       }
+       if (hdr->size != sizeof(msg)) {
+               printk(KERN_WARNING "Strange size bypass disable message; %d != 
%zd.\n",
+                      hdr->size, sizeof(msg));
+               return;
+       }
+       nc2_copy_from_ring(&nc->rings.cons_ring, &msg, sizeof(msg));
+       nar = find_ring_by_handle(nc, msg.handle);
+       if (nar == NULL) {
+               printk(KERN_WARNING "Request to disable unknown alternate ring 
%d.\n",
+                      msg.handle);
+               return;
+       }
+       nc2_aux_ring_start_disable_sequence(nar);
+}
+
+/* We've received a BYPASS_DETACH message on the master ring.  Do
+   what's needed to process it. */
+/* Called from the tasklet holding the master ring lock. */
+void nc2_handle_bypass_detach(struct netchannel2 *nc,
+                             struct netchannel2_ring_pair *ncrp,
+                             struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_bypass_detach msg;
+       struct nc2_alternate_ring *nar;
+
+       if (ncrp != &nc->rings) {
+               pr_debug("Bypass detach on wrong ring.\n");
+               return;
+       }
+       if (!nc->remote_trusted) {
+               pr_debug("Detach request from untrusted peer.\n");
+               return;
+       }
+       if (hdr->size != sizeof(msg)) {
+               printk(KERN_WARNING "Strange size bypass detach message; %d != 
%zd.\n",
+                      hdr->size, sizeof(msg));
+               return;
+       }
+       nc2_copy_from_ring(&nc->rings.cons_ring, &msg, sizeof(msg));
+       nar = find_ring_by_handle(nc, msg.handle);
+       if (nar == NULL) {
+               printk(KERN_WARNING "Request to detach from unknown alternate 
ring %d.\n",
+                      msg.handle);
+               return;
+       }
+
+       nc2_aux_ring_start_detach_sequence(nar);
+}
+
+/* This is only called once the irqs have been stopped and the
+   interfaces have been de-pended, so it shouldn't have to worry about
+   any async activity. */
+static void release_alt_ring(struct nc2_alternate_ring *nar)
+{
+       flush_scheduled_work();
+
+       nc2_unmap_grants(&nar->prod_mapper);
+       nc2_unmap_grants(&nar->cons_mapper);
+       nc2_unmap_grants(&nar->control_mapper);
+
+       cleanup_ring_pair(&nar->rings);
+}
+
+void nc2_release_alt_rings(struct netchannel2 *nc)
+{
+       struct nc2_alternate_ring *nar, *next_nar;
+
+       list_for_each_entry_safe(nar, next_nar, &nc->alternate_rings,
+                                rings_by_interface) {
+               release_alt_ring(nar);
+       }
+}
+
+/* This is called from a suspend callback just before the VM goes down
+   for suspend/resume. When it returns, we must have unmapped all
+   bypass rings.  There is no possibility of failing. */
+void detach_all_bypasses(struct netchannel2 *nc)
+{
+       struct nc2_alternate_ring *nar;
+
+       int cntr;
+
+       spin_lock_bh(&nc->rings.lock);
+       cntr = 0;
+       while (!list_empty(&nc->alternate_rings) && cntr < 500) {
+               list_for_each_entry(nar, &nc->alternate_rings,
+                                   rings_by_interface) {
+                       spin_lock(&nar->rings.lock);
+                       /* If we're currently in an operating state,
+                          pretend we received a DISABLE message, so
+                          we eventually generate a DISABLED message.
+                          The peer will then start the detach state
+                          machine, which will eventually destroy the
+                          bypass. */
+                       /* nc2_alt_ring_frontend_sent_ready is a bit
+                          odd.  We are frontend-like, and we've told
+                          the backend who we are, but we haven't yet
+                          received a READY from the backend.  We
+                          don't necessarily trust the backend, so we
+                          can't wait for it.  The best we can do is
+                          to tell the peer that we've disabled, and
+                          let it drive the backend into shutdown. */
+                       if (nar->state == nc2_alt_ring_frontend_sent_ready ||
+                           nar->state == nc2_alt_ring_ready) {
+                               nar->state = nc2_alt_ring_disabling;
+                               nc2_kick(&nar->rings);
+                       }
+                       spin_unlock(&nar->rings.lock);
+               }
+               spin_unlock_bh(&nc->rings.lock);
+               /* Bit of a hack... */
+               msleep(10);
+               cntr++;
+               spin_lock_bh(&nc->rings.lock);
+       }
+       spin_unlock_bh(&nc->rings.lock);
+
+       if (cntr < 500)
+               return;
+
+       /* Okay, doing it the nice way didn't work.  This can happen
+          if the domain at the other end of the bypass isn't picking
+          up messages, so we can't flush through all of our pending
+          packets and disable ourselves cleanly.  Force it through
+          instead, by pretending that we've received a DETACH message
+          from the parent. */
+       printk(KERN_WARNING "timed out trying to disable a bypass nicely, being 
more forceful\n");
+       spin_lock_bh(&nc->rings.lock);
+       cntr = 0;
+       while (!list_empty(&nc->alternate_rings)) {
+               list_for_each_entry(nar, &nc->alternate_rings,
+                                   rings_by_interface) {
+                       spin_lock(&nar->rings.lock);
+                       if (nar->state >= nc2_alt_ring_detaching) {
+                               /* Okay, we're already detaching, and
+                                  we're waiting either for our work
+                                  item to run or for an opportunity
+                                  to tell the parent that we're
+                                  detached.  The parent is trusted,
+                                  so just wait for whatever it is
+                                  that we're waiting for to
+                                  happen. */
+                               spin_unlock(&nar->rings.lock);
+                               continue;
+                       }
+                       nar->state = nc2_alt_ring_detaching;
+                       spin_unlock(&nar->rings.lock);
+                       INIT_WORK(&nar->detach_work_item,
+                                 start_detach_worker);
+                       schedule_work(&nar->detach_work_item);
+               }
+               spin_unlock_bh(&nc->rings.lock);
+               msleep(10);
+               cntr++;
+               if (cntr % 100 == 0)
+                       printk(KERN_WARNING
+                              "taking a long time to detach from bypasses 
(%d)\n",
+                              cntr);
+               spin_lock_bh(&nc->rings.lock);
+       }
+       spin_unlock_bh(&nc->rings.lock);
+}
diff --git a/drivers/net/xen-netchannel2/bypassee.c 
b/drivers/net/xen-netchannel2/bypassee.c
new file mode 100644
index 0000000..f0cda24
--- /dev/null
+++ b/drivers/net/xen-netchannel2/bypassee.c
@@ -0,0 +1,738 @@
+/* All the bits which allow a domain to be bypassed. */
+#include <linux/kernel.h>
+#include <linux/delay.h>
+#include <linux/spinlock.h>
+#include <asm/xen/page.h>
+#include "netchannel2_core.h"
+
+/* Bypass disable is a bit tricky.  Enable is relatively easy:
+
+   1) We decide to establish a bypass between two interfaces.
+   2) We allocate the pages for the rings and grant them to
+      the relevant domains.
+   3) We nominate one endpoint as the ``backend''.
+   4) We send both endpoints BYPASS messages.
+   5) As far as we're concerned, the bypass is now ready.  The
+      endpoints will do the rest of the negotiation without any help
+      from us.
+
+   Disable is harder.  Each bypass endpoint can be in one of three
+   states:
+
+   -- Running normally.
+   -- Disabled.
+   -- Detached.
+
+   A disabled endpoint won't generate any new operations (which means
+   that it can't send packets, but can send FINISHED_PACKET messages
+   and so forth).  A detached endpoint is one which has no longer
+   mapped the ring pages, so it can neither send nor receive.  There
+   is no provision for transitioning ``backwards'' i.e. from Disabled
+   to Running, Detached to Running, or Detached to Disabled.  There
+   are a couple of messages relevant to changing state:
+
+   -- DISABLE -- go to state Disabled if we're in Running.  Ignored in
+      other states (we won't even get an ACK). We send this to the
+      endpoint.
+   -- DISABLED -- endpoint has transitioned to Disabled, whether of
+      its own accord or due to a DISABLE message.  We receive this
+      from the endpoint.
+   -- DETACH -- go to state Detached if we're in Running or Disabled.
+      Ignore in other states (without an ACK). Sent to the endpoint.
+   -- DETACHED -- endpoint has transitioned to DETACHED.  Received
+      from the endpoint.
+
+   A bypass in which both endpoints are Detached can be safely
+   destroyed.
+
+   Once either endpoint has transitioned out of Running, the bypass is
+   pretty useless, so we try to push things so that we go to
+   Detached/Detached as quickly as possible.  In particular:
+
+   A state           B state              Action
+   Running           Disabled             Send A a DISABLE
+   Running           Detached             Send A a DETACH
+   Disabled          Disabled             Send both endpoints DETACH
+   Disabled          Detached             Send A a DETACH
+   Detached          Detached             Destroy the interface
+
+   (And the obvious mirror images)
+
+   There's some filtering so that we never send a given endpoint more
+   than one DISABLE message or more than one DETACH message.  If we
+   want to tear the bypass down from this end, we send both endpoints
+   DISABLE messages and let the state machine take things from
+   there.
+
+   The core state machine is implemented in
+   crank_bypass_state_machine().
+*/
+
+/* A list of all currently-live nc2_bypass interfaces. Only touched
+   from the worker thread. */
+static LIST_HEAD(all_bypasses);
+
+/* Bottom-half safe lock protecting pretty much all of the bypass
+   state, across all interfaces.  The pending_list_lock is sometimes
+   acquired while this is held.         It is acquired while holding the ring
+   lock. */
+static DEFINE_SPINLOCK(bypasses_lock);
+
+/* Encourage the endpoint to detach as soon as possible. */
+/* Called under the bypass lock. */
+static void schedule_detach(struct nc2_bypass_endpoint *ep)
+{
+       if (!ep->detached && !ep->need_detach && !ep->detach_sent) {
+               BUG_ON(ep->nc2 == NULL);
+               ep->need_detach = 1;
+               ep->nc2->need_advertise_bypasses = 1;
+               nc2_kick(&ep->nc2->rings);
+       }
+}
+
+/* Encourage the endpoint to disable as soon as possible. */
+/* Called under the bypass lock. */
+static void schedule_disable(struct nc2_bypass_endpoint *ep)
+{
+       if (!ep->disabled && !ep->need_disable && !ep->disable_sent) {
+               BUG_ON(ep->detached);
+               BUG_ON(ep->nc2 == NULL);
+               ep->need_disable = 1;
+               ep->nc2->need_advertise_bypasses = 1;
+               nc2_kick(&ep->nc2->rings);
+       }
+}
+
+static void grant_end(grant_ref_t *gref)
+{
+       if (*gref && gnttab_end_foreign_access_ref(*gref, 0)) {
+               gnttab_free_grant_reference(*gref);
+               *gref = 0;
+       }
+}
+
+/* Release all resources associated with the bypass.  It is assumed
+   that the caller has ensured that nobody else is going to access it
+   any more. */
+static void release_bypass(struct nc2_bypass *bypass)
+{
+       int i;
+
+       BUG_ON(atomic_read(&bypass->refcnt) != 0);
+
+       for (i = 0; i < bypass->nr_ring_pages; i++) {
+               grant_end(&bypass->ep_a.incoming_grefs[i]);
+               grant_end(&bypass->ep_b.incoming_grefs[i]);
+               grant_end(&bypass->ep_a.outgoing_grefs[i]);
+               grant_end(&bypass->ep_b.outgoing_grefs[i]);
+               if (bypass->ep_a.incoming_pages[i] &&
+                   !bypass->ep_a.incoming_grefs[i] &&
+                   !bypass->ep_b.outgoing_grefs[i])
+                       free_page(bypass->ep_a.incoming_pages[i]);
+               if (bypass->ep_b.incoming_pages[i] &&
+                   !bypass->ep_b.incoming_grefs[i] &&
+                   !bypass->ep_a.outgoing_grefs[i])
+                       free_page(bypass->ep_b.incoming_pages[i]);
+       }
+       grant_end(&bypass->ep_a.control_gref);
+       grant_end(&bypass->ep_b.control_gref);
+       if (bypass->control_page &&
+           !bypass->ep_a.control_gref &&
+           !bypass->ep_b.control_gref)
+               free_page(bypass->control_page);
+
+       kfree(bypass);
+}
+
+static void put_bypass(struct nc2_bypass *bypass)
+{
+       if (atomic_dec_and_test(&bypass->refcnt))
+               release_bypass(bypass);
+}
+
+/* The state of one of the bypass endpoints has changed.  Crank
+   through the state machine, scheduling any messages which are
+   needed.  Tear the bypass down if both ends have detached. */
+/* Called under the bypass lock. */
+static void crank_bypass_state_machine(struct nc2_bypass *bypass)
+{
+       if (bypass->ep_a.disabled != bypass->ep_b.disabled) {
+               schedule_disable(&bypass->ep_a);
+               schedule_disable(&bypass->ep_b);
+       }
+       if (bypass->ep_a.disabled && bypass->ep_b.disabled) {
+               schedule_detach(&bypass->ep_b);
+               schedule_detach(&bypass->ep_a);
+       }
+       if (bypass->ep_a.detached != bypass->ep_b.detached) {
+               schedule_detach(&bypass->ep_b);
+               schedule_detach(&bypass->ep_a);
+       }
+       if (bypass->ep_a.detached && bypass->ep_b.detached) {
+               /* Okay, neither endpoint knows about the bypass any
+                  more.  It is therefore dead. */
+               /* XXX: Should there be a concept of zombie bypasses?
+                * i.e. keep the bypass around until userspace
+                * explicitly reaps it, so as to avoid the usual ID
+                * reuse races. */
+               list_del_init(&bypass->list);
+               wake_up_all(&bypass->detach_waitq);
+               put_bypass(bypass);
+       }
+}
+
+/* A bypass disabled message has been received on @ncrp (which should
+   be the main ring for @nc, or someone's misbehaving). */
+/* Called from the tasklet. */
+void nc2_handle_bypass_disabled(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_bypass_disabled msg;
+       struct nc2_bypass *bypass;
+
+       if (hdr->size != sizeof(msg)) {
+               pr_debug("Strange size bypass disabled message; %d != %zd.\n",
+                        hdr->size, sizeof(msg));
+               return;
+       }
+       if (ncrp != &nc->rings) {
+               pr_debug("bypass_disabled on wrong ring.\n");
+               return;
+       }
+       nc2_copy_from_ring(&nc->rings.cons_ring, &msg, sizeof(msg));
+       spin_lock(&bypasses_lock);
+       list_for_each_entry(bypass, &nc->bypasses_a, ep_a.list) {
+               if (bypass->handle == msg.handle) {
+                       bypass->ep_a.disabled = 1;
+                       crank_bypass_state_machine(bypass);
+                       spin_unlock(&bypasses_lock);
+                       return;
+               }
+       }
+       list_for_each_entry(bypass, &nc->bypasses_b, ep_b.list) {
+               if (bypass->handle == msg.handle) {
+                       bypass->ep_b.disabled = 1;
+                       crank_bypass_state_machine(bypass);
+                       spin_unlock(&bypasses_lock);
+                       return;
+               }
+       }
+       spin_unlock(&bypasses_lock);
+
+       pr_debug("Disabled message was on the wrong ring (%d)?\n",
+                msg.handle);
+       return;
+}
+
+static void detach(struct nc2_bypass_endpoint *ep)
+{
+       if (ep->detached)
+               return;
+       list_del_init(&ep->list);
+       ep->disabled = ep->detached = 1;
+       ep->nc2->extant_bypasses--;
+       ep->nc2 = NULL;
+}
+
+/* One of our peers has sent us a bypass detached message i.e. it was
+   previously bypassing us, and it's not any more.  Do the appropriate
+   thing. */
+void nc2_handle_bypass_detached(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_bypass_detached msg;
+       struct nc2_bypass *bypass;
+
+       if (hdr->size != sizeof(msg)) {
+               pr_debug("Strange size bypass detached message; %d != %zd.\n",
+                        hdr->size, sizeof(msg));
+               return;
+       }
+       if (ncrp != &nc->rings) {
+               pr_debug("bypass_disabled on wrong ring.\n");
+               return;
+       }
+       nc2_copy_from_ring(&nc->rings.cons_ring, &msg, sizeof(msg));
+       spin_lock(&bypasses_lock);
+       list_for_each_entry(bypass, &nc->bypasses_a, ep_a.list) {
+               if (bypass->handle == msg.handle) {
+                       detach(&bypass->ep_a);
+                       crank_bypass_state_machine(bypass);
+                       spin_unlock(&bypasses_lock);
+                       return;
+               }
+       }
+       list_for_each_entry(bypass, &nc->bypasses_b, ep_b.list) {
+               if (bypass->handle == msg.handle) {
+                       detach(&bypass->ep_b);
+                       crank_bypass_state_machine(bypass);
+                       spin_unlock(&bypasses_lock);
+                       return;
+               }
+       }
+       spin_unlock(&bypasses_lock);
+       pr_debug("Detached message was on the wrong ring (%d)?\n",
+                msg.handle);
+}
+
+static int send_disable_bypass_msg(struct netchannel2 *nc,
+                                  struct nc2_bypass *bypass)
+{
+       struct netchannel2_msg_bypass_disable msg = {
+               .handle = bypass->handle
+       };
+
+       if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring, sizeof(msg)))
+               return 1;
+       nc2_send_message(&nc->rings.prod_ring, NETCHANNEL2_MSG_BYPASS_DISABLE,
+                        0, &msg, sizeof(msg));
+       nc->rings.pending_time_sensitive_messages = 1;
+       return 0;
+}
+
+static int send_detach_bypass_msg(struct netchannel2 *nc,
+                                 struct nc2_bypass *bypass)
+{
+       struct netchannel2_msg_bypass_detach msg = {
+               .handle = bypass->handle
+       };
+
+       if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring, sizeof(msg)))
+               return 1;
+       nc2_send_message(&nc->rings.prod_ring, NETCHANNEL2_MSG_BYPASS_DETACH,
+                        0, &msg, sizeof(msg));
+       nc->rings.pending_time_sensitive_messages = 1;
+       return 0;
+}
+
+static void init_bypass_msg_common(struct netchannel2_msg_bypass_common *msg,
+                                  struct nc2_bypass_endpoint *this_ep,
+                                  struct netchannel2 *remote,
+                                  struct nc2_bypass *bypass)
+{
+       msg->control_gref = this_ep->control_gref;
+
+       msg->ring_domid = DOMID_SELF;
+       msg->ring_pages = bypass->nr_ring_pages;;
+       msg->peer_domid = remote->rings.otherend_id;
+       msg->peer_trusted = remote->remote_trusted;
+       msg->handle = bypass->handle;
+       memcpy(msg->remote_mac, remote->rings.remote_mac, ETH_ALEN);
+}
+
+static int advertise_bypass_frontend(struct netchannel2 *nc,
+                                    struct nc2_bypass *bypass)
+{
+       struct netchannel2_msg_bypass_frontend msg;
+       unsigned msg_size;
+
+       BUG_ON(nc != bypass->ep_a.nc2);
+
+       msg_size = sizeof(msg) + bypass->nr_ring_pages * 2 * sizeof(uint32_t);
+       if (!nc->current_bypass_frontend &&
+           !nc2_can_send_payload_bytes(&nc->rings.prod_ring, msg_size))
+               return 1;
+
+       memset(&msg, 0, sizeof(msg));
+
+       init_bypass_msg_common(&msg.common, &bypass->ep_a, bypass->ep_b.nc2,
+                              bypass);
+
+       nc->current_bypass_frontend = bypass;
+
+       /* Send the message.  nc2_send_message doesn't support the
+          right kind of scatter gather, so do it by hand. */
+       __nc2_avoid_ring_wrap(&nc->rings.prod_ring, msg_size);
+       msg.hdr.type = NETCHANNEL2_MSG_BYPASS_FRONTEND;
+       msg.hdr.size = msg_size;
+       nc2_copy_to_ring(&nc->rings.prod_ring, &msg, sizeof(msg));
+       nc2_copy_to_ring_off(&nc->rings.prod_ring,
+                            bypass->ep_a.outgoing_grefs,
+                            sizeof(uint32_t) * bypass->nr_ring_pages,
+                            sizeof(msg));
+       nc2_copy_to_ring_off(&nc->rings.prod_ring,
+                            bypass->ep_a.incoming_grefs,
+                            sizeof(uint32_t) * bypass->nr_ring_pages,
+                            sizeof(msg) + sizeof(uint32_t) * 
bypass->nr_ring_pages);
+       nc->rings.prod_ring.prod_pvt += msg_size;
+       nc->rings.prod_ring.bytes_available -= msg_size;
+       nc->rings.pending_time_sensitive_messages = 1;
+       return 0;
+}
+
+static int advertise_bypass_backend(struct netchannel2 *nc,
+                                    struct nc2_bypass *bypass)
+{
+       struct netchannel2_msg_bypass_backend msg;
+       unsigned msg_size;
+
+       BUG_ON(nc != bypass->ep_b.nc2);
+
+       msg_size = sizeof(msg) + bypass->nr_ring_pages * 2 * sizeof(uint32_t);
+       if (!nc2_can_send_payload_bytes(&nc->rings.prod_ring, msg_size))
+               return 1;
+
+       memset(&msg, 0, sizeof(msg));
+
+       init_bypass_msg_common(&msg.common, &bypass->ep_b, bypass->ep_a.nc2,
+                              bypass);
+
+       BUG_ON(bypass->evtchn_port == 0);
+       msg.port = bypass->evtchn_port;
+       msg.hdr.type = NETCHANNEL2_MSG_BYPASS_BACKEND;
+       msg.hdr.size = msg_size;
+       nc2_copy_to_ring(&nc->rings.prod_ring, &msg, sizeof(msg));
+       nc2_copy_to_ring_off(&nc->rings.prod_ring,
+                            bypass->ep_b.outgoing_grefs,
+                            sizeof(uint32_t) * bypass->nr_ring_pages,
+                            sizeof(msg));
+       nc2_copy_to_ring_off(&nc->rings.prod_ring,
+                            bypass->ep_b.incoming_grefs,
+                            sizeof(uint32_t) * bypass->nr_ring_pages,
+                            sizeof(msg) + sizeof(uint32_t) * 
bypass->nr_ring_pages);
+       nc->rings.prod_ring.prod_pvt += msg_size;
+       nc->rings.prod_ring.bytes_available -= msg_size;
+       nc->rings.pending_time_sensitive_messages = 1;
+       return 0;
+}
+
+/* Called from the tasklet, holding the ring lock for nc and the
+   bypass lock. */
+static int advertise_bypass(struct netchannel2 *nc, struct nc2_bypass *bypass)
+{
+       if (nc == bypass->ep_a.nc2)
+               return advertise_bypass_frontend(nc, bypass);
+       else
+               return advertise_bypass_backend(nc, bypass);
+}
+
+/* Called from the tasklet holding the ring and bypass locks. */
+static int nc2_do_bypass_advertise_work(struct nc2_bypass_endpoint *ep,
+                                       struct netchannel2 *nc,
+                                       struct nc2_bypass *bypass)
+{
+       if (ep->need_advertise) {
+               if (advertise_bypass(nc, bypass))
+                       return 0;
+               ep->need_advertise = 0;
+       }
+       if (ep->need_disable) {
+               if (send_disable_bypass_msg(nc, bypass))
+                       return 0;
+               ep->need_disable = 0;
+               ep->disable_sent = 1;
+       }
+       if (ep->need_detach) {
+               if (send_detach_bypass_msg(nc, bypass))
+                       return 0;
+               ep->need_detach = 0;
+               ep->detach_sent = 1;
+       }
+       return 1;
+}
+
+/* Called from the tasklet holding the ring lock. */
+void _nc2_advertise_bypasses(struct netchannel2 *nc)
+{
+       struct nc2_bypass *bypass;
+       int success;
+
+       spin_lock(&bypasses_lock);
+       success = 1;
+       list_for_each_entry(bypass, &nc->bypasses_a, ep_a.list) {
+               success &= nc2_do_bypass_advertise_work(&bypass->ep_a,
+                                                       nc,
+                                                       bypass);
+       }
+       list_for_each_entry(bypass, &nc->bypasses_b, ep_b.list) {
+               success &= nc2_do_bypass_advertise_work(&bypass->ep_b,
+                                                       nc,
+                                                       bypass);
+       }
+       if (success)
+               nc->need_advertise_bypasses = 0;
+       spin_unlock(&bypasses_lock);
+}
+
+void nc2_handle_bypass_frontend_ready(struct netchannel2 *nc,
+                                     struct netchannel2_ring_pair *ncrp,
+                                     struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_bypass_frontend_ready msg;
+       struct nc2_bypass *bypass;
+
+       if (hdr->size != sizeof(msg) || ncrp != &nc->rings ||
+           !nc->current_bypass_frontend)
+               return;
+       bypass = nc->current_bypass_frontend;
+       nc->current_bypass_frontend = NULL;
+       nc2_copy_from_ring(&nc->rings.cons_ring, &msg, sizeof(msg));
+       spin_lock(&bypasses_lock);
+       if (msg.port <= 0) {
+               printk(KERN_WARNING "%d from frontend trying to establish 
bypass\n",
+                      msg.port);
+               detach(&bypass->ep_a);
+               detach(&bypass->ep_b);
+               crank_bypass_state_machine(bypass);
+               spin_unlock(&bypasses_lock);
+               return;
+       }
+
+       bypass->evtchn_port = msg.port;
+       bypass->ep_b.need_advertise = 1;
+       bypass->ep_b.nc2->need_advertise_bypasses = 1;
+       nc2_kick(&bypass->ep_b.nc2->rings);
+       spin_unlock(&bypasses_lock);
+}
+
+/* Called from an ioctl not holding any locks. */
+static int build_bypass_page(int *gref_pool,
+                            int *grefp_a,
+                            int *grefp_b,
+                            domid_t domid_a,
+                            domid_t domid_b,
+                            unsigned long *pagep)
+{
+       int gref_a, gref_b;
+       unsigned long page;
+
+       page = get_zeroed_page(GFP_ATOMIC);
+       if (page == 0)
+               return -ENOMEM;
+       gref_a = gnttab_claim_grant_reference(gref_pool);
+       gref_b = gnttab_claim_grant_reference(gref_pool);
+       BUG_ON(gref_a < 0);
+       BUG_ON(gref_b < 0);
+       gnttab_grant_foreign_access_ref(gref_a, domid_a, virt_to_mfn(page), 0);
+       gnttab_grant_foreign_access_ref(gref_b, domid_b, virt_to_mfn(page), 0);
+
+       *pagep = page;
+       *grefp_a = gref_a;
+       *grefp_b = gref_b;
+       return 0;
+}
+
+/* Called from an ioctl or work queue item not holding any locks. */
+int nc2_establish_bypass(struct netchannel2 *a, struct netchannel2 *b)
+{
+       struct nc2_bypass *work;
+       struct nc2_bypass *other_bypass;
+       int err;
+       grant_ref_t gref_pool;
+       int i;
+       static atomic_t next_handle;
+       int handle;
+       unsigned nr_pages;
+
+       /* Can't establish a bypass unless we're trusted by both of
+          the remote endpoints. */
+       if (!a->local_trusted || !b->local_trusted)
+               return -EPERM;
+
+       /* Can't establish a bypass unless it's allowed by both
+        * endpoints. */
+       if (!a->bypass_max_pages || !b->bypass_max_pages)
+               return -EOPNOTSUPP;
+
+       if (a->extant_bypasses >= a->max_bypasses ||
+           b->extant_bypasses >= b->max_bypasses)
+               return -EMFILE;
+
+       nr_pages = a->bypass_max_pages;
+       if (nr_pages > b->bypass_max_pages)
+               nr_pages = b->bypass_max_pages;
+       if (nr_pages > MAX_BYPASS_RING_PAGES_GRANTABLE)
+               nr_pages = MAX_BYPASS_RING_PAGES_GRANTABLE;
+       if (nr_pages == 0) {
+               printk(KERN_WARNING "tried to establish a null bypass ring?\n");
+               return -EINVAL;
+       }
+
+       work = kzalloc(sizeof(*work), GFP_ATOMIC);
+       if (!work)
+               return -ENOMEM;
+       atomic_set(&work->refcnt, 1);
+       init_waitqueue_head(&work->detach_waitq);
+
+       work->nr_ring_pages = nr_pages;
+
+       work->ep_a.nc2 = a;
+       work->ep_b.nc2 = b;
+
+       work->ep_a.need_advertise = 1;
+
+       handle = atomic_inc_return(&next_handle);
+       work->handle = handle;
+
+       err = gnttab_alloc_grant_references(work->nr_ring_pages * 4 + 2,
+                                           &gref_pool);
+       if (err < 0)
+               goto err;
+
+       err = -ENOMEM;
+       for (i = 0; i < work->nr_ring_pages; i++) {
+               err = build_bypass_page(&gref_pool,
+                                       &work->ep_a.incoming_grefs[i],
+                                       &work->ep_b.outgoing_grefs[i],
+                                       a->rings.otherend_id,
+                                       b->rings.otherend_id,
+                                       &work->ep_a.incoming_pages[i]);
+               if (err < 0)
+                       goto err;
+               err = build_bypass_page(&gref_pool,
+                                       &work->ep_b.incoming_grefs[i],
+                                       &work->ep_a.outgoing_grefs[i],
+                                       b->rings.otherend_id,
+                                       a->rings.otherend_id,
+                                       &work->ep_b.incoming_pages[i]);
+               if (err < 0)
+                       goto err;
+       }
+       err = build_bypass_page(&gref_pool,
+                               &work->ep_a.control_gref,
+                               &work->ep_b.control_gref,
+                               a->rings.otherend_id,
+                               b->rings.otherend_id,
+                               &work->control_page);
+       if (err < 0)
+               goto err;
+
+       spin_lock_bh(&bypasses_lock);
+
+       if (work->ep_a.nc2->current_bypass_frontend) {
+               /* We can't establish another bypass until this one
+                  has finished (which might be forever, if the remote
+                  domain is misbehaving, but that's not a
+                  problem). */
+               err = -EBUSY;
+               spin_unlock_bh(&bypasses_lock);
+               goto err;
+       }
+
+       /* Don't allow redundant bypasses, because they'll never be used.
+          This doesn't actually matter all that much, because in order
+          to establish a redundant bypass, either:
+
+          -- The user explicitly requested one, in which case they
+             get what they deserve, or
+          -- They're using the autobypasser, in which case it'll detect
+             that the bypass isn't being used within a few seconds
+             and tear it down.
+
+          Still, it's better to avoid it (if only so the user gets a
+          sensible error message), and so we do a quick check here.
+       */
+       list_for_each_entry(other_bypass, &a->bypasses_a, ep_a.list) {
+               BUG_ON(other_bypass->ep_a.nc2 != a);
+               if (other_bypass->ep_b.nc2 == b) {
+                       err = -EEXIST;
+                       spin_unlock_bh(&bypasses_lock);
+                       goto err;
+               }
+       }
+       list_for_each_entry(other_bypass, &a->bypasses_b, ep_b.list) {
+               BUG_ON(other_bypass->ep_b.nc2 != a);
+               if (other_bypass->ep_a.nc2 == b) {
+                       err = -EEXIST;
+                       spin_unlock_bh(&bypasses_lock);
+                       goto err;
+               }
+       }
+
+       list_add(&work->ep_a.list, &a->bypasses_a);
+       INIT_LIST_HEAD(&work->ep_b.list);
+       a->need_advertise_bypasses = 1;
+       list_add(&work->ep_b.list, &b->bypasses_b);
+       list_add_tail(&work->list, &all_bypasses);
+
+       a->extant_bypasses++;
+       b->extant_bypasses++;
+
+       spin_unlock_bh(&bypasses_lock);
+
+       nc2_kick(&a->rings);
+
+       return handle;
+
+err:
+       gnttab_free_grant_references(gref_pool);
+       put_bypass(work);
+       return err;
+}
+
+/* Called from an ioctl holding the bypass lock. */
+static struct nc2_bypass *get_bypass(uint32_t handle)
+{
+       struct nc2_bypass *bypass;
+
+       list_for_each_entry(bypass, &all_bypasses, list) {
+               if (bypass->handle == handle) {
+                       atomic_inc(&bypass->refcnt);
+                       return bypass;
+               }
+       }
+       return NULL;
+}
+
+static int bypass_fully_detached(struct nc2_bypass *bypass)
+{
+       int res;
+       spin_lock_bh(&bypasses_lock);
+       res = bypass->ep_a.detached && bypass->ep_b.detached;
+       spin_unlock_bh(&bypasses_lock);
+       return res;
+}
+
+int nc2_destroy_bypass(int handle)
+{
+       struct nc2_bypass *bypass;
+       int r;
+
+       spin_lock_bh(&bypasses_lock);
+       bypass = get_bypass(handle);
+       if (bypass == NULL) {
+               spin_unlock_bh(&bypasses_lock);
+               return -ESRCH;
+       }
+       schedule_disable(&bypass->ep_a);
+       schedule_disable(&bypass->ep_b);
+       spin_unlock_bh(&bypasses_lock);
+
+       r = wait_event_interruptible_timeout(bypass->detach_waitq,
+                                            bypass_fully_detached(bypass),
+                                            5 * HZ);
+       put_bypass(bypass);
+       if (r < 0) {
+               printk(KERN_WARNING "Failed to destroy a bypass (%d).\n",
+                      r);
+       }
+       return r;
+}
+
+/* We're guaranteed to be the only thing accessing @nc at this point,
+   but we don't know what's happening to the other endpoints of any
+   bypasses which it might have attached. */
+void release_bypasses(struct netchannel2 *nc)
+{
+       struct nc2_bypass *bypass, *next_bypass;
+
+       spin_lock(&bypasses_lock);
+       list_for_each_entry_safe(bypass, next_bypass, &nc->bypasses_a,
+                                ep_a.list) {
+               detach(&bypass->ep_a);
+               crank_bypass_state_machine(bypass);
+       }
+       list_for_each_entry_safe(bypass, next_bypass, &nc->bypasses_b,
+                                ep_b.list) {
+               detach(&bypass->ep_b);
+               crank_bypass_state_machine(bypass);
+       }
+       spin_unlock(&bypasses_lock);
+
+       BUG_ON(!list_empty(&nc->bypasses_a));
+       BUG_ON(!list_empty(&nc->bypasses_b));
+
+       flush_scheduled_work();
+}
diff --git a/drivers/net/xen-netchannel2/chan.c 
b/drivers/net/xen-netchannel2/chan.c
index e96a8ee..5ceacc3 100644
--- a/drivers/net/xen-netchannel2/chan.c
+++ b/drivers/net/xen-netchannel2/chan.c
@@ -18,7 +18,7 @@
 static int process_ring(struct napi_struct *napi,
                        int work_avail);
 
-static irqreturn_t nc2_int(int irq, void *dev_id)
+irqreturn_t nc2_int(int irq, void *dev_id)
 {
        struct netchannel2_ring_pair *ncr = dev_id;
 
@@ -90,6 +90,30 @@ retry:
                        nc2_handle_set_max_fragments_per_packet(nc, ncrp,
                                                                &hdr);
                        break;
+               case NETCHANNEL2_MSG_BYPASS_FRONTEND:
+                       nc2_handle_bypass_frontend(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_BACKEND:
+                       nc2_handle_bypass_backend(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_FRONTEND_READY:
+                       nc2_handle_bypass_frontend_ready(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_DISABLE:
+                       nc2_handle_bypass_disable(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_DISABLED:
+                       nc2_handle_bypass_disabled(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_DETACH:
+                       nc2_handle_bypass_detach(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_DETACHED:
+                       nc2_handle_bypass_detached(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_BYPASS_READY:
+                       nc2_handle_bypass_ready(nc, ncrp, &hdr);
+                       break;
                case NETCHANNEL2_MSG_PAD:
                        break;
                default:
@@ -144,8 +168,15 @@ static void flush_rings(struct netchannel2_ring_pair *ncrp)
                advertise_max_packets(ncrp);
        if (ncrp->need_advertise_max_fragments_per_packet)
                advertise_max_fragments_per_packet(ncrp);
-       if (nc->need_advertise_offloads)
-               advertise_offloads(nc);
+
+       if (ncrp == &nc->rings) {
+               if (nc->need_advertise_offloads)
+                       advertise_offloads(nc);
+               nc2_advertise_bypasses(nc);
+               nc2_crank_aux_ring_state_machine(nc);
+       } else {
+               nc2_alternate_ring_disable_finish(ncrp);
+       }
 
        need_kick = 0;
        if (nc2_finish_messages(&ncrp->cons_ring)) {
@@ -352,6 +383,9 @@ struct netchannel2 *nc2_new(struct xenbus_device *xd)
        int local_trusted;
        int remote_trusted;
        int filter_mac;
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+       int max_bypasses;
+#endif
 
        if (!gnttab_subpage_grants_available()) {
                printk(KERN_ERR "netchannel2 needs version 2 grant tables\n");
@@ -364,6 +398,17 @@ struct netchannel2 *nc2_new(struct xenbus_device *xd)
                local_trusted = 1;
        }
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+       max_bypasses = 0;
+       if (local_trusted) {
+               if (xenbus_scanf(XBT_NIL, xd->nodename, "max-bypasses",
+                                "%d", &max_bypasses) != 1) {
+                       printk(KERN_WARNING "Can't get maximum bypass count; 
assuming 0.\n");
+                       max_bypasses = 0;
+               }
+       }
+#endif
+
        if (xenbus_scanf(XBT_NIL, xd->nodename, "remote-trusted",
                         "%d", &remote_trusted) != 1) {
                printk(KERN_WARNING "Can't tell whether local endpoint is 
trusted; assuming it isn't.\n");
@@ -398,6 +443,15 @@ struct netchannel2 *nc2_new(struct xenbus_device *xd)
        /* Default to RX csum on. */
        nc->use_rx_csum = 1;
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+       INIT_LIST_HEAD(&nc->bypasses_a);
+       INIT_LIST_HEAD(&nc->bypasses_b);
+       nc->max_bypasses = max_bypasses;
+#endif
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+       INIT_LIST_HEAD(&nc->alternate_rings);
+#endif
+
        skb_queue_head_init(&nc->pending_skbs);
        if (init_ring_pair(&nc->rings, nc) < 0) {
                nc2_release(nc);
@@ -452,21 +506,25 @@ void nc2_release(struct netchannel2 *nc)
           we're now the only thing accessing this netchannel2
           structure and we can tear it down with impunity. */
 
+       nc2_release_alt_rings(nc);
+
        cleanup_ring_pair(&nc->rings);
 
        nc2_queue_purge(&nc->rings, &nc->pending_skbs);
 
+       release_bypasses(nc);
+
        free_netdev(nc->net_device);
 }
 
-static void _nc2_attach_rings(struct netchannel2_ring_pair *ncrp,
-                             struct netchannel2_sring_cons *cons_sring,
-                             const volatile void *cons_payload,
-                             size_t cons_size,
-                             struct netchannel2_sring_prod *prod_sring,
-                             void *prod_payload,
-                             size_t prod_size,
-                             domid_t otherend_id)
+void _nc2_attach_rings(struct netchannel2_ring_pair *ncrp,
+                      struct netchannel2_sring_cons *cons_sring,
+                      const volatile void *cons_payload,
+                      size_t cons_size,
+                      struct netchannel2_sring_prod *prod_sring,
+                      void *prod_payload,
+                      size_t prod_size,
+                      domid_t otherend_id)
 {
        BUG_ON(prod_sring == NULL);
        BUG_ON(cons_sring == NULL);
@@ -503,6 +561,28 @@ int nc2_attach_rings(struct netchannel2 *nc,
                     size_t prod_size,
                     domid_t otherend_id)
 {
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+       int feature_bypass;
+       int max_bypass_pages;
+
+       if (xenbus_scanf(XBT_NIL, nc->xenbus_device->otherend,
+                        "feature-bypass", "%d", &feature_bypass) < 0)
+               feature_bypass = 0;
+       if (feature_bypass) {
+               if (xenbus_scanf(XBT_NIL, nc->xenbus_device->otherend,
+                                "feature-bypass-max-pages", "%d",
+                                &max_bypass_pages) < 0) {
+                       printk(KERN_WARNING "other end claimed to support 
bypasses, but didn't expose max-pages?\n");
+                       /* Bypasses disabled for this ring. */
+                       nc->max_bypasses = 0;
+               } else {
+                       nc->bypass_max_pages = max_bypass_pages;
+               }
+       } else {
+               nc->max_bypasses = 0;
+       }
+#endif
+
        spin_lock_bh(&nc->rings.lock);
        _nc2_attach_rings(&nc->rings, cons_sring, cons_payload, cons_size,
                          prod_sring, prod_payload, prod_size, otherend_id);
@@ -544,6 +624,24 @@ static void _detach_rings(struct netchannel2_ring_pair 
*ncrp)
        ncrp->is_attached = 0;
 
        spin_unlock_bh(&ncrp->lock);
+
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+       {
+               struct nc2_alternate_ring *nar;
+
+               /* Walk the alternate rings list and detach all of
+                  them as well.  This is recursive, but it's only
+                  ever going to recur one deep, so it's okay. */
+               /* Don't need to worry about synchronisation because
+                  the interface has been stopped. */
+               if (ncrp == &ncrp->interface->rings) {
+                       list_for_each_entry(nar,
+                                           &ncrp->interface->alternate_rings,
+                                           rings_by_interface)
+                               _detach_rings(&nar->rings);
+               }
+       }
+#endif
 }
 
 /* Detach from the rings.  This includes unmapping them and stopping
@@ -575,6 +673,20 @@ void nc2_detach_rings(struct netchannel2 *nc)
        if (nc->rings.irq >= 0)
                unbind_from_irqhandler(nc->rings.irq, &nc->rings);
        nc->rings.irq = -1;
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+       {
+               struct nc2_alternate_ring *ncr;
+
+               list_for_each_entry(ncr, &nc->alternate_rings,
+                                   rings_by_interface) {
+                       if (ncr->rings.irq >= 0) {
+                               unbind_from_irqhandler(ncr->rings.irq,
+                                                      &ncr->rings);
+                               ncr->rings.irq = -1;
+                       }
+               }
+       }
+#endif
 
        /* Disable all offloads */
        nc->net_device->features &= ~(NETIF_F_IP_CSUM | NETIF_F_SG | 
NETIF_F_TSO);
@@ -646,6 +758,7 @@ int nc2_get_evtchn_port(struct netchannel2 *nc)
 
 void nc2_suspend(struct netchannel2 *nc)
 {
+       detach_all_bypasses(nc);
        suspend_receive_map_mode();
 }
 
@@ -687,7 +800,7 @@ static int process_ring(struct napi_struct *napi,
                        release_tx_packet(ncrp, skb);
        }
 
-       if (nc->is_stopped) {
+       if (ncrp == &nc->rings && nc->is_stopped) {
                /* If the other end has processed some messages, there
                   may be space on the ring for a delayed send from
                   earlier.  Process it now. */
diff --git a/drivers/net/xen-netchannel2/netback2.c 
b/drivers/net/xen-netchannel2/netback2.c
index bd33786..06f2705 100644
--- a/drivers/net/xen-netchannel2/netback2.c
+++ b/drivers/net/xen-netchannel2/netback2.c
@@ -1,18 +1,30 @@
 #include <linux/kernel.h>
 #include <linux/gfp.h>
 #include <linux/vmalloc.h>
+#include <linux/miscdevice.h>
+#include <linux/fs.h>
 #include <xen/grant_table.h>
 #include <xen/xenbus.h>
 #include <xen/interface/io/netchannel2.h>
 
 #include "netchannel2_core.h"
 #include "netchannel2_endpoint.h"
+#include "netchannel2_uspace.h"
+
+static atomic_t next_handle;
+/* A list of all currently-live netback2 interfaces. */
+static LIST_HEAD(all_netbacks);
+/* A lock to protect the above list. */
+static DEFINE_MUTEX(all_netbacks_lock);
 
 #define NETBACK2_MAGIC 0xb5e99485
 struct netback2 {
        unsigned magic;
        struct xenbus_device *xenbus_device;
 
+       int handle;
+       struct list_head list;
+
        struct netchannel2 *chan;
 
        struct grant_mapping b2f_mapping;
@@ -182,6 +194,14 @@ static void frontend_changed(struct xenbus_device *xd,
                xenbus_printf(XBT_NIL, nb->xenbus_device->nodename,
                              "max-sring-pages", "%d", MAX_GRANT_MAP_PAGES);
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+               xenbus_printf(XBT_NIL, nb->xenbus_device->nodename,
+                             "feature-bypass", "1");
+               xenbus_printf(XBT_NIL, nb->xenbus_device->nodename,
+                             "feature-bypass-max-pages", "%d",
+                             MAX_BYPASS_RING_PAGES_GRANTABLE);
+#endif
+
                /* Start the device bring-up bit of the state
                 * machine. */
                xenbus_switch_state(nb->xenbus_device, XenbusStateInitWait);
@@ -296,6 +316,11 @@ static int netback2_probe(struct xenbus_device *xd,
 
        xd->dev.driver_data = nb;
 
+       nb->handle = atomic_inc_return(&next_handle);
+       mutex_lock(&all_netbacks_lock);
+       list_add(&nb->list, &all_netbacks);
+       mutex_unlock(&all_netbacks_lock);
+
        kobject_uevent(&xd->dev.kobj, KOBJ_ONLINE);
 
        return 0;
@@ -315,6 +340,9 @@ static int netback2_remove(struct xenbus_device *xd)
 {
        struct netback2 *nb = xenbus_device_to_nb2(xd);
        kobject_uevent(&xd->dev.kobj, KOBJ_OFFLINE);
+       mutex_lock(&all_netbacks_lock);
+       list_del(&nb->list);
+       mutex_unlock(&all_netbacks_lock);
        if (nb->chan != NULL)
                nc2_release(nb->chan);
        if (nb->have_shutdown_watch)
@@ -341,14 +369,114 @@ static struct xenbus_driver netback2 = {
        .uevent = netback2_uevent,
 };
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+static struct netback2 *find_netback_by_handle_locked(unsigned handle)
+{
+       struct netback2 *nb;
+
+       list_for_each_entry(nb, &all_netbacks, list) {
+               if (nb->handle == handle)
+                       return nb;
+       }
+       return NULL;
+}
+
+static struct netback2 *find_netback_by_remote_mac_locked(const char *mac)
+{
+       struct netback2 *nb;
+
+       list_for_each_entry(nb, &all_netbacks, list) {
+               if (!memcmp(nb->chan->rings.remote_mac, mac, ETH_ALEN))
+                       return nb;
+       }
+       return NULL;
+}
+
+static long netchannel2_ioctl_establish_bypass(struct 
netchannel2_ioctl_establish_bypass __user *argsp)
+{
+       struct netchannel2_ioctl_establish_bypass args;
+       struct netback2 *a, *b;
+       int res;
+
+       if (copy_from_user(&args, argsp, sizeof(args)))
+               return -EFAULT;
+
+       mutex_lock(&all_netbacks_lock);
+       a = find_netback_by_handle_locked(args.handle_a);
+       b = find_netback_by_handle_locked(args.handle_b);
+       if (a && b)
+               res = nc2_establish_bypass(a->chan, b->chan);
+       else
+               res = -EINVAL;
+       mutex_unlock(&all_netbacks_lock);
+
+       return res;
+}
+
+void nb2_handle_suggested_bypass(struct netchannel2 *a_chan, const char *mac_b)
+{
+       struct netback2 *b;
+       mutex_lock(&all_netbacks_lock);
+       b = find_netback_by_remote_mac_locked(mac_b);
+       if (b != NULL)
+               nc2_establish_bypass(a_chan, b->chan);
+       mutex_unlock(&all_netbacks_lock);
+}
+
+static long netchannel2_ioctl_destroy_bypass(struct 
netchannel2_ioctl_destroy_bypass __user *argsp)
+{
+       struct netchannel2_ioctl_destroy_bypass args;
+
+       if (copy_from_user(&args, argsp, sizeof(args)))
+               return -EFAULT;
+
+       return nc2_destroy_bypass(args.handle);
+}
+#endif
+
+static long misc_dev_unlocked_ioctl(struct file *filp, unsigned cmd,
+                                   unsigned long data)
+{
+       switch (cmd) {
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+       case NETCHANNEL2_IOCTL_ESTABLISH_BYPASS:
+               return netchannel2_ioctl_establish_bypass(
+                       (struct netchannel2_ioctl_establish_bypass __user 
*)data);
+       case NETCHANNEL2_IOCTL_DESTROY_BYPASS:
+               return netchannel2_ioctl_destroy_bypass(
+                       (struct netchannel2_ioctl_destroy_bypass __user *)data);
+#endif
+       default:
+               return -EINVAL;
+       }
+}
+
+static struct file_operations misc_dev_fops = {
+       .owner = THIS_MODULE,
+       .unlocked_ioctl = misc_dev_unlocked_ioctl
+};
+
+static struct miscdevice netback2_misc_dev = {
+       .minor = MISC_DYNAMIC_MINOR,
+       .name = "netback2",
+       .fops = &misc_dev_fops
+};
+
 int __init netback2_init(void)
 {
        int r;
 
+       r = misc_register(&netback2_misc_dev);
+       if (r < 0) {
+               printk(KERN_ERR "Error %d registering control device.\n",
+                      r);
+               return r;
+       }
        r = xenbus_register_backend(&netback2);
        if (r < 0) {
                printk(KERN_ERR "error %d registering backend driver.\n",
                       r);
+               misc_deregister(&netback2_misc_dev);
        }
        return r;
 }
diff --git a/drivers/net/xen-netchannel2/netchannel2_core.h 
b/drivers/net/xen-netchannel2/netchannel2_core.h
index 2572017..a116e5c 100644
--- a/drivers/net/xen-netchannel2/netchannel2_core.h
+++ b/drivers/net/xen-netchannel2/netchannel2_core.h
@@ -43,7 +43,7 @@ enum transmit_policy {
        transmit_policy_last = transmit_policy_small
 };
 
-/* When we send a packet message, we need to tag it with an ID.         That
+/* When we send a packet message, we need to tag it with an ID.  That
    ID is an index into the TXP slot array.  Each slot contains either
    a pointer to an sk_buff (if it's in use), or the index of the next
    free slot (if it isn't).  A slot is in use if the contents is >
@@ -108,7 +108,6 @@ static inline struct skb_cb_overlay *get_skb_overlay(struct 
sk_buff *skb)
        return (struct skb_cb_overlay *)skb->cb;
 }
 
-
 /* Packets for which we need to send FINISH_PACKET messages for as
    soon as possible. */
 struct pending_finish_packets {
@@ -154,7 +153,7 @@ struct netchannel2_ring_pair {
 
        /* The IRQ corresponding to the event channel which is
           connected to the other end.  This only changes from the
-          xenbus state change handler.  It is notified from lots of
+          xenbus state change handler.          It is notified from lots of
           other places.  Fortunately, it's safe to notify on an irq
           after it's been released, so the lack of synchronisation
           doesn't matter. */
@@ -248,6 +247,15 @@ struct netchannel2 {
           it's useful for optimisation. */
        int local_trusted;
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+       /* Alternate rings for this interface.  Protected by the
+          master rings lock. */
+       struct list_head alternate_rings;
+       uint8_t need_aux_ring_state_machine;
+
+       uint8_t pending_bypass_error;
+#endif
+
        struct netchannel2_ring_pair rings;
 
        /* Packets which we need to transmit soon */
@@ -275,12 +283,261 @@ struct netchannel2 {
           after we receive an interrupt so that we can wake it up */
        uint8_t is_stopped;
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+       /* Bypass support.  */
+       /* There's some unadvertised bypass in one of the lists. */
+       uint8_t need_advertise_bypasses;
+       uint8_t bypass_max_pages;
+       uint16_t max_bypasses;
+       uint16_t extant_bypasses;
+       struct list_head bypasses_a;
+       struct list_head bypasses_b;
+
+       struct nc2_bypass *current_bypass_frontend;
+#endif
+
        /* Updates are protected by the lock.  This can be read at any
         * time without holding any locks, and the rest of Linux is
         * expected to cope. */
        struct net_device_stats stats;
 };
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASSABLE
+#define MAX_BYPASS_RING_PAGES_GRANTABLE 4
+struct nc2_bypass_endpoint {
+       struct list_head list; /* Always ``valid'', but won't actually
+                                 be in any list if we're detached (it
+                                 gets set to the empty list). */
+       struct netchannel2 *nc2; /* Valid provided detached isn't
+                                 * set */
+       grant_ref_t incoming_grefs[MAX_BYPASS_RING_PAGES_GRANTABLE];
+       grant_ref_t outgoing_grefs[MAX_BYPASS_RING_PAGES_GRANTABLE];
+       grant_ref_t control_gref;
+       unsigned long incoming_pages[MAX_BYPASS_RING_PAGES_GRANTABLE];
+
+       uint8_t need_advertise;
+       uint8_t need_disable;
+       uint8_t disable_sent;
+       uint8_t disabled;
+       uint8_t need_detach;
+       uint8_t detach_sent;
+       uint8_t detached;
+};
+
+/* This is the representation of a bypass in the bypassed domain. */
+struct nc2_bypass {
+       /* Cleared to an empty list if both endpoints are detached. */
+       struct list_head list;
+
+       /* Reference count.  Being on the big list, threaded through
+          @list, counts as a single reference. */
+       atomic_t refcnt;
+
+       struct nc2_bypass_endpoint ep_a;
+       struct nc2_bypass_endpoint ep_b;
+       unsigned long control_page;
+       unsigned nr_ring_pages;
+
+       unsigned handle;
+       int evtchn_port;
+
+       wait_queue_head_t detach_waitq;
+};
+
+int nc2_establish_bypass(struct netchannel2 *a, struct netchannel2 *b);
+int nc2_destroy_bypass(int handle);
+void _nc2_advertise_bypasses(struct netchannel2 *nc);
+static inline void nc2_advertise_bypasses(struct netchannel2 *nc)
+{
+       if (nc->need_advertise_bypasses)
+               _nc2_advertise_bypasses(nc);
+}
+void nc2_handle_bypass_disabled(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_detached(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_frontend_ready(struct netchannel2 *nc,
+                                     struct netchannel2_ring_pair *ncrp,
+                                     struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_disabled(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_detached(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr);
+void release_bypasses(struct netchannel2 *nc);
+void nb2_handle_suggested_bypass(struct netchannel2 *a_chan,
+                                const char *mac_b);
+#else
+static inline void release_bypasses(struct netchannel2 *nc)
+{
+}
+static inline void nc2_advertise_bypasses(struct netchannel2 *nc)
+{
+}
+static inline void nc2_handle_bypass_frontend_ready(struct netchannel2 *nc,
+                                                   struct 
netchannel2_ring_pair *ncrp,
+                                                   struct netchannel2_msg_hdr 
*hdr)
+{
+}
+static inline void nc2_handle_bypass_disabled(struct netchannel2 *nc,
+                                             struct netchannel2_ring_pair 
*ncrp,
+                                             struct netchannel2_msg_hdr *hdr)
+{
+}
+static inline void nc2_handle_bypass_detached(struct netchannel2 *nc,
+                                             struct netchannel2_ring_pair 
*ncrp,
+                                             struct netchannel2_msg_hdr *hdr)
+{
+}
+#endif
+
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+#define MAX_BYPASS_RING_PAGES_MAPPABLE 4
+/* This is the representation of a bypass from the point of view of
+   one of the endpoint domains. */
+struct nc2_alternate_ring {
+       /* List of all alternate rings on a given interface.  Dangles
+        * off of alternate_rings in struct netchannel2.  Protected by
+        * the netchannel2 master ring lock. */
+       struct list_head rings_by_interface;
+       /* The state of the alternate ring.  This only ever goes
+        * forwards.  It is protected by the auxiliary ring lock. */
+       enum {
+               /* This is a frontend, it's just been allocated and
+                  doesn't yet have a port. */
+               nc2_alt_ring_frontend_preparing = 0xf001,
+               /* This is a frontend, it has a port but hasn't told
+                  the parent yet. */
+               nc2_alt_ring_frontend_send_ready_pending,
+               /* We've sent the FRONTEND_READY message and are
+                  waiting for the backend to say it's ready. */
+               nc2_alt_ring_frontend_sent_ready,
+               /* This is a backend.  In theory, we know what port to
+                  use, but we haven't tried to bind to it yet. */
+               nc2_alt_ring_backend_preparing,
+               /* Running normally */
+               nc2_alt_ring_ready,
+               /* Can't be used for more PACKETs, will disable as
+                  soon as all FINISHes arrive. */
+               nc2_alt_ring_disabling,
+               /* All FINISHes arrived, waiting to send DISABLED */
+               nc2_alt_ring_disabled_pending,
+               /* DISABLED sent. */
+               nc2_alt_ring_disabled,
+               /* DETACH received */
+               nc2_alt_ring_detaching,
+               /* Ring has been detached, waiting to send the
+                  DETACHED message. */
+               nc2_alt_ring_detached_pending
+       } state;
+       struct work_struct work_item;
+       struct work_struct detach_work_item;
+
+       struct grant_mapping prod_mapper;
+       struct grant_mapping cons_mapper;
+       struct grant_mapping control_mapper;
+
+       struct netchannel2_ring_pair rings;
+
+       /* A lower bound on the number of times we've called
+          disable_irq() on the irq.  The interrupt handler guarantees
+          to notify the eventq quickly if this increases.  It
+          increases whenever there is work for the worker thread to
+          do. */
+       atomic_t irq_disable_count;
+       wait_queue_head_t eventq;
+       uint32_t handle;
+
+       struct netchannel2_msg_bypass_frontend frontend_setup_msg;
+       struct netchannel2_msg_bypass_backend backend_setup_msg;
+       uint32_t cons_grefs[MAX_BYPASS_RING_PAGES_MAPPABLE];
+       uint32_t prod_grefs[MAX_BYPASS_RING_PAGES_MAPPABLE];
+};
+
+void nc2_handle_bypass_ready(struct netchannel2 *nc,
+                            struct netchannel2_ring_pair *ncrp,
+                            struct netchannel2_msg_hdr *hdr);
+int bypass_xmit_packet(struct netchannel2 *nc,
+                      struct nc2_alternate_ring *ncr,
+                      struct sk_buff *skb);
+void _nc2_alternate_ring_disable_finish(struct nc2_alternate_ring *ncr);
+static inline void nc2_alternate_ring_disable_finish(struct 
netchannel2_ring_pair *ncrp)
+{
+       struct nc2_alternate_ring *nar;
+       nar = container_of(ncrp, struct nc2_alternate_ring, rings);
+       if (nar->state == nc2_alt_ring_disabling &&
+           ncrp->nr_tx_packets_outstanding == 0)
+               _nc2_alternate_ring_disable_finish(nar);
+}
+void _nc2_crank_aux_ring_state_machine(struct netchannel2 *nc);
+static inline void nc2_crank_aux_ring_state_machine(struct netchannel2 *nc)
+{
+       if (nc->need_aux_ring_state_machine)
+               _nc2_crank_aux_ring_state_machine(nc);
+}
+void nc2_release_alt_rings(struct netchannel2 *nc);
+void detach_all_bypasses(struct netchannel2 *nc);
+void nc2_handle_bypass_frontend(struct netchannel2 *nc,
+                               struct netchannel2_ring_pair *ncrp,
+                               struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_backend(struct netchannel2 *nc,
+                              struct netchannel2_ring_pair *ncrp,
+                              struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_disable(struct netchannel2 *nc,
+                              struct netchannel2_ring_pair *ncrp,
+                              struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_detach(struct netchannel2 *nc,
+                             struct netchannel2_ring_pair *ncrp,
+                             struct netchannel2_msg_hdr *hdr);
+void nc2_handle_bypass_ready(struct netchannel2 *nc,
+                            struct netchannel2_ring_pair *ncrp,
+                            struct netchannel2_msg_hdr *hdr);
+void nc2_aux_ring_start_disable_sequence(struct nc2_alternate_ring *nar);
+void nc2_aux_ring_start_detach_sequence(struct nc2_alternate_ring *nar);
+#else
+static inline void detach_all_bypasses(struct netchannel2 *nc)
+{
+}
+static inline void nc2_crank_aux_ring_state_machine(struct netchannel2 *nc)
+{
+}
+static inline void nc2_alternate_ring_disable_finish(struct 
netchannel2_ring_pair *ncrp)
+{
+}
+static inline void nc2_release_alt_rings(struct netchannel2 *nc)
+{
+}
+static inline void nc2_handle_bypass_frontend(struct netchannel2 *nc,
+                                             struct netchannel2_ring_pair 
*ncrp,
+                                             struct netchannel2_msg_hdr *hdr)
+{
+}
+static inline void nc2_handle_bypass_backend(struct netchannel2 *nc,
+                                            struct netchannel2_ring_pair *ncrp,
+                                            struct netchannel2_msg_hdr *hdr)
+{
+}
+static inline void nc2_handle_bypass_disable(struct netchannel2 *nc,
+                                            struct netchannel2_ring_pair *ncrp,
+                                            struct netchannel2_msg_hdr *hdr)
+{
+}
+static inline void nc2_handle_bypass_detach(struct netchannel2 *nc,
+                                           struct netchannel2_ring_pair *ncrp,
+                                           struct netchannel2_msg_hdr *hdr)
+{
+}
+static inline void nc2_handle_bypass_ready(struct netchannel2 *nc,
+                                          struct netchannel2_ring_pair *ncrp,
+                                          struct netchannel2_msg_hdr *hdr)
+{
+}
+#endif
+
+
 static inline void flush_prepared_grant_copies(struct hypercall_batcher *hb,
                                               void (*on_fail)(void *ctxt,
                                                               struct 
gnttab_copy *gop))
@@ -371,10 +628,24 @@ int nc2_map_grants(struct grant_mapping *gm,
                   unsigned nr_grefs,
                   domid_t remote_domain);
 void nc2_unmap_grants(struct grant_mapping *gm);
-
+void _nc2_attach_rings(struct netchannel2_ring_pair *ncrp,
+                      struct netchannel2_sring_cons *cons_sring,
+                      const volatile void *cons_payload,
+                      size_t cons_size,
+                      struct netchannel2_sring_prod *prod_sring,
+                      void *prod_payload,
+                      size_t prod_size,
+                      domid_t otherend_id);
 void queue_packet_to_interface(struct sk_buff *skb,
                               struct netchannel2_ring_pair *ncrp);
 
+unsigned get_transmitted_packet_msg_size(struct sk_buff *skb);
+int init_ring_pair(struct netchannel2_ring_pair *ncrp,
+                  struct netchannel2 *nc);
+
+irqreturn_t nc2_int(int irq, void *dev_id);
+
+void cleanup_ring_pair(struct netchannel2_ring_pair *ncrp);
 void nc2_rscb_on_gntcopy_fail(void *ctxt, struct gnttab_copy *gop);
 
 int init_receive_map_mode(void);
diff --git a/drivers/net/xen-netchannel2/netchannel2_uspace.h 
b/drivers/net/xen-netchannel2/netchannel2_uspace.h
new file mode 100644
index 0000000..f4d06ca
--- /dev/null
+++ b/drivers/net/xen-netchannel2/netchannel2_uspace.h
@@ -0,0 +1,17 @@
+#ifndef NETCHANNEL2_USPACE_H__
+#define NETCHANNEL2_USPACE_H__
+
+#include <linux/ioctl.h>
+
+struct netchannel2_ioctl_establish_bypass {
+       unsigned handle_a;
+       unsigned handle_b;
+};
+#define NETCHANNEL2_IOCTL_ESTABLISH_BYPASS _IOW('N', 0, struct 
netchannel2_ioctl_establish_bypass)
+
+struct netchannel2_ioctl_destroy_bypass {
+       unsigned handle;
+};
+#define NETCHANNEL2_IOCTL_DESTROY_BYPASS _IOW('N', 1, struct 
netchannel2_ioctl_destroy_bypass)
+
+#endif /* !NETCHANNEL2_USPACE_H__ */
diff --git a/drivers/net/xen-netchannel2/netfront2.c 
b/drivers/net/xen-netchannel2/netfront2.c
index 6c88484..f360b43 100644
--- a/drivers/net/xen-netchannel2/netfront2.c
+++ b/drivers/net/xen-netchannel2/netfront2.c
@@ -181,6 +181,19 @@ again:
                goto abort;
        }
 
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+       err = xenbus_printf(xbt, nf->xenbus_device->nodename,
+                           "feature-bypass", "1");
+       if (!err)
+               err = xenbus_printf(xbt, nf->xenbus_device->nodename,
+                                   "feature-bypass-max-pages", "%d",
+                                   MAX_BYPASS_RING_PAGES_MAPPABLE);
+       if (err) {
+               msg = "publishing bypass info";
+               goto abort;
+       }
+#endif
+
        err = xenbus_transaction_end(xbt, 0);
        if (err) {
                if (err == -EAGAIN)
@@ -439,6 +452,17 @@ err:
        return -ENOMEM;
 }
 
+static int netfront_suspend(struct xenbus_device *xd)
+{
+       /* We're about to suspend.  Do the minimum amount of work to
+          make that safe. */
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+
+       nc2_suspend(nf->chan);
+
+       return 0;
+}
+
 static int netfront_resume(struct xenbus_device *xd)
 {
        /* We've been suspended and come back.  The rings are
@@ -476,6 +500,7 @@ static struct xenbus_driver netfront2 = {
        .remove = __devexit_p(netfront_remove),
        .otherend_changed = backend_changed,
        .resume = netfront_resume,
+       .suspend = netfront_suspend,
 };
 
 int __init netfront2_init(void)
diff --git a/drivers/net/xen-netchannel2/recv_packet.c 
b/drivers/net/xen-netchannel2/recv_packet.c
index 8c38788..749c70e 100644
--- a/drivers/net/xen-netchannel2/recv_packet.c
+++ b/drivers/net/xen-netchannel2/recv_packet.c
@@ -80,6 +80,15 @@ void nc2_handle_packet_msg(struct netchannel2 *nc,
 
        nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
 
+       if (msg.type != NC2_PACKET_TYPE_receiver_copy &&
+           msg.type != NC2_PACKET_TYPE_small &&
+           ncrp != &nc->rings) {
+               pr_debug("Received strange packet type %d on bypass ring.\n",
+                        msg.type);
+               nc->stats.tx_errors++;
+               return;
+       }
+
        frags_bytes = hdr->size - sizeof(msg) - msg.prefix_size;
        nr_frags = frags_bytes / sizeof(struct netchannel2_fragment);
 
diff --git a/drivers/net/xen-netchannel2/xmit_packet.c 
b/drivers/net/xen-netchannel2/xmit_packet.c
index eb4090b..03deb65 100644
--- a/drivers/net/xen-netchannel2/xmit_packet.c
+++ b/drivers/net/xen-netchannel2/xmit_packet.c
@@ -40,7 +40,7 @@ enum prepare_xmit_result prepare_xmit_allocate_small(
 }
 
 /* Figure out how much space @tp will take up on the ring. */
-static unsigned get_transmitted_packet_msg_size(struct sk_buff *skb)
+unsigned get_transmitted_packet_msg_size(struct sk_buff *skb)
 {
        struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
        return (sizeof(struct netchannel2_msg_packet) +
@@ -236,6 +236,21 @@ int nc2_start_xmit(struct sk_buff *skb, struct net_device 
*dev)
 
        spin_lock_bh(&nc->rings.lock);
 
+       /* If we have a bypass suitable for this packet then we prefer
+        * that to the main ring pair. */
+#ifdef CONFIG_XEN_NETDEV2_BYPASS_ENDPOINT
+       {
+               struct nc2_alternate_ring *ncr;
+               list_for_each_entry(ncr, &nc->alternate_rings,
+                                   rings_by_interface) {
+                       if (bypass_xmit_packet(nc, ncr, skb)) {
+                               spin_unlock_bh(&nc->rings.lock);
+                               return NETDEV_TX_OK;
+                       }
+               }
+       }
+#endif
+
        if (!nc->rings.is_attached)
                goto out_drop;
 
diff --git a/include/xen/interface/io/netchannel2.h 
b/include/xen/interface/io/netchannel2.h
index f264995..f3cabe8 100644
--- a/include/xen/interface/io/netchannel2.h
+++ b/include/xen/interface/io/netchannel2.h
@@ -188,4 +188,142 @@ struct netchannel2_msg_set_max_fragments_per_packet {
        uint32_t max_frags_per_packet;
 };
 
+/* Attach to a bypass ring as a frontend.  The receiving domain should
+ * map the bypass ring (which will be in the sending domain's memory)
+ * and attach to it in the same as it attached to the original ring.
+ * This bypass ring will, once it's been successfully set up, be used
+ * for all packets destined for @remote_mac (excluding broadcasts).
+ *
+ * @ring_domid indicates which domain allocated the ring pages, and
+ * hence which domain should be specified when grant mapping
+ * @control_gref, @prod_gref, and @cons_gref.  It can be set to
+ * DOMID_SELF, in which case the domain ID of the domain sending the
+ * message should be used.
+ *
+ * @peer_domid indicates the domain ID of the domain on the other end
+ * of the ring.
+ *
+ * @handle gives a unique handle for the bypass which will be used in
+ * future messages.
+ *
+ * @peer_trusted is true if the peer should be trusted by the domain
+ * which sent the bypass message.
+ *
+ * @ring_pages gives the number of valid grefs in the @prod_grefs and
+ * @cons_grefs arrays.
+ *
+ * @is_backend_like indicates which ring attach the receiving domain
+ * should use.  If @is_backend_like is set, the receiving domain
+ * should interpret the control area as a netchannel2_backend_shared.
+ * Otherwise, it's a netchannel2_frontend_shared.  Also, a
+ * backend-like endpoint should receive an event channel from the peer
+ * domain, while a frontend-like one should send one.  Once
+ * established, the ring is symmetrical.
+ *
+ *
+ * BYPASS messages can only be sent by a trusted endpoint.  They may
+ * not be sent over bypass rings.
+ *
+ * No packets may be sent over the ring until a READY message is
+ * received.  Until that point, all packets must be sent over the
+ * parent ring.
+ */
+struct netchannel2_msg_bypass_common {
+       uint16_t ring_domid;
+       uint16_t peer_domid;
+       uint32_t handle;
+
+       uint8_t remote_mac[6];
+       uint8_t peer_trusted;
+       uint8_t ring_pages;
+
+       uint32_t control_gref;
+       uint32_t pad;
+
+       /* Followed by a run of @ring_pages uint32_t producer ring
+          grant references, then a run of @ring_pages uint32_t
+          consumer ring grant references */
+};
+
+#define NETCHANNEL2_MSG_BYPASS_FRONTEND 9
+struct netchannel2_msg_bypass_frontend {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t pad;
+       struct netchannel2_msg_bypass_common common;
+};
+
+#define NETCHANNEL2_MSG_BYPASS_BACKEND 10
+struct netchannel2_msg_bypass_backend {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t port;
+       struct netchannel2_msg_bypass_common common;
+};
+
+#define NETCHANNEL2_MSG_BYPASS_FRONTEND_READY 11
+struct netchannel2_msg_bypass_frontend_ready {
+       struct netchannel2_msg_hdr hdr;
+       int32_t port;
+};
+
+/* This message is sent on a bypass ring once the sending domain is
+ * ready to receive packets.  Until it has been received, the bypass
+ * ring cannot be used to transmit packets.  It may only be sent once.
+ *
+ * Note that it is valid to send packet messages before *sending* a
+ * BYPASS_READY message, provided a BYPASS_READY message has been
+ * *received*.
+ *
+ * This message can only be sent on a bypass ring.
+ */
+#define NETCHANNEL2_MSG_BYPASS_READY 12
+struct netchannel2_msg_bypass_ready {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t pad;
+};
+
+/* Disable an existing bypass. This is sent over the *parent* ring,
+ * in the same direction as the original BYPASS message, when the
+ * bypassed domain wishes to disable the ring. The receiving domain
+ * should stop sending PACKET messages over the ring, wait for FINISH
+ * messages for any outstanding PACKETs, and then acknowledge this
+ * message with a DISABLED message.
+ *
+ * This message may not be sent on bypass rings.
+ */
+#define NETCHANNEL2_MSG_BYPASS_DISABLE 13
+struct netchannel2_msg_bypass_disable {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t handle;
+};
+#define NETCHANNEL2_MSG_BYPASS_DISABLED 14
+struct netchannel2_msg_bypass_disabled {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t handle;
+};
+
+/* Detach from an existing bypass.  This is sent over the *parent* in
+ * the same direction as the original BYPASS message, when the
+ * bypassed domain wishes to destroy the ring. The receiving domain
+ * should immediately unmap the ring and respond with a DETACHED
+ * message.  Any PACKET messages which haven't already received a
+ * FINISH message are dropped.
+ *
+ * During a normal shutdown, this message will be sent after DISABLED
+ * messages have been received from both endpoints.  However, it can
+ * also be sent without a preceding DISABLE message if the other
+ * endpoint appears to be misbehaving or has crashed.
+ *
+ * This message may not be sent on bypass rings.
+ */
+#define NETCHANNEL2_MSG_BYPASS_DETACH 15
+struct netchannel2_msg_bypass_detach {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t handle;
+};
+#define NETCHANNEL2_MSG_BYPASS_DETACHED 16
+struct netchannel2_msg_bypass_detached {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t handle;
+};
+
 #endif /* !__NETCHANNEL2_H__ */
-- 
1.6.3.1


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel

<Prev in Thread] Current Thread [Next in Thread>