This is an archived copy of the Xen.org mailing list, which we have preserved to ensure that existing links to archives are not broken. The live archive, which contains the latest emails, can be found at http://lists.xen.org/
Home Products Support Community News


[Xen-API] [PATCH 2 of 3] CP-1884: implementation of VDI.copy can now use

To: xen-api@xxxxxxxxxxxxxxxxxxx
Subject: [Xen-API] [PATCH 2 of 3] CP-1884: implementation of VDI.copy can now use the import_raw_vdi HTTP handler to copy between hosts
From: David Scott <dave.scott@xxxxxxxxxxxxx>
Date: Mon, 23 Aug 2010 13:06:41 +0100
Delivery-date: Mon, 23 Aug 2010 05:29:00 -0700
Envelope-to: www-data@xxxxxxxxxxxxxxxxxxx
In-reply-to: <patchbomb.1282565199@ely>
List-help: <mailto:xen-api-request@lists.xensource.com?subject=help>
List-id: Discussion of API issues surrounding Xen <xen-api.lists.xensource.com>
List-post: <mailto:xen-api@lists.xensource.com>
List-subscribe: <http://lists.xensource.com/mailman/listinfo/xen-api>, <mailto:xen-api-request@lists.xensource.com?subject=subscribe>
List-unsubscribe: <http://lists.xensource.com/mailman/listinfo/xen-api>, <mailto:xen-api-request@lists.xensource.com?subject=unsubscribe>
References: <patchbomb.1282565199@ely>
Sender: xen-api-bounces@xxxxxxxxxxxxxxxxxxx
User-agent: Mercurial-patchbomb/1.4.3
# HG changeset patch
# User David Scott <dave.scott@xxxxxxxxxxxxx>
# Date 1282565148 -3600
# Node ID f9725bc799887fb205bf1da8cf3474288529dc90
# Parent  23fa063db91d4eae0179fb29179002a85a75cf31
CP-1884: implementation of VDI.copy can now use the import_raw_vdi HTTP handler 
to copy between hosts.

Signed-off-by: David Scott <dave.scott@xxxxxxxxxxxxx>

diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/import_raw_vdi.ml
--- a/ocaml/xapi/import_raw_vdi.ml      Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/import_raw_vdi.ml      Mon Aug 23 13:05:48 2010 +0100
@@ -23,14 +23,16 @@
 open Sparse_encoding
 open Unixext
 open Pervasiveext
+open Client
 let receive_chunks (s: Unix.file_descr) (fd: Unix.file_descr) = 
        Chunk.fold (fun () -> Chunk.write fd) () s
 let vdi_of_req ~__context (req: request) = 
+       let all = req.Http.query @ req.Http.cookie in
        let vdi = 
-               if List.mem_assoc "vdi" req.Http.query
-               then List.assoc "vdi" req.Http.query
+               if List.mem_assoc "vdi" all
+               then List.assoc "vdi" all
                else raise (Failure "Missing vdi query parameter") in
        if Db_cache.DBCache.is_valid_ref vdi 
        then Ref.of_string vdi 
@@ -40,21 +42,25 @@
   req.close <- true;
   Xapi_http.with_context "Importing raw VDI" req s
     (fun __context ->
+       let all = req.Http.query @ req.Http.cookie in
       let vdi = vdi_of_req ~__context req in
-      let chunked = List.mem_assoc "chunked" req.Http.query in
-      try
-       match req.transfer_encoding, req.content_length with
-       | Some "chunked", _ ->
+      let chunked = List.mem_assoc "chunked" all in
+      let task_id = Context.get_task_id __context in
+        debug "import_raw_vdi task_id = %s vdi = %s; chunked = %b" 
(Ref.string_of task_id) (Ref.string_of vdi) chunked;
+        try
+       match req.transfer_encoding with
+       | Some "chunked" ->
            error "Chunked encoding not yet implemented in the import code";
            Http_svr.headers s http_403_forbidden;
            raise (Failure "import code cannot handle chunked encoding")
-       | None, Some len ->
+       | None ->
            let headers = Http.http_200_ok ~keep_alive:false () @
-             [ Http.task_id_hdr ^ ":" ^ (Ref.string_of (Context.get_task_id 
+             [ Http.task_id_hdr ^ ":" ^ (Ref.string_of task_id);
                content_type ] in
             Http_svr.headers s headers;
+               Server_helpers.exec_with_new_task "VDI.import" 
+               (fun __context -> 
                 Sm_fs_ops.with_block_attached_device __context rpc session_id 
vdi `RW
                   (fun device ->
                      let fd = Unix.openfile device  [ Unix.O_WRONLY ] 0 in
@@ -63,16 +69,18 @@
                             if chunked
                             then receive_chunks s fd
-                            else ignore(Unixext.copy_file ~limit:len s fd);
-                            Unixext.fsync fd
+                            else ignore(Unixext.copy_file 
?limit:req.content_length s fd);
+                            Unixext.fsync fd;
                           with Unix.Unix_error(Unix.EIO, _, _) ->
                             raise (Api_errors.Server_error 
(Api_errors.vdi_io_error, ["Device I/O errors"]))
                        (fun () -> Unix.close fd)
-                  );
-           TaskHelper.complete ~__context []
+                  )
+           );
+           TaskHelper.complete ~__context [];
       with e ->
+       error "Caught exception: %s" (ExnHelper.string_of_exn e);
+       log_backtrace ();
        TaskHelper.failed ~__context (Api_errors.internal_error, ["Caught 
exception: " ^ (ExnHelper.string_of_exn e)]);
        raise e)
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/message_forwarding.ml
--- a/ocaml/xapi/message_forwarding.ml  Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/message_forwarding.ml  Mon Aug 23 13:05:48 2010 +0100
@@ -2862,9 +2862,9 @@
     let copy ~__context ~vdi ~sr =
       info "VDI.copy: VDI = '%s'; SR = '%s'" (vdi_uuid ~__context vdi) 
(sr_uuid ~__context sr);
       let local_fn = Local.VDI.copy ~vdi ~sr in
-      let src_sr = Db.VDI.get_SR ~__context ~self:vdi in
       (* No need to lock the VDI because the VBD.plug will do that for us *)
-      SR.forward_sr_multiple_op ~local_fn ~__context ~srs:[src_sr;sr] 
+      (* Forward the request to a host which can read the source VDI *)
+      forward_vdi_op ~local_fn ~__context ~self:vdi
        (fun session_id rpc -> Client.VDI.copy rpc session_id vdi sr)
     let resize ~__context ~vdi ~size =
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sm_fs_ops.ml
--- a/ocaml/xapi/sm_fs_ops.ml   Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.ml   Mon Aug 23 13:05:48 2010 +0100
@@ -41,6 +41,35 @@
     | vdi :: vdis -> with_block_attached_device __context rpc session_id vdi 
mode (fun path -> loop (path :: acc) vdis) in
   loop [] vdis
+(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
+let with_import_vdi __context rpc session_id vdi f = 
+       let subtask_of = Context.get_task_id __context in
+       Server_helpers.exec_with_new_task "VDI.import" 
+       (fun __context -> 
+               (* Find a suitable host for the SR containing the VDI *)
+               let sr = Db.VDI.get_SR ~__context ~self:vdi in
+               let host = Importexport.find_host_for_sr ~__context sr in
+               let address = Db.Host.get_address ~__context ~self:host in
+               let importtask = Client.Task.create rpc session_id "VDI.import" 
"" in
+                       let headers = Xapi_http.http_request 
+                               ~cookie:(["session_id", Ref.string_of 
+                                         "task_id", Ref.string_of importtask;
+                                         "vdi", Ref.string_of vdi;
+                                         "chunked", "true"])
+                               Http.Put address Constants.import_raw_vdi_uri in
+                       let writer _ _ sock = f sock; true in
+                       if not (Xmlrpcclient.do_secure_http_rpc 
+                               ~task_id:(Ref.string_of (Context.get_task_id 
+                               ~host:address ~port:Xapi_globs.default_ssl_port 
~headers ~body:"" writer)
+                       then failwith "with_import_vdi";
+                       debug "Waiting for import task (%s) to complete" 
(Ref.string_of importtask);
+                       (* wait for the task to complete before cleaning 
anything up *)
+                       while Client.Task.get_status rpc session_id importtask 
= `pending do
+                               Thread.delay 1.;
+                       done;
+                       Client.Task.destroy rpc session_id importtask;
+       )
 (** Catch those smint exceptions and convert into meaningful internal errors *)
 let with_api_errors f x = 
@@ -141,8 +170,37 @@
 exception Cancelled
 exception NonZero
+(** The copying routine can operate on anything which looks like a 
file-descriptor/Stream *)
+module type Stream = sig
+       type stream
+       val write: stream -> int64 -> string -> int -> int -> unit
+(** Writes directly to a file *)
+module FileStream = struct
+       type stream = Unix.file_descr
+       let write stream stream_offset buf off len =
+               let newoff = Unix.LargeFile.lseek stream stream_offset 
Unix.SEEK_SET in
+               (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" 
(String.length buf) offset len; *)
+               let n = Unix.write stream buf off len in
+               if n < len then failwith "Short write"
+(** Marshals data across the network in chunks *)
+module NetworkStream = struct
+       open Sparse_encoding
+       type stream = Unix.file_descr
+       let write stream stream_offset buf off len =
+               let copy = String.create len in
+               String.blit buf off copy 0 len;
+               let x = { Chunk.start = stream_offset; data = copy } in
+               Chunk.marshal stream x
+module DD(Output: Stream) = struct
 (* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd ofd size bs =
+let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
   let round v = int_of_float (v *. 50.0) in
   let update = 
     let oldvalue = ref (-1.0) in
@@ -178,20 +236,22 @@
        let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
        Unixext.really_read ifd buf 0 this_chunk;
-       begin
-         if sparse && (allzero buf this_chunk)
-          then
-           ignore(Unix.LargeFile.lseek ofd (Int64.of_int this_chunk) 
-         else
-           let n = Unix.write ofd buf 0 this_chunk in
-           (if n<this_chunk then failwith "Error!")
-       end;
+       if not sparse || (not (allzero buf this_chunk))
+       then Output.write stream offset buf 0 this_chunk;
        do_block (Int64.add offset (Int64.of_int this_chunk))
   do_block 0L;
+  Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
   update 1.0
+module LocalDD = DD(FileStream)
+module RemoteDD = DD(NetworkStream)
 (* SCTX-286: thin provisioning is thrown away over VDI.copy, 
    Return true if the newly created vdi must have zeroes written into it; 
default to false
    under the assumption that "proper" storage devices (ie not our legacy LVM 
stuff) always 
@@ -232,33 +292,47 @@
   (* Use the sparse copy unless we must write zeroes into the new VDI *)
   let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
+  (* Copy locally unless this host can't see the destination SR *)
+  let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
+  let local_copy = Importexport.check_sr_availability ~__context sr_dst in
   let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
   let blocksize = 1024*1024 in
-  debug "Sm_fs_ops.copy_vdi: copying %Ld in blocks of %d%s preserving 
sparseness" size blocksize (if sparse then "" else " NOT");
+  debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving 
sparseness" (if local_copy then "locally" else "remotely") size blocksize (if 
sparse then "" else " NOT");
-  let dd = sparse_dd refresh_session ~__context sparse in
+  let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
+  let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
-  with_block_attached_device __context rpc session_id vdi_src `RO
-    (fun device_src ->
-       with_block_attached_device __context rpc session_id vdi_dst `RW
-        (fun device_dst ->
-           let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 
-           and ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 
-           finally
-             (fun () ->
-                try
-                  dd ifd ofd size blocksize;
-                with
-                  | Unix.Unix_error(Unix.EIO, _, _) ->
-                      raise (Api_errors.Server_error (Api_errors.vdi_io_error, 
["Device I/O error"]))
-                  | e ->
-                      debug "Caught exception %s" (ExnHelper.string_of_exn e);
-                      log_backtrace ())
-             (fun () ->
-                Unix.close ifd;
-                Unix.close ofd)
-        )
-    )
-  )
+       with_block_attached_device __context rpc session_id vdi_src `RO
+       (fun device_src ->
+               let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
+               finally
+               (fun () ->
+                       if local_copy 
+                       then with_block_attached_device __context rpc 
session_id vdi_dst `RW
+                               (fun device_dst ->
+                                       let ofd=Unix.openfile device_dst 
[Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
+                                       finally
+                                       (fun () ->
+                                               local_dd ifd ofd size blocksize
+                                       )
+                                       (fun () -> Unix.close ofd)
+                               )
+                       else with_import_vdi __context rpc session_id vdi_dst
+                               (fun ofd ->
+                                       remote_dd ifd ofd size blocksize
+                               )
+               )
+               (fun () -> Unix.close ifd)
+       )
+| Unix.Unix_error(Unix.EIO, _, _) ->
+       raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O 
+| e ->
+       debug "Caught exception %s" (ExnHelper.string_of_exn e);
+       log_backtrace ();
+       raise e
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sparse_encoding.ml
--- a/ocaml/xapi/sparse_encoding.ml     Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/sparse_encoding.ml     Mon Aug 23 13:05:48 2010 +0100
@@ -85,13 +85,14 @@
        let really_write fd offset buf off len = 
-               ignore(Unix.LargeFile.lseek fd offset Unix.SEEK_SET);
                let n = Unix.write fd buf off len in
                if n < len 
                then failwith "Short write: attempted to write %d bytes at %Ld, 
only wrote %d" len offset n
        (** Writes a single block of data to the output device *)
-       let write fd x = really_write fd x.start x.data 0 (String.length x.data)
+       let write fd x = 
+               ignore(Unix.LargeFile.lseek fd x.start Unix.SEEK_SET);
+               really_write fd x.start x.data 0 (String.length x.data)
        (** Reads a type t from a file descriptor *)
        let unmarshal fd = 
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/xapi_vdi.ml
--- a/ocaml/xapi/xapi_vdi.ml    Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/xapi_vdi.ml    Mon Aug 23 13:05:48 2010 +0100
@@ -460,7 +460,6 @@
        raise e)
 let copy ~__context ~vdi ~sr =
-  Sm.assert_pbd_is_plugged ~__context ~sr;
   Xapi_vdi_helpers.assert_managed ~__context ~vdi;
   let task_id = Ref.string_of (Context.get_task_id __context) in
@@ -489,8 +488,10 @@
       e -> 
-       destroy ~__context ~self:dst;
-       raise e
+      Helpers.call_api_functions ~__context
+      (fun rpc session_id -> Client.VDI.destroy rpc session_id dst);
+      raise e
 let force_unlock ~__context ~vdi = 
 ocaml/xapi/import_raw_vdi.ml     |   36 ++++++---
 ocaml/xapi/message_forwarding.ml |    4 +-
 ocaml/xapi/sm_fs_ops.ml          |  140 +++++++++++++++++++++++++++++---------
 ocaml/xapi/sparse_encoding.ml    |    5 +-
 ocaml/xapi/xapi_vdi.ml           |    7 +-
 5 files changed, 138 insertions(+), 54 deletions(-)

Attachment: xen-api.hg-2.patch
Description: Text Data

xen-api mailing list