diff mbox

msgr: Correctly handle half-open connections.

Message ID 1291318895-11879-1-git-send-email-jaschut@sandia.gov (mailing list archive)
State New, archived
Headers show

Commit Message

Jim Schutt Dec. 2, 2010, 7:41 p.m. UTC
None
diff mbox

Patch

diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc
index 97ed4c3..0c9a0bd 100644
--- a/src/msg/SimpleMessenger.cc
+++ b/src/msg/SimpleMessenger.cc
@@ -1875,7 +1875,7 @@  int SimpleMessenger::Pipe::read_message(Message **pm)
 	
     while (left > 0) {
       // wait for data
-      if (tcp_wait(sd, messenger->timeout) < 0)
+      if (tcp_read_wait(sd, messenger->timeout) < 0)
 	goto out_dethrottle;
 
       // get a buffer
diff --git a/src/msg/tcp.cc b/src/msg/tcp.cc
index c1be756..71d85f1 100644
--- a/src/msg/tcp.cc
+++ b/src/msg/tcp.cc
@@ -13,9 +13,6 @@  int tcp_read(int sd, char *buf, int len, int timeout)
   if (sd < 0)
     return -1;
 
-  struct pollfd pfd;
-  pfd.fd = sd;
-  pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
   while (len > 0) {
 
     if (g_conf.ms_inject_socket_failures && sd >= 0) {
@@ -25,24 +22,14 @@  int tcp_read(int sd, char *buf, int len, int timeout)
       }
     }
 
-    if (poll(&pfd, 1, timeout) <= 0)
+    if (tcp_read_wait(sd, timeout) < 0)
       return -1;
 
-    if (!(pfd.revents & POLLIN))
-      return -1;
+    int got = tcp_read_nonblocking(sd, buf, len);
 
-    /*
-     * although we turn on the MSG_DONTWAIT flag, we don't expect
-     * receivng an EAGAIN, as we polled on the socket, so there
-     * should be data waiting for us.
-     */
-    int got = ::recv( sd, buf, len, MSG_DONTWAIT );
-    if (got <= 0) {
-      //char buf[100];
-      //generic_dout(0) << "tcp_read socket " << sd << " returned " << got
-      //<< " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+    if (got < 0)
       return -1;
-    }
+
     len -= got;
     buf += got;
     //generic_dout(DBL) << "tcp_read got " << got << ", " << len << " left" << dendl;
@@ -50,26 +37,51 @@  int tcp_read(int sd, char *buf, int len, int timeout)
   return len;
 }
 
-int tcp_wait(int sd, int timeout) 
+int tcp_read_wait(int sd, int timeout) 
 {
   if (sd < 0)
     return -1;
   struct pollfd pfd;
   pfd.fd = sd;
-  pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
+  pfd.events = POLLIN | POLLRDHUP;
 
   if (poll(&pfd, 1, timeout) <= 0)
     return -1;
 
+  if (pfd.revents & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL))
+    return -1;
+
   if (!(pfd.revents & POLLIN))
     return -1;
 
   return 0;
 }
 
+/* This function can only be called if poll/select says there is
+ * data available.  Otherwise we cannot properly interpret a
+ * read of 0 bytes.
+ */
 int tcp_read_nonblocking(int sd, char *buf, int len)
 {
-  return ::recv(sd, buf, len, MSG_DONTWAIT);
+again:
+  int got = ::recv( sd, buf, len, MSG_DONTWAIT );
+  if (got < 0) {
+    if (errno == EAGAIN || errno == EINTR) {
+      goto again;
+    } else {
+      char buf[100];
+      generic_dout(10) << "tcp_read_nonblocking socket " << sd << " returned "
+        << got << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+      return -1;
+    }
+  } else if (got == 0) {
+    /* poll() said there was data, but we didn't read any - peer
+     * sent a FIN.  Maybe POLLRDHUP signals this, but this is
+     * standard socket behavior as documented by Stevens.
+     */
+    return -1;
+  }
+  return got;
 }
 
 int tcp_write(int sd, const char *buf, int len)
diff --git a/src/msg/tcp.h b/src/msg/tcp.h
index 31ae967..bccdbda 100644
--- a/src/msg/tcp.h
+++ b/src/msg/tcp.h
@@ -26,7 +26,7 @@  inline ostream& operator<<(ostream& out, const sockaddr_storage &ss)
 }
 
 extern int tcp_read(int sd, char *buf, int len, int timeout=-1);
-extern int tcp_wait(int sd, int timeout);
+extern int tcp_read_wait(int sd, int timeout);
 extern int tcp_read_nonblocking(int sd, char *buf, int len);
 extern int tcp_write(int sd, const char *buf, int len);