tcp_iface.cc revision 11622
1/* 2 * Copyright (c) 2015 ARM Limited 3 * All rights reserved 4 * 5 * The license below extends only to copyright in the software and shall 6 * not be construed as granting a license to any other intellectual 7 * property including but not limited to intellectual property relating 8 * to a hardware implementation of the functionality of the software 9 * licensed hereunder. You may use the software subject to the license 10 * terms below provided that you ensure that this notice is replicated 11 * unmodified and in its entirety in all distributions of the software, 12 * modified or unmodified, in source code or in binary form. 13 * 14 * Redistribution and use in source and binary forms, with or without 15 * modification, are permitted provided that the following conditions are 16 * met: redistributions of source code must retain the above copyright 17 * notice, this list of conditions and the following disclaimer; 18 * redistributions in binary form must reproduce the above copyright 19 * notice, this list of conditions and the following disclaimer in the 20 * documentation and/or other materials provided with the distribution; 21 * neither the name of the copyright holders nor the names of its 22 * contributors may be used to endorse or promote products derived from 23 * this software without specific prior written permission. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 36 * 37 * Authors: Gabor Dozsa 38 * Mohammad Alian 39 */ 40 41/* @file 42 * TCP stream socket based interface class implementation for dist-gem5 runs. 43 */ 44 45#include "dev/net/tcp_iface.hh" 46 47#include <arpa/inet.h> 48#include <netdb.h> 49#include <netinet/tcp.h> 50#include <sys/socket.h> 51#include <sys/types.h> 52#include <unistd.h> 53 54#include <cerrno> 55#include <cstring> 56#include <vector> 57 58#include "base/types.hh" 59#include "debug/DistEthernet.hh" 60#include "debug/DistEthernetCmd.hh" 61#include "sim/sim_exit.hh" 62 63#if defined(__FreeBSD__) 64#include <netinet/in.h> 65 66#endif 67 68// MSG_NOSIGNAL does not exists on OS X 69#if defined(__APPLE__) || defined(__MACH__) 70#ifndef MSG_NOSIGNAL 71#define MSG_NOSIGNAL SO_NOSIGPIPE 72#endif 73#endif 74 75using namespace std; 76 77std::vector<std::pair<TCPIface::NodeInfo, int> > TCPIface::nodes; 78vector<int> TCPIface::sockRegistry; 79int TCPIface::fdStatic = -1; 80bool TCPIface::anyListening = false; 81 82TCPIface::TCPIface(string server_name, unsigned server_port, 83 unsigned dist_rank, unsigned dist_size, 84 Tick sync_start, Tick sync_repeat, 85 EventManager *em, bool is_switch, int num_nodes) : 86 DistIface(dist_rank, dist_size, sync_start, sync_repeat, em, 87 is_switch, num_nodes), serverName(server_name), 88 serverPort(server_port), isSwitch(is_switch), listening(false) 89{ 90 if (is_switch && isMaster) { 91 while (!listen(serverPort)) { 92 DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n", 93 serverPort); 94 serverPort++; 95 } 96 inform("tcp_iface listening on port %d", serverPort); 97 // Now accept the first connection requests from each compute node and 98 // store the node info. The compute nodes will then wait for ack 99 // messages. Ack messages will be sent by initTransport() in the 100 // appropriate order to make sure that every compute node is always 101 // connected to the same switch port. 102 NodeInfo ni; 103 for (int i = 0; i < size; i++) { 104 accept(); 105 DPRINTF(DistEthernet, "First connection, waiting for link info\n"); 106 if (!recvTCP(sock, &ni, sizeof(ni))) 107 panic("Failed to receive link info"); 108 nodes.push_back(make_pair(ni, sock)); 109 } 110 } 111} 112 113bool 114TCPIface::listen(int port) 115{ 116 if (listening) 117 panic("Socket already listening!"); 118 119 struct sockaddr_in sockaddr; 120 int ret; 121 122 fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); 123 panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno)); 124 125 sockaddr.sin_family = PF_INET; 126 sockaddr.sin_addr.s_addr = INADDR_ANY; 127 sockaddr.sin_port = htons(port); 128 // finally clear sin_zero 129 memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero)); 130 ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr)); 131 132 if (ret != 0) { 133 if (ret == -1 && errno != EADDRINUSE) 134 panic("ListenSocket(listen): bind() failed!"); 135 return false; 136 } 137 138 if (::listen(fdStatic, 24) == -1) { 139 if (errno != EADDRINUSE) 140 panic("ListenSocket(listen): listen() failed!"); 141 142 return false; 143 } 144 145 listening = true; 146 anyListening = true; 147 return true; 148} 149 150void 151TCPIface::establishConnection() 152{ 153 static unsigned cur_rank = 0; 154 static unsigned cur_id = 0; 155 NodeInfo ni; 156 157 if (isSwitch) { 158 if (cur_id == 0) { // first connection accepted in the ctor already 159 auto const &iface0 = 160 find_if(nodes.begin(), nodes.end(), 161 [](const pair<NodeInfo, int> &cn) -> bool { 162 return cn.first.rank == cur_rank; 163 }); 164 assert(iface0 != nodes.end()); 165 assert(iface0->first.distIfaceId == 0); 166 sock = iface0->second; 167 ni = iface0->first; 168 } else { // additional connections from the same compute node 169 accept(); 170 DPRINTF(DistEthernet, "Next connection, waiting for link info\n"); 171 if (!recvTCP(sock, &ni, sizeof(ni))) 172 panic("Failed to receive link info"); 173 assert(ni.rank == cur_rank); 174 assert(ni.distIfaceId == cur_id); 175 } 176 inform("Link okay (iface:%d -> (node:%d, iface:%d))", 177 distIfaceId, ni.rank, ni.distIfaceId); 178 if (ni.distIfaceId < ni.distIfaceNum - 1) { 179 cur_id++; 180 } else { 181 cur_rank++; 182 cur_id = 0; 183 } 184 // send ack 185 ni.distIfaceId = distIfaceId; 186 ni.distIfaceNum = distIfaceNum; 187 sendTCP(sock, &ni, sizeof(ni)); 188 } else { // this is not a switch 189 connect(); 190 // send link info 191 ni.rank = rank; 192 ni.distIfaceId = distIfaceId; 193 ni.distIfaceNum = distIfaceNum; 194 sendTCP(sock, &ni, sizeof(ni)); 195 DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n", 196 distIfaceId); 197 if (!recvTCP(sock, &ni, sizeof(ni))) 198 panic("Failed to receive ack"); 199 assert(ni.rank == rank); 200 inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId, 201 ni.distIfaceId); 202 } 203 sockRegistry.push_back(sock); 204} 205 206void 207TCPIface::accept() 208{ 209 struct sockaddr_in sockaddr; 210 socklen_t slen = sizeof (sockaddr); 211 sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen); 212 if (sock != -1) { 213 int i = 1; 214 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i, 215 sizeof(i)) < 0) 216 warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!"); 217 } 218} 219 220void 221TCPIface::connect() 222{ 223 struct addrinfo addr_hint, *addr_results; 224 int ret; 225 226 string port_str = to_string(serverPort); 227 228 sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); 229 panic_if(sock < 0, "socket() failed: %s", strerror(errno)); 230 231 int fl = 1; 232 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0) 233 warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!"); 234 235 bzero(&addr_hint, sizeof(addr_hint)); 236 addr_hint.ai_family = AF_INET; 237 addr_hint.ai_socktype = SOCK_STREAM; 238 addr_hint.ai_protocol = IPPROTO_TCP; 239 240 ret = getaddrinfo(serverName.c_str(), port_str.c_str(), 241 &addr_hint, &addr_results); 242 panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); 243 244 DPRINTF(DistEthernet, "Connecting to %s:%s\n", 245 serverName.c_str(), port_str.c_str()); 246 247 ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), 248 addr_results->ai_addrlen); 249 panic_if(ret < 0, "connect() failed: %s", strerror(errno)); 250 251 freeaddrinfo(addr_results); 252} 253 254TCPIface::~TCPIface() 255{ 256 int M5_VAR_USED ret; 257 258 ret = close(sock); 259 assert(ret == 0); 260} 261 262void 263TCPIface::sendTCP(int sock, const void *buf, unsigned length) 264{ 265 ssize_t ret; 266 267 ret = ::send(sock, buf, length, MSG_NOSIGNAL); 268 if (ret < 0) { 269 if (errno == ECONNRESET || errno == EPIPE) { 270 exitSimLoop("Message server closed connection, simulation " 271 "is exiting"); 272 } else { 273 panic("send() failed: %s", strerror(errno)); 274 } 275 } 276 panic_if(ret != length, "send() failed"); 277} 278 279bool 280TCPIface::recvTCP(int sock, void *buf, unsigned length) 281{ 282 ssize_t ret; 283 284 ret = ::recv(sock, buf, length, MSG_WAITALL ); 285 if (ret < 0) { 286 if (errno == ECONNRESET || errno == EPIPE) 287 inform("recv(): %s", strerror(errno)); 288 else if (ret < 0) 289 panic("recv() failed: %s", strerror(errno)); 290 } else if (ret == 0) { 291 inform("recv(): Connection closed"); 292 } else if (ret != length) 293 panic("recv() failed"); 294 295 return (ret == length); 296} 297 298void 299TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet) 300{ 301 sendTCP(sock, &header, sizeof(header)); 302 sendTCP(sock, packet->data, packet->length); 303} 304 305void 306TCPIface::sendCmd(const Header &header) 307{ 308 DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n", 309 static_cast<int>(header.msgType)); 310 // Global commands (i.e. sync request) are always sent by the master 311 // DistIface. The transfer method is simply implemented as point-to-point 312 // messages for now 313 for (auto s: sockRegistry) 314 sendTCP(s, (void*)&header, sizeof(header)); 315} 316 317bool 318TCPIface::recvHeader(Header &header) 319{ 320 bool ret = recvTCP(sock, &header, sizeof(header)); 321 DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n", 322 static_cast<int>(header.msgType), ret); 323 return ret; 324} 325 326void 327TCPIface::recvPacket(const Header &header, EthPacketPtr &packet) 328{ 329 packet = make_shared<EthPacketData>(header.dataPacketLength); 330 bool ret = recvTCP(sock, packet->data, header.dataPacketLength); 331 panic_if(!ret, "Error while reading socket"); 332 packet->length = header.dataPacketLength; 333} 334 335void 336TCPIface::initTransport() 337{ 338 // We cannot setup the conections in the constructor because the number 339 // of dist interfaces (per process) is unknown until the (simobject) init 340 // phase. That information is necessary for global connection ordering. 341 establishConnection(); 342} 343