From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from eggs.gnu.org ([2001:4830:134:3::10]:38295) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1ZRf5b-0005oz-J8 for qemu-devel@nongnu.org; Tue, 18 Aug 2015 07:30:01 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1ZRf5W-0006JE-M2 for qemu-devel@nongnu.org; Tue, 18 Aug 2015 07:29:59 -0400 Received: from szxga01-in.huawei.com ([58.251.152.64]:49188) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1ZRf5V-00069N-35 for qemu-devel@nongnu.org; Tue, 18 Aug 2015 07:29:54 -0400 References: <1434450415-11339-1-git-send-email-dgilbert@redhat.com> <1434450415-11339-16-git-send-email-dgilbert@redhat.com> <55C1C421.10708@huawei.com> <20150818104521.GB2314@work-vm> From: zhanghailiang Message-ID: <55D316FE.2040500@huawei.com> Date: Tue, 18 Aug 2015 19:29:02 +0800 MIME-Version: 1.0 In-Reply-To: <20150818104521.GB2314@work-vm> Content-Type: text/plain; charset="windows-1252"; format=flowed Content-Transfer-Encoding: 7bit Subject: Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: "Dr. David Alan Gilbert" Cc: aarcange@redhat.com, yamahata@private.email.ne.jp, quintela@redhat.com, liang.z.li@intel.com, peter.huangpeng@huawei.com, qemu-devel@nongnu.org, luis@cs.umu.se, amit.shah@redhat.com, pbonzini@redhat.com, david@gibson.dropbear.id.au On 2015/8/18 18:45, Dr. David Alan Gilbert wrote: > * zhanghailiang (zhang.zhanghailiang@huawei.com) wrote: >> Hi Dave, >> >> On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote: >>> From: "Dr. David Alan Gilbert" >>> >>> Open a return path, and handle messages that are received upon it. >>> >>> Signed-off-by: Dr. David Alan Gilbert >>> --- >>> include/migration/migration.h | 8 ++ >>> migration/migration.c | 177 +++++++++++++++++++++++++++++++++++++++++- >>> trace-events | 12 +++ >>> 3 files changed, 196 insertions(+), 1 deletion(-) >>> >>> diff --git a/include/migration/migration.h b/include/migration/migration.h >>> index 36caab9..868f59a 100644 >>> --- a/include/migration/migration.h >>> +++ b/include/migration/migration.h >>> @@ -77,6 +77,14 @@ struct MigrationState >>> >>> int state; >>> MigrationParams params; >>> + >>> + /* State related to return path */ >>> + struct { >>> + QEMUFile *file; >> >> There is already a 'file' member in MigrationState, >> and since for migration, there is only one path direction, just from source side >> to destination side, so it is ok to use that name. >> >> But for post-copy and COLO, we need two-way communication, >> So we can rename the original 'file' member of MigrationState to 'ouput_file', >> and add a new 'input_file' member. For MigrationIncomingState struct, rename its original >> 'file' member to 'input_file',and add a new 'output_file'. >> IMHO, this will make things more clear. > > Would the following be clearer: > Yes, it is clearer and more graceful :) > On the source make the existing migration file: > QEMUFile *to_dst_file; > and for the return path > QEMUFile *from_dst_dile; > ^ from_dst_file > and then on the destination, the incoming migration stream: > QEMUFile *from_src_file; > and then the return path on the destionation: > QEMUFile *to_src_file; > > Dave > >> Thanks, >> zhanghailiang >> >> >>> + QemuThread rp_thread; >>> + bool error; >>> + } rp_state; >>> + >>> double mbps; >>> int64_t total_time; >>> int64_t downtime; >>> diff --git a/migration/migration.c b/migration/migration.c >>> index afb19a1..fb2f491 100644 >>> --- a/migration/migration.c >>> +++ b/migration/migration.c >>> @@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error **errp) >>> return params; >>> } >>> >>> +/* >>> + * Return true if we're already in the middle of a migration >>> + * (i.e. any of the active or setup states) >>> + */ >>> +static bool migration_already_active(MigrationState *ms) >>> +{ >>> + switch (ms->state) { >>> + case MIGRATION_STATUS_ACTIVE: >>> + case MIGRATION_STATUS_SETUP: >>> + return true; >>> + >>> + default: >>> + return false; >>> + >>> + } >>> +} >>> + >>> static void get_xbzrle_cache_stats(MigrationInfo *info) >>> { >>> if (migrate_use_xbzrle()) { >>> @@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) >>> } >>> } >>> >>> +static void migrate_fd_cleanup_src_rp(MigrationState *ms) >>> +{ >>> + QEMUFile *rp = ms->rp_state.file; >>> + >>> + /* >>> + * When stuff goes wrong (e.g. failing destination) on the rp, it can get >>> + * cleaned up from a few threads; make sure not to do it twice in parallel >>> + */ >>> + rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL); >>> + if (rp) { >>> + trace_migrate_fd_cleanup_src_rp(); >>> + qemu_fclose(rp); >>> + } >>> +} >>> + >>> static void migrate_fd_cleanup(void *opaque) >>> { >>> MigrationState *s = opaque; >>> @@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque) >>> qemu_bh_delete(s->cleanup_bh); >>> s->cleanup_bh = NULL; >>> >>> + migrate_fd_cleanup_src_rp(s); >>> + >>> if (s->file) { >>> trace_migrate_fd_cleanup(); >>> qemu_mutex_unlock_iothread(); >>> @@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s) >>> QEMUFile *f = migrate_get_current()->file; >>> trace_migrate_fd_cancel(); >>> >>> + if (s->rp_state.file) { >>> + /* shutdown the rp socket, so causing the rp thread to shutdown */ >>> + qemu_file_shutdown(s->rp_state.file); >>> + } >>> + >>> do { >>> old_state = s->state; >>> if (old_state != MIGRATION_STATUS_SETUP && >>> @@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void) >>> return s->xbzrle_cache_size; >>> } >>> >>> -/* migration thread support */ >>> +/* >>> + * Something bad happened to the RP stream, mark an error >>> + * The caller shall print something to indicate why >>> + */ >>> +static void source_return_path_bad(MigrationState *s) >>> +{ >>> + s->rp_state.error = true; >>> + migrate_fd_cleanup_src_rp(s); >>> +} >>> + >>> +/* >>> + * Handles messages sent on the return path towards the source VM >>> + * >>> + */ >>> +static void *source_return_path_thread(void *opaque) >>> +{ >>> + MigrationState *ms = opaque; >>> + QEMUFile *rp = ms->rp_state.file; >>> + uint16_t expected_len, header_len, header_type; >>> + const int max_len = 512; >>> + uint8_t buf[max_len]; >>> + uint32_t tmp32; >>> + int res; >>> + >>> + trace_source_return_path_thread_entry(); >>> + while (rp && !qemu_file_get_error(rp) && >>> + migration_already_active(ms)) { >>> + trace_source_return_path_thread_loop_top(); >>> + header_type = qemu_get_be16(rp); >>> + header_len = qemu_get_be16(rp); >>> + >>> + switch (header_type) { >>> + case MIG_RP_MSG_SHUT: >>> + case MIG_RP_MSG_PONG: >>> + expected_len = 4; >>> + break; >>> + >>> + default: >>> + error_report("RP: Received invalid message 0x%04x length 0x%04x", >>> + header_type, header_len); >>> + source_return_path_bad(ms); >>> + goto out; >>> + } >>> >>> + if (header_len > expected_len) { >>> + error_report("RP: Received message 0x%04x with" >>> + "incorrect length %d expecting %d", >>> + header_type, header_len, >>> + expected_len); >>> + source_return_path_bad(ms); >>> + goto out; >>> + } >>> + >>> + /* We know we've got a valid header by this point */ >>> + res = qemu_get_buffer(rp, buf, header_len); >>> + if (res != header_len) { >>> + trace_source_return_path_thread_failed_read_cmd_data(); >>> + source_return_path_bad(ms); >>> + goto out; >>> + } >>> + >>> + /* OK, we have the message and the data */ >>> + switch (header_type) { >>> + case MIG_RP_MSG_SHUT: >>> + tmp32 = be32_to_cpup((uint32_t *)buf); >>> + trace_source_return_path_thread_shut(tmp32); >>> + if (tmp32) { >>> + error_report("RP: Sibling indicated error %d", tmp32); >>> + source_return_path_bad(ms); >>> + } >>> + /* >>> + * We'll let the main thread deal with closing the RP >>> + * we could do a shutdown(2) on it, but we're the only user >>> + * anyway, so there's nothing gained. >>> + */ >>> + goto out; >>> + >>> + case MIG_RP_MSG_PONG: >>> + tmp32 = be32_to_cpup((uint32_t *)buf); >>> + trace_source_return_path_thread_pong(tmp32); >>> + break; >>> + >>> + default: >>> + break; >>> + } >>> + } >>> + if (rp && qemu_file_get_error(rp)) { >>> + trace_source_return_path_thread_bad_end(); >>> + source_return_path_bad(ms); >>> + } >>> + >>> + trace_source_return_path_thread_end(); >>> +out: >>> + return NULL; >>> +} >>> + >>> +__attribute__ (( unused )) /* Until later in patch series */ >>> +static int open_return_path_on_source(MigrationState *ms) >>> +{ >>> + >>> + ms->rp_state.file = qemu_file_get_return_path(ms->file); >>> + if (!ms->rp_state.file) { >>> + return -1; >>> + } >>> + >>> + trace_open_return_path_on_source(); >>> + qemu_thread_create(&ms->rp_state.rp_thread, "return path", >>> + source_return_path_thread, ms, QEMU_THREAD_JOINABLE); >>> + >>> + trace_open_return_path_on_source_continue(); >>> + >>> + return 0; >>> +} >>> + >>> +__attribute__ (( unused )) /* Until later in patch series */ >>> +/* Returns 0 if the RP was ok, otherwise there was an error on the RP */ >>> +static int await_return_path_close_on_source(MigrationState *ms) >>> +{ >>> + /* >>> + * If this is a normal exit then the destination will send a SHUT and the >>> + * rp_thread will exit, however if there's an error we need to cause >>> + * it to exit, which we can do by a shutdown. >>> + * (canceling must also shutdown to stop us getting stuck here if >>> + * the destination died at just the wrong place) >>> + */ >>> + if (qemu_file_get_error(ms->file) && ms->rp_state.file) { >>> + qemu_file_shutdown(ms->rp_state.file); >>> + } >>> + trace_await_return_path_close_on_source_joining(); >>> + qemu_thread_join(&ms->rp_state.rp_thread); >>> + trace_await_return_path_close_on_source_close(); >>> + return ms->rp_state.error; >>> +} >>> + >>> +/* >>> + * Master migration thread on the source VM. >>> + * It drives the migration and pumps the data down the outgoing channel. >>> + */ >>> static void *migration_thread(void *opaque) >>> { >>> MigrationState *s = opaque; >>> diff --git a/trace-events b/trace-events >>> index 5738e3f..282cde1 100644 >>> --- a/trace-events >>> +++ b/trace-events >>> @@ -1394,12 +1394,24 @@ flic_no_device_api(int err) "flic: no Device Contral API support %d" >>> flic_reset_failed(int err) "flic: reset failed %d" >>> >>> # migration.c >>> +await_return_path_close_on_source_close(void) "" >>> +await_return_path_close_on_source_joining(void) "" >>> migrate_set_state(int new_state) "new state %d" >>> migrate_fd_cleanup(void) "" >>> +migrate_fd_cleanup_src_rp(void) "" >>> migrate_fd_error(void) "" >>> migrate_fd_cancel(void) "" >>> migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" PRIu64 >>> migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d" >>> +open_return_path_on_source(void) "" >>> +open_return_path_on_source_continue(void) "" >>> +source_return_path_thread_bad_end(void) "" >>> +source_return_path_thread_end(void) "" >>> +source_return_path_thread_entry(void) "" >>> +source_return_path_thread_failed_read_cmd_data(void) "" >>> +source_return_path_thread_loop_top(void) "" >>> +source_return_path_thread_pong(uint32_t val) "%x" >>> +source_return_path_thread_shut(uint32_t val) "%x" >>> migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64 >>> >>> # migration/rdma.c >>> >> >> > -- > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK > > . >