WARNING - OLD ARCHIVES

This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
   
 
 
Xen 
 
Home Products Support Community News
 
   
 

xen-devel

[Xen-devel] [PATCH 09/22] Add a very basic netchannel2 implementation.

This is functional, in the sense that packets can be sent and
received, but lacks any advanced features.

Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx>
---
 drivers/xen/Kconfig                            |   24 +
 drivers/xen/Makefile                           |    1 +
 drivers/xen/netchannel2/Makefile               |   12 +
 drivers/xen/netchannel2/chan.c                 |  659 ++++++++++++++++++++++++
 drivers/xen/netchannel2/netback2.c             |  354 +++++++++++++
 drivers/xen/netchannel2/netchan2.c             |   32 ++
 drivers/xen/netchannel2/netchannel2_core.h     |  351 +++++++++++++
 drivers/xen/netchannel2/netchannel2_endpoint.h |   63 +++
 drivers/xen/netchannel2/netfront2.c            |  488 ++++++++++++++++++
 drivers/xen/netchannel2/recv_packet.c          |  216 ++++++++
 drivers/xen/netchannel2/rscb.c                 |  385 ++++++++++++++
 drivers/xen/netchannel2/util.c                 |  230 +++++++++
 drivers/xen/netchannel2/xmit_packet.c          |  318 ++++++++++++
 include/xen/interface/io/netchannel2.h         |  106 ++++
 include/xen/interface/io/uring.h               |  426 +++++++++++++++
 15 files changed, 3665 insertions(+), 0 deletions(-)
 create mode 100644 drivers/xen/netchannel2/Makefile
 create mode 100644 drivers/xen/netchannel2/chan.c
 create mode 100644 drivers/xen/netchannel2/netback2.c
 create mode 100644 drivers/xen/netchannel2/netchan2.c
 create mode 100644 drivers/xen/netchannel2/netchannel2_core.h
 create mode 100644 drivers/xen/netchannel2/netchannel2_endpoint.h
 create mode 100644 drivers/xen/netchannel2/netfront2.c
 create mode 100644 drivers/xen/netchannel2/recv_packet.c
 create mode 100644 drivers/xen/netchannel2/rscb.c
 create mode 100644 drivers/xen/netchannel2/util.c
 create mode 100644 drivers/xen/netchannel2/xmit_packet.c
 create mode 100644 include/xen/interface/io/netchannel2.h
 create mode 100644 include/xen/interface/io/uring.h

diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
index ed4b89b..a081b73 100644
--- a/drivers/xen/Kconfig
+++ b/drivers/xen/Kconfig
@@ -210,6 +210,30 @@ config XEN_SCSI_FRONTEND
          The SCSI frontend driver allows the kernel to access SCSI Devices
          within another guest OS.
 
+config XEN_NETCHANNEL2
+       tristate "Net channel 2 support"
+       depends on XEN && NET
+       default y
+       help
+         Xen netchannel2 driver support.  This allows a domain to act as
+         either the backend or frontend part of a netchannel2 connection.
+         Unless you are building a dedicated device-driver domain, you
+         almost certainly want to say Y here.
+
+         If you say Y or M here, you should also say Y to one or both of
+         ``Net channel2 backend support'' and ``Net channel2 frontend
+         support'', below.
+
+config XEN_NETDEV2_BACKEND
+       bool "Net channel 2 backend support"
+       depends on XEN_BACKEND && XEN_NETCHANNEL2
+       default XEN_BACKEND
+
+config XEN_NETDEV2_FRONTEND
+       bool "Net channel 2 frontend support"
+       depends on XEN_NETCHANNEL2
+       default y
+
 config XEN_GRANT_DEV
        tristate "User-space granted page access driver"
        default XEN_PRIVILEGED_GUEST
diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
index 873e5a3..68eb231 100644
--- a/drivers/xen/Makefile
+++ b/drivers/xen/Makefile
@@ -30,4 +30,5 @@ obj-$(CONFIG_XEN_GRANT_DEV)   += gntdev/
 obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_UTIL)                += sfc_netutil/
 obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_FRONTEND)    += sfc_netfront/
 obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_BACKEND)     += sfc_netback/
+obj-$(CONFIG_XEN_NETCHANNEL2)          += netchannel2/
 obj-$(CONFIG_XEN_ACPI_WMI_WRAPPER)              += acpi-wmi/
diff --git a/drivers/xen/netchannel2/Makefile b/drivers/xen/netchannel2/Makefile
new file mode 100644
index 0000000..bdad6da
--- /dev/null
+++ b/drivers/xen/netchannel2/Makefile
@@ -0,0 +1,12 @@
+obj-$(CONFIG_XEN_NETCHANNEL2) += netchannel2.o
+
+netchannel2-objs := chan.o netchan2.o rscb.o util.o \
+       xmit_packet.o recv_packet.o
+
+ifeq ($(CONFIG_XEN_NETDEV2_BACKEND),y)
+netchannel2-objs += netback2.o
+endif
+
+ifeq ($(CONFIG_XEN_NETDEV2_FRONTEND),y)
+netchannel2-objs += netfront2.o
+endif
diff --git a/drivers/xen/netchannel2/chan.c b/drivers/xen/netchannel2/chan.c
new file mode 100644
index 0000000..e3ad981
--- /dev/null
+++ b/drivers/xen/netchannel2/chan.c
@@ -0,0 +1,659 @@
+#include <linux/kernel.h>
+#include <linux/kthread.h>
+#include <linux/gfp.h>
+#include <linux/etherdevice.h>
+#include <linux/interrupt.h>
+#include <linux/netdevice.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/delay.h>
+#include <linux/version.h>
+#include <xen/evtchn.h>
+#include <xen/xenbus.h>
+
+#include "netchannel2_endpoint.h"
+#include "netchannel2_core.h"
+
+static int process_ring(struct napi_struct *napi,
+                       int work_avail);
+
+static irqreturn_t nc2_int(int irq, void *dev_id)
+{
+       struct netchannel2_ring_pair *ncr = dev_id;
+
+       if (ncr->irq == -1)
+               return IRQ_HANDLED;
+       if (ncr->cons_ring.sring->prod != ncr->cons_ring.cons_pvt ||
+           ncr->interface->is_stopped)
+               nc2_kick(ncr);
+       return IRQ_HANDLED;
+}
+
+/* Process all incoming messages.  The function is given an
+   IRQ-disabled reference for the interface, and must dispose of it
+   (either by enabling the IRQ or re-introducing it to the pending
+   list).  Alternatively, the function can stop the ring being
+   processed again by leaking the reference (e.g. when the remote
+   endpoint is misbehaving). */
+/* Returns -1 if we used all the available work without finishing, or
+   the amount of work used otherwise. */
+static int process_messages(struct netchannel2_ring_pair *ncrp,
+                           int work_avail,
+                           struct sk_buff_head *pending_rx_queue)
+{
+       struct netchannel2_msg_hdr hdr;
+       RING_IDX prod;
+       struct netchannel2 *nc = ncrp->interface;
+       int work_done;
+
+       work_done = 1;
+
+retry:
+       prod = ncrp->cons_ring.sring->prod;
+       rmb();
+       while (work_done < work_avail &&
+              prod != ncrp->cons_ring.cons_pvt) {
+               nc2_copy_from_ring(&ncrp->cons_ring, &hdr, sizeof(hdr));
+               if (hdr.size < sizeof(hdr)) {
+                       printk(KERN_WARNING "Other end sent too-small message 
(%d)\n",
+                              hdr.size);
+                       goto done;
+               }
+               if (hdr.size > ncrp->cons_ring.payload_bytes) {
+                       /* This one message is bigger than the whole
+                          ring -> other end is clearly misbehaving.
+                          We won't take any more messages from this
+                          ring. */
+                       printk(KERN_WARNING "Other end sent enormous message 
(%d > %zd)\n",
+                              hdr.size,
+                              ncrp->cons_ring.payload_bytes);
+                       goto done;
+               }
+
+               switch (hdr.type) {
+               case NETCHANNEL2_MSG_SET_MAX_PACKETS:
+                       nc2_handle_set_max_packets_msg(ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_PACKET:
+                       nc2_handle_packet_msg(nc, ncrp, &hdr,
+                                             pending_rx_queue);
+                       break;
+               case NETCHANNEL2_MSG_FINISH_PACKET:
+                       nc2_handle_finish_packet_msg(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_PAD:
+                       break;
+               default:
+                       /* Drop bad messages.  We should arguably stop
+                          processing the ring at this point, because
+                          the ring is probably corrupt.  However, if
+                          it is corrupt then one of the other checks
+                          will hit soon enough, and doing it this way
+                          should make it a bit easier to add new
+                          message types in future. */
+                       pr_debug("Bad message type %d from peer!\n",
+                                hdr.type);
+                       break;
+               }
+               hdr.size = (hdr.size + 7) & ~7;
+               ncrp->cons_ring.cons_pvt += hdr.size;
+
+               work_done++;
+               if (work_done == work_avail)
+                       return -1;
+       }
+
+       if (unlikely(prod != ncrp->cons_ring.sring->prod))
+               goto retry;
+
+       /* Dispose of our IRQ-disable reference. */
+done:
+       napi_complete(&ncrp->napi);
+       enable_irq(ncrp->irq);
+
+       if (nc2_final_check_for_messages(&ncrp->cons_ring,
+                                        prod)) {
+               /* More work to do still. */
+               nc2_kick(ncrp);
+       }
+
+       return work_done;
+}
+
+/* Flush out all pending metadata messages on ring @ncrp, and then
+   update the ring pointers to indicate that we've done so.  Fire the
+   event channel if necessary. */
+static void flush_rings(struct netchannel2_ring_pair *ncrp)
+{
+       int need_kick;
+
+       flush_hypercall_batcher(&ncrp->pending_rx_hypercalls,
+                               nc2_rscb_on_gntcopy_fail);
+       send_finish_packet_messages(ncrp);
+       if (ncrp->need_advertise_max_packets)
+               advertise_max_packets(ncrp);
+
+       need_kick = 0;
+       if (nc2_finish_messages(&ncrp->cons_ring)) {
+               need_kick = 1;
+               /* If we need an event on the consumer ring, we always
+                  need to notify the other end, even if we don't have
+                  any messages which would normally be considered
+                  urgent. */
+               ncrp->pending_time_sensitive_messages = 1;
+       }
+       if (nc2_flush_ring(&ncrp->prod_ring))
+               need_kick = 1;
+       if (need_kick ||
+           (ncrp->delayed_kick && ncrp->pending_time_sensitive_messages)) {
+               if (ncrp->pending_time_sensitive_messages) {
+                       notify_remote_via_irq(ncrp->irq);
+                       ncrp->delayed_kick = 0;
+               } else {
+                       ncrp->delayed_kick = 1;
+               }
+               ncrp->pending_time_sensitive_messages = 0;
+       }
+}
+
+/* Process incoming messages, and then flush outgoing metadata
+ * messages.  We also try to unjam the xmit queue if any of the
+ * incoming messages would give us permission to send more stuff. */
+/* This is given an IRQ-disable reference, and must dispose of it. */
+static int nc2_poll(struct netchannel2_ring_pair *ncrp, int work_avail,
+                   struct sk_buff_head *rx_queue)
+{
+       int work_done;
+
+       if (!ncrp->is_attached) {
+               napi_complete(&ncrp->napi);
+               enable_irq(ncrp->irq);
+               return 0;
+       }
+
+       work_done = process_messages(ncrp, work_avail, rx_queue);
+
+       flush_rings(ncrp);
+
+       if (work_done < 0)
+               return work_avail;
+       else
+               return work_done;
+}
+
+/* Like skb_queue_purge(), but use release_tx_packet() rather than
+   kfree_skb() */
+void nc2_queue_purge(struct netchannel2_ring_pair *ncrp,
+                    struct sk_buff_head *queue)
+{
+       struct sk_buff *skb;
+
+       while (!skb_queue_empty(queue)) {
+               skb = skb_dequeue(queue);
+               release_tx_packet(ncrp, skb);
+       }
+}
+
+/* struct net_device stop() method. */
+static int nc2_stop(struct net_device *nd)
+{
+       struct netchannel2 *nc = netdev_priv(nd);
+
+       spin_lock_bh(&nc->rings.lock);
+       nc->stats.tx_dropped += skb_queue_len(&nc->pending_skbs);
+       nc2_queue_purge(&nc->rings, &nc->pending_skbs);
+       spin_unlock_bh(&nc->rings.lock);
+
+       return 0;
+}
+
+/* Kick a netchannel2 interface so that the poll() method runs
+ * soon. */
+/* This has semi release-like semantics, so you can set flags
+   lock-free and be guaranteed that the poll() method will eventually
+   run and see the flag set, without doing any explicit locking. */
+void nc2_kick(struct netchannel2_ring_pair *ncrp)
+{
+       if (napi_schedule_prep(&ncrp->napi)) {
+               disable_irq_nosync(ncrp->irq);
+               __napi_schedule(&ncrp->napi);
+       }
+}
+
+static int nc2_open(struct net_device *nd)
+{
+       struct netchannel2 *nc = netdev_priv(nd);
+
+       nc2_kick(&nc->rings);
+       return 0;
+}
+
+/* Rad a mac address from an address in xenstore at @prefix/@node.
+ * Call not holding locks.  Returns 0 on success or <0 on error. */
+static int read_mac_address(const char *prefix, const char *node,
+                           unsigned char *addr)
+{
+       int err;
+       unsigned mac[6];
+       int i;
+
+       err = xenbus_scanf(XBT_NIL, prefix, node,
+                          "%x:%x:%x:%x:%x:%x",
+                          &mac[0],
+                          &mac[1],
+                          &mac[2],
+                          &mac[3],
+                          &mac[4],
+                          &mac[5]);
+       if (err < 0)
+               return err;
+       if (err != 6)
+               return -EINVAL;
+       for (i = 0; i < 6; i++) {
+               if (mac[i] >= 0x100)
+                       return -EINVAL;
+               addr[i] = mac[i];
+       }
+       return 0;
+}
+
+/* Release resources associated with a ring pair.  It is assumed that
+   the ring pair has already been detached (which stops the IRQ and
+   un-pends the ring). */
+void cleanup_ring_pair(struct netchannel2_ring_pair *ncrp)
+{
+       BUG_ON(ncrp->prod_ring.sring);
+       BUG_ON(ncrp->cons_ring.sring);
+
+       drop_pending_tx_packets(ncrp);
+       nc2_queue_purge(ncrp, &ncrp->release_on_flush_batcher);
+       if (ncrp->gref_pool != 0)
+               gnttab_free_grant_references(ncrp->gref_pool);
+       netif_napi_del(&ncrp->napi);
+}
+
+int init_ring_pair(struct netchannel2_ring_pair *ncrp,
+                  struct netchannel2 *nc)
+{
+       unsigned x;
+
+       ncrp->interface = nc;
+       spin_lock_init(&ncrp->lock);
+       ncrp->irq = -1;
+
+       for (x = 0; x < NR_TX_PACKETS - 1; x++)
+               txp_set_next_free(ncrp->tx_packets + x, x + 1);
+       txp_set_next_free(ncrp->tx_packets + x, INVALID_TXP_INDEX);
+       ncrp->head_free_tx_packet = 0;
+
+       skb_queue_head_init(&ncrp->pending_tx_queue);
+       skb_queue_head_init(&ncrp->release_on_flush_batcher);
+
+       if (gnttab_alloc_grant_references(NR_TX_PACKETS,
+                                         &ncrp->gref_pool) < 0)
+               return -1;
+
+       netif_napi_add(ncrp->interface->net_device, &ncrp->napi,
+                      process_ring, 64);
+       napi_enable(&ncrp->napi);
+
+       return 0;
+}
+
+static struct net_device_stats *nc2_get_stats(struct net_device *nd)
+{
+       struct netchannel2 *nc = netdev_priv(nd);
+
+       return &nc->stats;
+}
+
+/* Create a new netchannel2 structure. Call with no locks held.
+   Returns NULL on error.  The xenbus device must remain valid for as
+   long as the netchannel2 structure does.  The core does not take out
+   any kind of reference count on it, but will refer to it throughout
+   the returned netchannel2's life. */
+struct netchannel2 *nc2_new(struct xenbus_device *xd)
+{
+       struct net_device *netdev;
+       struct netchannel2 *nc;
+       int err;
+       int local_trusted;
+       int remote_trusted;
+       int filter_mac;
+
+       if (!gnttab_subpage_grants_available()) {
+               printk(KERN_ERR "netchannel2 needs version 2 grant tables\n");
+               return NULL;
+       }
+
+       if (xenbus_scanf(XBT_NIL, xd->nodename, "local-trusted",
+                        "%d", &local_trusted) != 1) {
+               printk(KERN_WARNING "Can't tell whether local endpoint is 
trusted; assuming it is.\n");
+               local_trusted = 1;
+       }
+
+       if (xenbus_scanf(XBT_NIL, xd->nodename, "remote-trusted",
+                        "%d", &remote_trusted) != 1) {
+               printk(KERN_WARNING "Can't tell whether local endpoint is 
trusted; assuming it isn't.\n");
+               remote_trusted = 0;
+       }
+
+       if (xenbus_scanf(XBT_NIL, xd->nodename, "filter-mac",
+                        "%d", &filter_mac) != 1) {
+               if (remote_trusted) {
+                       printk(KERN_WARNING "Can't tell whether to filter MAC 
addresses from remote domain; filtering off.\n");
+                       filter_mac = 0;
+               } else {
+                       printk(KERN_WARNING "Can't tell whether to filter MAC 
addresses from remote domain; filtering on.\n");
+                       filter_mac = 1;
+               }
+       }
+
+       netdev = alloc_etherdev(sizeof(*nc));
+       if (netdev == NULL)
+               return NULL;
+
+       nc = netdev_priv(netdev);
+       memset(nc, 0, sizeof(*nc));
+       nc->magic = NETCHANNEL2_MAGIC;
+       nc->net_device = netdev;
+       nc->xenbus_device = xd;
+
+       nc->remote_trusted = remote_trusted;
+       nc->local_trusted = local_trusted;
+       nc->rings.filter_mac = filter_mac;
+
+       skb_queue_head_init(&nc->pending_skbs);
+       if (init_ring_pair(&nc->rings, nc) < 0) {
+               nc2_release(nc);
+               return NULL;
+       }
+
+       netdev->open = nc2_open;
+       netdev->stop = nc2_stop;
+       netdev->hard_start_xmit = nc2_start_xmit;
+       netdev->get_stats = nc2_get_stats;
+
+       /* We need to hold the ring lock in order to send messages
+          anyway, so there's no point in Linux doing additional
+          synchronisation. */
+       netdev->features = NETIF_F_LLTX;
+
+       SET_NETDEV_DEV(netdev, &xd->dev);
+
+       err = read_mac_address(xd->nodename, "remote-mac",
+                              nc->rings.remote_mac);
+       if (err == 0)
+               err = read_mac_address(xd->nodename, "mac", netdev->dev_addr);
+       if (err == 0)
+               err = register_netdev(netdev);
+
+       if (err != 0) {
+               nc2_release(nc);
+               return NULL;
+       }
+
+       return nc;
+}
+
+/* Release a netchannel2 structure previously allocated with
+ * nc2_new().  Call with no locks held.         The rings will be
+ * automatically detach if necessary. */
+void nc2_release(struct netchannel2 *nc)
+{
+       netif_carrier_off(nc->net_device);
+
+       unregister_netdev(nc->net_device);
+
+       nc2_detach_rings(nc);
+
+       /* Unregistering the net device stops any netdev methods from
+          running, and detaching the rings stops the napi methods, so
+          we're now the only thing accessing this netchannel2
+          structure and we can tear it down with impunity. */
+
+       cleanup_ring_pair(&nc->rings);
+
+       nc2_queue_purge(&nc->rings, &nc->pending_skbs);
+
+       free_netdev(nc->net_device);
+}
+
+static void _nc2_attach_rings(struct netchannel2_ring_pair *ncrp,
+                             struct netchannel2_sring_cons *cons_sring,
+                             const volatile void *cons_payload,
+                             size_t cons_size,
+                             struct netchannel2_sring_prod *prod_sring,
+                             void *prod_payload,
+                             size_t prod_size,
+                             domid_t otherend_id)
+{
+       BUG_ON(prod_sring == NULL);
+       BUG_ON(cons_sring == NULL);
+
+       ncrp->prod_ring.sring = prod_sring;
+       ncrp->prod_ring.payload_bytes = prod_size;
+       ncrp->prod_ring.prod_pvt = 0;
+       ncrp->prod_ring.payload = prod_payload;
+
+       ncrp->cons_ring.sring = cons_sring;
+       ncrp->cons_ring.payload_bytes = cons_size;
+       ncrp->cons_ring.sring->prod_event = ncrp->cons_ring.sring->prod + 1;
+       ncrp->cons_ring.cons_pvt = 0;
+       ncrp->cons_ring.payload = cons_payload;
+
+       ncrp->otherend_id = otherend_id;
+
+       ncrp->is_attached = 1;
+
+       ncrp->need_advertise_max_packets = 1;
+}
+
+/* Attach a netchannel2 structure to a ring pair.  The endpoint is
+   also expected to set up an event channel after calling this before
+   using the interface.         Returns 0 on success or <0 on error. */
+int nc2_attach_rings(struct netchannel2 *nc,
+                    struct netchannel2_sring_cons *cons_sring,
+                    const volatile void *cons_payload,
+                    size_t cons_size,
+                    struct netchannel2_sring_prod *prod_sring,
+                    void *prod_payload,
+                    size_t prod_size,
+                    domid_t otherend_id)
+{
+       spin_lock_bh(&nc->rings.lock);
+       _nc2_attach_rings(&nc->rings, cons_sring, cons_payload, cons_size,
+                         prod_sring, prod_payload, prod_size, otherend_id);
+
+       spin_unlock_bh(&nc->rings.lock);
+
+       netif_carrier_on(nc->net_device);
+
+       /* Kick it to get it going. */
+       nc2_kick(&nc->rings);
+
+       return 0;
+}
+
+static void _detach_rings(struct netchannel2_ring_pair *ncrp)
+{
+       spin_lock_bh(&ncrp->lock);
+       /* We need to release all of the pending transmission packets,
+          because they're never going to complete now that we've lost
+          the ring. */
+       drop_pending_tx_packets(ncrp);
+
+       disable_irq(ncrp->irq);
+
+       BUG_ON(ncrp->nr_tx_packets_outstanding);
+       ncrp->max_tx_packets_outstanding = 0;
+
+       /* No way of sending pending finish messages now; drop
+        * them. */
+       ncrp->pending_finish.prod = 0;
+       ncrp->pending_finish.cons = 0;
+
+       ncrp->cons_ring.sring = NULL;
+       ncrp->prod_ring.sring = NULL;
+       ncrp->is_attached = 0;
+
+       spin_unlock_bh(&ncrp->lock);
+}
+
+/* Detach from the rings.  This includes unmapping them and stopping
+   the interrupt. */
+/* Careful: the netdev methods may still be running at this point. */
+/* This is not allowed to wait for the other end, because it might
+   have gone away (e.g. over suspend/resume). */
+static void nc2_detach_ring(struct netchannel2_ring_pair *ncrp)
+{
+       if (!ncrp->is_attached)
+               return;
+
+       napi_disable(&ncrp->napi);
+       _detach_rings(ncrp);
+}
+
+/* Trivial wrapper around nc2_detach_ring().  Make the ring no longer
+   used. */
+void nc2_detach_rings(struct netchannel2 *nc)
+{
+       nc2_detach_ring(&nc->rings);
+
+       /* Okay, all async access to the ring is stopped.  Kill the
+          irqhandlers.  (It might be better to do this from the
+          _detach_ring() functions, but you're not allowed to
+          free_irq() from interrupt context, and tasklets are close
+          enough to cause problems). */
+
+       if (nc->rings.irq >= 0)
+               unbind_from_irqhandler(nc->rings.irq, &nc->rings);
+       nc->rings.irq = -1;
+}
+
+#if defined(CONFIG_XEN_NETDEV2_BACKEND)
+/* Connect to an event channel port in a remote domain.         Returns 0 on
+   success or <0 on error.  The port is automatically disconnected
+   when the channel is released or if the rings are detached.  This
+   should not be called if the port is already open. */
+int nc2_connect_evtchn(struct netchannel2 *nc, domid_t domid,
+                      int evtchn)
+{
+       int err;
+
+       BUG_ON(nc->rings.irq >= 0);
+
+       err = bind_interdomain_evtchn_to_irqhandler(domid,
+                                                   evtchn,
+                                                   nc2_int,
+                                                   IRQF_SAMPLE_RANDOM,
+                                                   "netchannel2",
+                                                   &nc->rings);
+       if (err >= 0) {
+               nc->rings.irq = err;
+               nc->rings.evtchn = irq_to_evtchn_port(err);
+               return 0;
+       } else {
+               return err;
+       }
+}
+#endif
+
+#if defined(CONFIG_XEN_NETDEV2_FRONTEND)
+/* Listen for incoming event channel connections from domain domid.
+   Similar semantics to nc2_connect_evtchn(). */
+int nc2_listen_evtchn(struct netchannel2 *nc, domid_t domid)
+{
+       int err;
+
+       BUG_ON(nc->rings.irq >= 0);
+
+       err = bind_listening_port_to_irqhandler(domid,
+                                               nc2_int,
+                                               IRQF_SAMPLE_RANDOM,
+                                               "netchannel2",
+                                               &nc->rings);
+       if (err >= 0) {
+               nc->rings.irq = err;
+               nc->rings.evtchn = irq_to_evtchn_port(err);
+               return 0;
+       } else {
+               return err;
+       }
+}
+#endif
+
+/* Find the local event channel port which was allocated by
+ * nc2_listen_evtchn() or nc2_connect_evtchn().         It is an error to
+ * call this when there is no event channel connected. */
+int nc2_get_evtchn_port(struct netchannel2 *nc)
+{
+       BUG_ON(nc->rings.irq < 0);
+       return nc->rings.evtchn;
+}
+
+/* @ncrp has been recently nc2_kick()ed.  Do all of the necessary
+   stuff. */
+static int process_ring(struct napi_struct *napi,
+                       int work_avail)
+{
+       struct netchannel2_ring_pair *ncrp =
+               container_of(napi, struct netchannel2_ring_pair, napi);
+       struct netchannel2 *nc = ncrp->interface;
+       struct sk_buff *skb;
+       int work_done;
+       struct sk_buff_head rx_queue;
+
+       skb_queue_head_init(&rx_queue);
+
+       spin_lock(&ncrp->lock);
+
+       /* Pick up incoming messages. */
+       work_done = nc2_poll(ncrp, work_avail, &rx_queue);
+
+       /* Transmit pending packets. */
+       if (!skb_queue_empty(&ncrp->pending_tx_queue)) {
+               skb = __skb_dequeue(&ncrp->pending_tx_queue);
+               do {
+                       if (!nc2_really_start_xmit(ncrp, skb)) {
+                               /* Requeue the packet so that we will try
+                                  when the ring is less busy */
+                               __skb_queue_head(&ncrp->pending_tx_queue, skb);
+                               break;
+                       }
+                       skb = __skb_dequeue(&ncrp->pending_tx_queue);
+               } while (skb != NULL);
+
+               flush_rings(ncrp);
+
+               while ((skb = __skb_dequeue(&ncrp->release_on_flush_batcher)))
+                       release_tx_packet(ncrp, skb);
+       }
+
+       if (nc->is_stopped) {
+               /* If the other end has processed some messages, there
+                  may be space on the ring for a delayed send from
+                  earlier.  Process it now. */
+               while (1) {
+                       skb = skb_peek_tail(&nc->pending_skbs);
+                       if (!skb)
+                               break;
+                       if (prepare_xmit_allocate_resources(nc, skb) < 0) {
+                               /* Still stuck */
+                               break;
+                       }
+                       __skb_unlink(skb, &nc->pending_skbs);
+                       queue_packet_to_interface(skb, ncrp);
+               }
+               if (skb_queue_empty(&nc->pending_skbs)) {
+                       nc->is_stopped = 0;
+                       netif_wake_queue(nc->net_device);
+               }
+       }
+
+       spin_unlock(&ncrp->lock);
+
+       receive_pending_skbs(&rx_queue);
+
+       return work_done;
+}
diff --git a/drivers/xen/netchannel2/netback2.c 
b/drivers/xen/netchannel2/netback2.c
new file mode 100644
index 0000000..fd6f238
--- /dev/null
+++ b/drivers/xen/netchannel2/netback2.c
@@ -0,0 +1,354 @@
+#include <linux/kernel.h>
+#include <linux/gfp.h>
+#include <linux/vmalloc.h>
+#include <xen/gnttab.h>
+#include <xen/xenbus.h>
+#include <xen/interface/io/netchannel2.h>
+
+#include "netchannel2_core.h"
+#include "netchannel2_endpoint.h"
+
+#define NETBACK2_MAGIC 0xb5e99485
+struct netback2 {
+       unsigned magic;
+       struct xenbus_device *xenbus_device;
+
+       struct netchannel2 *chan;
+
+       struct grant_mapping b2f_mapping;
+       struct grant_mapping f2b_mapping;
+       struct grant_mapping control_mapping;
+
+       int attached;
+
+       struct xenbus_watch shutdown_watch;
+       int have_shutdown_watch;
+};
+
+static struct netback2 *xenbus_device_to_nb2(struct xenbus_device *xd)
+{
+       struct netback2 *nb = xd->dev.driver_data;
+       BUG_ON(nb->magic != NETBACK2_MAGIC);
+       return nb;
+}
+
+/* Read a range of grants out of xenstore and map them in gm.  Any
+   existing mapping in gm is released. Returns 0 on success or <0 on
+   error.  On error, gm is preserved, and xenbus_dev_fatal() is
+   called. */
+static int map_grants(struct netback2 *nd, const char *prefix,
+                     struct grant_mapping *gm)
+{
+       struct xenbus_device *xd = nd->xenbus_device;
+       int err;
+       char buf[32];
+       int i;
+       unsigned nr_pages;
+       grant_ref_t grefs[MAX_GRANT_MAP_PAGES];
+
+       sprintf(buf, "%s-nr-pages", prefix);
+       err = xenbus_scanf(XBT_NIL, xd->otherend, buf, "%u", &nr_pages);
+       if (err == -ENOENT) {
+               nr_pages = 1;
+       } else if (err != 1) {
+               if (err < 0) {
+                       xenbus_dev_fatal(xd, err, "reading %s", buf);
+                       return err;
+               } else {
+                       xenbus_dev_fatal(xd, err, "reading %s as integer",
+                                        buf);
+                       return -EINVAL;
+               }
+       }
+
+       for (i = 0; i < nr_pages; i++) {
+               sprintf(buf, "%s-ref-%d", prefix, i);
+               err = xenbus_scanf(XBT_NIL, xd->otherend, buf, "%u",
+                                  &grefs[i]);
+               if (err != 1) {
+                       if (err < 0) {
+                               xenbus_dev_fatal(xd,
+                                                err,
+                                                "reading gref %d from %s/%s",
+                                                i,
+                                                xd->otherend,
+                                                buf);
+                       } else {
+                               xenbus_dev_fatal(xd,
+                                                -EINVAL,
+                                                "expected an integer at %s/%s",
+                                                xd->otherend,
+                                                buf);
+                               err = -EINVAL;
+                       }
+                       return err;
+               }
+       }
+
+       err = nc2_map_grants(gm, grefs, nr_pages, xd->otherend_id);
+       if (err < 0)
+               xenbus_dev_fatal(xd, err, "mapping ring %s from %s",
+                                prefix, xd->otherend);
+       return err;
+}
+
+/* Undo the effects of attach_to_frontend */
+static void detach_from_frontend(struct netback2 *nb)
+{
+       if (!nb->attached)
+               return;
+       nc2_detach_rings(nb->chan);
+       nc2_unmap_grants(&nb->b2f_mapping);
+       nc2_unmap_grants(&nb->f2b_mapping);
+       nc2_unmap_grants(&nb->control_mapping);
+       nb->attached = 0;
+}
+
+static int attach_to_frontend(struct netback2 *nd)
+{
+       int err;
+       int evtchn;
+       struct xenbus_device *xd = nd->xenbus_device;
+       struct netchannel2 *nc = nd->chan;
+       struct netchannel2_backend_shared *nbs;
+
+       if (nd->attached)
+               return 0;
+
+       /* Attach the shared memory bits */
+       err = map_grants(nd, "b2f-ring", &nd->b2f_mapping);
+       if (err)
+               return err;
+       err = map_grants(nd, "f2b-ring", &nd->f2b_mapping);
+       if (err)
+               return err;
+       err = map_grants(nd, "control", &nd->control_mapping);
+       if (err)
+               return err;
+       nbs = nd->control_mapping.mapping->addr;
+       err = nc2_attach_rings(nc,
+                              &nbs->cons,
+                              nd->f2b_mapping.mapping->addr,
+                              nd->f2b_mapping.nr_pages * PAGE_SIZE,
+                              &nbs->prod,
+                              nd->b2f_mapping.mapping->addr,
+                              nd->b2f_mapping.nr_pages * PAGE_SIZE,
+                              xd->otherend_id);
+       if (err < 0) {
+               xenbus_dev_fatal(xd, err, "attaching to rings");
+               return err;
+       }
+
+       /* Connect the event channel. */
+       err = xenbus_scanf(XBT_NIL, xd->otherend, "event-channel", "%u",
+                          &evtchn);
+       if (err < 0) {
+               xenbus_dev_fatal(xd, err,
+                       "reading %s/event-channel or {t,r}x-sring-pages",
+                       xd->otherend);
+               return err;
+       }
+       err = nc2_connect_evtchn(nd->chan, xd->otherend_id, evtchn);
+       if (err < 0) {
+               xenbus_dev_fatal(xd, err, "binding to event channel");
+               return err;
+       }
+
+       /* All done */
+       nd->attached = 1;
+
+       return 0;
+}
+
+static void frontend_changed(struct xenbus_device *xd,
+                            enum xenbus_state frontend_state)
+{
+       struct netback2 *nb = xenbus_device_to_nb2(xd);
+       int err;
+
+       switch (frontend_state) {
+       case XenbusStateInitialising:
+               /* If the frontend does a kexec following a crash, we
+                  can end up bounced back here even though we're
+                  attached.  Try to recover by detaching from the old
+                  rings. */
+               /* (A normal shutdown, and even a normal kexec, would
+                * have gone through Closed first, so we'll already be
+                * detached, and this is pointless but harmless.) */
+               detach_from_frontend(nb);
+
+               /* Tell the frontend what sort of rings we're willing
+                  to accept. */
+               xenbus_printf(XBT_NIL, nb->xenbus_device->nodename,
+                             "max-sring-pages", "%d", MAX_GRANT_MAP_PAGES);
+
+               /* Start the device bring-up bit of the state
+                * machine. */
+               xenbus_switch_state(nb->xenbus_device, XenbusStateInitWait);
+               break;
+
+       case XenbusStateInitWait:
+               /* Frontend doesn't use this state */
+               xenbus_dev_fatal(xd, EINVAL,
+                                "unexpected frontend state InitWait");
+               break;
+
+       case XenbusStateInitialised:
+       case XenbusStateConnected:
+               /* Frontend has advertised its rings to us */
+               err = attach_to_frontend(nb);
+               if (err >= 0)
+                       xenbus_switch_state(xd, XenbusStateConnected);
+               break;
+
+       case XenbusStateClosing:
+               detach_from_frontend(nb);
+               xenbus_switch_state(xd, XenbusStateClosed);
+               break;
+
+       case XenbusStateClosed:
+               detach_from_frontend(nb);
+               xenbus_switch_state(xd, XenbusStateClosed);
+               if (!xenbus_dev_is_online(xd))
+                       device_unregister(&xd->dev);
+               break;
+
+       case XenbusStateUnknown:
+               detach_from_frontend(nb);
+               xenbus_switch_state(xd, XenbusStateClosed);
+               device_unregister(&xd->dev);
+               break;
+
+       default:
+               /* Ignore transitions to unknown states */
+               break;
+       }
+}
+
+static int netback2_uevent(struct xenbus_device *xd,
+                          struct kobj_uevent_env *env)
+{
+       struct netback2 *nb = xenbus_device_to_nb2(xd);
+
+       add_uevent_var(env, "vif=%s", nb->chan->net_device->name);
+
+       return 0;
+}
+
+static void netback2_shutdown(struct xenbus_device *xd)
+{
+       xenbus_switch_state(xd, XenbusStateClosing);
+}
+
+static void shutdown_watch_callback(struct xenbus_watch *watch,
+                                   const char **vec,
+                                   unsigned int len)
+{
+       struct netback2 *nb =
+               container_of(watch, struct netback2, shutdown_watch);
+       char *type;
+
+       type = xenbus_read(XBT_NIL, nb->xenbus_device->nodename,
+                          "shutdown-request", NULL);
+       if (IS_ERR(type)) {
+               if (PTR_ERR(type) != -ENOENT)
+                       printk(KERN_WARNING "Cannot read %s/%s: %ld\n",
+                              nb->xenbus_device->nodename, "shutdown-request",
+                              PTR_ERR(type));
+               return;
+       }
+       if (strcmp(type, "force") == 0) {
+               detach_from_frontend(nb);
+               xenbus_switch_state(nb->xenbus_device, XenbusStateClosed);
+       } else if (strcmp(type, "normal") == 0) {
+               netback2_shutdown(nb->xenbus_device);
+       } else {
+               printk(KERN_WARNING "Unrecognised shutdown request %s from 
tools\n",
+                      type);
+       }
+       xenbus_rm(XBT_NIL, nb->xenbus_device->nodename, "shutdown-request");
+       kfree(type);
+}
+
+static int netback2_probe(struct xenbus_device *xd,
+                         const struct xenbus_device_id *id)
+{
+       struct netback2 *nb;
+
+       nb = kzalloc(sizeof(*nb), GFP_KERNEL);
+       if (nb == NULL)
+               goto err;
+       nb->magic = NETBACK2_MAGIC;
+       nb->xenbus_device = xd;
+
+       nb->shutdown_watch.node = kasprintf(GFP_KERNEL, "%s/shutdown-request",
+                                           xd->nodename);
+       if (nb->shutdown_watch.node == NULL)
+               goto err;
+       nb->shutdown_watch.callback = shutdown_watch_callback;
+       if (register_xenbus_watch(&nb->shutdown_watch))
+               goto err;
+       nb->have_shutdown_watch = 1;
+
+       nb->chan = nc2_new(xd);
+       if (!nb->chan)
+               goto err;
+
+       xd->dev.driver_data = nb;
+
+       kobject_uevent(&xd->dev.kobj, KOBJ_ONLINE);
+
+       return 0;
+
+err:
+       if (nb != NULL) {
+               if (nb->have_shutdown_watch)
+                       unregister_xenbus_watch(&nb->shutdown_watch);
+               kfree(nb->shutdown_watch.node);
+               kfree(nb);
+       }
+       xenbus_dev_fatal(xd, ENOMEM, "probing netdev");
+       return -ENOMEM;
+}
+
+static int netback2_remove(struct xenbus_device *xd)
+{
+       struct netback2 *nb = xenbus_device_to_nb2(xd);
+       kobject_uevent(&xd->dev.kobj, KOBJ_OFFLINE);
+       if (nb->chan != NULL)
+               nc2_release(nb->chan);
+       if (nb->have_shutdown_watch)
+               unregister_xenbus_watch(&nb->shutdown_watch);
+       kfree(nb->shutdown_watch.node);
+       nc2_unmap_grants(&nb->b2f_mapping);
+       nc2_unmap_grants(&nb->f2b_mapping);
+       nc2_unmap_grants(&nb->control_mapping);
+       kfree(nb);
+       return 0;
+}
+
+static const struct xenbus_device_id netback2_ids[] = {
+       { "vif2" },
+       { "" }
+};
+
+static struct xenbus_driver netback2 = {
+       .name = "vif2",
+       .ids = netback2_ids,
+       .probe = netback2_probe,
+       .remove = netback2_remove,
+       .otherend_changed = frontend_changed,
+       .uevent = netback2_uevent,
+};
+
+int __init netback2_init(void)
+{
+       int r;
+
+       r = xenbus_register_backend(&netback2);
+       if (r < 0) {
+               printk(KERN_ERR "error %d registering backend driver.\n",
+                      r);
+       }
+       return r;
+}
diff --git a/drivers/xen/netchannel2/netchan2.c 
b/drivers/xen/netchannel2/netchan2.c
new file mode 100644
index 0000000..b23b7e4
--- /dev/null
+++ b/drivers/xen/netchannel2/netchan2.c
@@ -0,0 +1,32 @@
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include "netchannel2_endpoint.h"
+
+static int __init netchan2_init(void)
+{
+       int r;
+
+       r = nc2_init();
+       if (r < 0)
+               return r;
+       r = netfront2_init();
+       if (r < 0)
+               return r;
+       r = netback2_init();
+       if (r < 0)
+               netfront2_exit();
+       return r;
+}
+module_init(netchan2_init);
+
+/* We can't unload if we're acting as a backend. */
+#ifndef CONFIG_XEN_NETDEV2_BACKEND
+static void __exit netchan2_exit(void)
+{
+       netfront2_exit();
+       nc2_exit();
+}
+module_exit(netchan2_exit);
+#endif
+
+MODULE_LICENSE("GPL");
diff --git a/drivers/xen/netchannel2/netchannel2_core.h 
b/drivers/xen/netchannel2/netchannel2_core.h
new file mode 100644
index 0000000..6ae273d
--- /dev/null
+++ b/drivers/xen/netchannel2/netchannel2_core.h
@@ -0,0 +1,351 @@
+#ifndef NETCHANNEL2_CORE_H__
+#define NETCHANNEL2_CORE_H__
+
+#include <xen/interface/xen.h>
+#include <xen/gnttab.h>
+#include <xen/interface/io/netchannel2.h>
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+
+/* After we send this number of frags, we request the other end to
+ * notify us when sending the corresponding finish packet message */
+#define MAX_MAX_COUNT_FRAGS_NO_EVENT 192
+
+/* Very small packets (e.g. TCP pure acks) are sent inline in the
+ * ring, to avoid the hypercall overhead.  This is the largest packet
+ * which will be sent small, in bytes. It should be big enough to
+ * cover the normal headers (i.e. ethernet + IP + TCP = 66 bytes) plus
+ * a little bit of slop for options etc. */
+#define PACKET_PREFIX_SIZE 96
+
+/* How many packets can we have outstanding at any one time?  This
+ * must be small enough that it won't be confused with an sk_buff
+ * pointer; see the txp_slot stuff later. */
+#define NR_TX_PACKETS 256
+
+/* A way of keeping track of a mapping of a bunch of grant references
+   into a contigous chunk of virtual address space.  This is used for
+   things like multi-page rings. */
+#define MAX_GRANT_MAP_PAGES 4
+struct grant_mapping {
+       unsigned nr_pages;
+       grant_handle_t handles[MAX_GRANT_MAP_PAGES];
+       struct vm_struct *mapping;
+};
+
+enum transmit_policy {
+       transmit_policy_unknown = 0,
+       transmit_policy_first = 0xf001,
+       transmit_policy_grant = transmit_policy_first,
+       transmit_policy_small,
+       transmit_policy_last = transmit_policy_small
+};
+
+/* When we send a packet message, we need to tag it with an ID.         That
+   ID is an index into the TXP slot array.  Each slot contains either
+   a pointer to an sk_buff (if it's in use), or the index of the next
+   free slot (if it isn't).  A slot is in use if the contents is >
+   NR_TX_PACKETS, and free otherwise. */
+struct txp_slot {
+       unsigned long __contents;
+};
+
+typedef uint32_t nc2_txp_index_t;
+
+#define INVALID_TXP_INDEX ((nc2_txp_index_t)NR_TX_PACKETS)
+
+static inline int txp_slot_in_use(struct txp_slot *slot)
+{
+       if (slot->__contents <= NR_TX_PACKETS)
+               return 0;
+       else
+               return 1;
+}
+
+static inline void txp_set_skb(struct txp_slot *slot, struct sk_buff *skb)
+{
+       slot->__contents = (unsigned long)skb;
+}
+
+static inline struct sk_buff *txp_get_skb(struct txp_slot *slot)
+{
+       if (txp_slot_in_use(slot))
+               return (struct sk_buff *)slot->__contents;
+       else
+               return NULL;
+}
+
+static inline void txp_set_next_free(struct txp_slot *slot,
+                                    nc2_txp_index_t idx)
+{
+       slot->__contents = idx;
+}
+
+static inline nc2_txp_index_t txp_get_next_free(struct txp_slot *slot)
+{
+       return (nc2_txp_index_t)slot->__contents;
+}
+
+/* This goes in struct sk_buff::cb */
+struct skb_cb_overlay {
+       struct txp_slot *tp;
+       unsigned nr_fragments;
+       grant_ref_t gref_pool;
+       enum transmit_policy policy;
+       uint8_t failed;
+       uint8_t expecting_finish;
+       uint8_t type;
+       uint16_t inline_prefix_size;
+};
+
+#define CASSERT(x) typedef unsigned __cassert_ ## __LINE__ [(x)-1]
+CASSERT(sizeof(struct skb_cb_overlay) <= sizeof(((struct sk_buff *)0)->cb));
+
+static inline struct skb_cb_overlay *get_skb_overlay(struct sk_buff *skb)
+{
+       return (struct skb_cb_overlay *)skb->cb;
+}
+
+
+/* Packets for which we need to send FINISH_PACKET messages for as
+   soon as possible. */
+struct pending_finish_packets {
+#define MAX_PENDING_FINISH_PACKETS 256
+       uint32_t ids[MAX_PENDING_FINISH_PACKETS];
+       RING_IDX prod;
+       RING_IDX cons;
+};
+
+#define RX_GRANT_COPY_BATCH 32
+struct hypercall_batcher {
+       unsigned nr_pending_gops;
+       gnttab_copy_t gops[RX_GRANT_COPY_BATCH];
+       void *ctxt[RX_GRANT_COPY_BATCH];
+};
+
+struct netchannel2_ring_pair {
+       struct netchannel2 *interface;
+       /* Main ring lock.  Acquired from bottom halves. */
+       spinlock_t lock;
+
+       struct napi_struct napi;
+
+       /* Protected by the lock.  Initialised at attach_ring() time
+          and de-initialised at detach_ring() time. */
+       struct netchannel2_prod_ring prod_ring;
+       struct netchannel2_cons_ring cons_ring;
+       uint8_t is_attached; /* True if the rings are currently safe to
+                               access. */
+
+       unsigned max_count_frags_no_event;
+       unsigned expected_finish_messages;
+
+       domid_t otherend_id;
+
+       grant_ref_t gref_pool;
+
+       /* The IRQ corresponding to the event channel which is
+          connected to the other end.  This only changes from the
+          xenbus state change handler.  It is notified from lots of
+          other places.  Fortunately, it's safe to notify on an irq
+          after it's been released, so the lack of synchronisation
+          doesn't matter. */
+       int irq;
+       int evtchn;
+
+       /* The MAC address of our peer. */
+       unsigned char remote_mac[ETH_ALEN];
+
+       /* Set if we need to check the source MAC address on incoming
+          packets. */
+       int filter_mac;
+
+       /* A pool of free transmitted_packet structures, threaded on
+          the list member.  Protected by the lock. */
+       nc2_txp_index_t head_free_tx_packet;
+
+       /* Total number of packets on the allocated list.  Protected
+          by the lock. */
+       unsigned nr_tx_packets_outstanding;
+       /* Maximum number of packets which the other end will allow us
+          to keep outstanding at one time.  Valid whenever
+          is_attached is set. */
+       unsigned max_tx_packets_outstanding;
+
+       /* Count number of frags that we have sent to the other side
+          When we reach a max value we request that the other end
+          send an event when sending the corresponding finish message */
+       unsigned count_frags_no_event;
+
+       /* Set if we need to send a SET_MAX_PACKETS message.
+          Protected by the lock. */
+       uint8_t need_advertise_max_packets;
+
+       /* Set if there are messages on the ring which are considered
+          time-sensitive, so that it's necessary to notify the remote
+          endpoint as soon as possible. */
+       uint8_t pending_time_sensitive_messages;
+
+       /* Set if we've previously suppressed a remote notification
+          because none of the messages pending at the time of the
+          flush were time-sensitive.  The remote should be notified
+          as soon as the ring is flushed, even if the normal
+          filtering rules would suppress the event. */
+       uint8_t delayed_kick;
+
+       /* A list of packet IDs which we need to return to the other
+          end as soon as there is space on the ring.  Protected by
+          the lock. */
+       struct pending_finish_packets pending_finish;
+
+       /* transmitted_packet structures which are to be transmitted
+          next time the TX tasklet looks at this interface.
+          Protected by the lock. */
+       struct sk_buff_head pending_tx_queue;
+
+       /* Packets which we'll have finished transmitting as soon as
+          we flush the hypercall batcher.  Protected by the lock. */
+       struct sk_buff_head release_on_flush_batcher;
+
+       struct hypercall_batcher pending_rx_hypercalls;
+
+       /* A pre-allocated pool of TX packets.  The
+          allocated_tx_packets and free_tx_packets linked lists
+          contain elements of this array, and it can also be directly
+          indexed by packet ID.  Protected by the lock. */
+       struct txp_slot tx_packets[NR_TX_PACKETS];
+};
+
+struct netchannel2 {
+#define NETCHANNEL2_MAGIC 0x57c68c1d
+       unsigned magic;
+
+       /* Set when the structure is created and never changed */
+       struct net_device *net_device;
+       struct xenbus_device *xenbus_device;
+
+       /* Set if we trust the remote endpoint. */
+       int remote_trusted;
+       /* Set if the remote endpoint is expected to trust us.
+          There's no guarantee that this is actually correct, but
+          it's useful for optimisation. */
+       int local_trusted;
+
+       struct netchannel2_ring_pair rings;
+
+       /* Packets which we need to transmit soon */
+       struct sk_buff_head pending_skbs;
+
+       /* Flag to indicate that the interface is stopped
+          When the interface is stopped we need to run the tasklet
+          after we receive an interrupt so that we can wake it up */
+       uint8_t is_stopped;
+
+       /* Updates are protected by the lock.  This can be read at any
+        * time without holding any locks, and the rest of Linux is
+        * expected to cope. */
+       struct net_device_stats stats;
+};
+
+static inline void flush_prepared_grant_copies(struct hypercall_batcher *hb,
+                                              void (*on_fail)(void *ctxt,
+                                                              gnttab_copy_t 
*gop))
+{
+       unsigned x;
+
+       if (hb->nr_pending_gops == 0)
+               return;
+       if (HYPERVISOR_grant_table_op(GNTTABOP_copy, hb->gops,
+                                     hb->nr_pending_gops))
+               BUG();
+       for (x = 0; x < hb->nr_pending_gops; x++)
+               if (hb->gops[x].status != GNTST_okay)
+                       on_fail(hb->ctxt[x], &hb->gops[x]);
+       hb->nr_pending_gops = 0;
+}
+
+static inline gnttab_copy_t *hypercall_batcher_grant_copy(struct 
hypercall_batcher *hb,
+                                                         void *ctxt,
+                                                         void (*on_fail)(void 
*,
+                                                                         
gnttab_copy_t *gop))
+{
+       if (hb->nr_pending_gops == ARRAY_SIZE(hb->gops))
+               flush_prepared_grant_copies(hb, on_fail);
+       hb->ctxt[hb->nr_pending_gops] = ctxt;
+       return &hb->gops[hb->nr_pending_gops++];
+}
+
+static inline void flush_hypercall_batcher(struct hypercall_batcher *hb,
+                                          void (*on_fail)(void *,
+                                                          gnttab_copy_t *gop))
+{
+       flush_prepared_grant_copies(hb, on_fail);
+}
+
+struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc,
+                                           struct netchannel2_ring_pair *ncrp,
+                                           struct netchannel2_msg_packet *msg,
+                                           struct netchannel2_msg_hdr *hdr,
+                                           unsigned nr_frags,
+                                           unsigned frags_off);
+
+int prepare_xmit_allocate_small(struct netchannel2_ring_pair *ncrp,
+                                                      struct sk_buff *skb);
+int prepare_xmit_allocate_grant(struct netchannel2_ring_pair *ncrp,
+                               struct sk_buff *skb);
+void xmit_grant(struct netchannel2_ring_pair *ncrp,
+               struct sk_buff *skb,
+               volatile void *msg);
+
+void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp,
+                                uint32_t id, uint8_t flags);
+
+int allocate_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb);
+void release_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb);
+/* Releases the txp slot, the grant pool, and the skb */
+void release_tx_packet(struct netchannel2_ring_pair *ncrp,
+                      struct sk_buff *skb);
+
+void fetch_fragment(struct netchannel2_ring_pair *ncrp,
+                   unsigned idx,
+                   struct netchannel2_fragment *frag,
+                   unsigned off);
+
+void nc2_kick(struct netchannel2_ring_pair *ncrp);
+
+int nc2_map_grants(struct grant_mapping *gm,
+                  const grant_ref_t *grefs,
+                  unsigned nr_grefs,
+                  domid_t remote_domain);
+void nc2_unmap_grants(struct grant_mapping *gm);
+
+void queue_packet_to_interface(struct sk_buff *skb,
+                              struct netchannel2_ring_pair *ncrp);
+
+void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop);
+
+int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev);
+int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp,
+                          struct sk_buff *skb);
+int prepare_xmit_allocate_resources(struct netchannel2 *nc,
+                                   struct sk_buff *skb);
+void nc2_handle_finish_packet_msg(struct netchannel2 *nc,
+                                 struct netchannel2_ring_pair *ncrp,
+                                 struct netchannel2_msg_hdr *hdr);
+void nc2_handle_set_max_packets_msg(struct netchannel2_ring_pair *ncrp,
+                                   struct netchannel2_msg_hdr *hdr);
+void drop_pending_tx_packets(struct netchannel2_ring_pair *ncrp);
+
+void send_finish_packet_messages(struct netchannel2_ring_pair *ncrp);
+void nc2_handle_packet_msg(struct netchannel2 *nc,
+                          struct netchannel2_ring_pair *ncrp,
+                          struct netchannel2_msg_hdr *hdr,
+                          struct sk_buff_head *pending_rx_queue);
+void advertise_max_packets(struct netchannel2_ring_pair *ncrp);
+void receive_pending_skbs(struct sk_buff_head *rx_queue);
+void nc2_queue_purge(struct netchannel2_ring_pair *ncrp,
+                    struct sk_buff_head *queue);
+
+#endif /* !NETCHANNEL2_CORE_H__ */
diff --git a/drivers/xen/netchannel2/netchannel2_endpoint.h 
b/drivers/xen/netchannel2/netchannel2_endpoint.h
new file mode 100644
index 0000000..2525f23
--- /dev/null
+++ b/drivers/xen/netchannel2/netchannel2_endpoint.h
@@ -0,0 +1,63 @@
+/* Interface between the endpoint implementations (netfront2.c,
+   netback2.c) and the netchannel2 core (chan.c and the various
+   transmission modes).         */
+#ifndef NETCHANNEL2_ENDPOINT_H__
+#define NETCHANNEL2_ENDPOINT_H__
+
+#include <linux/init.h>
+#include <xen/interface/xen.h>
+
+struct netchannel2_sring_prod;
+struct netchannel2_sring_cons;
+struct netchannel2;
+struct xenbus_device;
+
+struct netchannel2 *nc2_new(struct xenbus_device *xd);
+void nc2_release(struct netchannel2 *nc);
+
+int nc2_attach_rings(struct netchannel2 *nc,
+                    struct netchannel2_sring_cons *cons_sring,
+                    const volatile void *cons_payload,
+                    size_t cons_size,
+                    struct netchannel2_sring_prod *prod_sring,
+                    void *prod_payload,
+                    size_t prod_size,
+                    domid_t otherend_id);
+void nc2_detach_rings(struct netchannel2 *nc);
+#if defined(CONFIG_XEN_NETDEV2_FRONTEND)
+int nc2_listen_evtchn(struct netchannel2 *nc, domid_t dom);
+#endif
+#if defined(CONFIG_XEN_NETDEV2_BACKEND)
+int nc2_connect_evtchn(struct netchannel2 *nc, domid_t domid,
+                      int evtchn);
+#endif
+int nc2_get_evtchn_port(struct netchannel2 *nc);
+void nc2_suspend(struct netchannel2 *nc);
+
+void nc2_set_nr_tx_buffers(struct netchannel2 *nc, unsigned nr_buffers);
+
+/* Interface which the endpoints provide to the core. */
+#ifdef CONFIG_XEN_NETDEV2_FRONTEND
+int __init netfront2_init(void);
+void __exit netfront2_exit(void);
+#else
+static inline int netfront2_init(void)
+{
+       return 0;
+}
+static inline void netfront2_exit(void)
+{
+}
+#endif
+#ifdef CONFIG_XEN_NETDEV2_BACKEND
+int __init netback2_init(void);
+#else
+static inline int netback2_init(void)
+{
+       return 0;
+}
+#endif
+int __init nc2_init(void);
+void __exit nc2_exit(void);
+
+#endif /* NETCHANNEL2_ENDPOINT_H__ */
diff --git a/drivers/xen/netchannel2/netfront2.c 
b/drivers/xen/netchannel2/netfront2.c
new file mode 100644
index 0000000..fb5d426
--- /dev/null
+++ b/drivers/xen/netchannel2/netfront2.c
@@ -0,0 +1,488 @@
+#include <linux/kernel.h>
+#include <linux/gfp.h>
+#include <linux/version.h>
+#include <xen/gnttab.h>
+#include <xen/xenbus.h>
+
+#include "netchannel2_core.h"
+#include "netchannel2_endpoint.h"
+
+#define MAX_SRING_PAGES 4
+
+struct netfront2 {
+#define NETFRONT2_MAGIC 0x9268e704
+       unsigned magic;
+       struct xenbus_device *xenbus_device;
+
+       void *f2b_sring;
+       grant_ref_t f2b_grefs[MAX_SRING_PAGES];
+       void *b2f_sring;
+       grant_ref_t b2f_grefs[MAX_SRING_PAGES];
+
+       struct netchannel2_frontend_shared *control_shared;
+       grant_ref_t control_shared_gref;
+
+       int nr_sring_pages;
+       int sring_order;
+
+       grant_ref_t rings_gref_pool; /* Some pre-allocated grant
+                                       references to cover the shared
+                                       rings. */
+
+       struct netchannel2 *chan;
+
+       int attached; /* True if the shared rings are ready to go. */
+};
+
+static struct netfront2 *xenbus_device_to_nf2(struct xenbus_device *xd)
+{
+       struct netfront2 *work = xd->dev.driver_data;
+       BUG_ON(work->magic != NETFRONT2_MAGIC);
+       return work;
+}
+
+/* Try to revoke a bunch of grant references and return the grefs to
+   the rings grefs pool.  Any cleared grefs are set to 0.  Returns 0
+   on success or <0 on error.  Ignores zero entries in the @grefs
+   list, and zeroes any entries which are successfully ended. */
+static int ungrant_access_to_ring(struct netfront2 *nf,
+                                 grant_ref_t *grefs,
+                                 int nr_pages)
+{
+       int i;
+       int succ;
+       int failed;
+
+       failed = 0;
+
+       for (i = 0; i < nr_pages; i++) {
+               if (grefs[i]) {
+                       succ = gnttab_end_foreign_access_ref(grefs[i]);
+                       if (!succ) {
+                               /* XXX we can't recover when this
+                                * happens.  Try to do something
+                                * vaguely plausible, but the device
+                                * is pretty much doomed. */
+                               printk(KERN_WARNING "Failed to end access to 
gref %d\n",
+                                      i);
+                               failed = 1;
+                               continue;
+                       }
+                       gnttab_release_grant_reference(&nf->rings_gref_pool,
+                                                      grefs[i]);
+                       grefs[i] = 0;
+               }
+       }
+
+       if (failed)
+               return -EBUSY;
+       else
+               return 0;
+}
+
+/* Allocate and initialise grant references to cover a bunch of pages.
+   @ring should be in the direct-mapped region.         The rings_gref_pool
+   on nf should contain at least @nr_pages references.
+   Already-populated slots in the @grefs list are left unchanged. */
+static void grant_access_to_ring(struct netfront2 *nf,
+                                domid_t otherend,
+                                void *ring,
+                                int *grefs,
+                                int nr_pages)
+{
+       void *p;
+       int i;
+       grant_ref_t ref;
+
+       for (i = 0; i < nr_pages; i++) {
+
+               if (grefs[i] != 0)
+                       continue;
+
+               p = (void *)((unsigned long)ring + PAGE_SIZE * i);
+
+               ref = gnttab_claim_grant_reference(&nf->rings_gref_pool);
+               /* There should be enough grefs in the pool to handle
+                  the rings. */
+               BUG_ON(ref < 0);
+               gnttab_grant_foreign_access_ref(ref,
+                                               otherend,
+                                               virt_to_mfn(p),
+                                               0);
+               grefs[i] = ref;
+       }
+}
+
+/* Push an already-granted ring into xenstore. */
+static int publish_ring(struct xenbus_transaction xbt,
+                       struct netfront2 *nf,
+                       const char *prefix,
+                       const int *grefs,
+                       int nr_grefs)
+{
+       int i;
+       char buf[32];
+       int err;
+
+       sprintf(buf, "%s-nr-pages", prefix);
+       err = xenbus_printf(xbt, nf->xenbus_device->nodename, buf,
+                           "%u", nr_grefs);
+       if (err)
+               return err;
+
+       for (i = 0; i < nr_grefs; i++) {
+               BUG_ON(grefs[i] == 0);
+               sprintf(buf, "%s-ref-%u", prefix, i);
+               err = xenbus_printf(xbt, nf->xenbus_device->nodename,
+                                   buf, "%u", grefs[i]);
+               if (err)
+                       return err;
+       }
+       return 0;
+}
+
+static int publish_rings(struct netfront2 *nf)
+{
+       int err;
+       struct xenbus_transaction xbt;
+       const char *msg;
+
+again:
+       err = xenbus_transaction_start(&xbt);
+       if (err) {
+               xenbus_dev_fatal(nf->xenbus_device, err,
+                                "starting transaction");
+               return err;
+       }
+
+       err = publish_ring(xbt, nf, "f2b-ring", nf->f2b_grefs,
+                          nf->nr_sring_pages);
+       if (err) {
+               msg = "publishing f2b-ring";
+               goto abort;
+       }
+       err = publish_ring(xbt, nf, "b2f-ring", nf->b2f_grefs,
+                          nf->nr_sring_pages);
+       if (err) {
+               msg = "publishing b2f-ring";
+               goto abort;
+       }
+       err = publish_ring(xbt, nf, "control", &nf->control_shared_gref, 1);
+       if (err) {
+               msg = "publishing control";
+               goto abort;
+       }
+       err = xenbus_printf(xbt, nf->xenbus_device->nodename,
+                           "event-channel", "%u",
+                           nc2_get_evtchn_port(nf->chan));
+       if (err) {
+               msg = "publishing event channel";
+               goto abort;
+       }
+
+       err = xenbus_transaction_end(xbt, 0);
+       if (err) {
+               if (err == -EAGAIN)
+                       goto again;
+               xenbus_dev_fatal(nf->xenbus_device, err,
+                                "completing transaction");
+       }
+
+       return err;
+
+abort:
+       xenbus_transaction_end(xbt, 1);
+       xenbus_dev_fatal(nf->xenbus_device, err, msg);
+       return err;
+}
+
+/* Release the rings.  WARNING: This will leak memory if the other end
+   still has the rings mapped. There isn't really anything we can do
+   about that; the alternative (giving the other end access to
+   whatever Linux puts in the memory after we released it) is probably
+   worse. */
+static void release_rings(struct netfront2 *nf)
+{
+       int have_outstanding_grants;
+
+       have_outstanding_grants = 0;
+
+       if (nf->f2b_sring) {
+               if (ungrant_access_to_ring(nf, nf->f2b_grefs,
+                                          nf->nr_sring_pages) >= 0) {
+                       free_pages((unsigned long)nf->f2b_sring,
+                                  nf->sring_order);
+               } else {
+                       have_outstanding_grants = 1;
+               }
+               nf->f2b_sring = NULL;
+       }
+
+       if (nf->b2f_sring) {
+               if (ungrant_access_to_ring(nf, nf->b2f_grefs,
+                                          nf->nr_sring_pages) >= 0) {
+                       free_pages((unsigned long)nf->b2f_sring,
+                                  nf->sring_order);
+               } else {
+                       have_outstanding_grants = 1;
+               }
+               nf->b2f_sring = NULL;
+       }
+
+       if (nf->control_shared) {
+               if (ungrant_access_to_ring(nf, &nf->control_shared_gref,
+                                          1) >= 0) {
+                       free_page((unsigned long)nf->control_shared);
+               } else {
+                       have_outstanding_grants = 1;
+               }
+               nf->control_shared = NULL;
+       }
+
+       if (have_outstanding_grants != 0) {
+               printk(KERN_WARNING
+                      "Released shared rings while the backend still had them 
mapped; leaking memory\n");
+       }
+
+       /* We can't release the gref pool if there are still
+          references outstanding against it. */
+       if (!have_outstanding_grants) {
+               if (nf->rings_gref_pool)
+                       gnttab_free_grant_references(nf->rings_gref_pool);
+               nf->rings_gref_pool = 0;
+       }
+
+       nf->attached = 0;
+}
+
+static int allocate_rings(struct netfront2 *nf, domid_t otherend)
+{
+       int err;
+       int max_sring_pages;
+       int sring_order;
+       int nr_sring_pages;
+       size_t sring_size;
+
+       /* Figure out how big our shared rings are going to be. */
+       err = xenbus_scanf(XBT_NIL, nf->xenbus_device->otherend,
+                          "max-sring-pages", "%d", &max_sring_pages);
+       if (err < 0) {
+               xenbus_dev_fatal(nf->xenbus_device, err,
+                                "reading %s/max-sring-pages",
+                                nf->xenbus_device->otherend);
+               return err;
+       }
+       if (max_sring_pages > MAX_SRING_PAGES)
+               max_sring_pages = MAX_SRING_PAGES;
+       sring_order = order_base_2(max_sring_pages);
+       nr_sring_pages = 1 << sring_order;
+       sring_size = nr_sring_pages * PAGE_SIZE;
+
+       release_rings(nf);
+
+       nf->nr_sring_pages = nr_sring_pages;
+       nf->sring_order = sring_order;
+
+       nf->f2b_sring = (void *)__get_free_pages(GFP_KERNEL, sring_order);
+       if (!nf->f2b_sring)
+               return -ENOMEM;
+       memset(nf->f2b_sring, 0, sring_size);
+
+       nf->b2f_sring = (void *)__get_free_pages(GFP_KERNEL, sring_order);
+       if (!nf->b2f_sring)
+               return -ENOMEM;
+       memset(nf->b2f_sring, 0, sring_size);
+
+       nf->control_shared = (void *)get_zeroed_page(GFP_KERNEL);
+       if (!nf->control_shared)
+               return -ENOMEM;
+
+       /* Pre-allocate enough grant references to be sure that we can
+          grant access to both rings without an error. */
+       err = gnttab_alloc_grant_references(nr_sring_pages * 2 + 1,
+                                           &nf->rings_gref_pool);
+       if (err < 0)
+               return err;
+
+       grant_access_to_ring(nf,
+                            otherend,
+                            nf->b2f_sring,
+                            nf->b2f_grefs,
+                            nr_sring_pages);
+       grant_access_to_ring(nf,
+                            otherend,
+                            nf->f2b_sring,
+                            nf->f2b_grefs,
+                            nr_sring_pages);
+       grant_access_to_ring(nf,
+                            otherend,
+                            nf->control_shared,
+                            &nf->control_shared_gref,
+                            1);
+       err = nc2_listen_evtchn(nf->chan, otherend);
+       if (err < 0)
+               return err;
+
+       nf->attached = 1;
+
+       return 0;
+}
+
+static void backend_changed(struct xenbus_device *xd,
+                           enum xenbus_state backend_state)
+{
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+       int err;
+
+       switch (backend_state) {
+       case XenbusStateInitialising:
+               /* Backend isn't ready yet, don't do anything. */
+               break;
+
+       case XenbusStateInitWait:
+               /* Backend has advertised the ring protocol.  Allocate
+                  the rings, and tell the backend about them. */
+
+               err = 0;
+               if (!nf->attached)
+                       err = allocate_rings(nf, xd->otherend_id);
+               if (err < 0) {
+                       xenbus_dev_fatal(xd, err, "allocating shared rings");
+                       break;
+               }
+               err = publish_rings(nf);
+               if (err >= 0)
+                       xenbus_switch_state(xd, XenbusStateInitialised);
+               break;
+
+       case XenbusStateInitialised:
+               /* Backend isn't supposed to use this state. */
+               xenbus_dev_fatal(xd, EINVAL,
+                                "unexpected backend state Initialised");
+               break;
+
+       case XenbusStateConnected:
+               /* All ready */
+               err = nc2_attach_rings(nf->chan,
+                                      &nf->control_shared->cons,
+                                      nf->b2f_sring,
+                                      nf->nr_sring_pages * PAGE_SIZE,
+                                      &nf->control_shared->prod,
+                                      nf->f2b_sring,
+                                      nf->nr_sring_pages * PAGE_SIZE,
+                                      nf->xenbus_device->otherend_id);
+               if (err < 0) {
+                       xenbus_dev_fatal(xd, err,
+                                        "failed to attach to rings");
+               } else {
+                       xenbus_switch_state(xd, XenbusStateConnected);
+               }
+               break;
+
+       case XenbusStateClosing:
+               xenbus_switch_state(xd, XenbusStateClosing);
+               break;
+
+       case XenbusStateClosed:
+               /* Tell the tools that it's safe to remove the device
+                  from the bus. */
+               xenbus_frontend_closed(xd);
+               /* Note that we don't release the rings here.  This
+                  means that if the backend moves to a different
+                  domain, we won't be able to reconnect, but it also
+                  limits the amount of memory which can be wasted in
+                  the release_rings() leak if the backend is faulty
+                  or malicious.  It's not obvious which is more
+                  useful, and so I choose the safer but less
+                  featureful approach. */
+               /* This is only a problem if you're using driver
+                  domains and trying to recover from a driver error
+                  by rebooting the backend domain.  The rest of the
+                  tools don't support that, so it's a bit
+                  theoretical.  The memory leaks aren't, though. */
+               break;
+
+       case XenbusStateUnknown:
+               /* The tools have removed the device area from the
+                  store.  Do nothing and rely on xenbus core to call
+                  our remove method. */
+               break;
+
+       default:
+               /* Ignore transitions to unknown states */
+               break;
+       }
+}
+
+static int __devinit netfront_probe(struct xenbus_device *xd,
+                                   const struct xenbus_device_id *id)
+{
+       struct netfront2 *nf;
+
+       nf = kzalloc(sizeof(*nf), GFP_KERNEL);
+       if (nf == NULL)
+               goto err;
+       nf->magic = NETFRONT2_MAGIC;
+       nf->xenbus_device = xd;
+       nf->chan = nc2_new(xd);
+       if (nf->chan == NULL)
+               goto err;
+
+       xd->dev.driver_data = nf;
+
+       return 0;
+
+err:
+       kfree(nf);
+       xenbus_dev_fatal(xd, ENOMEM, "probing netdev");
+       return -ENOMEM;
+}
+
+static int netfront_resume(struct xenbus_device *xd)
+{
+       /* We've been suspended and come back.  The rings are
+          therefore dead.  Tear them down. */
+       /* We rely on the normal xenbus state machine to bring them
+          back to life. */
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+
+       nc2_detach_rings(nf->chan);
+       release_rings(nf);
+
+       return 0;
+}
+
+static int __devexit netfront_remove(struct xenbus_device *xd)
+{
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+       if (nf->chan != NULL)
+               nc2_release(nf->chan);
+       release_rings(nf);
+       kfree(nf);
+       return 0;
+}
+
+static const struct xenbus_device_id netfront_ids[] = {
+       { "vif2" },
+       { "" }
+};
+MODULE_ALIAS("xen:vif2");
+
+static struct xenbus_driver netfront2 = {
+       .name = "vif2",
+       .ids = netfront_ids,
+       .probe = netfront_probe,
+       .remove = __devexit_p(netfront_remove),
+       .otherend_changed = backend_changed,
+       .resume = netfront_resume,
+};
+
+int __init netfront2_init(void)
+{
+       return xenbus_register_frontend(&netfront2);
+}
+
+void __exit netfront2_exit(void)
+{
+       xenbus_unregister_driver(&netfront2);
+}
diff --git a/drivers/xen/netchannel2/recv_packet.c 
b/drivers/xen/netchannel2/recv_packet.c
new file mode 100644
index 0000000..4678c28
--- /dev/null
+++ b/drivers/xen/netchannel2/recv_packet.c
@@ -0,0 +1,216 @@
+/* Support for receiving individual packets, and all the stuff which
+ * goes with that. */
+#include <linux/kernel.h>
+#include <linux/etherdevice.h>
+#include <linux/version.h>
+#include "netchannel2_core.h"
+
+/* Send as many finish packet messages as will fit on the ring. */
+void send_finish_packet_messages(struct netchannel2_ring_pair *ncrp)
+{
+       struct pending_finish_packets *pfp = &ncrp->pending_finish;
+       struct netchannel2_msg_finish_packet msg;
+       RING_IDX cons;
+
+       while (pfp->prod != pfp->cons &&
+              nc2_can_send_payload_bytes(&ncrp->prod_ring, sizeof(msg))) {
+               cons = pfp->cons;
+               msg.id = pfp->ids[pfp->cons % MAX_PENDING_FINISH_PACKETS];
+               pfp->cons++;
+               nc2_send_message(&ncrp->prod_ring,
+                                NETCHANNEL2_MSG_FINISH_PACKET,
+                                0,
+                                &msg,
+                                sizeof(msg));
+       }
+}
+
+/* Add a packet ID to the finish packet queue. The caller should
+   arrange that send_finish_packet_messages is sent soon to flush the
+   requests out. */
+void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp,
+                                uint32_t id, uint8_t flags)
+{
+       struct pending_finish_packets *pfp = &ncrp->pending_finish;
+       RING_IDX prod;
+
+       prod = pfp->prod;
+       pfp->ids[prod % MAX_PENDING_FINISH_PACKETS] = id;
+       pfp->prod++;
+
+       if (flags & NC2_PACKET_FLAG_need_event)
+               ncrp->pending_time_sensitive_messages = 1;
+}
+
+/* Handle a packet message from the other end.  On success, queues the
+   new skb to the pending skb list.  If the packet is invalid, it is
+   discarded without generating a FINISH message. */
+/* Caution: this drops and re-acquires the ring lock. */
+void nc2_handle_packet_msg(struct netchannel2 *nc,
+                          struct netchannel2_ring_pair *ncrp,
+                          struct netchannel2_msg_hdr *hdr,
+                          struct sk_buff_head *pending_rx_queue)
+{
+       unsigned nr_frags;
+       struct netchannel2_msg_packet msg;
+       struct sk_buff *skb;
+       const unsigned frags_off = sizeof(msg);
+       unsigned frags_bytes;
+
+       if (ncrp->pending_finish.prod - ncrp->pending_finish.cons ==
+           MAX_PENDING_FINISH_PACKETS) {
+               pr_debug("Remote endpoint sent too many packets!\n");
+               nc->stats.rx_errors++;
+               return;
+       }
+
+       if (hdr->size < sizeof(msg)) {
+               pr_debug("Packet message too small (%d < %zd)\n", hdr->size,
+                        sizeof(msg));
+               nc->stats.rx_errors++;
+               return;
+       }
+
+       if (hdr->size & 7) {
+               pr_debug("Packet size in ring not multiple of 8: %d\n",
+                        hdr->size);
+               nc->stats.rx_errors++;
+               return;
+       }
+
+       nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
+
+       frags_bytes = hdr->size - sizeof(msg) - msg.prefix_size;
+       nr_frags = frags_bytes / sizeof(struct netchannel2_fragment);
+
+       switch (msg.type) {
+       case NC2_PACKET_TYPE_small:
+               if (nr_frags != 0) {
+                       /* Small packets, by definition, have no
+                        * fragments */
+                       pr_debug("Received small packet with %d frags?\n",
+                                nr_frags);
+                       nc->stats.rx_errors++;
+                       return;
+               }
+               /* Any of the receiver functions can handle small
+                  packets as a trivial special case.  Use receiver
+                  copy, since that's the simplest. */
+               skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr,
+                                                 nr_frags, frags_off);
+               /* No finish message */
+               break;
+       case NC2_PACKET_TYPE_receiver_copy:
+               skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr,
+                                                 nr_frags, frags_off);
+               queue_finish_packet_message(ncrp, msg.id, msg.flags);
+               break;
+       default:
+               pr_debug("Unknown packet type %d\n", msg.type);
+               nc->stats.rx_errors++;
+               skb = NULL;
+               break;
+       }
+       if (skb != NULL) {
+               nc->stats.rx_bytes += skb->len;
+               nc->stats.rx_packets++;
+               skb->dev = nc->net_device;
+
+               if (ncrp->filter_mac &&
+                   skb_headlen(skb) >= sizeof(struct ethhdr) &&
+                   memcmp(((struct ethhdr *)skb->data)->h_source,
+                          ncrp->remote_mac,
+                          ETH_ALEN)) {
+                       /* We're in filter MACs mode and the source
+                          MAC on this packet is wrong.  Drop it. */
+                       /* (We know that any packet big enough to
+                          contain an ethernet header at all will
+                          contain it in the head space because we do
+                          a pull_through at the end of the type
+                          handler.) */
+                       nc->stats.rx_missed_errors++;
+                       goto err;
+               }
+
+               __skb_queue_tail(pending_rx_queue, skb);
+
+               if (ncrp->pending_rx_hypercalls.nr_pending_gops >=
+                   RX_GRANT_COPY_BATCH) {
+                       
flush_prepared_grant_copies(&ncrp->pending_rx_hypercalls,
+                                                   nc2_rscb_on_gntcopy_fail);
+                       /* since receive could generate ACKs to the
+                          start_xmit() function we need to release
+                          the ring lock */
+                       spin_unlock(&ncrp->lock);
+                       /* we should receive the packet as soon as the
+                          copy is complete to benefit from cache
+                          locality */
+                       receive_pending_skbs(pending_rx_queue);
+                       spin_lock(&ncrp->lock);
+
+               }
+
+       }
+       return;
+
+err:
+       /* If the receive succeeded part-way, there may be references
+          to the skb in the hypercall batcher.  Flush them out before
+          we release it.  This is a slow path, so we don't care that
+          much about performance. */
+       flush_prepared_grant_copies(&ncrp->pending_rx_hypercalls,
+                                   nc2_rscb_on_gntcopy_fail);
+
+       /* We may need to send a FINISH message here if this was a
+          receiver-map packet.  That should be handled automatically
+          by the kfree_skb(). */
+       kfree_skb(skb);
+       nc->stats.rx_errors++;
+       return;
+}
+
+/* If there is space on the ring, tell the other end how many packets
+   its allowed to send at one time and clear the
+   need_advertise_max_packets flag. */
+void advertise_max_packets(struct netchannel2_ring_pair *ncrp)
+{
+       struct netchannel2_msg_set_max_packets msg;
+
+       if (!nc2_can_send_payload_bytes(&ncrp->prod_ring, sizeof(msg)))
+               return;
+       msg.max_outstanding_packets = MAX_PENDING_FINISH_PACKETS;
+       nc2_send_message(&ncrp->prod_ring,
+                        NETCHANNEL2_MSG_SET_MAX_PACKETS,
+                        0,
+                        &msg,
+                        sizeof(msg));
+       ncrp->need_advertise_max_packets = 0;
+       ncrp->pending_time_sensitive_messages = 1;
+}
+
+void receive_pending_skbs(struct sk_buff_head *pending_rx_queue)
+{
+       struct sk_buff *skb;
+       struct skb_cb_overlay *sco;
+       while (!skb_queue_empty(pending_rx_queue)) {
+               skb = __skb_dequeue(pending_rx_queue);
+               sco = get_skb_overlay(skb);
+               if (unlikely(sco->failed))
+                       kfree_skb(skb);
+               else {
+                       skb->protocol = eth_type_trans(skb, skb->dev);
+                       netif_receive_skb(skb);
+               }
+       }
+}
+
+
+/* These don't really belong here, but it's as good a place as any. */
+int __init nc2_init(void)
+{
+       return 0;
+}
+
+void __exit nc2_exit(void)
+{
+}
diff --git a/drivers/xen/netchannel2/rscb.c b/drivers/xen/netchannel2/rscb.c
new file mode 100644
index 0000000..8984f90
--- /dev/null
+++ b/drivers/xen/netchannel2/rscb.c
@@ -0,0 +1,385 @@
+/* Receiver-side copy buffer support */
+#include <linux/kernel.h>
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+#include <linux/version.h>
+#include <xen/gnttab.h>
+#include <xen/live_maps.h>
+
+#include "netchannel2_core.h"
+
+/* -------------------------- Receive -------------------------------- */
+
+/* This is called whenever an RSCB grant copy fails. */
+void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop)
+{
+       struct sk_buff *skb = ctxt;
+       struct skb_cb_overlay *sco = get_skb_overlay(skb);
+       if (!sco->failed && net_ratelimit())
+               printk(KERN_WARNING "Dropping RX packet because of copy 
error\n");
+       sco->failed = 1;
+}
+
+
+/* Copy @size bytes from @offset in grant ref @gref against domain
+   @domid and shove them on the end of @skb. Fails if it the head
+   does not have enough space or if the copy would span multiple
+   pages. */
+static int nc2_grant_copy(struct netchannel2_ring_pair *ncrp,
+                         struct sk_buff *skb,
+                         unsigned offset,
+                         unsigned size,
+                         grant_ref_t gref,
+                         domid_t domid)
+{
+       gnttab_copy_t *gop;
+       void *tail;
+       void *end;
+
+       if (size > PAGE_SIZE)
+               return 0;
+
+       tail = skb_tail_pointer(skb);
+       end = skb_end_pointer(skb);
+
+       if (unlikely(size > (end-tail)))
+               return 0;
+
+       if (unlikely(offset_in_page(tail) + size > PAGE_SIZE)) {
+               unsigned f1 = PAGE_SIZE - offset_in_page(tail);
+               /* Recursive, but only ever to depth 1, so okay */
+               if (!nc2_grant_copy(ncrp, skb, offset, f1, gref, domid))
+                       return 0;
+               offset += f1;
+               size -= f1;
+               tail += f1;
+       }
+
+       /* Copy this fragment into the header. */
+       gop = hypercall_batcher_grant_copy(&ncrp->pending_rx_hypercalls,
+                                          skb,
+                                          nc2_rscb_on_gntcopy_fail);
+       gop->flags = GNTCOPY_source_gref;
+       gop->source.domid = domid;
+       gop->source.offset = offset;
+       gop->source.u.ref = gref;
+       gop->dest.domid = DOMID_SELF;
+       gop->dest.offset = offset_in_page(tail);
+       gop->dest.u.gmfn = virt_to_mfn(tail);
+       gop->len = size;
+
+       skb_put(skb, size);
+
+       return 1;
+}
+
+/* We've received a receiver-copy packet message from the remote.
+   Parse it up, build an sk_buff, and return it.  Returns NULL on
+   error. */
+struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc,
+                                           struct netchannel2_ring_pair *ncrp,
+                                           struct netchannel2_msg_packet *msg,
+                                           struct netchannel2_msg_hdr *hdr,
+                                           unsigned nr_frags,
+                                           unsigned frags_off)
+{
+       struct netchannel2_fragment frag;
+       unsigned nr_bytes;
+       unsigned x;
+       struct sk_buff *skb;
+       unsigned skb_headsize;
+       int first_frag, first_frag_size;
+       gnttab_copy_t *gop;
+       struct skb_shared_info *shinfo;
+       struct page *new_page;
+
+       if (msg->prefix_size > NETCHANNEL2_MAX_INLINE_BYTES) {
+               pr_debug("Inline prefix too big! (%d > %d)\n",
+                        msg->prefix_size, NETCHANNEL2_MAX_INLINE_BYTES);
+               return NULL;
+       }
+
+       /* Count the number of bytes in the packet.  Be careful: the
+          other end can still access the packet on the ring, so the
+          size could change later. */
+       nr_bytes = msg->prefix_size;
+       for (x = 0; x < nr_frags; x++) {
+               fetch_fragment(ncrp, x, &frag, frags_off);
+               nr_bytes += frag.size;
+       }
+       if (nr_bytes > NETCHANNEL2_MAX_PACKET_BYTES) {
+               pr_debug("Packet too big! (%d > %d)\n", nr_bytes,
+                        NETCHANNEL2_MAX_PACKET_BYTES);
+               return NULL;
+       }
+       if (nr_bytes < 64) {
+               /* Linux sometimes has problems with very small SKBs.
+                  Impose a minimum size of 64 bytes. */
+               nr_bytes = 64;
+       }
+
+       first_frag = 0;
+       if (nr_frags > 0) {
+               fetch_fragment(ncrp, 0, &frag, frags_off);
+               first_frag_size = frag.size;
+               first_frag = 1;
+       } else {
+               first_frag_size = 0;
+               first_frag = 0;
+       }
+
+       /* We try to have both prefix and the first frag in the skb head
+          if they do not exceed the page size */
+       skb_headsize = msg->prefix_size + first_frag_size + NET_IP_ALIGN;
+       if (skb_headsize >
+           ((PAGE_SIZE - sizeof(struct skb_shared_info) - NET_SKB_PAD) &
+            ~(SMP_CACHE_BYTES - 1))) {
+               skb_headsize = msg->prefix_size + NET_IP_ALIGN;
+               first_frag = 0;
+       }
+
+       skb = dev_alloc_skb(skb_headsize);
+       if (!skb) {
+               /* Drop the packet. */
+               pr_debug("Couldn't allocate a %d byte skb.\n", nr_bytes);
+               nc->stats.rx_dropped++;
+               return NULL;
+       }
+
+       /* Arrange that the IP header is nicely aligned in memory. */
+       skb_reserve(skb, NET_IP_ALIGN);
+
+       /* The inline prefix should always fit in the SKB head. */
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              skb_put(skb, msg->prefix_size),
+                              msg->prefix_size,
+                              frags_off + nr_frags * sizeof(frag));
+
+       /* copy first frag into skb head if it does not cross a
+          page boundary */
+       if (first_frag == 1) {
+               fetch_fragment(ncrp, 0, &frag, frags_off);
+               if (!nc2_grant_copy(ncrp, skb, frag.off, frag.size,
+                                   frag.receiver_copy.gref,
+                                   ncrp->otherend_id)) {
+                       get_skb_overlay(skb)->failed = 1;
+                       return skb;
+               }
+       }
+
+       shinfo = skb_shinfo(skb);
+       for (x = first_frag; x < nr_frags; x++) {
+               fetch_fragment(ncrp, x, &frag, frags_off);
+
+               /* Allocate a new page for the fragment */
+               new_page = alloc_page(GFP_ATOMIC);
+               if (!new_page) {
+                       get_skb_overlay(skb)->failed = 1;
+                       return skb;
+               }
+
+               gop = hypercall_batcher_grant_copy(&ncrp->pending_rx_hypercalls,
+                                                  skb,
+                                                  nc2_rscb_on_gntcopy_fail);
+               gop->flags = GNTCOPY_source_gref;
+               gop->source.domid = ncrp->otherend_id;
+               gop->source.offset = frag.off;
+               gop->source.u.ref = frag.receiver_copy.gref;
+               gop->dest.domid = DOMID_SELF;
+               gop->dest.offset = 0;
+               gop->dest.u.gmfn = pfn_to_mfn(page_to_pfn(new_page));
+               gop->len = frag.size;
+
+               shinfo->frags[x-first_frag].page = new_page;
+               shinfo->frags[x-first_frag].page_offset = 0;
+               shinfo->frags[x-first_frag].size = frag.size;
+               shinfo->nr_frags++;
+
+               skb->truesize += frag.size;
+               skb->data_len += frag.size;
+               skb->len += frag.size;
+       }
+       return skb;
+}
+
+
+
+/* ------------------------------- Transmit ---------------------------- */
+
+struct grant_packet_plan {
+       volatile struct netchannel2_fragment *out_fragment;
+       grant_ref_t gref_pool;
+       unsigned prefix_avail;
+};
+
+static inline int nfrags_skb(struct sk_buff *skb, int prefix_size)
+{
+       unsigned long start_grant;
+       unsigned long end_grant;
+
+       if (skb_headlen(skb) <= prefix_size)
+               return skb_shinfo(skb)->nr_frags;
+
+       start_grant = ((unsigned long)skb->data + prefix_size) &
+               ~(PAGE_SIZE-1);
+       end_grant = ((unsigned long)skb->data +
+                    skb_headlen(skb) +  PAGE_SIZE - 1) &
+               ~(PAGE_SIZE-1);
+       return ((end_grant - start_grant) >> PAGE_SHIFT)
+               + skb_shinfo(skb)->nr_frags;
+}
+
+int prepare_xmit_allocate_grant(struct netchannel2_ring_pair *ncrp,
+                               struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       unsigned nr_fragments;
+       grant_ref_t gref_pool;
+       int err;
+       unsigned inline_prefix_size;
+
+       if (allocate_txp_slot(ncrp, skb) < 0)
+               return -1;
+
+       /* We're going to have to get the remote to issue a grant copy
+          hypercall anyway, so there's no real benefit to shoving the
+          headers inline. */
+       /* (very small packets won't go through here, so there's no
+          chance that we could completely eliminate the grant
+          copy.) */
+       inline_prefix_size = sizeof(struct ethhdr);
+
+       if (skb_co->nr_fragments == 0) {
+               nr_fragments = nfrags_skb(skb, inline_prefix_size);
+
+               /* No-fragments packets should be policy small, not
+                * policy grant. */
+               BUG_ON(nr_fragments == 0);
+
+               skb_co->nr_fragments = nr_fragments;
+       }
+
+       /* Grab the grant references. */
+       err = gnttab_suballoc_grant_references(skb_co->nr_fragments,
+                                              &ncrp->gref_pool,
+                                              &gref_pool);
+       if (err < 0) {
+               release_txp_slot(ncrp, skb);
+               /* Leave skb_co->nr_fragments set, so that we don't
+                  have to recompute it next time around. */
+               return -1;
+       }
+       skb_co->gref_pool = gref_pool;
+       skb_co->inline_prefix_size = inline_prefix_size;
+
+       skb_co->type = NC2_PACKET_TYPE_receiver_copy;
+
+       return 0;
+}
+
+static void prepare_subpage_grant(struct netchannel2_ring_pair *ncrp,
+                                 struct page *page,
+                                 unsigned off_in_page,
+                                 unsigned size,
+                                 struct grant_packet_plan *plan)
+{
+       volatile struct netchannel2_fragment *frag;
+       domid_t trans_domid;
+       grant_ref_t trans_gref;
+       grant_ref_t gref;
+
+       if (size <= plan->prefix_avail) {
+               /* This fragment is going to be inline -> nothing to
+                * do. */
+               plan->prefix_avail -= size;
+               return;
+       }
+       if (plan->prefix_avail > 0) {
+               /* Part inline, part in payload. */
+               size -= plan->prefix_avail;
+               off_in_page += plan->prefix_avail;
+               plan->prefix_avail = 0;
+       }
+       frag = plan->out_fragment;
+       gref = gnttab_claim_grant_reference(&plan->gref_pool);
+       frag->receiver_copy.gref = gref;
+       if (page_is_tracked(page)) {
+               lookup_tracker_page(page, &trans_domid, &trans_gref);
+               gnttab_grant_foreign_access_ref_trans(gref,
+                                                     ncrp->otherend_id,
+                                                     GTF_readonly,
+                                                     trans_domid,
+                                                     trans_gref);
+       } else {
+               gnttab_grant_foreign_access_ref_subpage(gref,
+                                                       ncrp->otherend_id,
+                                                       
virt_to_mfn(page_address(page)),
+                                                       GTF_readonly,
+                                                       off_in_page,
+                                                       size);
+       }
+
+       frag->off = off_in_page;
+       frag->size = size;
+       plan->out_fragment++;
+}
+
+static int grant_data_area(struct netchannel2_ring_pair *ncrp,
+                          struct sk_buff *skb,
+                          struct grant_packet_plan *plan)
+{
+       void *ptr = skb->data;
+       unsigned len = skb_headlen(skb);
+       unsigned off;
+       unsigned this_time;
+
+       for (off = 0; off < len; off += this_time) {
+               this_time = len - off;
+               if (this_time + offset_in_page(ptr + off) > PAGE_SIZE)
+                       this_time = PAGE_SIZE - offset_in_page(ptr + off);
+               prepare_subpage_grant(ncrp,
+                                     virt_to_page(ptr + off),
+                                     offset_in_page(ptr + off),
+                                     this_time,
+                                     plan);
+       }
+       return 0;
+}
+
+void xmit_grant(struct netchannel2_ring_pair *ncrp,
+               struct sk_buff *skb,
+               volatile void *msg_buf)
+{
+       volatile struct netchannel2_msg_packet *msg = msg_buf;
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct grant_packet_plan plan;
+       unsigned x;
+       struct skb_shared_info *shinfo;
+       skb_frag_t *frag;
+
+       memset(&plan, 0, sizeof(plan));
+       plan.prefix_avail = skb_co->inline_prefix_size;
+       plan.out_fragment = msg->frags;
+       plan.gref_pool = skb_co->gref_pool;
+
+       ncrp->count_frags_no_event += skb_co->nr_fragments;
+       if (ncrp->count_frags_no_event >= ncrp->max_count_frags_no_event) {
+               msg->flags |= NC2_PACKET_FLAG_need_event;
+               ncrp->count_frags_no_event = 0;
+       }
+
+       grant_data_area(ncrp, skb, &plan);
+
+       shinfo = skb_shinfo(skb);
+       for (x = 0; x < shinfo->nr_frags; x++) {
+               frag = &shinfo->frags[x];
+               prepare_subpage_grant(ncrp,
+                                     frag->page,
+                                     frag->page_offset,
+                                     frag->size,
+                                     &plan);
+       }
+
+       skb_co->nr_fragments = plan.out_fragment - msg->frags;
+}
+
diff --git a/drivers/xen/netchannel2/util.c b/drivers/xen/netchannel2/util.c
new file mode 100644
index 0000000..302dfc1
--- /dev/null
+++ b/drivers/xen/netchannel2/util.c
@@ -0,0 +1,230 @@
+#include <linux/kernel.h>
+#include <linux/list.h>
+#include <linux/skbuff.h>
+#include <linux/version.h>
+#ifdef CONFIG_XEN_NETDEV2_BACKEND
+#include <xen/driver_util.h>
+#endif
+#include <xen/gnttab.h>
+#include "netchannel2_core.h"
+
+int allocate_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct txp_slot *tp;
+
+       BUG_ON(skb_co->tp);
+
+       if (ncrp->head_free_tx_packet == INVALID_TXP_INDEX ||
+           ncrp->nr_tx_packets_outstanding ==
+                   ncrp->max_tx_packets_outstanding) {
+               return -1;
+       }
+
+       tp = &ncrp->tx_packets[ncrp->head_free_tx_packet];
+       ncrp->head_free_tx_packet = txp_get_next_free(tp);
+
+       txp_set_skb(tp, skb);
+       skb_co->tp = tp;
+       ncrp->nr_tx_packets_outstanding++;
+       return 0;
+}
+
+static void nc2_free_skb(struct netchannel2 *nc,
+                        struct sk_buff *skb)
+{
+       dev_kfree_skb(skb);
+}
+
+void release_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct txp_slot *tp = skb_co->tp;
+
+       BUG_ON(txp_get_skb(tp) != skb);
+
+       /* Try to keep the free TX packet list in order as far as
+        * possible, since that gives slightly better cache behaviour.
+        * It's not worth spending a lot of effort getting this right,
+        * though, so just use a simple heuristic: if we're freeing a
+        * packet, and the previous packet is already free, chain this
+        * packet directly after it, rather than putting it at the
+        * head of the list.  This isn't perfect by any means, but
+        * it's enough that you get nice long runs of contiguous
+        * packets in the free list, and that's all we really need.
+        * Runs much bigger than a cache line aren't really very
+        * useful, anyway. */
+       if (tp != ncrp->tx_packets && !txp_slot_in_use(tp - 1)) {
+               txp_set_next_free(tp, txp_get_next_free(tp - 1));
+               txp_set_next_free(tp - 1, tp - ncrp->tx_packets);
+       } else {
+               txp_set_next_free(tp, ncrp->head_free_tx_packet);
+               ncrp->head_free_tx_packet = tp - ncrp->tx_packets;
+       }
+       skb_co->tp = NULL;
+       ncrp->nr_tx_packets_outstanding--;
+}
+
+void release_tx_packet(struct netchannel2_ring_pair *ncrp,
+                      struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct txp_slot *tp = skb_co->tp;
+       grant_ref_t gref;
+       int r;
+       unsigned cntr;
+
+       if (skb_co->type == NC2_PACKET_TYPE_receiver_copy) {
+               while (1) {
+                       r = gnttab_claim_grant_reference(&skb_co->gref_pool);
+                       if (r == -ENOSPC)
+                               break;
+                       gref = (grant_ref_t)r;
+                       /* It's a subpage grant reference, so Xen
+                          guarantees to release it quickly.  Sit and
+                          wait for it to do so. */
+                       cntr = 0;
+                       while (!gnttab_end_foreign_access_ref(gref)) {
+                               cpu_relax();
+                               if (++cntr % 65536 == 0)
+                                       printk(KERN_WARNING "Having trouble 
ending gref %d for receiver copy.\n",
+                                              gref);
+                       }
+                       gnttab_release_grant_reference(&ncrp->gref_pool, gref);
+               }
+       } else if (skb_co->gref_pool != 0) {
+               gnttab_subfree_grant_references(skb_co->gref_pool,
+                                               &ncrp->gref_pool);
+       }
+
+       if (tp != NULL)
+               release_txp_slot(ncrp, skb);
+
+       nc2_free_skb(ncrp->interface, skb);
+}
+
+void fetch_fragment(struct netchannel2_ring_pair *ncrp,
+                   unsigned idx,
+                   struct netchannel2_fragment *frag,
+                   unsigned off)
+{
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              frag,
+                              sizeof(*frag),
+                              off + idx * sizeof(*frag));
+}
+
+/* Copy @count bytes from the skb's data area into its head, updating
+ * the pointers as appropriate.         The caller should ensure that there
+ * is actually enough space in the head. */
+void pull_through(struct sk_buff *skb, unsigned count)
+{
+       unsigned frag = 0;
+       unsigned this_frag;
+       void *buf;
+       void *va;
+
+       while (count != 0 && frag < skb_shinfo(skb)->nr_frags) {
+               this_frag = skb_shinfo(skb)->frags[frag].size;
+               if (this_frag > count)
+                       this_frag = count;
+               va = page_address(skb_shinfo(skb)->frags[frag].page);
+               buf = skb->tail;
+               memcpy(buf, va + skb_shinfo(skb)->frags[frag].page_offset,
+                      this_frag);
+               skb->tail += this_frag;
+               BUG_ON(skb->tail > skb->end);
+               skb_shinfo(skb)->frags[frag].size -= this_frag;
+               skb_shinfo(skb)->frags[frag].page_offset += this_frag;
+               skb->data_len -= this_frag;
+               count -= this_frag;
+               frag++;
+       }
+       for (frag = 0;
+            frag < skb_shinfo(skb)->nr_frags &&
+                    skb_shinfo(skb)->frags[frag].size == 0;
+            frag++) {
+               put_page(skb_shinfo(skb)->frags[frag].page);
+       }
+       skb_shinfo(skb)->nr_frags -= frag;
+       memmove(skb_shinfo(skb)->frags,
+               skb_shinfo(skb)->frags+frag,
+               sizeof(skb_shinfo(skb)->frags[0]) *
+               skb_shinfo(skb)->nr_frags);
+}
+
+#ifdef CONFIG_XEN_NETDEV2_BACKEND
+
+/* Zap a grant_mapping structure, releasing all mappings and the
+   reserved virtual address space.  Prepare the grant_mapping for
+   re-use. */
+void nc2_unmap_grants(struct grant_mapping *gm)
+{
+       struct gnttab_unmap_grant_ref op[MAX_GRANT_MAP_PAGES];
+       int i;
+
+       if (gm->mapping == NULL)
+               return;
+       for (i = 0; i < gm->nr_pages; i++) {
+               gnttab_set_unmap_op(&op[i],
+                                   (unsigned long)gm->mapping->addr +
+                                           i * PAGE_SIZE,
+                                   GNTMAP_host_map,
+                                   gm->handles[i]);
+       }
+       if (HYPERVISOR_grant_table_op(GNTTABOP_unmap_grant_ref, op, i))
+               BUG();
+       free_vm_area(gm->mapping);
+       memset(gm, 0, sizeof(*gm));
+}
+
+int nc2_map_grants(struct grant_mapping *gm,
+                  const grant_ref_t *grefs,
+                  unsigned nr_grefs,
+                  domid_t remote_domain)
+{
+       struct grant_mapping work;
+       struct gnttab_map_grant_ref op[MAX_GRANT_MAP_PAGES];
+       int i;
+
+       memset(&work, 0, sizeof(work));
+
+       if (nr_grefs > MAX_GRANT_MAP_PAGES || nr_grefs == 0)
+               return -EINVAL;
+
+       if (nr_grefs & (nr_grefs-1)) {
+               /* Must map a power-of-two number of pages. */
+               return -EINVAL;
+       }
+
+       work.nr_pages = nr_grefs;
+       work.mapping = alloc_vm_area(PAGE_SIZE * work.nr_pages);
+       if (!work.mapping)
+               return -ENOMEM;
+       for (i = 0; i < nr_grefs; i++)
+               gnttab_set_map_op(&op[i],
+                                 (unsigned long)work.mapping->addr +
+                                         i * PAGE_SIZE,
+                                 GNTMAP_host_map,
+                                 grefs[i],
+                                 remote_domain);
+
+       if (HYPERVISOR_grant_table_op(GNTTABOP_map_grant_ref, op, nr_grefs))
+               BUG();
+
+       for (i = 0; i < nr_grefs; i++) {
+               if (op[i].status) {
+                       work.nr_pages = i;
+                       nc2_unmap_grants(&work);
+                       return -EFAULT;
+               }
+               work.handles[i] = op[i].handle;
+       }
+
+       nc2_unmap_grants(gm);
+       *gm = work;
+       return 0;
+}
+#endif
diff --git a/drivers/xen/netchannel2/xmit_packet.c 
b/drivers/xen/netchannel2/xmit_packet.c
new file mode 100644
index 0000000..92fbabf
--- /dev/null
+++ b/drivers/xen/netchannel2/xmit_packet.c
@@ -0,0 +1,318 @@
+/* Things related to actually sending packet messages, and which is
+   shared across all transmit modes. */
+#include <linux/kernel.h>
+#include <linux/version.h>
+#include "netchannel2_core.h"
+
+/* We limit the number of transmitted packets which can be in flight
+   at any one time, as a somewhat paranoid safety catch. */
+#define MAX_TX_PACKETS MAX_PENDING_FINISH_PACKETS
+
+static enum transmit_policy transmit_policy(struct netchannel2 *nc,
+                                           struct sk_buff *skb)
+{
+       if (skb->len <= PACKET_PREFIX_SIZE && !skb_is_nonlinear(skb))
+               return transmit_policy_small;
+       else
+               return transmit_policy_grant;
+}
+
+/* Allocate resources for a small packet.  The entire thing will be
+   transmitted in the ring.  This is only called for small, linear
+   SKBs.  It always succeeds, but has an int return type for symmetry
+   with the other prepare_xmit_*() functions. */
+int prepare_xmit_allocate_small(struct netchannel2_ring_pair *ncrp,
+                               struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+
+       BUG_ON(skb_is_nonlinear(skb));
+       BUG_ON(skb->len > NETCHANNEL2_MAX_INLINE_BYTES);
+
+       skb_co->type = NC2_PACKET_TYPE_small;
+       skb_co->gref_pool = 0;
+       skb_co->inline_prefix_size = skb->len;
+
+       return 0;
+}
+
+/* Figure out how much space @tp will take up on the ring. */
+static unsigned get_transmitted_packet_msg_size(struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       return (sizeof(struct netchannel2_msg_packet) +
+               sizeof(struct netchannel2_fragment) * skb_co->nr_fragments +
+               skb_co->inline_prefix_size + 7) & ~7;
+}
+
+/* Do the minimum amount of work to be certain that when we come to
+   transmit this packet we won't run out of resources. This includes
+   figuring out how we're going to fragment the packet for
+   transmission, which buffers we're going to use, etc. Return <0 if
+   insufficient resources are available right now, or 0 if we
+   succeed. */
+/* Careful: this may allocate e.g. a TXP slot and then discover that
+   it can't reserve ring space.  In that case, the TXP remains
+   allocated.  The expected case is that the caller will arrange for
+   us to retry the allocation later, in which case we'll pick up the
+   already-allocated buffers. */
+int prepare_xmit_allocate_resources(struct netchannel2 *nc,
+                                   struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       enum transmit_policy policy;
+       unsigned msg_size;
+       int r;
+
+       if (skb_co->policy == transmit_policy_unknown) {
+               policy = transmit_policy(nc, skb);
+               switch (policy) {
+               case transmit_policy_small:
+                       r = prepare_xmit_allocate_small(&nc->rings, skb);
+                       break;
+               case transmit_policy_grant:
+                       r = prepare_xmit_allocate_grant(&nc->rings, skb);
+                       break;
+               default:
+                       BUG();
+                       /* Shut the compiler up. */
+                       r = -1;
+               }
+               if (r < 0)
+                       return r;
+               skb_co->policy = policy;
+       }
+
+       msg_size = get_transmitted_packet_msg_size(skb);
+       if (nc2_reserve_payload_bytes(&nc->rings.prod_ring, msg_size))
+               return 0;
+
+       return -1;
+}
+
+/* Transmit a packet which has previously been prepared with
+   prepare_xmit_allocate_resources(). */
+/* Once this has been called, the ring must not be flushed until the
+   TX hypercall batcher is (assuming this ring has a hypercall
+   batcher). */
+int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp,
+                         struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct netchannel2 *nc = ncrp->interface;
+       unsigned msg_size;
+       volatile struct netchannel2_msg_packet *msg;
+
+       msg_size = get_transmitted_packet_msg_size(skb);
+       /* Un-reserve the space we reserved for the packet. */
+       BUG_ON(ncrp->prod_ring.reserve < msg_size);
+       ncrp->prod_ring.reserve -= msg_size;
+       if (!nc2_can_send_payload_bytes(&ncrp->prod_ring, msg_size)) {
+               /* Aw, crud.  We had to transmit a PAD message at just
+                  the wrong time, and our attempt to reserve ring
+                  space failed.  Delay transmiting this packet
+                  Make sure we redo the space reserve */
+               ncrp->prod_ring.reserve += msg_size;
+               return 0;
+       }
+       __nc2_avoid_ring_wrap(&ncrp->prod_ring, msg_size);
+
+       /* Set up part of the message.  We do the message header
+          itself and the inline prefix.  The individual xmit_*
+          methods are responsible for the fragments.  They may also
+          set some more msg flags. */
+       msg = __nc2_get_message_ptr(&ncrp->prod_ring);
+       msg->hdr.type = NETCHANNEL2_MSG_PACKET;
+       msg->hdr.flags = 0;
+       msg->id = skb_co->tp - ncrp->tx_packets;
+       msg->type = skb_co->type;
+       msg->flags = 0;
+       msg->prefix_size = skb_co->inline_prefix_size;
+
+       /* We cast away the volatile to avoid compiler warnings, and
+          then use barrier()s to discourage gcc from using msg->frags
+          in CSE or somesuch.  It's kind of unlikely that it would,
+          but better to make sure. */
+       barrier();
+       memcpy((void *)(msg->frags + skb_co->nr_fragments),
+              skb->data,
+              skb_co->inline_prefix_size);
+       barrier();
+
+       switch (skb_co->policy) {
+       case transmit_policy_small:
+               /* Nothing to do */
+               break;
+       case transmit_policy_grant:
+               xmit_grant(ncrp, skb, msg);
+               break;
+       default:
+               BUG();
+       }
+
+       /* The transmission method may have decided not to use all the
+          fragments it reserved, which changes the message size. */
+       msg_size = get_transmitted_packet_msg_size(skb);
+       msg->hdr.size = msg_size;
+
+       ncrp->prod_ring.prod_pvt += msg_size;
+
+       BUG_ON(ncrp->prod_ring.bytes_available < msg_size);
+
+       ncrp->prod_ring.bytes_available -= msg_size;
+
+       ncrp->pending_time_sensitive_messages = 1;
+
+       if (skb_co->tp) {
+               ncrp->expected_finish_messages++;
+               /* We're now ready to accept a FINISH message for this
+                  packet. */
+               skb_co->expecting_finish = 1;
+       } else {
+               /* This packet doesn't need a FINISH message.  Queue
+                  it up to be released as soon as we flush the
+                  hypercall batcher and the ring. */
+               nc->stats.tx_bytes += skb->len;
+               nc->stats.tx_packets++;
+               __skb_queue_tail(&ncrp->release_on_flush_batcher, skb);
+       }
+
+       return 1;
+}
+
+/* Arrange that @skb will be sent on ring @ncrp soon.  Assumes that
+   prepare_xmit_allocate_resources() has been successfully called on
+   @skb already. */
+void queue_packet_to_interface(struct sk_buff *skb,
+                              struct netchannel2_ring_pair *ncrp)
+{
+       __skb_queue_tail(&ncrp->pending_tx_queue, skb);
+       if (ncrp->pending_tx_queue.qlen == 1)
+               nc2_kick(ncrp);
+}
+
+int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev)
+{
+       struct netchannel2 *nc = netdev_priv(dev);
+       struct skb_cb_overlay *sco = get_skb_overlay(skb);
+       int r;
+
+       memset(sco, 0, sizeof(*sco));
+
+       spin_lock_bh(&nc->rings.lock);
+
+       if (!nc->rings.is_attached) {
+               spin_unlock_bh(&nc->rings.lock);
+               dev_kfree_skb(skb);
+               nc->stats.tx_dropped++;
+               return NETDEV_TX_OK;
+       }
+
+       r = prepare_xmit_allocate_resources(nc, skb);
+       if (r < 0)
+               goto out_busy;
+       queue_packet_to_interface(skb, &nc->rings);
+       spin_unlock_bh(&nc->rings.lock);
+
+       return NETDEV_TX_OK;
+
+out_busy:
+       /* Some more buffers may have arrived, so kick the worker
+        * thread to go and have a look. */
+       nc2_kick(&nc->rings);
+
+       __skb_queue_tail(&nc->pending_skbs, skb);
+       nc->is_stopped = 1;
+       netif_stop_queue(dev);
+       spin_unlock_bh(&nc->rings.lock);
+       return NETDEV_TX_OK;
+}
+
+
+void nc2_handle_finish_packet_msg(struct netchannel2 *nc,
+                                 struct netchannel2_ring_pair *ncrp,
+                                 struct netchannel2_msg_hdr *hdr)
+{
+       struct skb_cb_overlay *sco;
+       struct netchannel2_msg_finish_packet msg;
+       struct txp_slot *tp;
+       struct sk_buff *skb;
+
+       if (hdr->size < sizeof(msg)) {
+               pr_debug("Packet finish message had strange size %d\n",
+                        hdr->size);
+               return;
+       }
+       nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
+       if (msg.id > NR_TX_PACKETS) {
+               pr_debug("Other end tried to end bad packet id %d\n",
+                        msg.id);
+               return;
+       }
+       tp = &ncrp->tx_packets[msg.id];
+       skb = txp_get_skb(tp);
+       if (!skb) {
+               pr_debug("Other end tried to end packet id %d which wasn't in 
use\n",
+                        msg.id);
+               return;
+       }
+       sco = get_skb_overlay(skb);
+       /* Careful: if the remote is malicious, they may try to end a
+          packet after we allocate it but before we send it (e.g. if
+          we've had to back out because we didn't have enough ring
+          space). */
+       if (!sco->expecting_finish) {
+               pr_debug("Other end finished packet before we sent it?\n");
+               return;
+       }
+       nc->stats.tx_bytes += skb->len;
+       nc->stats.tx_packets++;
+       release_tx_packet(ncrp, skb);
+       ncrp->expected_finish_messages--;
+}
+
+
+/* ------------------------ Control-path operations ---------------------- */
+void nc2_handle_set_max_packets_msg(struct netchannel2_ring_pair *ncrp,
+                                   struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_set_max_packets msg;
+
+       if (hdr->size != sizeof(msg)) {
+               pr_debug("Set max packets message had strange size %d\n",
+                        hdr->size);
+               return;
+       }
+       if (ncrp->max_tx_packets_outstanding != 0) {
+               pr_debug("Other end tried to change number of outstanding 
packets from %d.\n",
+                        ncrp->max_tx_packets_outstanding);
+               return;
+       }
+       nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
+       /* Limit the number of outstanding packets to something sane.
+          This is a little bit paranoid (it should be safe to set
+          this arbitrarily high), but limiting it avoids nasty
+          surprises in untested configurations. */
+       if (msg.max_outstanding_packets > MAX_TX_PACKETS) {
+               pr_debug("Other end tried to set max outstanding to %d, 
limiting to %d.\n",
+                        msg.max_outstanding_packets, MAX_TX_PACKETS);
+               ncrp->max_tx_packets_outstanding = MAX_TX_PACKETS;
+       } else {
+               ncrp->max_tx_packets_outstanding = msg.max_outstanding_packets;
+       }
+}
+
+/* Release all packets on the transmitted and pending_tx lists. */
+void drop_pending_tx_packets(struct netchannel2_ring_pair *ncrp)
+{
+       struct sk_buff *skb;
+       unsigned x;
+
+       nc2_queue_purge(ncrp, &ncrp->pending_tx_queue);
+       for (x = 0; x < NR_TX_PACKETS; x++) {
+               skb = txp_get_skb(&ncrp->tx_packets[x]);
+               if (skb)
+                       release_tx_packet(ncrp, skb);
+       }
+}
+
diff --git a/include/xen/interface/io/netchannel2.h 
b/include/xen/interface/io/netchannel2.h
new file mode 100644
index 0000000..c45963e
--- /dev/null
+++ b/include/xen/interface/io/netchannel2.h
@@ -0,0 +1,106 @@
+#ifndef __NETCHANNEL2_H__
+#define __NETCHANNEL2_H__
+
+#include <xen/interface/io/uring.h>
+
+/* Tell the other end how many packets its allowed to have
+ * simultaneously outstanding for transmission.         An endpoint must not
+ * send PACKET messages which would take it over this limit.
+ *
+ * The SET_MAX_PACKETS message must be sent before any PACKET
+ * messages.  It should only be sent once, unless the ring is
+ * disconnected and reconnected.
+ */
+#define NETCHANNEL2_MSG_SET_MAX_PACKETS 1
+struct netchannel2_msg_set_max_packets {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t max_outstanding_packets;
+};
+
+/* Pass a packet to the other end.  The packet consists of a header,
+ * followed by a bunch of fragment descriptors, followed by an inline
+ * packet prefix.  Every fragment descriptor in a packet must be the
+ * same type, and the type is determined by the header.         The receiving
+ * endpoint should respond with a finished_packet message as soon as
+ * possible.  The prefix may be no more than
+ * NETCHANNEL2_MAX_INLINE_BYTES.  Packets may contain no more than
+ * NETCHANNEL2_MAX_PACKET_BYTES bytes of data, including all fragments
+ * and the prefix.
+ */
+#define NETCHANNEL2_MSG_PACKET 2
+#define NETCHANNEL2_MAX_PACKET_BYTES 65536
+#define NETCHANNEL2_MAX_INLINE_BYTES 256
+struct netchannel2_fragment {
+       uint16_t size;
+       /* The offset is always relative to the start of the page.
+          For pre_posted packet types, it is not relative to the
+          start of the buffer (although the fragment range will
+          obviously be within the buffer range). */
+       uint16_t off;
+       union {
+               struct {
+                       grant_ref_t gref;
+               } receiver_copy;
+       };
+};
+struct netchannel2_msg_packet {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t id; /* Opaque ID which is echoed into the finished
+                       packet message. */
+       uint8_t type;
+       uint8_t flags;
+       uint8_t pad0;
+       uint8_t pad1;
+       uint16_t prefix_size;
+       uint16_t pad2;
+       uint16_t pad3;
+       uint16_t pad4;
+       /* Variable-size array.  The number of elements is determined
+          by the size of the message. */
+       /* Until we support scatter-gather, this will be either 0 or 1
+          element. */
+       struct netchannel2_fragment frags[0];
+};
+
+/* If set, the transmitting domain requires an event urgently when
+ * this packet's finish message is sent.  Otherwise, the event can be
+ * delayed. */
+#define NC2_PACKET_FLAG_need_event 8
+
+/* The mechanism which should be used to receive the data part of
+ * a packet:
+ *
+ * receiver_copy -- The transmitting domain has granted the receiving
+ *                 domain access to the original RX buffers using
+ *                 copy-only grant references.  The receiving domain
+ *                 should copy the data out of the buffers and issue
+ *                 a FINISH message.
+ *
+ *                 Due to backend bugs, it is in not safe to use this
+ *                 packet type except on bypass rings.
+ *
+ * small -- The packet does not have any fragment descriptors
+ *         (i.e. the entire thing is inline in the ring).  The receiving
+ *         domain should simply the copy the packet out of the ring
+ *         into a locally allocated buffer.  No FINISH message is required
+ *         or allowed.
+ *
+ *         This packet type may be used on any ring.
+ *
+ * All endpoints must be able to receive all packet types, but note
+ * that it is correct to treat receiver_map and small packets as
+ * receiver_copy ones. */
+#define NC2_PACKET_TYPE_receiver_copy 1
+#define NC2_PACKET_TYPE_small 4
+
+/* Tell the other end that we're finished with a message it sent us,
+   and it can release the transmit buffers etc.         This must be sent in
+   response to receiver_copy and receiver_map packets. It must not be
+   sent in response to pre_posted or small packets. */
+#define NETCHANNEL2_MSG_FINISH_PACKET 3
+struct netchannel2_msg_finish_packet {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t id;
+};
+
+#endif /* !__NETCHANNEL2_H__ */
diff --git a/include/xen/interface/io/uring.h b/include/xen/interface/io/uring.h
new file mode 100644
index 0000000..663c3d7
--- /dev/null
+++ b/include/xen/interface/io/uring.h
@@ -0,0 +1,426 @@
+#ifndef __XEN_PUBLIC_IO_URING_H__
+#define __XEN_PUBLIC_IO_URING_H__
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <asm/system.h>
+
+typedef unsigned RING_IDX;
+
+#define NETCHANNEL2_MSG_PAD 255
+
+/* The sring structures themselves.     The _cons and _prod variants are
+   different views of the same bit of shared memory, and are supposed
+   to provide better checking of the expected use patterns.     Fields in
+   the shared ring are owned by either the producer end or the
+   consumer end.  If a field is owned by your end, the other end will
+   never modify it.     If it's owned by the other end, the other end is
+   allowed to modify it whenever it likes, and you can never do so.
+
+   Fields owned by the other end are always const (because you can't
+   change them).  They're also volatile, because there are a bunch
+   of places where we go:
+
+   local_x = sring->x;
+   validate(local_x);
+   use(local_x);
+
+   and it would be very bad if the compiler turned that into:
+
+   local_x = sring->x;
+   validate(sring->x);
+   use(local_x);
+
+   because that contains a potential TOCTOU race (hard to exploit, but
+   still present).     The compiler is only allowed to do that
+   optimisation because it knows that local_x == sring->x at the start
+   of the call to validate(), and it only knows that if it can reorder
+   the read of sring->x over the sequence point at the end of the
+   first statement.     In other words, it can only do the bad
+   optimisation if it knows that reads of sring->x are side-effect
+   free.  volatile stops it from making that assumption.
+
+   We don't need a full memory barrier here, because it's sufficient
+   to copy the volatile data into stable guest-local storage, and
+   volatile achieves that.     i.e. we don't need local_x to be precisely
+   sring->x, but we do need it to be a stable snapshot of some
+   previous valud of sring->x.
+
+   Note that there are still plenty of other places where we *do* need
+   full barriers.  volatile just deals with this one, specific, case.
+
+   We could also deal with it by putting compiler barriers in all over
+   the place.  The downside of that approach is that you need to put
+   the barrier()s in lots of different places (basically, everywhere
+   which needs to access these fields), and it's easy to forget one.
+   barrier()s also have somewhat heavier semantics than volatile
+   (because they prevent all reordering, rather than just reordering
+   on this one field), although that's pretty much irrelevant because
+   gcc usually treats pretty much any volatile access as a call to
+   barrier().
+*/
+
+/* Messages are sent over sring pairs. Each sring in a pair provides
+ * a unidirectional byte stream which can generate events when either
+ * the producer or consumer pointers cross a particular threshold.
+ *
+ * We define both sring_prod and sring_cons structures.         The two
+ * structures will always map onto the same physical bytes in memory,
+ * but they provide different views of that memory which are
+ * appropriate to either producers or consumers.
+ *
+ * Obviously, the endpoints need to agree on which end produces
+ * messages on which ring.     The endpoint which provided the memory
+ * backing the ring always produces on the first sring, and the one
+ * which just mapped the ring produces on the second.  By convention,
+ * these are known as the frontend and backend, respectively.
+ */
+
+/* For both rings, the producer (consumer) pointers point at the
+ * *next* byte which is going to be produced (consumed).  An endpoint
+ * must generate an event on the event channel port if it moves the
+ * producer pointer (consumer pointer) across prod_event (cons_event).
+ *
+ * i.e if an endpoint ever updates a pointer so that the old pointer
+ * is strictly less than the event, and the new pointer is greater
+ * than or equal to the event then the remote must be notified.         If
+ * the pointer overflows the ring, treat the new value as if it were
+ * (actual new value) + (1 << 32).
+ */
+struct netchannel2_sring_prod {
+       RING_IDX prod;
+       volatile const RING_IDX cons;
+       volatile const RING_IDX prod_event;
+       RING_IDX cons_event;
+       unsigned char pad[48];
+};
+
+struct netchannel2_sring_cons {
+       volatile const RING_IDX prod;
+       RING_IDX cons;
+       RING_IDX prod_event;
+       volatile const RING_IDX cons_event;
+       unsigned char pad[48];
+};
+
+struct netchannel2_frontend_shared {
+       struct netchannel2_sring_prod prod;
+       struct netchannel2_sring_cons cons;
+};
+
+struct netchannel2_backend_shared {
+       struct netchannel2_sring_cons cons;
+       struct netchannel2_sring_prod prod;
+};
+
+struct netchannel2_prod_ring {
+       struct netchannel2_sring_prod *sring;
+       void *payload;
+       RING_IDX prod_pvt;
+       /* This is the number of bytes available after prod_pvt last
+          time we checked, minus the number of bytes which we've
+          consumed since then.  It's used to a avoid a bunch of
+          memory barriers when checking for ring space. */
+       unsigned bytes_available;
+       /* Number of bytes reserved by nc2_reserve_payload_bytes() */
+       unsigned reserve;
+       size_t payload_bytes;
+};
+
+struct netchannel2_cons_ring {
+       struct netchannel2_sring_cons *sring;
+       const volatile void *payload;
+       RING_IDX cons_pvt;
+       size_t payload_bytes;
+};
+
+/* A message header.  There is one of these at the start of every
+ * message.     @type is one of the #define's below, and @size is the
+ * size of the message, including the header and any padding.
+ * size should be a multiple of 8 so we avoid unaligned memory copies.
+ * structs defining message formats should have sizes multiple of 8
+ * bytes and should use paddding fields if needed.
+ */
+struct netchannel2_msg_hdr {
+       uint8_t type;
+       uint8_t flags;
+       uint16_t size;
+};
+
+/* Copy some bytes from the shared ring to a stable local buffer,
+ * starting at the private consumer pointer.  Does not update the
+ * private consumer pointer.
+ */
+static inline void nc2_copy_from_ring_off(struct netchannel2_cons_ring *ring,
+                                         void *buf,
+                                         size_t nbytes,
+                                         unsigned off)
+{
+       unsigned start, end;
+
+       start = (ring->cons_pvt + off) & (ring->payload_bytes-1);
+       end = (ring->cons_pvt + nbytes + off) & (ring->payload_bytes-1);
+       /* We cast away the volatile modifier to get rid of an
+          irritating compiler warning, and compensate with a
+          barrier() at the end. */
+       memcpy(buf, (const void *)ring->payload + start, nbytes);
+       barrier();
+}
+
+static inline void nc2_copy_from_ring(struct netchannel2_cons_ring *ring,
+                                     void *buf,
+                                     size_t nbytes)
+{
+       nc2_copy_from_ring_off(ring, buf, nbytes, 0);
+}
+
+
+/* Copy some bytes to the shared ring, starting at the private
+ * producer pointer.  Does not update the private pointer.
+ */
+static inline void nc2_copy_to_ring_off(struct netchannel2_prod_ring *ring,
+                                       const void *src,
+                                       unsigned nr_bytes,
+                                       unsigned off)
+{
+       unsigned start, end;
+
+       start = (ring->prod_pvt + off) & (ring->payload_bytes-1);
+       end = (ring->prod_pvt + nr_bytes + off) & (ring->payload_bytes-1);
+       memcpy(ring->payload + start, src, nr_bytes);
+}
+
+static inline void nc2_copy_to_ring(struct netchannel2_prod_ring *ring,
+                                   const void *src,
+                                   unsigned nr_bytes)
+{
+       nc2_copy_to_ring_off(ring, src, nr_bytes, 0);
+}
+
+static inline void __nc2_send_pad(struct netchannel2_prod_ring *ring,
+                                 unsigned nr_bytes)
+{
+       struct netchannel2_msg_hdr msg;
+       msg.type = NETCHANNEL2_MSG_PAD;
+       msg.flags = 0;
+       msg.size = nr_bytes;
+       nc2_copy_to_ring(ring, &msg, sizeof(msg));
+       ring->prod_pvt += nr_bytes;
+       ring->bytes_available -= nr_bytes;
+}
+
+static inline int __nc2_ring_would_wrap(struct netchannel2_prod_ring *ring,
+                                       unsigned nr_bytes)
+{
+       RING_IDX mask;
+       mask = ~(ring->payload_bytes - 1);
+       return (ring->prod_pvt & mask) != ((ring->prod_pvt + nr_bytes) & mask);
+}
+
+static inline unsigned __nc2_pad_needed(struct netchannel2_prod_ring *ring)
+{
+       return ring->payload_bytes -
+               (ring->prod_pvt & (ring->payload_bytes - 1));
+}
+
+static inline void __nc2_avoid_ring_wrap(struct netchannel2_prod_ring *ring,
+                                        unsigned nr_bytes)
+{
+       if (!__nc2_ring_would_wrap(ring, nr_bytes))
+               return;
+       __nc2_send_pad(ring, __nc2_pad_needed(ring));
+
+}
+
+/* Prepare a message for the other end and place it on the shared
+ * ring, updating the private producer pointer.         You need to call
+ * nc2_flush_messages() before the message is actually made visible to
+ * the other end.  It is permissible to send several messages in a
+ * batch and only flush them once.
+ */
+static inline void nc2_send_message(struct netchannel2_prod_ring *ring,
+                                   unsigned type,
+                                   unsigned flags,
+                                   const void *msg,
+                                   size_t size)
+{
+       struct netchannel2_msg_hdr *hdr = (struct netchannel2_msg_hdr *)msg;
+
+       __nc2_avoid_ring_wrap(ring, size);
+
+       hdr->type = type;
+       hdr->flags = flags;
+       hdr->size = size;
+
+       nc2_copy_to_ring(ring, msg, size);
+       ring->prod_pvt += size;
+       BUG_ON(ring->bytes_available < size);
+       ring->bytes_available -= size;
+}
+
+static inline volatile void *__nc2_get_message_ptr(struct 
netchannel2_prod_ring *ncrp)
+{
+       return (volatile void *)ncrp->payload +
+               (ncrp->prod_pvt & (ncrp->payload_bytes-1));
+}
+
+/* Copy the private producer pointer to the shared producer pointer,
+ * with a suitable memory barrier such that all messages placed on the
+ * ring are stable before we do the copy.  This effectively pushes any
+ * messages which we've just sent out to the other end.         Returns 1 if
+ * we need to notify the other end and 0 otherwise.
+ */
+static inline int nc2_flush_ring(struct netchannel2_prod_ring *ring)
+{
+       RING_IDX old_prod, new_prod;
+
+       old_prod = ring->sring->prod;
+       new_prod = ring->prod_pvt;
+
+       wmb();
+
+       ring->sring->prod = new_prod;
+
+       /* We need the update to prod to happen before we read
+        * event. */
+       mb();
+
+       /* We notify if the producer pointer moves across the event
+        * pointer. */
+       if ((RING_IDX)(new_prod - ring->sring->prod_event) <
+           (RING_IDX)(new_prod - old_prod))
+               return 1;
+       else
+               return 0;
+}
+
+/* Copy the private consumer pointer to the shared consumer pointer,
+ * with a memory barrier so that any previous reads from the ring
+ * complete before the pointer is updated.     This tells the other end
+ * that we're finished with the messages, and that it can re-use the
+ * ring space for more messages.  Returns 1 if we need to notify the
+ * other end and 0 otherwise.
+ */
+static inline int nc2_finish_messages(struct netchannel2_cons_ring *ring)
+{
+       RING_IDX old_cons, new_cons;
+
+       old_cons = ring->sring->cons;
+       new_cons = ring->cons_pvt;
+
+       /* Need to finish reading from the ring before updating
+          cons */
+       mb();
+       ring->sring->cons = ring->cons_pvt;
+
+       /* Need to publish our new consumer pointer before checking
+          event. */
+       mb();
+       if ((RING_IDX)(new_cons - ring->sring->cons_event) <
+           (RING_IDX)(new_cons - old_cons))
+               return 1;
+       else
+               return 0;
+}
+
+/* Check whether there are any unconsumed messages left on the shared
+ * ring.  Returns 1 if there are, and 0 if there aren't.  If there are
+ * no more messages, set the producer event so that we'll get a
+ * notification as soon as another one gets sent.  It is assumed that
+ * all messages up to @prod have been processed, and none of the ones
+ * after it have been. */
+static inline int nc2_final_check_for_messages(struct netchannel2_cons_ring 
*ring,
+                                              RING_IDX prod)
+{
+       if (prod != ring->sring->prod)
+               return 1;
+       /* Request an event when more stuff gets poked on the ring. */
+       ring->sring->prod_event = prod + 1;
+
+       /* Publish event before final check for responses. */
+       mb();
+       if (prod != ring->sring->prod)
+               return 1;
+       else
+               return 0;
+}
+
+/* Can we send a message with @nr_bytes payload bytes? Returns 1 if
+ * we can or 0 if we can't.     If there isn't space right now, set the
+ * consumer event so that we'll get notified when space is
+ * available. */
+static inline int nc2_can_send_payload_bytes(struct netchannel2_prod_ring 
*ring,
+                                            unsigned nr_bytes)
+{
+       unsigned space;
+       RING_IDX cons;
+       BUG_ON(ring->bytes_available > ring->payload_bytes);
+       /* Times 2 because we might need to send a pad message */
+       if (likely(ring->bytes_available > nr_bytes * 2 + ring->reserve))
+               return 1;
+       if (__nc2_ring_would_wrap(ring, nr_bytes))
+               nr_bytes += __nc2_pad_needed(ring);
+retry:
+       cons = ring->sring->cons;
+       space = ring->payload_bytes - (ring->prod_pvt - cons);
+       if (likely(space >= nr_bytes + ring->reserve)) {
+               /* We have enough space to send the message. */
+
+               /* Need to make sure that the read of cons happens
+                  before any following memory writes. */
+               mb();
+
+               ring->bytes_available = space;
+
+               return 1;
+       } else {
+               /* Not enough space available.  Set an event pointer
+                  when cons changes.  We need to be sure that the
+                  @cons used here is the same as the cons used to
+                  calculate @space above, and the volatile modifier
+                  on sring->cons achieves that. */
+               ring->sring->cons_event = cons + 1;
+
+               /* Check whether more space became available while we
+                  were messing about. */
+
+               /* Need the event pointer to be stable before we do
+                  the check. */
+               mb();
+               if (unlikely(cons != ring->sring->cons)) {
+                       /* Cons pointer changed.  Try again. */
+                       goto retry;
+               }
+
+               /* There definitely isn't space on the ring now, and
+                  an event has been set such that we'll be notified
+                  if more space becomes available. */
+               /* XXX we get a notification as soon as any more space
+                  becomes available.  We could maybe optimise by
+                  setting the event such that we only get notified
+                  when we know that enough space is available.  The
+                  main complication is handling the case where you
+                  try to send a message of size A, fail due to lack
+                  of space, and then try to send one of size B, where
+                  B < A.  It's not clear whether you want to set the
+                  event for A bytes or B bytes.  The obvious answer
+                  is B, but that means moving the event pointer
+                  backwards, and it's not clear that that's always
+                  safe.  Always setting for a single byte is safe, so
+                  stick with that for now. */
+               return 0;
+       }
+}
+
+static inline int nc2_reserve_payload_bytes(struct netchannel2_prod_ring *ring,
+                                           unsigned nr_bytes)
+{
+       if (nc2_can_send_payload_bytes(ring, nr_bytes)) {
+               ring->reserve += nr_bytes;
+               return 1;
+       } else {
+               return 0;
+       }
+}
+
+#endif /* __XEN_PUBLIC_IO_URING_H__ */
-- 
1.6.3.1


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel

<Prev in Thread] Current Thread [Next in Thread>