# HG changeset patch
# User David Scott <dave.scott@xxxxxxxxxxxxx>
# Date 1282565148 -3600
# Node ID f9725bc799887fb205bf1da8cf3474288529dc90
# Parent 23fa063db91d4eae0179fb29179002a85a75cf31
CP-1884: implementation of VDI.copy can now use the import_raw_vdi HTTP handler
to copy between hosts.
Signed-off-by: David Scott <dave.scott@xxxxxxxxxxxxx>
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/import_raw_vdi.ml
--- a/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/import_raw_vdi.ml Mon Aug 23 13:05:48 2010 +0100
@@ -23,14 +23,16 @@
open Sparse_encoding
open Unixext
open Pervasiveext
+open Client
let receive_chunks (s: Unix.file_descr) (fd: Unix.file_descr) =
Chunk.fold (fun () -> Chunk.write fd) () s
let vdi_of_req ~__context (req: request) =
+ let all = req.Http.query @ req.Http.cookie in
let vdi =
- if List.mem_assoc "vdi" req.Http.query
- then List.assoc "vdi" req.Http.query
+ if List.mem_assoc "vdi" all
+ then List.assoc "vdi" all
else raise (Failure "Missing vdi query parameter") in
if Db_cache.DBCache.is_valid_ref vdi
then Ref.of_string vdi
@@ -40,21 +42,25 @@
req.close <- true;
Xapi_http.with_context "Importing raw VDI" req s
(fun __context ->
+ let all = req.Http.query @ req.Http.cookie in
let vdi = vdi_of_req ~__context req in
- let chunked = List.mem_assoc "chunked" req.Http.query in
- try
- match req.transfer_encoding, req.content_length with
- | Some "chunked", _ ->
+ let chunked = List.mem_assoc "chunked" all in
+ let task_id = Context.get_task_id __context in
+ debug "import_raw_vdi task_id = %s vdi = %s; chunked = %b"
(Ref.string_of task_id) (Ref.string_of vdi) chunked;
+ try
+ match req.transfer_encoding with
+ | Some "chunked" ->
error "Chunked encoding not yet implemented in the import code";
Http_svr.headers s http_403_forbidden;
raise (Failure "import code cannot handle chunked encoding")
- | None, Some len ->
+ | None ->
let headers = Http.http_200_ok ~keep_alive:false () @
- [ Http.task_id_hdr ^ ":" ^ (Ref.string_of (Context.get_task_id
__context));
+ [ Http.task_id_hdr ^ ":" ^ (Ref.string_of task_id);
content_type ] in
Http_svr.headers s headers;
-
+ Server_helpers.exec_with_new_task "VDI.import"
+ (fun __context ->
Sm_fs_ops.with_block_attached_device __context rpc session_id
vdi `RW
(fun device ->
let fd = Unix.openfile device [ Unix.O_WRONLY ] 0 in
@@ -63,16 +69,18 @@
try
if chunked
then receive_chunks s fd
- else ignore(Unixext.copy_file ~limit:len s fd);
- Unixext.fsync fd
+ else ignore(Unixext.copy_file
?limit:req.content_length s fd);
+ Unixext.fsync fd;
with Unix.Unix_error(Unix.EIO, _, _) ->
raise (Api_errors.Server_error
(Api_errors.vdi_io_error, ["Device I/O errors"]))
)
(fun () -> Unix.close fd)
- );
-
- TaskHelper.complete ~__context []
+ )
+ );
+ TaskHelper.complete ~__context [];
with e ->
+ error "Caught exception: %s" (ExnHelper.string_of_exn e);
+ log_backtrace ();
TaskHelper.failed ~__context (Api_errors.internal_error, ["Caught
exception: " ^ (ExnHelper.string_of_exn e)]);
raise e)
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/message_forwarding.ml
--- a/ocaml/xapi/message_forwarding.ml Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/message_forwarding.ml Mon Aug 23 13:05:48 2010 +0100
@@ -2862,9 +2862,9 @@
let copy ~__context ~vdi ~sr =
info "VDI.copy: VDI = '%s'; SR = '%s'" (vdi_uuid ~__context vdi)
(sr_uuid ~__context sr);
let local_fn = Local.VDI.copy ~vdi ~sr in
- let src_sr = Db.VDI.get_SR ~__context ~self:vdi in
(* No need to lock the VDI because the VBD.plug will do that for us *)
- SR.forward_sr_multiple_op ~local_fn ~__context ~srs:[src_sr;sr]
+ (* Forward the request to a host which can read the source VDI *)
+ forward_vdi_op ~local_fn ~__context ~self:vdi
(fun session_id rpc -> Client.VDI.copy rpc session_id vdi sr)
let resize ~__context ~vdi ~size =
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sm_fs_ops.ml
--- a/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/sm_fs_ops.ml Mon Aug 23 13:05:48 2010 +0100
@@ -41,6 +41,35 @@
| vdi :: vdis -> with_block_attached_device __context rpc session_id vdi
mode (fun path -> loop (path :: acc) vdis) in
loop [] vdis
+(** Open an import_raw_vdi HTTP connection and run [f] with the socket *)
+let with_import_vdi __context rpc session_id vdi f =
+ let subtask_of = Context.get_task_id __context in
+ Server_helpers.exec_with_new_task "VDI.import"
+ (fun __context ->
+ (* Find a suitable host for the SR containing the VDI *)
+ let sr = Db.VDI.get_SR ~__context ~self:vdi in
+ let host = Importexport.find_host_for_sr ~__context sr in
+ let address = Db.Host.get_address ~__context ~self:host in
+ let importtask = Client.Task.create rpc session_id "VDI.import"
"" in
+
+ let headers = Xapi_http.http_request
+ ~cookie:(["session_id", Ref.string_of
session_id;
+ "task_id", Ref.string_of importtask;
+ "vdi", Ref.string_of vdi;
+ "chunked", "true"])
+ Http.Put address Constants.import_raw_vdi_uri in
+ let writer _ _ sock = f sock; true in
+ if not (Xmlrpcclient.do_secure_http_rpc
~use_stunnel_cache:false
+ ~task_id:(Ref.string_of (Context.get_task_id
__context))
+ ~host:address ~port:Xapi_globs.default_ssl_port
~headers ~body:"" writer)
+ then failwith "with_import_vdi";
+ debug "Waiting for import task (%s) to complete"
(Ref.string_of importtask);
+ (* wait for the task to complete before cleaning
anything up *)
+ while Client.Task.get_status rpc session_id importtask
= `pending do
+ Thread.delay 1.;
+ done;
+ Client.Task.destroy rpc session_id importtask;
+ )
(** Catch those smint exceptions and convert into meaningful internal errors *)
let with_api_errors f x =
@@ -141,8 +170,37 @@
exception Cancelled
exception NonZero
+(** The copying routine can operate on anything which looks like a
file-descriptor/Stream *)
+module type Stream = sig
+ type stream
+ val write: stream -> int64 -> string -> int -> int -> unit
+end
+
+(** Writes directly to a file *)
+module FileStream = struct
+ type stream = Unix.file_descr
+ let write stream stream_offset buf off len =
+ let newoff = Unix.LargeFile.lseek stream stream_offset
Unix.SEEK_SET in
+ (* Printf.printf "Unix.write buf len %d; offset %d; len %d\n"
(String.length buf) offset len; *)
+ let n = Unix.write stream buf off len in
+ if n < len then failwith "Short write"
+end
+
+(** Marshals data across the network in chunks *)
+module NetworkStream = struct
+ open Sparse_encoding
+ type stream = Unix.file_descr
+ let write stream stream_offset buf off len =
+ let copy = String.create len in
+ String.blit buf off copy 0 len;
+ let x = { Chunk.start = stream_offset; data = copy } in
+ Chunk.marshal stream x
+end
+
+module DD(Output: Stream) = struct
+
(* dd with sparseness check *)
-let sparse_dd refresh_session ~__context sparse ifd ofd size bs =
+let sparse_dd refresh_session ~__context sparse ifd stream size bs : unit =
let round v = int_of_float (v *. 50.0) in
let update =
let oldvalue = ref (-1.0) in
@@ -178,20 +236,22 @@
begin
let this_chunk = Int64.to_int (min remaining (Int64.of_int bs)) in
Unixext.really_read ifd buf 0 this_chunk;
- begin
- if sparse && (allzero buf this_chunk)
- then
- ignore(Unix.LargeFile.lseek ofd (Int64.of_int this_chunk)
Unix.SEEK_CUR)
- else
- let n = Unix.write ofd buf 0 this_chunk in
- (if n<this_chunk then failwith "Error!")
- end;
+
+ if not sparse || (not (allzero buf this_chunk))
+ then Output.write stream offset buf 0 this_chunk;
+
do_block (Int64.add offset (Int64.of_int this_chunk))
end
in
do_block 0L;
+ Output.write stream 0L "" 0 0; (* end of stream is a zero-sized chunk *)
update 1.0
+end
+
+module LocalDD = DD(FileStream)
+module RemoteDD = DD(NetworkStream)
+
(* SCTX-286: thin provisioning is thrown away over VDI.copy,
VM.import(VM.export).
Return true if the newly created vdi must have zeroes written into it;
default to false
under the assumption that "proper" storage devices (ie not our legacy LVM
stuff) always
@@ -232,33 +292,47 @@
(* Use the sparse copy unless we must write zeroes into the new VDI *)
let sparse = not (must_write_zeroes_into_new_vdi ~__context vdi_dst) in
+ (* Copy locally unless this host can't see the destination SR *)
+ let sr_dst = Db.VDI.get_SR ~__context ~self:vdi_dst in
+ let local_copy = Importexport.check_sr_availability ~__context sr_dst in
+
let size = Db.VDI.get_virtual_size ~__context ~self:vdi_src in
let blocksize = 1024*1024 in
- debug "Sm_fs_ops.copy_vdi: copying %Ld in blocks of %d%s preserving
sparseness" size blocksize (if sparse then "" else " NOT");
+ debug "Sm_fs_ops.copy_vdi: %s-copying %Ld in blocks of %d%s preserving
sparseness" (if local_copy then "locally" else "remotely") size blocksize (if
sparse then "" else " NOT");
- let dd = sparse_dd refresh_session ~__context sparse in
+ let local_dd = LocalDD.sparse_dd refresh_session ~__context sparse in
+ let remote_dd = RemoteDD.sparse_dd refresh_session ~__context sparse in
- with_block_attached_device __context rpc session_id vdi_src `RO
- (fun device_src ->
- with_block_attached_device __context rpc session_id vdi_dst `RW
- (fun device_dst ->
- let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600
- and ofd=Unix.openfile device_dst [Unix.O_WRONLY; Unix.O_SYNC] 0o600
in
- finally
- (fun () ->
- try
- dd ifd ofd size blocksize;
- with
- | Unix.Unix_error(Unix.EIO, _, _) ->
- raise (Api_errors.Server_error (Api_errors.vdi_io_error,
["Device I/O error"]))
- | e ->
- debug "Caught exception %s" (ExnHelper.string_of_exn e);
- log_backtrace ())
- (fun () ->
- Unix.close ifd;
- Unix.close ofd)
- )
- )
- )
+try
+ with_block_attached_device __context rpc session_id vdi_src `RO
+ (fun device_src ->
+ let ifd=Unix.openfile device_src [Unix.O_RDONLY] 0o600 in
+ finally
+ (fun () ->
+ if local_copy
+ then with_block_attached_device __context rpc
session_id vdi_dst `RW
+ (fun device_dst ->
+ let ofd=Unix.openfile device_dst
[Unix.O_WRONLY; Unix.O_SYNC] 0o600 in
+ finally
+ (fun () ->
+ local_dd ifd ofd size blocksize
+ )
+ (fun () -> Unix.close ofd)
+ )
+ else with_import_vdi __context rpc session_id vdi_dst
+ (fun ofd ->
+ remote_dd ifd ofd size blocksize
+ )
+ )
+ (fun () -> Unix.close ifd)
+ )
+with
+| Unix.Unix_error(Unix.EIO, _, _) ->
+ raise (Api_errors.Server_error (Api_errors.vdi_io_error, ["Device I/O
error"]))
+| e ->
+ debug "Caught exception %s" (ExnHelper.string_of_exn e);
+ log_backtrace ();
+ raise e
+)
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/sparse_encoding.ml
--- a/ocaml/xapi/sparse_encoding.ml Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/sparse_encoding.ml Mon Aug 23 13:05:48 2010 +0100
@@ -85,13 +85,14 @@
}
let really_write fd offset buf off len =
- ignore(Unix.LargeFile.lseek fd offset Unix.SEEK_SET);
let n = Unix.write fd buf off len in
if n < len
then failwith "Short write: attempted to write %d bytes at %Ld,
only wrote %d" len offset n
(** Writes a single block of data to the output device *)
- let write fd x = really_write fd x.start x.data 0 (String.length x.data)
+ let write fd x =
+ ignore(Unix.LargeFile.lseek fd x.start Unix.SEEK_SET);
+ really_write fd x.start x.data 0 (String.length x.data)
(** Reads a type t from a file descriptor *)
let unmarshal fd =
diff -r 23fa063db91d -r f9725bc79988 ocaml/xapi/xapi_vdi.ml
--- a/ocaml/xapi/xapi_vdi.ml Mon Aug 23 13:05:47 2010 +0100
+++ b/ocaml/xapi/xapi_vdi.ml Mon Aug 23 13:05:48 2010 +0100
@@ -460,7 +460,6 @@
raise e)
let copy ~__context ~vdi ~sr =
- Sm.assert_pbd_is_plugged ~__context ~sr;
Xapi_vdi_helpers.assert_managed ~__context ~vdi;
let task_id = Ref.string_of (Context.get_task_id __context) in
@@ -489,8 +488,10 @@
dst
with
e ->
- destroy ~__context ~self:dst;
- raise e
+ Helpers.call_api_functions ~__context
+ (fun rpc session_id -> Client.VDI.destroy rpc session_id dst);
+ raise e
+
let force_unlock ~__context ~vdi =
ocaml/xapi/import_raw_vdi.ml | 36 ++++++---
ocaml/xapi/message_forwarding.ml | 4 +-
ocaml/xapi/sm_fs_ops.ml | 140 +++++++++++++++++++++++++++++---------
ocaml/xapi/sparse_encoding.ml | 5 +-
ocaml/xapi/xapi_vdi.ml | 7 +-
5 files changed, 138 insertions(+), 54 deletions(-)
xen-api.hg-2.patch
Description: Text Data
_______________________________________________
xen-api mailing list
xen-api@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/mailman/listinfo/xen-api
|