@@ -130,7 +130,7 @@ static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
if (ret == 0)
return 0;
else if (ret == SOCKET_ERROR)
- return WSAGetLastError();
+ return DAPL_FD_ERROR;
else if (FD_ISSET(s, &rw_fds))
return event;
else
@@ -161,6 +161,8 @@ static int dapl_socket_errno(void)
case WSAEACCES:
case WSAEADDRINUSE:
return EADDRINUSE;
+ case WSAECONNRESET:
+ return ECONNRESET;
default:
return err;
}
@@ -236,18 +238,17 @@ static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
s, ret, fds.revents);
if (ret == 0)
return 0;
- else if (fds.revents & (POLLERR | POLLHUP | POLLNVAL))
+ else if (ret < 0 || (fds.revents & (POLLERR | POLLHUP | POLLNVAL)))
return DAPL_FD_ERROR;
else
- return fds.revents;
+ return event;
}
static int dapl_select(struct dapl_fd_set *set)
{
int ret;
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep, fds=%d\n",
- set->index);
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep, fds=%d\n", set->index);
ret = poll(set->set, set->index, -1);
dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup, ret=0x%x\n", ret);
return ret;
@@ -368,13 +369,8 @@ multi_cleanup:
notify_thread:
/* wakeup work thread, if something destroyed */
- if (hca_ptr != NULL) {
- if (send(hca_ptr->ib_trans.scm[1],
- "w", sizeof "w", 0) == -1)
- dapl_log(DAPL_DBG_TYPE_CM,
- " cm_destroy: thread wakeup error = %s\n",
- strerror(errno));
- }
+ if (hca_ptr != NULL)
+ send(hca_ptr->ib_trans.scm[1], "w", sizeof "w", 0);
}
/* queue socket for processing CM work */
@@ -388,10 +384,7 @@ static void dapli_cm_queue(struct ib_cm_handle *cm_ptr)
dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
/* wakeup CM work thread */
- if (send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0) == -1)
- dapl_log(DAPL_DBG_TYPE_CM,
- " cm_queue: thread wakeup error = %s\n",
- strerror(errno));
+ send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);
}
/*
@@ -451,10 +444,9 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
err == -1 ? "POLL" : "SOCKOPT",
err == -1 ? strerror(errno) : strerror(err),
inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr),
+ &cm_ptr->addr)->sin_addr),
ntohs(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_port));
+ &cm_ptr->addr)->sin_port));
goto bail;
}
@@ -462,7 +454,7 @@ static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
ret = setsockopt(cm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
(char *)&opt, sizeof(opt));
if (ret)
- dapl_log(DAPL_DBG_TYPE_WARN,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" CONN_PENDING: NODELAY setsockopt: %s\n",
strerror(errno));
@@ -522,42 +514,48 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
{
dp_ib_cm_handle_t cm_ptr;
int ret;
+ socklen_t sl;
DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- struct sockaddr_in addr;
+ DAT_RETURN dat_ret = DAT_INSUFFICIENT_RESOURCES;
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",
r_qual, p_size);
cm_ptr = dapls_ib_cm_create(ep_ptr);
if (cm_ptr == NULL)
- return DAT_INSUFFICIENT_RESOURCES;
+ return dat_ret;
/* create, connect, sockopt, and exchange QP information */
if ((cm_ptr->socket =
socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {
- dapl_os_free(cm_ptr, sizeof(*cm_ptr));
- return DAT_INSUFFICIENT_RESOURCES;
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect: socket create ERR %s\n", strerror(errno));
+ goto bail;
}
ret = dapl_config_socket(cm_ptr->socket);
if (ret < 0) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " socket connect: config socket %d ERR %d %s\n",
+ " connect: config socket %d ERR %d %s\n",
cm_ptr->socket, ret, strerror(errno));
+ dat_ret = DAT_INTERNAL_ERROR;
goto bail;
}
- dapl_os_memcpy(&addr, r_addr, sizeof(addr));
- addr.sin_port = htons(r_qual+1000);
- ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *)&addr,
- sizeof(addr));
+ /* save remote address */
+ dapl_os_memcpy(&cm_ptr->addr, r_addr, sizeof(r_addr));
+
+#ifdef DAPL_DBG
+ /* DBG: Active PID [0], PASSIVE PID [2]*/
+ *(uint16_t*)&cm_ptr->msg.resv[0] = htons((uint16_t)dapl_os_getpid());
+ *(uint16_t*)&cm_ptr->msg.resv[2] = ((struct sockaddr_in *)&cm_ptr->addr)->sin_port;
+#endif
+ ((struct sockaddr_in *)&cm_ptr->addr)->sin_port = htons(r_qual + 1000);
+ ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *)&cm_ptr->addr,
+ sizeof(cm_ptr->addr));
if (ret && ret != EAGAIN) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " socket connect ERROR: %s -> %s r_qual %d\n",
- strerror(errno),
- inet_ntoa(addr.sin_addr), (unsigned int)r_qual);
- dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
- return DAT_INVALID_ADDRESS;
+ dat_ret = DAT_INVALID_ADDRESS;
+ goto bail;
}
/* REQ: QP info in msg.saddr, IA address in msg.daddr, and pdata */
@@ -571,9 +569,16 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
/* save references */
cm_ptr->hca = ia_ptr->hca_ptr;
cm_ptr->ep = ep_ptr;
- cm_ptr->msg.daddr.so = ia_ptr->hca_ptr->hca_address;
- ((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_port = ntohs((uint16_t)r_qual);
+
+ /* get local address information from socket */
+ sl = sizeof(cm_ptr->msg.daddr.so);
+ if (getsockname(cm_ptr->socket, (struct sockaddr *)&cm_ptr->msg.daddr.so, &sl)) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " connect getsockname ERROR: %s -> %s r_qual %d\n",
+ strerror(errno),
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
+ (unsigned int)r_qual);;
+ }
if (p_size) {
cm_ptr->msg.p_size = htons(p_size);
@@ -591,8 +596,8 @@ dapli_socket_connect(DAPL_EP * ep_ptr,
dapl_dbg_log(DAPL_DBG_TYPE_EP,
" connect: %s r_qual %d pending, p_sz=%d, %d %d ...\n",
- inet_ntoa(addr.sin_addr), (unsigned int)r_qual,
- ntohs(cm_ptr->msg.p_size),
+ inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),
+ (unsigned int)r_qual, ntohs(cm_ptr->msg.p_size),
cm_ptr->msg.p_data[0], cm_ptr->msg.p_data[1]);
dapli_cm_queue(cm_ptr);
@@ -606,8 +611,8 @@ bail:
(unsigned int)r_qual);
/* close socket, free cm structure */
- dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
- return DAT_INTERNAL_ERROR;
+ dapls_ib_cm_free(cm_ptr, NULL);
+ return dat_ret;
}
/*
@@ -618,18 +623,32 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
DAPL_EP *ep_ptr = cm_ptr->ep;
int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
ib_cm_events_t event = IB_CME_LOCAL_FAILURE;
+ socklen_t sl;
/* read DST information into cm_ptr, overwrite SRC info */
dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: recv peer QP data\n");
len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, exp, 0);
if (len != exp || ntohs(cm_ptr->msg.ver) != DCM_VER) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU read: ERR %s, rcnt=%d, ver=%d -> %s\n",
- strerror(errno), len, cm_ptr->msg.ver,
- inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.remote_ia_address_ptr)->
- sin_addr));
+ dapl_log(DAPL_DBG_TYPE_WARN,
+ " CONN_RTU read: sk %d ERR %s, rcnt=%d, v=%d -> %s PORT L-%x R-%x PID L-%x R-%x\n",
+ cm_ptr->socket, strerror(errno), len, ntohs(cm_ptr->msg.ver),
+ inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),
+ ntohs(((struct sockaddr_in *)&cm_ptr->msg.daddr.so)->sin_port),
+ ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port),
+ ntohs(*(uint16_t*)&cm_ptr->msg.resv[0]),
+ ntohs(*(uint16_t*)&cm_ptr->msg.resv[2]));
+
+ /* Retry; corner case where server tcp stack resets under load */
+ if (dapl_socket_errno() == ECONNRESET) {
+ closesocket(cm_ptr->socket);
+ cm_ptr->socket = DAPL_INVALID_SOCKET;
+ dapli_socket_connect(cm_ptr->ep, (DAT_IA_ADDRESS_PTR)&cm_ptr->addr,
+ ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port) - 1000,
+ ntohs(cm_ptr->msg.p_size), &cm_ptr->msg.p_data);
+ dapls_ib_cm_free(cm_ptr, NULL);
+ return;
+ }
goto bail;
}
@@ -640,6 +659,10 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
&cm_ptr->msg.daddr.so,
sizeof(union dcm_addr));
+ /* save local address information from socket */
+ sl = sizeof(cm_ptr->addr);
+ getsockname(cm_ptr->socket,(struct sockaddr *)&cm_ptr->addr, &sl);
+
dapl_dbg_log(DAPL_DBG_TYPE_EP,
" CONN_RTU: DST %s %d lid=0x%x,"
" qpn=0x%x, qp_type=%d, psize=%d\n",
@@ -689,15 +712,11 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
if (event != IB_CME_CONNECTED) {
dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU: reject from %s\n",
+ " CONN_RTU: reject from %s %x\n",
inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr));
-#ifdef DAT_EXTENSIONS
- if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD)
- goto ud_bail;
- else
-#endif
+ &cm_ptr->msg.daddr.so)->sin_addr),
+ ntohs(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_port));
goto bail;
}
@@ -709,11 +728,15 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
cm_ptr->msg.saddr.ib.lid,
NULL) != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: QPS_RTR ERR %s -> %s\n",
- strerror(errno),
+ " CONN_RTU: QPS_RTR ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",
+ strerror(errno), ep_ptr->qp_handle->qp_type,
+ ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,
+ ntohl(cm_ptr->msg.saddr.ib.qpn),
+ ntohs(cm_ptr->msg.saddr.ib.lid),
inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr));
+ &cm_ptr->msg.daddr.so)->sin_addr),
+ ntohs(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_port));
dapl_os_unlock(&ep_ptr->header.lock);
goto bail;
}
@@ -723,11 +746,15 @@ static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
cm_ptr->msg.saddr.ib.lid,
NULL) != DAT_SUCCESS) {
dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: QPS_RTS ERR %s -> %s\n",
- strerror(errno),
+ " CONN_RTU: QPS_RTS ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",
+ strerror(errno), ep_ptr->qp_handle->qp_type,
+ ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,
+ ntohl(cm_ptr->msg.saddr.ib.qpn),
+ ntohs(cm_ptr->msg.saddr.ib.lid),
inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr));
+ &cm_ptr->msg.daddr.so)->sin_addr),
+ ntohs(((struct sockaddr_in *)
+ &cm_ptr->msg.daddr.so)->sin_port));
dapl_os_unlock(&ep_ptr->header.lock);
goto bail;
}
@@ -753,34 +780,37 @@ ud_bail:
ib_pd_handle_t pd_handle =
((DAPL_PZ *)ep_ptr->param.pz_handle)->pd_handle;
- cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
- ep_ptr->qp_handle,
- cm_ptr->msg.saddr.ib.lid,
- NULL);
- if (!cm_ptr->ah) {
- event = IB_CME_LOCAL_FAILURE;
- goto bail;
- }
-
- dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU: UD AH %p for lid 0x%x qpn 0x%x\n",
- cm_ptr->ah, ntohs(cm_ptr->msg.saddr.ib.lid),
- ntohl(cm_ptr->msg.saddr.ib.qpn));
-
- /* post EVENT, modify_qp created ah */
- xevent.status = 0;
- xevent.type = DAT_IB_UD_REMOTE_AH;
- xevent.remote_ah.ah = cm_ptr->ah;
- xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
- dapl_os_memcpy(&xevent.remote_ah.ia_addr,
- &ep_ptr->remote_ia_address,
- sizeof(union dcm_addr));
-
- if (event == IB_CME_CONNECTED)
- event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
- else
+ if (event == IB_CME_CONNECTED) {
+ cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
+ ep_ptr->qp_handle,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL);
+ if (cm_ptr->ah) {
+ /* post UD extended EVENT */
+ xevent.status = 0;
+ xevent.type = DAT_IB_UD_REMOTE_AH;
+ xevent.remote_ah.ah = cm_ptr->ah;
+ xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
+ dapl_os_memcpy(&xevent.remote_ah.ia_addr,
+ &ep_ptr->remote_ia_address,
+ sizeof(union dcm_addr));
+ event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
+
+ dapl_log(DAPL_DBG_TYPE_CM,
+ " CONN_RTU: UD AH %p for lid 0x%x"
+ " qpn 0x%x\n",
+ cm_ptr->ah,
+ ntohs(cm_ptr->msg.saddr.ib.lid),
+ ntohl(cm_ptr->msg.saddr.ib.qpn));
+
+ } else
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
+
+ } else if (event == IB_CME_LOCAL_FAILURE) {
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
+ } else
event = DAT_IB_UD_CONNECTION_REJECT_EVENT;
-
+
dapls_evd_post_connection_event_ext(
(DAPL_EVD *) ep_ptr->param.connect_evd_handle,
event,
@@ -802,6 +832,11 @@ ud_bail:
return;
bail:
+
+#ifdef DAT_EXTENSIONS
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD)
+ goto ud_bail;
+#endif
/* close socket, and post error event */
dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
closesocket(cm_ptr->socket);
@@ -839,7 +874,7 @@ dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
goto bail;
}
- addr.sin_port = htons(serviceID+1000);
+ addr.sin_port = htons(serviceID + 1000);
addr.sin_family = AF_INET;
addr.sin_addr = ((struct sockaddr_in *) &ia_ptr->hca_ptr->hca_address)->sin_addr;
@@ -847,7 +882,7 @@ dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
|| (listen(cm_ptr->socket, 128) < 0)) {
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" listen: ERROR %s on conn_qual 0x%x\n",
- strerror(errno), serviceID);
+ strerror(errno), serviceID + 1000);
if (dapl_socket_errno() == EADDRINUSE)
dat_status = DAT_CONN_QUAL_IN_USE;
else
@@ -857,6 +892,7 @@ dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
/* set cm_handle for this service point, save listen socket */
sp_ptr->cm_srvc_handle = cm_ptr;
+ dapl_os_memcpy(&cm_ptr->addr, &addr, sizeof(addr));
/* queue up listen socket to process inbound CR's */
cm_ptr->state = DCM_LISTEN;
@@ -864,12 +900,12 @@ dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" listen: qual 0x%x cr %p s_fd %d\n",
- ntohs(serviceID), cm_ptr, cm_ptr->socket);
+ ntohs(serviceID + 1000), cm_ptr, cm_ptr->socket);
return dat_status;
bail:
dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " listen: ERROR on conn_qual 0x%x\n", serviceID);
+ " listen: ERROR on conn_qual 0x%x\n", serviceID + 1000);
dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
return dat_status;
}
@@ -881,6 +917,7 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
{
dp_ib_cm_handle_t acm_ptr;
int ret, len, opt = 1;
+ socklen_t sl;
/*
* Accept all CR's on this port to avoid half-connection (SYN_RCV)
@@ -906,18 +943,23 @@ static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
dapls_ib_cm_free(acm_ptr, acm_ptr->ep);
return;
}
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s\n",
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s %x\n",
inet_ntoa(((struct sockaddr_in *)
- &acm_ptr->msg.daddr.so)->sin_addr));
+ &acm_ptr->msg.daddr.so)->sin_addr),
+ ntohs(((struct sockaddr_in *)
+ &acm_ptr->msg.daddr.so)->sin_port));
/* no delay for small packets */
ret = setsockopt(acm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
(char *)&opt, sizeof(opt));
if (ret)
- dapl_log(DAPL_DBG_TYPE_WARN,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" ACCEPT: NODELAY setsockopt: %s\n",
strerror(errno));
-
+
+ /* get local address information from socket */
+ sl = sizeof(acm_ptr->addr);
+ getsockname(acm_ptr->socket, (struct sockaddr *)&acm_ptr->addr, &sl);
acm_ptr->state = DCM_ACCEPTING;
dapli_cm_queue(acm_ptr);
@@ -948,7 +990,7 @@ static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
/* validate private data size before reading */
exp = ntohs(acm_ptr->msg.p_size);
if (exp > DCM_MAX_PDATA_SIZE) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
+ dapl_log(DAPL_DBG_TYPE_ERR,
" accept read: psize (%d) wrong\n",
acm_ptr->msg.p_size);
goto bail;
@@ -968,8 +1010,8 @@ static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
acm_ptr->state = DCM_ACCEPTING_DATA;
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " ACCEPT: DST %s %d lid=0x%x, qpn=0x%x, psz=%d\n",
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
+ " ACCEPT: DST %s %x lid=0x%x, qpn=0x%x, psz=%d\n",
inet_ntoa(((struct sockaddr_in *)
&acm_ptr->msg.daddr.so)->sin_addr),
ntohs(((struct sockaddr_in *)
@@ -1018,15 +1060,23 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
ib_cm_msg_t local;
struct iovec iov[2];
int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
+ DAT_RETURN ret = DAT_INTERNAL_ERROR;
+ socklen_t sl;
- if (p_size > DCM_MAX_PDATA_SIZE)
+ if (p_size > DCM_MAX_PDATA_SIZE) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " accept_usr: psize(%d) too large\n", p_size);
return DAT_LENGTH_ERROR;
+ }
/* must have a accepted socket */
- if (cm_ptr->socket == DAPL_INVALID_SOCKET)
- return DAT_INTERNAL_ERROR;
+ if (cm_ptr->socket == DAPL_INVALID_SOCKET) {
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " accept_usr: cm socket invalid\n");
+ goto bail;
+ }
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,
" ACCEPT_USR: remote lid=0x%x"
" qpn=0x%x qp_type %d, psize=%d\n",
ntohs(cm_ptr->msg.saddr.ib.lid),
@@ -1037,10 +1087,11 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
#ifdef DAT_EXTENSIONS
if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD &&
ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
- dapl_dbg_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_USR: ERR remote QP is UD,"
- ", but local QP is not\n");
- return (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
+ dapl_log(DAPL_DBG_TYPE_ERR,
+ " ACCEPT_USR: ERR remote QP is UD,"
+ ", but local QP is not\n");
+ ret = (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
+ goto bail;
}
#endif
@@ -1087,9 +1138,17 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
local.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
dapl_os_memcpy(&local.saddr.ib.gid[0],
&ia_ptr->hca_ptr->ib_trans.gid, 16);
- local.daddr.so = ia_ptr->hca_ptr->hca_address;
- ((struct sockaddr_in *)&local.daddr.so)->sin_port =
- htons((uint16_t)cm_ptr->sp->conn_qual);
+
+ /* Get local address information from socket */
+ sl = sizeof(local.daddr.so);
+ getsockname(cm_ptr->socket, (struct sockaddr *)&local.daddr.so, &sl);
+
+#ifdef DAPL_DBG
+ /* DBG: Active PID [0], PASSIVE PID [2] */
+ *(uint16_t*)&cm_ptr->msg.resv[2] = htons((uint16_t)dapl_os_getpid());
+ dapl_os_memcpy(local.resv, cm_ptr->msg.resv, 4);
+#endif
+
cm_ptr->ep = ep_ptr;
cm_ptr->hca = ia_ptr->hca_ptr;
cm_ptr->state = DCM_ACCEPTED;
@@ -1111,6 +1170,7 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
strerror(errno), len,
inet_ntoa(((struct sockaddr_in *)
&cm_ptr->msg.daddr.so)->sin_addr));
+ cm_ptr->ep = NULL;
goto bail;
}
@@ -1128,9 +1188,8 @@ dapli_socket_accept_usr(DAPL_EP * ep_ptr,
dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: accepted!\n");
return DAT_SUCCESS;
bail:
- dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
- return DAT_INTERNAL_ERROR;
+ dapls_ib_cm_free(cm_ptr, NULL);
+ return ret;
}
/*
@@ -1160,39 +1219,42 @@ static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: connected!\n");
#ifdef DAT_EXTENSIONS
+ud_bail:
if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
DAT_IB_EXTENSION_EVENT_DATA xevent;
ib_pd_handle_t pd_handle =
((DAPL_PZ *)cm_ptr->ep->param.pz_handle)->pd_handle;
-
- cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
- cm_ptr->ep->qp_handle,
- cm_ptr->msg.saddr.ib.lid,
- NULL);
- if (!cm_ptr->ah) {
- event = IB_CME_LOCAL_FAILURE;
- goto bail;
- }
+
+ if (event == IB_CME_CONNECTED) {
+ cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
+ cm_ptr->ep->qp_handle,
+ cm_ptr->msg.saddr.ib.lid,
+ NULL);
+ if (cm_ptr->ah) {
+ /* post EVENT, modify_qp created ah */
+ xevent.status = 0;
+ xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;
+ xevent.remote_ah.ah = cm_ptr->ah;
+ xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
+ dapl_os_memcpy(&xevent.remote_ah.ia_addr,
+ &cm_ptr->msg.daddr.so,
+ sizeof(union dcm_addr));
+ event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
+ } else
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
+ } else
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
dapl_log(DAPL_DBG_TYPE_CM,
" CONN_RTU: UD AH %p for lid 0x%x qpn 0x%x\n",
cm_ptr->ah, ntohs(cm_ptr->msg.saddr.ib.lid),
ntohl(cm_ptr->msg.saddr.ib.qpn));
- /* post EVENT, modify_qp created ah */
- xevent.status = 0;
- xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;
- xevent.remote_ah.ah = cm_ptr->ah;
- xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
- dapl_os_memcpy(&xevent.remote_ah.ia_addr,
- &cm_ptr->msg.daddr.so,
- sizeof(union dcm_addr));
-
dapls_evd_post_connection_event_ext(
(DAPL_EVD *)
cm_ptr->ep->param.connect_evd_handle,
- DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED,
+ event,
(DAT_EP_HANDLE) cm_ptr->ep,
(DAT_COUNT) ntohs(cm_ptr->msg.p_size),
(DAT_PVOID *) cm_ptr->msg.p_data,
@@ -1211,6 +1273,10 @@ static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
return;
bail:
+#ifdef DAT_EXTENSIONS
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD)
+ goto ud_bail;
+#endif
dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0, 0, 0);
dapls_ib_cm_free(cm_ptr, cm_ptr->ep);
dapls_cr_callback(cm_ptr, event, NULL, cm_ptr->sp);
@@ -1716,8 +1782,11 @@ void cr_thread(void *arg)
dapl_dbg_log(DAPL_DBG_TYPE_CM,
" CR FREE: %p ep=%p st=%d sock=%d\n",
cr, cr->ep, cr->state, cr->socket);
- shutdown(cr->socket, SHUT_RDWR);
- closesocket(cr->socket);
+
+ if (cr->socket != DAPL_INVALID_SOCKET) {
+ shutdown(cr->socket, SHUT_RDWR);
+ closesocket(cr->socket);
+ }
dapl_os_free(cr, sizeof(*cr));
continue;
}
@@ -1751,7 +1820,9 @@ void cr_thread(void *arg)
ret, cr->state, cr->socket);
/* data on listen, qp exchange, and on disc req */
- if (ret == DAPL_FD_READ) {
+ if ((ret == DAPL_FD_READ) ||
+ (cr->state != DCM_CONN_PENDING &&
+ ret == DAPL_FD_ERROR)) {
if (cr->socket != DAPL_INVALID_SOCKET) {
switch (cr->state) {
case DCM_LISTEN:
@@ -1773,29 +1844,24 @@ void cr_thread(void *arg)
break;
}
}
- /* connect socket is writable, check status */
+ /* ASYNC connections, writable, readable, error; check status */
} else if (ret == DAPL_FD_WRITE ||
(cr->state == DCM_CONN_PENDING &&
ret == DAPL_FD_ERROR)) {
+
+ if (ret == DAPL_FD_ERROR)
+ dapl_log(DAPL_DBG_TYPE_ERR, " CONN_PENDING - FD_ERROR\n");
+
opt = 0;
opt_len = sizeof(opt);
ret = getsockopt(cr->socket, SOL_SOCKET,
SO_ERROR, (char *)&opt,
&opt_len);
- if (!ret)
+ if (!ret && !opt)
dapli_socket_connected(cr, opt);
else
- dapli_socket_connected(cr, dapl_socket_errno());
-
- /* POLLUP, ERR, NVAL, or poll error - DISC */
- } else if (ret < 0 || ret == DAPL_FD_ERROR) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " poll=%d cr->st=%s sk=%d ep %p, %d\n",
- ret, dapl_cm_state_str(cr->state),
- cr->socket, cr->ep,
- cr->ep ? cr->ep->param.ep_state : 0);
- dapli_socket_disconnect(cr);
- }
+ dapli_socket_connected(cr, opt ? opt : dapl_socket_errno());
+ }
dapl_os_lock(&hca_ptr->ib_trans.lock);
}
@@ -1854,19 +1920,29 @@ void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
&ia_ptr->hca_ptr->ib_trans.list,
(DAPL_LLIST_ENTRY*)&cr->entry);
- printf( " CONN[%d]: sp %p ep %p sock %d %s %s %s %s %d\n",
+ printf( " CONN[%d]: sp %p ep %p sock %d %s %s %s %s %s %s PORT L-%x R-%x PID L-%x R-%x\n",
i, cr->sp, cr->ep, cr->socket,
cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
- dapl_cm_state_str(cr->state),
+ dapl_cm_state_str(cr->state), dapl_cm_op_str(ntohs(cr->msg.op)),
+ ntohs(cr->msg.op) == DCM_REQ ? /* local address */
+ inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr) :
+ inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr),
cr->sp ? "<-" : "->",
- cr->state == DCM_LISTEN ?
- inet_ntoa(((struct sockaddr_in *)
- &ia_ptr->hca_ptr->hca_address)->sin_addr) :
- inet_ntoa(((struct sockaddr_in *)
- &cr->msg.daddr.so)->sin_addr),
- cr->sp ? (int)cr->sp->conn_qual :
- ntohs(((struct sockaddr_in *)
- &cr->msg.daddr.so)->sin_port));
+ ntohs(cr->msg.op) == DCM_REQ ? /* remote address */
+ inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr) :
+ inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr),
+
+ ntohs(cr->msg.op) == DCM_REQ ? /* local port */
+ ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port) :
+ ntohs(((struct sockaddr_in *)&cr->addr)->sin_port),
+
+ ntohs(cr->msg.op) == DCM_REQ ? /* remote port */
+ ntohs(((struct sockaddr_in *)&cr->addr)->sin_port) :
+ ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port),
+
+ cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[2]) : ntohs(*(uint16_t*)&cr->msg.resv[0]),
+ cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[0]) : ntohs(*(uint16_t*)&cr->msg.resv[2]));
+
i++;
}
printf("\n");
@@ -42,6 +42,7 @@ struct ib_cm_handle
struct dapl_ep *ep;
ib_cm_msg_t msg;
struct ibv_ah *ah;
+ DAT_SOCK_ADDR6 addr;
};
typedef struct ib_cm_handle *dp_ib_cm_handle_t;
@@ -268,7 +268,12 @@ DAT_RETURN dapls_ib_open_hca(IN IB_HCA_NAME hca_name, IN DAPL_HCA * hca_ptr)
if (dat_status != DAT_SUCCESS)
return dat_status;
- /* Get list of all IB devices, find match, open */
+#ifdef DAPL_DBG
+ /* DBG: unused port, set process id, lower 16 bits of pid */
+ ((struct sockaddr_in *)&hca_ptr->hca_address)->sin_port =
+ htons((uint16_t)dapl_os_getpid());
+#endif
+ /* Get list of all IB devices, find match, open */
dev_list = ibv_get_device_list(NULL);
if (!dev_list) {
dapl_dbg_log(DAPL_DBG_TYPE_ERR,