From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:53502) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Z4o5i-0006Oq-Pt for qemu-devel@nongnu.org; Tue, 16 Jun 2015 06:27:40 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Z4o5h-0000yN-2F for qemu-devel@nongnu.org; Tue, 16 Jun 2015 06:27:38 -0400 Received: from mx1.redhat.com ([209.132.183.28]:54914) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Z4o5g-0000yB-Q0 for qemu-devel@nongnu.org; Tue, 16 Jun 2015 06:27:36 -0400 From: "Dr. David Alan Gilbert (git)" Date: Tue, 16 Jun 2015 11:26:28 +0100 Message-Id: <1434450415-11339-16-git-send-email-dgilbert@redhat.com> In-Reply-To: <1434450415-11339-1-git-send-email-dgilbert@redhat.com> References: <1434450415-11339-1-git-send-email-dgilbert@redhat.com> Subject: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: qemu-devel@nongnu.org Cc: aarcange@redhat.com, yamahata@private.email.ne.jp, quintela@redhat.com, liang.z.li@intel.com, luis@cs.umu.se, amit.shah@redhat.com, pbonzini@redhat.com, david@gibson.dropbear.id.au From: "Dr. David Alan Gilbert" Open a return path, and handle messages that are received upon it. Signed-off-by: Dr. David Alan Gilbert --- include/migration/migration.h | 8 ++ migration/migration.c | 177 +++++++++++++++++++++++++++++++++++++++++- trace-events | 12 +++ 3 files changed, 196 insertions(+), 1 deletion(-) diff --git a/include/migration/migration.h b/include/migration/migration.h index 36caab9..868f59a 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -77,6 +77,14 @@ struct MigrationState int state; MigrationParams params; + + /* State related to return path */ + struct { + QEMUFile *file; + QemuThread rp_thread; + bool error; + } rp_state; + double mbps; int64_t total_time; int64_t downtime; diff --git a/migration/migration.c b/migration/migration.c index afb19a1..fb2f491 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp) return params; } +/* + * Return true if we're already in the middle of a migration + * (i.e. any of the active or setup states) + */ +static bool migration_already_active(MigrationState *ms) +{ + switch (ms->state) { + case MIGRATION_STATUS_ACTIVE: + case MIGRATION_STATUS_SETUP: + return true; + + default: + return false; + + } +} + static void get_xbzrle_cache_stats(MigrationInfo *info) { if (migrate_use_xbzrle()) { @@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) } } +static void migrate_fd_cleanup_src_rp(MigrationState *ms) +{ + QEMUFile *rp = ms->rp_state.file; + + /* + * When stuff goes wrong (e.g. failing destination) on the rp, it can get + * cleaned up from a few threads; make sure not to do it twice in parallel + */ + rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL); + if (rp) { + trace_migrate_fd_cleanup_src_rp(); + qemu_fclose(rp); + } +} + static void migrate_fd_cleanup(void *opaque) { MigrationState *s = opaque; @@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque) qemu_bh_delete(s->cleanup_bh); s->cleanup_bh = NULL; + migrate_fd_cleanup_src_rp(s); + if (s->file) { trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s) QEMUFile *f = migrate_get_current()->file; trace_migrate_fd_cancel(); + if (s->rp_state.file) { + /* shutdown the rp socket, so causing the rp thread to shutdown */ + qemu_file_shutdown(s->rp_state.file); + } + do { old_state = s->state; if (old_state != MIGRATION_STATUS_SETUP && @@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } -/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print something to indicate why + */ +static void source_return_path_bad(MigrationState *s) +{ + s->rp_state.error = true; + migrate_fd_cleanup_src_rp(s); +} + +/* + * Handles messages sent on the return path towards the source VM + * + */ +static void *source_return_path_thread(void *opaque) +{ + MigrationState *ms = opaque; + QEMUFile *rp = ms->rp_state.file; + uint16_t expected_len, header_len, header_type; + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32; + int res; + + trace_source_return_path_thread_entry(); + while (rp && !qemu_file_get_error(rp) && + migration_already_active(ms)) { + trace_source_return_path_thread_loop_top(); + header_type = qemu_get_be16(rp); + header_len = qemu_get_be16(rp); + + switch (header_type) { + case MIG_RP_MSG_SHUT: + case MIG_RP_MSG_PONG: + expected_len = 4; + break; + + default: + error_report("RP: Received invalid message 0x%04x length 0x%04x", + header_type, header_len); + source_return_path_bad(ms); + goto out; + } + if (header_len > expected_len) { + error_report("RP: Received message 0x%04x with" + "incorrect length %d expecting %d", + header_type, header_len, + expected_len); + source_return_path_bad(ms); + goto out; + } + + /* We know we've got a valid header by this point */ + res = qemu_get_buffer(rp, buf, header_len); + if (res != header_len) { + trace_source_return_path_thread_failed_read_cmd_data(); + source_return_path_bad(ms); + goto out; + } + + /* OK, we have the message and the data */ + switch (header_type) { + case MIG_RP_MSG_SHUT: + tmp32 = be32_to_cpup((uint32_t *)buf); + trace_source_return_path_thread_shut(tmp32); + if (tmp32) { + error_report("RP: Sibling indicated error %d", tmp32); + source_return_path_bad(ms); + } + /* + * We'll let the main thread deal with closing the RP + * we could do a shutdown(2) on it, but we're the only user + * anyway, so there's nothing gained. + */ + goto out; + + case MIG_RP_MSG_PONG: + tmp32 = be32_to_cpup((uint32_t *)buf); + trace_source_return_path_thread_pong(tmp32); + break; + + default: + break; + } + } + if (rp && qemu_file_get_error(rp)) { + trace_source_return_path_thread_bad_end(); + source_return_path_bad(ms); + } + + trace_source_return_path_thread_end(); +out: + return NULL; +} + +__attribute__ (( unused )) /* Until later in patch series */ +static int open_return_path_on_source(MigrationState *ms) +{ + + ms->rp_state.file = qemu_file_get_return_path(ms->file); + if (!ms->rp_state.file) { + return -1; + } + + trace_open_return_path_on_source(); + qemu_thread_create(&ms->rp_state.rp_thread, "return path", + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); + + trace_open_return_path_on_source_continue(); + + return 0; +} + +__attribute__ (( unused )) /* Until later in patch series */ +/* Returns 0 if the RP was ok, otherwise there was an error on the RP */ +static int await_return_path_close_on_source(MigrationState *ms) +{ + /* + * If this is a normal exit then the destination will send a SHUT and the + * rp_thread will exit, however if there's an error we need to cause + * it to exit, which we can do by a shutdown. + * (canceling must also shutdown to stop us getting stuck here if + * the destination died at just the wrong place) + */ + if (qemu_file_get_error(ms->file) && ms->rp_state.file) { + qemu_file_shutdown(ms->rp_state.file); + } + trace_await_return_path_close_on_source_joining(); + qemu_thread_join(&ms->rp_state.rp_thread); + trace_await_return_path_close_on_source_close(); + return ms->rp_state.error; +} + +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque; diff --git a/trace-events b/trace-events index 5738e3f..282cde1 100644 --- a/trace-events +++ b/trace-events @@ -1394,12 +1394,24 @@ flic_no_device_api(int err) "flic: no Device Contral API support %d" flic_reset_failed(int err) "flic: reset failed %d" # migration.c +await_return_path_close_on_source_close(void) "" +await_return_path_close_on_source_joining(void) "" migrate_set_state(int new_state) "new state %d" migrate_fd_cleanup(void) "" +migrate_fd_cleanup_src_rp(void) "" migrate_fd_error(void) "" migrate_fd_cancel(void) "" migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" PRIu64 migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d" +open_return_path_on_source(void) "" +open_return_path_on_source_continue(void) "" +source_return_path_thread_bad_end(void) "" +source_return_path_thread_end(void) "" +source_return_path_thread_entry(void) "" +source_return_path_thread_failed_read_cmd_data(void) "" +source_return_path_thread_loop_top(void) "" +source_return_path_thread_pong(uint32_t val) "%x" +source_return_path_thread_shut(uint32_t val) "%x" migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64 # migration/rdma.c -- 2.4.3