MessageBuffer.cc revision 11796:315e133f45df
1/* 2 * Copyright (c) 1999-2008 Mark D. Hill and David A. Wood 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are 7 * met: redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer; 9 * redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution; 12 * neither the name of the copyright holders nor the names of its 13 * contributors may be used to endorse or promote products derived from 14 * this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29#include "mem/ruby/network/MessageBuffer.hh" 30 31#include <cassert> 32 33#include "base/cprintf.hh" 34#include "base/misc.hh" 35#include "base/random.hh" 36#include "base/stl_helpers.hh" 37#include "debug/RubyQueue.hh" 38#include "mem/ruby/system/RubySystem.hh" 39 40using namespace std; 41using m5::stl_helpers::operator<<; 42 43MessageBuffer::MessageBuffer(const Params *p) 44 : SimObject(p), m_stall_map_size(0), 45 m_max_size(p->buffer_size), m_time_last_time_size_checked(0), 46 m_time_last_time_enqueue(0), m_time_last_time_pop(0), 47 m_last_arrival_time(0), m_strict_fifo(p->ordered), 48 m_randomization(p->randomization) 49{ 50 m_msg_counter = 0; 51 m_consumer = NULL; 52 m_size_last_time_size_checked = 0; 53 m_size_at_cycle_start = 0; 54 m_msgs_this_cycle = 0; 55 m_priority_rank = 0; 56 57 m_stall_msg_map.clear(); 58 m_input_link_id = 0; 59 m_vnet_id = 0; 60 61 m_buf_msgs = 0; 62 m_stall_time = 0; 63} 64 65unsigned int 66MessageBuffer::getSize(Tick curTime) 67{ 68 if (m_time_last_time_size_checked != curTime) { 69 m_time_last_time_size_checked = curTime; 70 m_size_last_time_size_checked = m_prio_heap.size(); 71 } 72 73 return m_size_last_time_size_checked; 74} 75 76bool 77MessageBuffer::areNSlotsAvailable(unsigned int n, Tick current_time) 78{ 79 80 // fast path when message buffers have infinite size 81 if (m_max_size == 0) { 82 return true; 83 } 84 85 // determine the correct size for the current cycle 86 // pop operations shouldn't effect the network's visible size 87 // until schd cycle, but enqueue operations effect the visible 88 // size immediately 89 unsigned int current_size = 0; 90 91 if (m_time_last_time_pop < current_time) { 92 // no pops this cycle - heap size is correct 93 current_size = m_prio_heap.size(); 94 } else { 95 if (m_time_last_time_enqueue < current_time) { 96 // no enqueues this cycle - m_size_at_cycle_start is correct 97 current_size = m_size_at_cycle_start; 98 } else { 99 // both pops and enqueues occured this cycle - add new 100 // enqueued msgs to m_size_at_cycle_start 101 current_size = m_size_at_cycle_start + m_msgs_this_cycle; 102 } 103 } 104 105 // now compare the new size with our max size 106 if (current_size + m_stall_map_size + n <= m_max_size) { 107 return true; 108 } else { 109 DPRINTF(RubyQueue, "n: %d, current_size: %d, heap size: %d, " 110 "m_max_size: %d\n", 111 n, current_size, m_prio_heap.size(), m_max_size); 112 m_not_avail_count++; 113 return false; 114 } 115} 116 117const Message* 118MessageBuffer::peek() const 119{ 120 DPRINTF(RubyQueue, "Peeking at head of queue.\n"); 121 const Message* msg_ptr = m_prio_heap.front().get(); 122 assert(msg_ptr); 123 124 DPRINTF(RubyQueue, "Message: %s\n", (*msg_ptr)); 125 return msg_ptr; 126} 127 128// FIXME - move me somewhere else 129Tick 130random_time() 131{ 132 Tick time = 1; 133 time += random_mt.random(0, 3); // [0...3] 134 if (random_mt.random(0, 7) == 0) { // 1 in 8 chance 135 time += 100 + random_mt.random(1, 15); // 100 + [1...15] 136 } 137 return time; 138} 139 140void 141MessageBuffer::enqueue(MsgPtr message, Tick current_time, Tick delta) 142{ 143 // record current time incase we have a pop that also adjusts my size 144 if (m_time_last_time_enqueue < current_time) { 145 m_msgs_this_cycle = 0; // first msg this cycle 146 m_time_last_time_enqueue = current_time; 147 } 148 149 m_msg_counter++; 150 m_msgs_this_cycle++; 151 152 // Calculate the arrival time of the message, that is, the first 153 // cycle the message can be dequeued. 154 assert(delta > 0); 155 Tick arrival_time = 0; 156 157 if (!RubySystem::getRandomization() || !m_randomization) { 158 // No randomization 159 arrival_time = current_time + delta; 160 } else { 161 // Randomization - ignore delta 162 if (m_strict_fifo) { 163 if (m_last_arrival_time < current_time) { 164 m_last_arrival_time = current_time; 165 } 166 arrival_time = m_last_arrival_time + random_time(); 167 } else { 168 arrival_time = current_time + random_time(); 169 } 170 } 171 172 // Check the arrival time 173 assert(arrival_time > current_time); 174 if (m_strict_fifo) { 175 if (arrival_time < m_last_arrival_time) { 176 panic("FIFO ordering violated: %s name: %s current time: %d " 177 "delta: %d arrival_time: %d last arrival_time: %d\n", 178 *this, name(), current_time, delta, arrival_time, 179 m_last_arrival_time); 180 } 181 } 182 183 // If running a cache trace, don't worry about the last arrival checks 184 if (!RubySystem::getWarmupEnabled()) { 185 m_last_arrival_time = arrival_time; 186 } 187 188 // compute the delay cycles and set enqueue time 189 Message* msg_ptr = message.get(); 190 assert(msg_ptr != NULL); 191 192 assert(current_time >= msg_ptr->getLastEnqueueTime() && 193 "ensure we aren't dequeued early"); 194 195 msg_ptr->updateDelayedTicks(current_time); 196 msg_ptr->setLastEnqueueTime(arrival_time); 197 msg_ptr->setMsgCounter(m_msg_counter); 198 199 // Insert the message into the priority heap 200 m_prio_heap.push_back(message); 201 push_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>()); 202 // Increment the number of messages statistic 203 m_buf_msgs++; 204 205 DPRINTF(RubyQueue, "Enqueue arrival_time: %lld, Message: %s\n", 206 arrival_time, *(message.get())); 207 208 // Schedule the wakeup 209 assert(m_consumer != NULL); 210 m_consumer->scheduleEventAbsolute(arrival_time); 211 m_consumer->storeEventInfo(m_vnet_id); 212} 213 214Tick 215MessageBuffer::dequeue(Tick current_time, bool decrement_messages) 216{ 217 DPRINTF(RubyQueue, "Popping\n"); 218 assert(isReady(current_time)); 219 220 // get MsgPtr of the message about to be dequeued 221 MsgPtr message = m_prio_heap.front(); 222 223 // get the delay cycles 224 message->updateDelayedTicks(current_time); 225 Tick delay = message->getDelayedTicks(); 226 227 m_stall_time = curTick() - message->getTime(); 228 229 // record previous size and time so the current buffer size isn't 230 // adjusted until schd cycle 231 if (m_time_last_time_pop < current_time) { 232 m_size_at_cycle_start = m_prio_heap.size(); 233 m_time_last_time_pop = current_time; 234 } 235 236 pop_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>()); 237 m_prio_heap.pop_back(); 238 if (decrement_messages) { 239 // If the message will be removed from the queue, decrement the 240 // number of message in the queue. 241 m_buf_msgs--; 242 } 243 244 return delay; 245} 246 247void 248MessageBuffer::clear() 249{ 250 m_prio_heap.clear(); 251 252 m_msg_counter = 0; 253 m_time_last_time_enqueue = 0; 254 m_time_last_time_pop = 0; 255 m_size_at_cycle_start = 0; 256 m_msgs_this_cycle = 0; 257} 258 259void 260MessageBuffer::recycle(Tick current_time, Tick recycle_latency) 261{ 262 DPRINTF(RubyQueue, "Recycling.\n"); 263 assert(isReady(current_time)); 264 MsgPtr node = m_prio_heap.front(); 265 pop_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>()); 266 267 Tick future_time = current_time + recycle_latency; 268 node->setLastEnqueueTime(future_time); 269 270 m_prio_heap.back() = node; 271 push_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>()); 272 m_consumer->scheduleEventAbsolute(future_time); 273} 274 275void 276MessageBuffer::reanalyzeList(list<MsgPtr> <, Tick schdTick) 277{ 278 while (!lt.empty()) { 279 m_msg_counter++; 280 MsgPtr m = lt.front(); 281 m->setLastEnqueueTime(schdTick); 282 m->setMsgCounter(m_msg_counter); 283 284 m_prio_heap.push_back(m); 285 push_heap(m_prio_heap.begin(), m_prio_heap.end(), 286 greater<MsgPtr>()); 287 288 m_consumer->scheduleEventAbsolute(schdTick); 289 lt.pop_front(); 290 } 291} 292 293void 294MessageBuffer::reanalyzeMessages(Addr addr, Tick current_time) 295{ 296 DPRINTF(RubyQueue, "ReanalyzeMessages %#x\n", addr); 297 assert(m_stall_msg_map.count(addr) > 0); 298 299 // 300 // Put all stalled messages associated with this address back on the 301 // prio heap. The reanalyzeList call will make sure the consumer is 302 // scheduled for the current cycle so that the previously stalled messages 303 // will be observed before any younger messages that may arrive this cycle 304 // 305 m_stall_map_size -= m_stall_msg_map[addr].size(); 306 assert(m_stall_map_size >= 0); 307 reanalyzeList(m_stall_msg_map[addr], current_time); 308 m_stall_msg_map.erase(addr); 309} 310 311void 312MessageBuffer::reanalyzeAllMessages(Tick current_time) 313{ 314 DPRINTF(RubyQueue, "ReanalyzeAllMessages\n"); 315 316 // 317 // Put all stalled messages associated with this address back on the 318 // prio heap. The reanalyzeList call will make sure the consumer is 319 // scheduled for the current cycle so that the previously stalled messages 320 // will be observed before any younger messages that may arrive this cycle. 321 // 322 for (StallMsgMapType::iterator map_iter = m_stall_msg_map.begin(); 323 map_iter != m_stall_msg_map.end(); ++map_iter) { 324 m_stall_map_size -= map_iter->second.size(); 325 assert(m_stall_map_size >= 0); 326 reanalyzeList(map_iter->second, current_time); 327 } 328 m_stall_msg_map.clear(); 329} 330 331void 332MessageBuffer::stallMessage(Addr addr, Tick current_time) 333{ 334 DPRINTF(RubyQueue, "Stalling due to %#x\n", addr); 335 assert(isReady(current_time)); 336 assert(getOffset(addr) == 0); 337 MsgPtr message = m_prio_heap.front(); 338 339 // Since the message will just be moved to stall map, indicate that the 340 // buffer should not decrement the m_buf_msgs statistic 341 dequeue(current_time, false); 342 343 // 344 // Note: no event is scheduled to analyze the map at a later time. 345 // Instead the controller is responsible to call reanalyzeMessages when 346 // these addresses change state. 347 // 348 (m_stall_msg_map[addr]).push_back(message); 349 m_stall_map_size++; 350 m_stall_count++; 351} 352 353void 354MessageBuffer::print(ostream& out) const 355{ 356 ccprintf(out, "[MessageBuffer: "); 357 if (m_consumer != NULL) { 358 ccprintf(out, " consumer-yes "); 359 } 360 361 vector<MsgPtr> copy(m_prio_heap); 362 sort_heap(copy.begin(), copy.end(), greater<MsgPtr>()); 363 ccprintf(out, "%s] %s", copy, name()); 364} 365 366bool 367MessageBuffer::isReady(Tick current_time) const 368{ 369 return ((m_prio_heap.size() > 0) && 370 (m_prio_heap.front()->getLastEnqueueTime() <= current_time)); 371} 372 373void 374MessageBuffer::regStats() 375{ 376 m_not_avail_count 377 .name(name() + ".not_avail_count") 378 .desc("Number of times this buffer did not have N slots available") 379 .flags(Stats::nozero); 380 381 m_buf_msgs 382 .name(name() + ".avg_buf_msgs") 383 .desc("Average number of messages in buffer") 384 .flags(Stats::nozero); 385 386 m_stall_count 387 .name(name() + ".num_msg_stalls") 388 .desc("Number of times messages were stalled") 389 .flags(Stats::nozero); 390 391 m_occupancy 392 .name(name() + ".avg_buf_occ") 393 .desc("Average occupancy of buffer capacity") 394 .flags(Stats::nozero); 395 396 m_stall_time 397 .name(name() + ".avg_stall_time") 398 .desc("Average number of cycles messages are stalled in this MB") 399 .flags(Stats::nozero); 400 401 if (m_max_size > 0) { 402 m_occupancy = m_buf_msgs / m_max_size; 403 } else { 404 m_occupancy = 0; 405 } 406} 407 408uint32_t 409MessageBuffer::functionalWrite(Packet *pkt) 410{ 411 uint32_t num_functional_writes = 0; 412 413 // Check the priority heap and write any messages that may 414 // correspond to the address in the packet. 415 for (unsigned int i = 0; i < m_prio_heap.size(); ++i) { 416 Message *msg = m_prio_heap[i].get(); 417 if (msg->functionalWrite(pkt)) { 418 num_functional_writes++; 419 } 420 } 421 422 // Check the stall queue and write any messages that may 423 // correspond to the address in the packet. 424 for (StallMsgMapType::iterator map_iter = m_stall_msg_map.begin(); 425 map_iter != m_stall_msg_map.end(); 426 ++map_iter) { 427 428 for (std::list<MsgPtr>::iterator it = (map_iter->second).begin(); 429 it != (map_iter->second).end(); ++it) { 430 431 Message *msg = (*it).get(); 432 if (msg->functionalWrite(pkt)) { 433 num_functional_writes++; 434 } 435 } 436 } 437 438 return num_functional_writes; 439} 440 441MessageBuffer * 442MessageBufferParams::create() 443{ 444 return new MessageBuffer(this); 445} 446