From patchwork Fri Apr 19 18:05:50 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Hefty, Sean" X-Patchwork-Id: 10909621 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork-2.web.codeaurora.org (Postfix) with ESMTP id D87B314DB for ; Fri, 19 Apr 2019 19:12:52 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id CB21928DFD for ; Fri, 19 Apr 2019 19:12:52 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id BBB7728E02; Fri, 19 Apr 2019 19:12:52 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.9 required=2.0 tests=BAYES_00,MAILING_LIST_MULTI, RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 0DB0728DFD for ; Fri, 19 Apr 2019 19:12:52 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1728579AbfDSTMZ convert rfc822-to-8bit (ORCPT ); Fri, 19 Apr 2019 15:12:25 -0400 Received: from mga12.intel.com ([192.55.52.136]:62826 "EHLO mga12.intel.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1728443AbfDSTMY (ORCPT ); Fri, 19 Apr 2019 15:12:24 -0400 X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from orsmga001.jf.intel.com ([10.7.209.18]) by fmsmga106.fm.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 19 Apr 2019 11:05:51 -0700 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.60,370,1549958400"; d="scan'208";a="225001300" Received: from orsmsx102.amr.corp.intel.com ([10.22.225.129]) by orsmga001.jf.intel.com with ESMTP; 19 Apr 2019 11:05:51 -0700 Received: from orsmsx153.amr.corp.intel.com (10.22.226.247) by ORSMSX102.amr.corp.intel.com (10.22.225.129) with Microsoft SMTP Server (TLS) id 14.3.408.0; Fri, 19 Apr 2019 11:05:51 -0700 Received: from orsmsx109.amr.corp.intel.com ([169.254.11.52]) by ORSMSX153.amr.corp.intel.com ([169.254.12.186]) with mapi id 14.03.0415.000; Fri, 19 Apr 2019 11:05:50 -0700 From: "Hefty, Sean" To: "linux-rdma (linux-rdma@vger.kernel.org)" Subject: [PATCH rdma-core 4/5] rsockets: Wake-up all waiting threads on poll events Thread-Topic: [PATCH rdma-core 4/5] rsockets: Wake-up all waiting threads on poll events Thread-Index: AdT22g5wlWyW9x+ESryRhGrtO7A5xw== Date: Fri, 19 Apr 2019 18:05:50 +0000 Message-ID: <1828884A29C6694DAF28B7E6B8A82373B3E1F928@ORSMSX109.amr.corp.intel.com> Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-titus-metadata-40: eyJDYXRlZ29yeUxhYmVscyI6IiIsIk1ldGFkYXRhIjp7Im5zIjoiaHR0cDpcL1wvd3d3LnRpdHVzLmNvbVwvbnNcL0ludGVsMyIsImlkIjoiM2ZlNmEwYTEtNmFjMi00M2E4LTgxMGEtZTMzYjliNmEwODdlIiwicHJvcHMiOlt7Im4iOiJDVFBDbGFzc2lmaWNhdGlvbiIsInZhbHMiOlt7InZhbHVlIjoiQ1RQX05UIn1dfV19LCJTdWJqZWN0TGFiZWxzIjpbXSwiVE1DVmVyc2lvbiI6IjE3LjEwLjE4MDQuNDkiLCJUcnVzdGVkTGFiZWxIYXNoIjoicThsZkJvRGE4RGk5a2FTVlwvcjZkZWlsdTVZdTc1ZnRKUDdHRG9pMFFUekZBTmFpM1ErT3F4NVhZNURxZkkyWWoifQ== x-ctpclassification: CTP_NT dlp-product: dlpe-windows dlp-version: 11.0.600.7 dlp-reaction: no-action x-originating-ip: [10.22.254.139] MIME-Version: 1.0 Sender: linux-rdma-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP In order to have rpoll() block the calling thread until the desired event occurs, we map rpoll() to poll() on the CQ fd (or the rdma cm fd if the rsocket is not yet connected). However, poll() is waiting on reading an event from the CQ, and not directly on the state of the rsocket. This can result in rpoll() behaving differently than poll() in multi-thread situations. For example, have two threads call rpoll(), one waiting for POLLIN, the other POLLOUT. When a new completion is written to the CQ, an event will be written to the fd. It's possible for one thread (say the POLLOUT one) to wake up, read the CQ event, and process the completion. The completion could report that data is now available to be read from the rsocket, and that POLLIN should be signaled. When the POLLIN thread wakes up, it finds the CQ fd empty, so continues blocking on poll in the kernel. In this case, the thread never exits the kernel. In the above situation, the application will hang. Threads blocking in rpoll() should return based on changes to the state of the rsocket(s) that they are monitoring. And the state of the rsocket may be modified by another thread. To handle this situation, when the state of an rsocket _may_ have changed, we wake up all threads blocked in poll(), so they can re-check the state of their rsocket(s). Note that it's easier to have all threads re-check the rsocket states than perform more complex state tracking because of the difficulty in trying to track which rsockets have been passed into multiple rpoll() calls. Rpoll() is modified as follows. When a thread has processed any event, it halts future calls into poll(). It then writes to a socketpair to signal all threads blocked in poll() to wake-up. Only after all threads return from the kernel are threads allowed to resume calling poll(). Signed-off-by: Sean Hefty --- librdmacm/rsocket.c | 147 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 130 insertions(+), 17 deletions(-) diff --git a/librdmacm/rsocket.c b/librdmacm/rsocket.c index 9ac881bc..ed5e0fe3 100644 --- a/librdmacm/rsocket.c +++ b/librdmacm/rsocket.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008-2014 Intel Corporation. All rights reserved. + * Copyright (c) 2008-2019 Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -118,6 +118,10 @@ static struct rs_svc listen_svc = { .run = listen_svc_run }; +static uint32_t pollcnt; +static bool suspendpoll; +static int pollsignal[2]; + static uint16_t def_iomap_size = 0; static uint16_t def_inline = 64; static uint16_t def_sqsize = 384; @@ -469,8 +473,8 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd) msg.cmd = cmd; msg.status = EINVAL; msg.rs = rs; - write_all(svc->sock[0], &msg, sizeof msg); - read_all(svc->sock[0], &msg, sizeof msg); + write_all(svc->sock[0], &msg, sizeof(msg)); + read_all(svc->sock[0], &msg, sizeof(msg)); ret = rdma_seterrno(msg.status); if (svc->cnt) goto unlock; @@ -2983,19 +2987,123 @@ static uint64_t rs_time_us(void) return now.tv_sec * 1000000 + now.tv_usec; } +/* When mapping rpoll to poll, the events reported on the RDMA + * fd are independent from the events rpoll may be looking for. + * To avoid threads hanging in poll, whenever any event occurs, + * we need to wakeup all threads in poll, so that they can check + * if there has been a change on the rsockets they are monitoring. + * To support this, we 'gate' threads entering and leaving rpoll. + */ +static int rs_pollinit(void) +{ + int ret = 0; + + pthread_mutex_lock(&mut); + if (pollsignal[0] || pollsignal[1]) + goto unlock; + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, pollsignal); + if (ret) + goto unlock; + + /* Avoid hangs clearing the signal (reading fd) */ + ret = set_fd_nonblock(pollsignal[0], true); + if (ret) { + close(pollsignal[0]); + close(pollsignal[1]); + pollsignal[0] = 0; + pollsignal[1] = 0; + } +unlock: + pthread_mutex_unlock(&mut); + return ret; +} + +/* When an event occurs, we must wait until the state of all rsockets + * has settled. Then we need to re-check the rsocket state prior to + * blocking on poll(). + */ +static int rs_poll_enter(void) +{ + pthread_mutex_lock(&mut); + if (suspendpoll) { + pthread_mutex_unlock(&mut); + pthread_yield(); + return -EBUSY; + } + + pollcnt++; + pthread_mutex_unlock(&mut); + return 0; +} + +static void rs_poll_exit(void) +{ + ssize_t __attribute__((unused)) rc; + char c; + + pthread_mutex_lock(&mut); + if (!--pollcnt) { + rc = read(pollsignal[0], &c, sizeof(c)); + suspendpoll = 0; + } + pthread_mutex_unlock(&mut); +} + +/* When an event occurs, it's possible for a single thread blocked in + * poll to return from the kernel, read the event, and update the state + * of an rsocket. However, that can leave threads blocked in the kernel + * on poll (trying to read the CQ fd), which have had their rsocket + * state set. To avoid those threads remaining blocked in the kernel, + * we must wake them up and ensure that they all return to user space, + * in order to re-check the state of their rsockets. + * + * Because poll is racy wrt updating the rsocket states, we need to + * signal state checks whenever a thread updates the state of a + * monitored rsocket, independent of whether that thread actually + * reads an event from an fd. In other words, we must wake up all + * polling threads whenever poll() indicates that there is a new + * completion to process, and when rpoll() will return a successful + * value after having blocked. + */ +static void rs_poll_stop(void) +{ + ssize_t __attribute__((unused)) rc; + char c = 5; + + pthread_mutex_lock(&mut); + if (!--pollcnt) { + rc = read(pollsignal[0], &c, sizeof(c)); + suspendpoll = 0; + } else if (!suspendpoll) { + suspendpoll = 1; + write_all(pollsignal[1], &c, sizeof(c)); + } + pthread_mutex_unlock(&mut); +} + +/* We always add the pollsignal read fd to the poll fd set, so + * that we can signal any blocked threads. + */ static struct pollfd *rs_fds_alloc(nfds_t nfds) { static __thread struct pollfd *rfds; static __thread nfds_t rnfds; - if (nfds > rnfds) { + if (nfds + 1 > rnfds) { if (rfds) free(rfds); + else if (rs_pollinit()) + return NULL; - rfds = malloc(sizeof(*rfds) * nfds); - rnfds = rfds ? nfds : 0; + rfds = malloc(sizeof(*rfds) * nfds + 1); + rnfds = rfds ? nfds + 1 : 0; } + if (rfds) { + rfds[nfds].fd = pollsignal[0]; + rfds[nfds].events = POLLIN; + } return rfds; } @@ -3120,17 +3228,16 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds) int i, cnt = 0; for (i = 0; i < nfds; i++) { - if (!rfds[i].revents) - continue; - rs = idm_lookup(&idm, fds[i].fd); if (rs) { - fastlock_acquire(&rs->cq_wait_lock); - if (rs->type == SOCK_STREAM) - rs_get_cq_event(rs); - else - ds_get_cq_event(rs); - fastlock_release(&rs->cq_wait_lock); + if (rfds[i].revents) { + fastlock_acquire(&rs->cq_wait_lock); + if (rs->type == SOCK_STREAM) + rs_get_cq_event(rs); + else + ds_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); + } fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all); } else { fds[i].revents = rfds[i].revents; @@ -3174,17 +3281,23 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout) if (ret) break; + if (rs_poll_enter()) + continue; + if (timeout >= 0) { timeout -= (int) ((rs_time_us() - start_time) / 1000); if (timeout <= 0) return 0; } - ret = poll(rfds, nfds, timeout); - if (ret <= 0) + ret = poll(rfds, nfds + 1, timeout); + if (ret <= 0) { + rs_poll_exit(); break; + } ret = rs_poll_events(rfds, fds, nfds); + rs_poll_stop(); } while (!ret); return ret;