tcp_iface.cc revision 11290
110298Salexandru.dutu@amd.com/* 210298Salexandru.dutu@amd.com * Copyright (c) 2015 ARM Limited 310298Salexandru.dutu@amd.com * All rights reserved 410298Salexandru.dutu@amd.com * 510298Salexandru.dutu@amd.com * The license below extends only to copyright in the software and shall 610298Salexandru.dutu@amd.com * not be construed as granting a license to any other intellectual 710298Salexandru.dutu@amd.com * property including but not limited to intellectual property relating 810298Salexandru.dutu@amd.com * to a hardware implementation of the functionality of the software 910298Salexandru.dutu@amd.com * licensed hereunder. You may use the software subject to the license 1010298Salexandru.dutu@amd.com * terms below provided that you ensure that this notice is replicated 1110298Salexandru.dutu@amd.com * unmodified and in its entirety in all distributions of the software, 1210298Salexandru.dutu@amd.com * modified or unmodified, in source code or in binary form. 1310298Salexandru.dutu@amd.com * 1410298Salexandru.dutu@amd.com * Redistribution and use in source and binary forms, with or without 1510298Salexandru.dutu@amd.com * modification, are permitted provided that the following conditions are 1610298Salexandru.dutu@amd.com * met: redistributions of source code must retain the above copyright 1710298Salexandru.dutu@amd.com * notice, this list of conditions and the following disclaimer; 1810298Salexandru.dutu@amd.com * redistributions in binary form must reproduce the above copyright 1910298Salexandru.dutu@amd.com * notice, this list of conditions and the following disclaimer in the 2010298Salexandru.dutu@amd.com * documentation and/or other materials provided with the distribution; 2110298Salexandru.dutu@amd.com * neither the name of the copyright holders nor the names of its 2210298Salexandru.dutu@amd.com * contributors may be used to endorse or promote products derived from 2310298Salexandru.dutu@amd.com * this software without specific prior written permission. 2410298Salexandru.dutu@amd.com * 2510298Salexandru.dutu@amd.com * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 2610298Salexandru.dutu@amd.com * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 2710298Salexandru.dutu@amd.com * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 2810298Salexandru.dutu@amd.com * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 2910298Salexandru.dutu@amd.com * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 3010298Salexandru.dutu@amd.com * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 3110298Salexandru.dutu@amd.com * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 3210298Salexandru.dutu@amd.com * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 3310298Salexandru.dutu@amd.com * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 3410298Salexandru.dutu@amd.com * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 3510298Salexandru.dutu@amd.com * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 3610298Salexandru.dutu@amd.com * 3710298Salexandru.dutu@amd.com * Authors: Gabor Dozsa 3810298Salexandru.dutu@amd.com * Mohammad Alian 3910298Salexandru.dutu@amd.com */ 4010298Salexandru.dutu@amd.com 4110298Salexandru.dutu@amd.com/* @file 4210298Salexandru.dutu@amd.com * TCP stream socket based interface class implementation for dist-gem5 runs. 4310298Salexandru.dutu@amd.com */ 4410298Salexandru.dutu@amd.com 4511800Sbrandon.potter@amd.com#include "dev/net/tcp_iface.hh" 4611800Sbrandon.potter@amd.com 4710298Salexandru.dutu@amd.com#include <arpa/inet.h> 4810298Salexandru.dutu@amd.com#include <netdb.h> 4910298Salexandru.dutu@amd.com#include <netinet/tcp.h> 5010298Salexandru.dutu@amd.com#include <sys/socket.h> 5110298Salexandru.dutu@amd.com#include <sys/types.h> 5210298Salexandru.dutu@amd.com#include <unistd.h> 5310298Salexandru.dutu@amd.com 5410298Salexandru.dutu@amd.com#include <cerrno> 5510298Salexandru.dutu@amd.com#include <cstring> 5610298Salexandru.dutu@amd.com#include <vector> 5710298Salexandru.dutu@amd.com 5810298Salexandru.dutu@amd.com#include "base/types.hh" 5910298Salexandru.dutu@amd.com#include "debug/DistEthernet.hh" 6010298Salexandru.dutu@amd.com#include "debug/DistEthernetCmd.hh" 6110298Salexandru.dutu@amd.com#include "sim/sim_exit.hh" 6210298Salexandru.dutu@amd.com 6310298Salexandru.dutu@amd.com#if defined(__FreeBSD__) 6410298Salexandru.dutu@amd.com#include <netinet/in.h> 6510298Salexandru.dutu@amd.com 6610298Salexandru.dutu@amd.com#endif 6710298Salexandru.dutu@amd.com 6810298Salexandru.dutu@amd.com// MSG_NOSIGNAL does not exists on OS X 6910298Salexandru.dutu@amd.com#if defined(__APPLE__) || defined(__MACH__) 7010298Salexandru.dutu@amd.com#ifndef MSG_NOSIGNAL 7110298Salexandru.dutu@amd.com#define MSG_NOSIGNAL SO_NOSIGPIPE 7210298Salexandru.dutu@amd.com#endif 7310298Salexandru.dutu@amd.com#endif 7410298Salexandru.dutu@amd.com 7510298Salexandru.dutu@amd.comusing namespace std; 7610298Salexandru.dutu@amd.com 7710298Salexandru.dutu@amd.comstd::vector<std::pair<TCPIface::NodeInfo, int> > TCPIface::nodes; 7810298Salexandru.dutu@amd.comvector<int> TCPIface::sockRegistry; 7910298Salexandru.dutu@amd.comint TCPIface::fdStatic = -1; 8010298Salexandru.dutu@amd.combool TCPIface::anyListening = false; 8110298Salexandru.dutu@amd.com 8210298Salexandru.dutu@amd.comTCPIface::TCPIface(string server_name, unsigned server_port, 8310298Salexandru.dutu@amd.com unsigned dist_rank, unsigned dist_size, 8410298Salexandru.dutu@amd.com Tick sync_start, Tick sync_repeat, 8510298Salexandru.dutu@amd.com EventManager *em, bool is_switch, int num_nodes) : 8610298Salexandru.dutu@amd.com DistIface(dist_rank, dist_size, sync_start, sync_repeat, em, 8710298Salexandru.dutu@amd.com is_switch, num_nodes), serverName(server_name), 8810298Salexandru.dutu@amd.com serverPort(server_port), isSwitch(is_switch), listening(false) 8910298Salexandru.dutu@amd.com{ 9010298Salexandru.dutu@amd.com if (is_switch && isMaster) { 9110298Salexandru.dutu@amd.com while (!listen(serverPort)) { 9210298Salexandru.dutu@amd.com DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n", 9310298Salexandru.dutu@amd.com serverPort); 9410298Salexandru.dutu@amd.com serverPort++; 9510298Salexandru.dutu@amd.com } 9610298Salexandru.dutu@amd.com inform("tcp_iface listening on port %d", serverPort); 9710298Salexandru.dutu@amd.com // Now accept the first connection requests from each compute node and 9810298Salexandru.dutu@amd.com // store the node info. The compute nodes will then wait for ack 9910298Salexandru.dutu@amd.com // messages. Ack messages will be sent by initTransport() in the 10010298Salexandru.dutu@amd.com // appropriate order to make sure that every compute node is always 10110298Salexandru.dutu@amd.com // connected to the same switch port. 10210298Salexandru.dutu@amd.com NodeInfo ni; 10310298Salexandru.dutu@amd.com for (int i = 0; i < size; i++) { 10410298Salexandru.dutu@amd.com accept(); 10510298Salexandru.dutu@amd.com DPRINTF(DistEthernet, "First connection, waiting for link info\n"); 10610298Salexandru.dutu@amd.com if (!recvTCP(sock, &ni, sizeof(ni))) 10710298Salexandru.dutu@amd.com panic("Failed to receive link info"); 10810298Salexandru.dutu@amd.com nodes.push_back(make_pair(ni, sock)); 10910298Salexandru.dutu@amd.com } 11010298Salexandru.dutu@amd.com } 11110298Salexandru.dutu@amd.com} 11210298Salexandru.dutu@amd.com 11310298Salexandru.dutu@amd.combool 11410298Salexandru.dutu@amd.comTCPIface::listen(int port) 11510298Salexandru.dutu@amd.com{ 11610298Salexandru.dutu@amd.com if (listening) 11710298Salexandru.dutu@amd.com panic("Socket already listening!"); 11810298Salexandru.dutu@amd.com 11910298Salexandru.dutu@amd.com struct sockaddr_in sockaddr; 12010298Salexandru.dutu@amd.com int ret; 12110298Salexandru.dutu@amd.com 12210298Salexandru.dutu@amd.com fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); 12310298Salexandru.dutu@amd.com panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno)); 12410298Salexandru.dutu@amd.com 12510298Salexandru.dutu@amd.com sockaddr.sin_family = PF_INET; 12610298Salexandru.dutu@amd.com sockaddr.sin_addr.s_addr = INADDR_ANY; 12710298Salexandru.dutu@amd.com sockaddr.sin_port = htons(port); 12810298Salexandru.dutu@amd.com // finally clear sin_zero 12910298Salexandru.dutu@amd.com memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero)); 13010298Salexandru.dutu@amd.com ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr)); 13110298Salexandru.dutu@amd.com 13210298Salexandru.dutu@amd.com if (ret != 0) { 13310298Salexandru.dutu@amd.com if (ret == -1 && errno != EADDRINUSE) 13410298Salexandru.dutu@amd.com panic("ListenSocket(listen): bind() failed!"); 13510298Salexandru.dutu@amd.com return false; 13610298Salexandru.dutu@amd.com } 13710298Salexandru.dutu@amd.com 13810298Salexandru.dutu@amd.com if (::listen(fdStatic, 24) == -1) { 13910298Salexandru.dutu@amd.com if (errno != EADDRINUSE) 14010298Salexandru.dutu@amd.com panic("ListenSocket(listen): listen() failed!"); 14110298Salexandru.dutu@amd.com 14210298Salexandru.dutu@amd.com return false; 14310556Salexandru.dutu@amd.com } 14410556Salexandru.dutu@amd.com 14510298Salexandru.dutu@amd.com listening = true; 14610298Salexandru.dutu@amd.com anyListening = true; 14711175Sandreas.hansson@arm.com return true; 14810298Salexandru.dutu@amd.com} 14910558Salexandru.dutu@amd.com 15011175Sandreas.hansson@arm.comvoid 15111175Sandreas.hansson@arm.comTCPIface::establishConnection() 15211175Sandreas.hansson@arm.com{ 15311175Sandreas.hansson@arm.com static unsigned cur_rank = 0; 15411175Sandreas.hansson@arm.com static unsigned cur_id = 0; 15511168Sandreas.hansson@arm.com NodeInfo ni; 15611168Sandreas.hansson@arm.com 15710298Salexandru.dutu@amd.com if (isSwitch) { 15810298Salexandru.dutu@amd.com if (cur_id == 0) { // first connection accepted in the ctor already 159 auto const &iface0 = 160 find_if(nodes.begin(), nodes.end(), 161 [](const pair<NodeInfo, int> &cn) -> bool { 162 return cn.first.rank == cur_rank; 163 }); 164 assert(iface0 != nodes.end()); 165 assert(iface0->first.distIfaceId == 0); 166 sock = iface0->second; 167 ni = iface0->first; 168 } else { // additional connections from the same compute node 169 accept(); 170 DPRINTF(DistEthernet, "Next connection, waiting for link info\n"); 171 if (!recvTCP(sock, &ni, sizeof(ni))) 172 panic("Failed to receive link info"); 173 assert(ni.rank == cur_rank); 174 assert(ni.distIfaceId == cur_id); 175 } 176 inform("Link okay (iface:%d -> (node:%d, iface:%d))", 177 distIfaceId, ni.rank, ni.distIfaceId); 178 if (ni.distIfaceId < ni.distIfaceNum - 1) { 179 cur_id++; 180 } else { 181 cur_rank++; 182 cur_id = 0; 183 } 184 // send ack 185 ni.distIfaceId = distIfaceId; 186 ni.distIfaceNum = distIfaceNum; 187 sendTCP(sock, &ni, sizeof(ni)); 188 } else { // this is not a switch 189 connect(); 190 // send link info 191 ni.rank = rank; 192 ni.distIfaceId = distIfaceId; 193 ni.distIfaceNum = distIfaceNum; 194 sendTCP(sock, &ni, sizeof(ni)); 195 DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n", 196 distIfaceId); 197 if (!recvTCP(sock, &ni, sizeof(ni))) 198 panic("Failed to receive ack"); 199 assert(ni.rank == rank); 200 inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId, 201 ni.distIfaceId); 202 } 203 sockRegistry.push_back(sock); 204} 205 206void 207TCPIface::accept() 208{ 209 struct sockaddr_in sockaddr; 210 socklen_t slen = sizeof (sockaddr); 211 sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen); 212 if (sock != -1) { 213 int i = 1; 214 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i, 215 sizeof(i)) < 0) 216 warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!"); 217 } 218} 219 220void 221TCPIface::connect() 222{ 223 struct addrinfo addr_hint, *addr_results; 224 int ret; 225 226 string port_str = to_string(serverPort); 227 228 sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); 229 panic_if(sock < 0, "socket() failed: %s", strerror(errno)); 230 231 int fl = 1; 232 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0) 233 warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!"); 234 235 bzero(&addr_hint, sizeof(addr_hint)); 236 addr_hint.ai_family = AF_INET; 237 addr_hint.ai_socktype = SOCK_STREAM; 238 addr_hint.ai_protocol = IPPROTO_TCP; 239 240 ret = getaddrinfo(serverName.c_str(), port_str.c_str(), 241 &addr_hint, &addr_results); 242 panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); 243 244 DPRINTF(DistEthernet, "Connecting to %s:%s\n", 245 serverName.c_str(), port_str.c_str()); 246 247 ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), 248 addr_results->ai_addrlen); 249 panic_if(ret < 0, "connect() failed: %s", strerror(errno)); 250 251 freeaddrinfo(addr_results); 252} 253 254TCPIface::~TCPIface() 255{ 256 int M5_VAR_USED ret; 257 258 ret = close(sock); 259 assert(ret == 0); 260} 261 262void 263TCPIface::sendTCP(int sock, const void *buf, unsigned length) 264{ 265 ssize_t ret; 266 267 ret = ::send(sock, buf, length, MSG_NOSIGNAL); 268 if (ret < 0) { 269 if (errno == ECONNRESET || errno == EPIPE) { 270 inform("send(): %s", strerror(errno)); 271 exit_message("info", 0, "Message server closed connection, " 272 "simulation is exiting"); 273 } else { 274 panic("send() failed: %s", strerror(errno)); 275 } 276 } 277 panic_if(ret != length, "send() failed"); 278} 279 280bool 281TCPIface::recvTCP(int sock, void *buf, unsigned length) 282{ 283 ssize_t ret; 284 285 ret = ::recv(sock, buf, length, MSG_WAITALL ); 286 if (ret < 0) { 287 if (errno == ECONNRESET || errno == EPIPE) 288 inform("recv(): %s", strerror(errno)); 289 else if (ret < 0) 290 panic("recv() failed: %s", strerror(errno)); 291 } else if (ret == 0) { 292 inform("recv(): Connection closed"); 293 } else if (ret != length) 294 panic("recv() failed"); 295 296 return (ret == length); 297} 298 299void 300TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet) 301{ 302 sendTCP(sock, &header, sizeof(header)); 303 sendTCP(sock, packet->data, packet->length); 304} 305 306void 307TCPIface::sendCmd(const Header &header) 308{ 309 DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n", 310 static_cast<int>(header.msgType)); 311 // Global commands (i.e. sync request) are always sent by the master 312 // DistIface. The transfer method is simply implemented as point-to-point 313 // messages for now 314 for (auto s: sockRegistry) 315 sendTCP(s, (void*)&header, sizeof(header)); 316} 317 318bool 319TCPIface::recvHeader(Header &header) 320{ 321 bool ret = recvTCP(sock, &header, sizeof(header)); 322 DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n", 323 static_cast<int>(header.msgType), ret); 324 return ret; 325} 326 327void 328TCPIface::recvPacket(const Header &header, EthPacketPtr &packet) 329{ 330 packet = make_shared<EthPacketData>(header.dataPacketLength); 331 bool ret = recvTCP(sock, packet->data, header.dataPacketLength); 332 panic_if(!ret, "Error while reading socket"); 333 packet->length = header.dataPacketLength; 334} 335 336void 337TCPIface::initTransport() 338{ 339 // We cannot setup the conections in the constructor because the number 340 // of dist interfaces (per process) is unknown until the (simobject) init 341 // phase. That information is necessary for global connection ordering. 342 establishConnection(); 343} 344