# HG changeset patch # User David Scott # Date 1282565148 -3600 # Node ID f9725bc799887fb205bf1da8cf3474288529dc90 # Parent 23fa063db91d4eae0179fb29179002a85a75cf31 CP-1884: implementation of VDI.copy can now use the import_raw_vdi HTTP handler to copy between hosts. Signed-off-by: David Scott diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/import_raw_vdi.ml --- a/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:05:47 2010 +0100 +++ b/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:05:48 2010 +0100 @@ -23,14 +23,16 @@ open Sparse_encoding open Unixext open Pervasiveext +open Client let receive_chunks (s: Unix.file_descr) (fd: Unix.file_descr) = Chunk.fold (fun () -> Chunk.write fd) () s let vdi_of_req ~__context (req: request) = + let all = req.Http.query @ req.Http.cookie in let vdi = - if List.mem_assoc "vdi" req.Http.query - then List.assoc "vdi" req.Http.query + if List.mem_assoc "vdi" all + then List.assoc "vdi" all else raise (Failure "Missing vdi query parameter") in if Db_cache.DBCache.is_valid_ref vdi then Ref.of_string vdi @@ -40,21 +42,25 @@ req.close <- true; Xapi_http.with_context "Importing raw VDI" req s (fun __context -> + let all = req.Http.query @ req.Http.cookie in let vdi = vdi_of_req ~__context req in - let chunked = List.mem_assoc "chunked" req.Http.query in - try - match req.transfer_encoding, req.content_length with - | Some "chunked", _ -> + let chunked = List.mem_assoc "chunked" all in + let task_id = Context.get_task_id __context in + debug "import_raw_vdi task_id = %s vdi = %s; chunked = %b" (Ref.string_of task_id) (Ref.string_of vdi) chunked; + try + match req.transfer_encoding with + | Some "chunked" -> error "Chunked encoding not yet implemented in the import code"; Http_svr.headers s http_403_forbidden; raise (Failure "import code cannot handle chunked encoding") - | None, Some len -> + | None -> let headers = Http.http_200_ok ~keep_alive:false () @ - [ Http.task_id_hdr ^ ":" ^ (Ref.string_of (Context.get_task_id __context)); + [ Http.task_id_hdr ^ ":" ^ (Ref.string_of task_id); content_type ] in Http_svr.headers s headers; - + Server_helpers.exec_with_new_task "VDI.import" + (fun __context -> Sm_fs_ops.with_block_attached_device __context rpc session_id vdi `RW (fun device -> let fd = Unix.openfile device [ Unix.O_WRONLY ] 0 in @@ -63,16 +69,18 @@ try if chunked then receive_chunks s fd - else ignore(Unixext.copy_file ~limit:len s fd); - Unixext.fsync fd + else ignore(Unixext.copy_file ?limit:req.content_length s fd); + Unixext.fsync fd; with Unix.Unix_error(Unix.EIO, _, _) -> raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O errors"])) ) (fun () -> Unix.close fd) - ); - - TaskHelper.complete ~__context [] + ) + ); + TaskHelper.complete ~__context []; with e -> + error "Caught exception: %s" (ExnHelper.string_of_exn e); + log_backtrace (); TaskHelper.failed ~__context (Api_errors.internal_error, ["Caught exception: " ^ (ExnHelper.string_of_exn e)]); raise e) diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/message_forwarding.ml --- a/ocaml/xapi/message_forwarding.ml Mon Aug 23 13:05:47 2010 +0100 +++ b/ocaml/xapi/message_forwarding.ml Mon Aug 23 13:05:48 2010 +0100 @@ -2862,9 +2862,9 @@ let copy ~__context ~vdi ~sr = info "VDI.copy: VDI = '%s'; SR = '%s'" (vdi_uuid ~__context vdi) (sr_uuid ~__context sr); let local_fn = Local.VDI.copy ~vdi ~sr in - let src_sr = Db.VDI.get_SR ~__context ~self:vdi in (* No need to lock the VDI because the VBD.plug will do that for us *) - SR.forward_sr_multiple_op ~local_fn ~__context ~srs:[src_sr;sr] + (* Forward the request to a host which can read the source VDI *) + forward_vdi_op ~local_fn ~__context ~self:vdi (fun session_id rpc -> Client.VDI.copy rpc session_id vdi sr) let resize ~__context ~vdi ~size = diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sm_fs_ops.ml --- a/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:05:47 2010 +0100 +++ b/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:05:48 2010 +0100 @@ -41,6 +41,35 @@ | vdi :: vdis -> with_block_attached_device __context rpc session_id vdi mode (fun path -> loop (path :: acc) vdis) in loop [] vdis +(** Open an import_raw_vdi HTTP connection and run [f] with the socket *) +let with_import_vdi __context rpc session_id vdi f = + let subtask_of = Context.get_task_id __context in + Server_helpers.exec_with_new_task "VDI.import" + (fun __context -> + (* Find a suitable host for the SR containing the VDI *) + let sr = Db.VDI.get_SR ~__context ~self:vdi in + let host = Importexport.find_host_for_sr ~__context sr in + let address = Db.Host.get_address ~__context ~self:host in + let importtask = Client.Task.create rpc session_id "VDI.import" "" in + + let headers = Xapi_http.http_request + ~cookie:(["session_id", Ref.string_of session_id; + "task_id", Ref.string_of importtask; + "vdi", Ref.string_of vdi; + "chunked", "true"]) + Http.Put address Constants.import_raw_vdi_uri in + let writer _ _ sock = f sock; true in + if not (Xmlrpcclient.do_secure_http_rpc ~use_stunnel_cache:false + ~task_id:(Ref.string_of (Context.get_task_id __context)) + ~host:address ~port:Xapi_globs.default_ssl_port ~headers ~body:"" writer) + then failwith "with_import_vdi"; + debug "Waiting for import task (%s) to complete" (Ref.string_of importtask); + (* wait for the task to complete before cleaning anything up *) + while Client.Task.get_status rpc session_id importtask = `pending do + Thread.delay 1.; + done; + Client.Task.destroy rpc session_id importtask; + ) (** Catch those smint exceptions and convert into meaningful internal errors *) let with_api_errors f x = @@ -141,8 +170,37 @@ exception Cancelled exception NonZero +(** The copying routine can operate on anything which looks like a file-descriptor/Stream *) +module type Stream = sig + type stream + val write: stream -> int64 -> string -> int -> int -> unit +end + +(** Writes directly to a file *) +module FileStream = struct + type stream = Unix.file_descr + let write stream stream_offset buf off len = + let newoff = Unix.LargeFile.lseek stream stream_offset Unix.SEEK_SET in + (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n" (String.length buf) offset len; *) + let n = Unix.write stream buf off len in + if n < len then failwith "Short write" +end + +(** Marshals data across the network in chunks *) +module NetworkStream = struct + open Sparse_encoding + type stream = Unix.file_descr + let write stream stream_offset buf off len = + let copy = String.create len in + String.blit buf off copy 0 len; + let x = { Chunk.start = stream_offset; data = copy } in + Chunk.marshal stream x +end + +module DD(Output: Stream) = struct + (* dd with sparseness check *) -let sparse_dd refresh_session ~__context sparse ifd ofd size bs = +let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit = let round v = int_of_float (v *. 50.0) in let update = let oldvalue = ref (-1.0) in @@ -178,20 +236,22 @@ begin let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in Unixext.really_read ifd buf 0 this_chunk; - begin - if sparse && (allzero buf this_chunk) - then - ignore(Unix.LargeFile.lseek ofd (Int64.of_int this_chunk) Unix.SEEK_CUR) - else - let n = Unix.write ofd buf 0 this_chunk in - (if n - with_block_attached_device __context rpc session_id vdi_dst `RW - (fun device_dst -> - let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 - and ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 in - finally - (fun () -> - try - dd ifd ofd size blocksize; - with - | Unix.Unix_error(Unix.EIO, _, _) -> - raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O error"])) - | e -> - debug "Caught exception %s" (ExnHelper.string_of_exn e); - log_backtrace ()) - (fun () -> - Unix.close ifd; - Unix.close ofd) - ) - ) - ) +try + with_block_attached_device __context rpc session_id vdi_src `RO + (fun device_src -> + let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in + finally + (fun () -> + if local_copy + then with_block_attached_device __context rpc session_id vdi_dst `RW + (fun device_dst -> + let ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600 in + finally + (fun () -> + local_dd ifd ofd size blocksize + ) + (fun () -> Unix.close ofd) + ) + else with_import_vdi __context rpc session_id vdi_dst + (fun ofd -> + remote_dd ifd ofd size blocksize + ) + ) + (fun () -> Unix.close ifd) + ) +with +| Unix.Unix_error(Unix.EIO, _, _) -> + raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O error"])) +| e -> + debug "Caught exception %s" (ExnHelper.string_of_exn e); + log_backtrace (); + raise e +) diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sparse_encoding.ml --- a/ocaml/xapi/sparse_encoding.ml Mon Aug 23 13:05:47 2010 +0100 +++ b/ocaml/xapi/sparse_encoding.ml Mon Aug 23 13:05:48 2010 +0100 @@ -85,13 +85,14 @@ } let really_write fd offset buf off len = - ignore(Unix.LargeFile.lseek fd offset Unix.SEEK_SET); let n = Unix.write fd buf off len in if n < len then failwith "Short write: attempted to write %d bytes at %Ld, only wrote %d" len offset n (** Writes a single block of data to the output device *) - let write fd x = really_write fd x.start x.data 0 (String.length x.data) + let write fd x = + ignore(Unix.LargeFile.lseek fd x.start Unix.SEEK_SET); + really_write fd x.start x.data 0 (String.length x.data) (** Reads a type t from a file descriptor *) let unmarshal fd = diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/xapi_vdi.ml --- a/ocaml/xapi/xapi_vdi.ml Mon Aug 23 13:05:47 2010 +0100 +++ b/ocaml/xapi/xapi_vdi.ml Mon Aug 23 13:05:48 2010 +0100 @@ -460,7 +460,6 @@ raise e) let copy ~__context ~vdi ~sr = - Sm.assert_pbd_is_plugged ~__context ~sr; Xapi_vdi_helpers.assert_managed ~__context ~vdi; let task_id = Ref.string_of (Context.get_task_id __context) in @@ -489,8 +488,10 @@ dst with e -> - destroy ~__context ~self:dst; - raise e + Helpers.call_api_functions ~__context + (fun rpc session_id -> Client.VDI.destroy rpc session_id dst); + raise e + let force_unlock ~__context ~vdi =