37a38
> * Mohammad Alian
41c42
< * TCP stream socket based interface class implementation for multi gem5 runs.
---
> * TCP stream socket based interface class implementation for dist-gem5 runs.
47a49
> #include <netinet/tcp.h>
53a56
> #include <vector>
56c59,61
< #include "debug/MultiEthernet.hh"
---
> #include "debug/DistEthernet.hh"
> #include "debug/DistEthernetCmd.hh"
> #include "sim/sim_exit.hh"
57a63,67
> #if defined(__FreeBSD__)
> #include <netinet/in.h>
>
> #endif
>
66a77
> std::vector<std::pair<TCPIface::NodeInfo, int> > TCPIface::nodes;
67a79,80
> int TCPIface::fdStatic = -1;
> bool TCPIface::anyListening = false;
70,72c83,88
< unsigned multi_rank, Tick sync_start, Tick sync_repeat,
< EventManager *em) :
< MultiIface(multi_rank, sync_start, sync_repeat, em)
---
> unsigned dist_rank, unsigned dist_size,
> Tick sync_start, Tick sync_repeat,
> EventManager *em, bool is_switch, int num_nodes) :
> DistIface(dist_rank, dist_size, sync_start, sync_repeat, em,
> is_switch, num_nodes), serverName(server_name),
> serverPort(server_port), isSwitch(is_switch), listening(false)
74c90,119
< struct addrinfo addr_hint, *addr_results;
---
> if (is_switch && isMaster) {
> while (!listen(serverPort)) {
> DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n",
> serverPort);
> serverPort++;
> }
> inform("tcp_iface listening on port %d", serverPort);
> // Now accept the first connection requests from each compute node and
> // store the node info. The compute nodes will then wait for ack
> // messages. Ack messages will be sent by initTransport() in the
> // appropriate order to make sure that every compute node is always
> // connected to the same switch port.
> NodeInfo ni;
> for (int i = 0; i < size; i++) {
> accept();
> DPRINTF(DistEthernet, "First connection, waiting for link info\n");
> if (!recvTCP(sock, &ni, sizeof(ni)))
> panic("Failed to receive link info");
> nodes.push_back(make_pair(ni, sock));
> }
> }
> }
>
> bool
> TCPIface::listen(int port)
> {
> if (listening)
> panic("Socket already listening!");
>
> struct sockaddr_in sockaddr;
77c122,123
< string port_str = to_string(server_port);
---
> fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
> panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno));
79,80c125,130
< sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
< panic_if(sock < 0, "socket() failed: %s", strerror(errno));
---
> sockaddr.sin_family = PF_INET;
> sockaddr.sin_addr.s_addr = INADDR_ANY;
> sockaddr.sin_port = htons(port);
> // finally clear sin_zero
> memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero));
> ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr));
82,85c132,136
< bzero(&addr_hint, sizeof(addr_hint));
< addr_hint.ai_family = AF_INET;
< addr_hint.ai_socktype = SOCK_STREAM;
< addr_hint.ai_protocol = IPPROTO_TCP;
---
> if (ret != 0) {
> if (ret == -1 && errno != EADDRINUSE)
> panic("ListenSocket(listen): bind() failed!");
> return false;
> }
87,89c138,140
< ret = getaddrinfo(server_name.c_str(), port_str.c_str(),
< &addr_hint, &addr_results);
< panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno));
---
> if (::listen(fdStatic, 24) == -1) {
> if (errno != EADDRINUSE)
> panic("ListenSocket(listen): listen() failed!");
91,92c142,143
< DPRINTF(MultiEthernet, "Connecting to %s:%u\n",
< server_name.c_str(), port_str.c_str());
---
> return false;
> }
94,96c145,148
< ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr),
< addr_results->ai_addrlen);
< panic_if(ret < 0, "connect() failed: %s", strerror(errno));
---
> listening = true;
> anyListening = true;
> return true;
> }
98,99c150,202
< freeaddrinfo(addr_results);
< // add our socket to the static registry
---
> void
> TCPIface::establishConnection()
> {
> static unsigned cur_rank = 0;
> static unsigned cur_id = 0;
> NodeInfo ni;
>
> if (isSwitch) {
> if (cur_id == 0) { // first connection accepted in the ctor already
> auto const &iface0 =
> find_if(nodes.begin(), nodes.end(),
> [](const pair<NodeInfo, int> &cn) -> bool {
> return cn.first.rank == cur_rank;
> });
> assert(iface0 != nodes.end());
> assert(iface0->first.distIfaceId == 0);
> sock = iface0->second;
> ni = iface0->first;
> } else { // additional connections from the same compute node
> accept();
> DPRINTF(DistEthernet, "Next connection, waiting for link info\n");
> if (!recvTCP(sock, &ni, sizeof(ni)))
> panic("Failed to receive link info");
> assert(ni.rank == cur_rank);
> assert(ni.distIfaceId == cur_id);
> }
> inform("Link okay (iface:%d -> (node:%d, iface:%d))",
> distIfaceId, ni.rank, ni.distIfaceId);
> if (ni.distIfaceId < ni.distIfaceNum - 1) {
> cur_id++;
> } else {
> cur_rank++;
> cur_id = 0;
> }
> // send ack
> ni.distIfaceId = distIfaceId;
> ni.distIfaceNum = distIfaceNum;
> sendTCP(sock, &ni, sizeof(ni));
> } else { // this is not a switch
> connect();
> // send link info
> ni.rank = rank;
> ni.distIfaceId = distIfaceId;
> ni.distIfaceNum = distIfaceNum;
> sendTCP(sock, &ni, sizeof(ni));
> DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n",
> distIfaceId);
> if (!recvTCP(sock, &ni, sizeof(ni)))
> panic("Failed to receive ack");
> assert(ni.rank == rank);
> inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId,
> ni.distIfaceId);
> }
101,102d203
< // let the server know who we are
< sendTCP(sock, &multi_rank, sizeof(multi_rank));
104a206,253
> void
> TCPIface::accept()
> {
> struct sockaddr_in sockaddr;
> socklen_t slen = sizeof (sockaddr);
> sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen);
> if (sock != -1) {
> int i = 1;
> if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i,
> sizeof(i)) < 0)
> warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!");
> }
> }
>
> void
> TCPIface::connect()
> {
> struct addrinfo addr_hint, *addr_results;
> int ret;
>
> string port_str = to_string(serverPort);
>
> sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
> panic_if(sock < 0, "socket() failed: %s", strerror(errno));
>
> int fl = 1;
> if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0)
> warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!");
>
> bzero(&addr_hint, sizeof(addr_hint));
> addr_hint.ai_family = AF_INET;
> addr_hint.ai_socktype = SOCK_STREAM;
> addr_hint.ai_protocol = IPPROTO_TCP;
>
> ret = getaddrinfo(serverName.c_str(), port_str.c_str(),
> &addr_hint, &addr_results);
> panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno));
>
> DPRINTF(DistEthernet, "Connecting to %s:%s\n",
> serverName.c_str(), port_str.c_str());
>
> ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr),
> addr_results->ai_addrlen);
> panic_if(ret < 0, "connect() failed: %s", strerror(errno));
>
> freeaddrinfo(addr_results);
> }
>
114c263
< TCPIface::sendTCP(int sock, void *buf, unsigned length)
---
> TCPIface::sendTCP(int sock, const void *buf, unsigned length)
119c268,276
< panic_if(ret < 0, "send() failed: %s", strerror(errno));
---
> if (ret < 0) {
> if (errno == ECONNRESET || errno == EPIPE) {
> inform("send(): %s", strerror(errno));
> exit_message("info", 0, "Message server closed connection, "
> "simulation is exiting");
> } else {
> panic("send() failed: %s", strerror(errno));
> }
> }
143c300
< TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick)
---
> TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet)
145,153c302,304
< /*
< * Barrier is simply implemented by point-to-point messages to the server
< * for now. This method is called by only one TCPIface object.
< * The server will send back an 'ack' message when it gets the
< * sync request from all clients.
< */
< MultiHeaderPkt::Header header_pkt;
< header_pkt.msgType = sync_req;
< header_pkt.sendTick = sync_tick;
---
> sendTCP(sock, &header, sizeof(header));
> sendTCP(sock, packet->data, packet->length);
> }
155,156c306,315
< for (auto s : sockRegistry)
< sendTCP(s, (void *)&header_pkt, sizeof(header_pkt));
---
> void
> TCPIface::sendCmd(const Header &header)
> {
> DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n",
> static_cast<int>(header.msgType));
> // Global commands (i.e. sync request) are always sent by the master
> // DistIface. The transfer method is simply implemented as point-to-point
> // messages for now
> for (auto s: sockRegistry)
> sendTCP(s, (void*)&header, sizeof(header));
158a318,343
> bool
> TCPIface::recvHeader(Header &header)
> {
> bool ret = recvTCP(sock, &header, sizeof(header));
> DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n",
> static_cast<int>(header.msgType), ret);
> return ret;
> }
>
> void
> TCPIface::recvPacket(const Header &header, EthPacketPtr &packet)
> {
> packet = make_shared<EthPacketData>(header.dataPacketLength);
> bool ret = recvTCP(sock, packet->data, header.dataPacketLength);
> panic_if(!ret, "Error while reading socket");
> packet->length = header.dataPacketLength;
> }
>
> void
> TCPIface::initTransport()
> {
> // We cannot setup the conections in the constructor because the number
> // of dist interfaces (per process) is unknown until the (simobject) init
> // phase. That information is necessary for global connection ordering.
> establishConnection();
> }