MessageBuffer.cc (13973:2f953d25716b) MessageBuffer.cc (14217:68c3d00f780a)
1/*
2 * Copyright (c) 1999-2008 Mark D. Hill and David A. Wood
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met: redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer;
9 * redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution;
12 * neither the name of the copyright holders nor the names of its
13 * contributors may be used to endorse or promote products derived from
14 * this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "mem/ruby/network/MessageBuffer.hh"
30
31#include <cassert>
32
33#include "base/cprintf.hh"
34#include "base/logging.hh"
35#include "base/random.hh"
36#include "base/stl_helpers.hh"
37#include "debug/RubyQueue.hh"
38#include "mem/ruby/system/RubySystem.hh"
39
40using namespace std;
41using m5::stl_helpers::operator<<;
42
43MessageBuffer::MessageBuffer(const Params *p)
44 : SimObject(p), m_stall_map_size(0),
45 m_max_size(p->buffer_size), m_time_last_time_size_checked(0),
46 m_time_last_time_enqueue(0), m_time_last_time_pop(0),
47 m_last_arrival_time(0), m_strict_fifo(p->ordered),
48 m_randomization(p->randomization)
49{
50 m_msg_counter = 0;
51 m_consumer = NULL;
52 m_size_last_time_size_checked = 0;
53 m_size_at_cycle_start = 0;
1/*
2 * Copyright (c) 1999-2008 Mark D. Hill and David A. Wood
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are
7 * met: redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer;
9 * redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution;
12 * neither the name of the copyright holders nor the names of its
13 * contributors may be used to endorse or promote products derived from
14 * this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#include "mem/ruby/network/MessageBuffer.hh"
30
31#include <cassert>
32
33#include "base/cprintf.hh"
34#include "base/logging.hh"
35#include "base/random.hh"
36#include "base/stl_helpers.hh"
37#include "debug/RubyQueue.hh"
38#include "mem/ruby/system/RubySystem.hh"
39
40using namespace std;
41using m5::stl_helpers::operator<<;
42
43MessageBuffer::MessageBuffer(const Params *p)
44 : SimObject(p), m_stall_map_size(0),
45 m_max_size(p->buffer_size), m_time_last_time_size_checked(0),
46 m_time_last_time_enqueue(0), m_time_last_time_pop(0),
47 m_last_arrival_time(0), m_strict_fifo(p->ordered),
48 m_randomization(p->randomization)
49{
50 m_msg_counter = 0;
51 m_consumer = NULL;
52 m_size_last_time_size_checked = 0;
53 m_size_at_cycle_start = 0;
54 m_stalled_at_cycle_start = 0;
54 m_msgs_this_cycle = 0;
55 m_priority_rank = 0;
56
57 m_stall_msg_map.clear();
58 m_input_link_id = 0;
59 m_vnet_id = 0;
60
61 m_buf_msgs = 0;
62 m_stall_time = 0;
63
64 m_dequeue_callback = nullptr;
65}
66
67unsigned int
68MessageBuffer::getSize(Tick curTime)
69{
70 if (m_time_last_time_size_checked != curTime) {
71 m_time_last_time_size_checked = curTime;
72 m_size_last_time_size_checked = m_prio_heap.size();
73 }
74
75 return m_size_last_time_size_checked;
76}
77
78bool
79MessageBuffer::areNSlotsAvailable(unsigned int n, Tick current_time)
80{
81
82 // fast path when message buffers have infinite size
83 if (m_max_size == 0) {
84 return true;
85 }
86
87 // determine the correct size for the current cycle
88 // pop operations shouldn't effect the network's visible size
89 // until schd cycle, but enqueue operations effect the visible
90 // size immediately
91 unsigned int current_size = 0;
55 m_msgs_this_cycle = 0;
56 m_priority_rank = 0;
57
58 m_stall_msg_map.clear();
59 m_input_link_id = 0;
60 m_vnet_id = 0;
61
62 m_buf_msgs = 0;
63 m_stall_time = 0;
64
65 m_dequeue_callback = nullptr;
66}
67
68unsigned int
69MessageBuffer::getSize(Tick curTime)
70{
71 if (m_time_last_time_size_checked != curTime) {
72 m_time_last_time_size_checked = curTime;
73 m_size_last_time_size_checked = m_prio_heap.size();
74 }
75
76 return m_size_last_time_size_checked;
77}
78
79bool
80MessageBuffer::areNSlotsAvailable(unsigned int n, Tick current_time)
81{
82
83 // fast path when message buffers have infinite size
84 if (m_max_size == 0) {
85 return true;
86 }
87
88 // determine the correct size for the current cycle
89 // pop operations shouldn't effect the network's visible size
90 // until schd cycle, but enqueue operations effect the visible
91 // size immediately
92 unsigned int current_size = 0;
93 unsigned int current_stall_size = 0;
92
93 if (m_time_last_time_pop < current_time) {
94
95 if (m_time_last_time_pop < current_time) {
94 // no pops this cycle - heap size is correct
96 // no pops this cycle - heap and stall queue size is correct
95 current_size = m_prio_heap.size();
97 current_size = m_prio_heap.size();
98 current_stall_size = m_stall_map_size;
96 } else {
97 if (m_time_last_time_enqueue < current_time) {
98 // no enqueues this cycle - m_size_at_cycle_start is correct
99 current_size = m_size_at_cycle_start;
100 } else {
101 // both pops and enqueues occured this cycle - add new
102 // enqueued msgs to m_size_at_cycle_start
103 current_size = m_size_at_cycle_start + m_msgs_this_cycle;
104 }
99 } else {
100 if (m_time_last_time_enqueue < current_time) {
101 // no enqueues this cycle - m_size_at_cycle_start is correct
102 current_size = m_size_at_cycle_start;
103 } else {
104 // both pops and enqueues occured this cycle - add new
105 // enqueued msgs to m_size_at_cycle_start
106 current_size = m_size_at_cycle_start + m_msgs_this_cycle;
107 }
108
109 // Stall queue size at start is considered
110 current_stall_size = m_stalled_at_cycle_start;
105 }
106
107 // now compare the new size with our max size
111 }
112
113 // now compare the new size with our max size
108 if (current_size + m_stall_map_size + n <= m_max_size) {
114 if (current_size + current_stall_size + n <= m_max_size) {
109 return true;
110 } else {
111 DPRINTF(RubyQueue, "n: %d, current_size: %d, heap size: %d, "
112 "m_max_size: %d\n",
115 return true;
116 } else {
117 DPRINTF(RubyQueue, "n: %d, current_size: %d, heap size: %d, "
118 "m_max_size: %d\n",
113 n, current_size, m_prio_heap.size(), m_max_size);
119 n, current_size + current_stall_size,
120 m_prio_heap.size(), m_max_size);
114 m_not_avail_count++;
115 return false;
116 }
117}
118
119const Message*
120MessageBuffer::peek() const
121{
122 DPRINTF(RubyQueue, "Peeking at head of queue.\n");
123 const Message* msg_ptr = m_prio_heap.front().get();
124 assert(msg_ptr);
125
126 DPRINTF(RubyQueue, "Message: %s\n", (*msg_ptr));
127 return msg_ptr;
128}
129
130// FIXME - move me somewhere else
131Tick
132random_time()
133{
134 Tick time = 1;
135 time += random_mt.random(0, 3); // [0...3]
136 if (random_mt.random(0, 7) == 0) { // 1 in 8 chance
137 time += 100 + random_mt.random(1, 15); // 100 + [1...15]
138 }
139 return time;
140}
141
142void
143MessageBuffer::enqueue(MsgPtr message, Tick current_time, Tick delta)
144{
145 // record current time incase we have a pop that also adjusts my size
146 if (m_time_last_time_enqueue < current_time) {
147 m_msgs_this_cycle = 0; // first msg this cycle
148 m_time_last_time_enqueue = current_time;
149 }
150
151 m_msg_counter++;
152 m_msgs_this_cycle++;
153
154 // Calculate the arrival time of the message, that is, the first
155 // cycle the message can be dequeued.
156 assert(delta > 0);
157 Tick arrival_time = 0;
158
159 // random delays are inserted if either RubySystem level randomization flag
160 // is turned on, or the buffer level randomization is set
161 if (!RubySystem::getRandomization() && !m_randomization) {
162 // No randomization
163 arrival_time = current_time + delta;
164 } else {
165 // Randomization - ignore delta
166 if (m_strict_fifo) {
167 if (m_last_arrival_time < current_time) {
168 m_last_arrival_time = current_time;
169 }
170 arrival_time = m_last_arrival_time + random_time();
171 } else {
172 arrival_time = current_time + random_time();
173 }
174 }
175
176 // Check the arrival time
177 assert(arrival_time > current_time);
178 if (m_strict_fifo) {
179 if (arrival_time < m_last_arrival_time) {
180 panic("FIFO ordering violated: %s name: %s current time: %d "
181 "delta: %d arrival_time: %d last arrival_time: %d\n",
182 *this, name(), current_time, delta, arrival_time,
183 m_last_arrival_time);
184 }
185 }
186
187 // If running a cache trace, don't worry about the last arrival checks
188 if (!RubySystem::getWarmupEnabled()) {
189 m_last_arrival_time = arrival_time;
190 }
191
192 // compute the delay cycles and set enqueue time
193 Message* msg_ptr = message.get();
194 assert(msg_ptr != NULL);
195
196 assert(current_time >= msg_ptr->getLastEnqueueTime() &&
197 "ensure we aren't dequeued early");
198
199 msg_ptr->updateDelayedTicks(current_time);
200 msg_ptr->setLastEnqueueTime(arrival_time);
201 msg_ptr->setMsgCounter(m_msg_counter);
202
203 // Insert the message into the priority heap
204 m_prio_heap.push_back(message);
205 push_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
206 // Increment the number of messages statistic
207 m_buf_msgs++;
208
209 DPRINTF(RubyQueue, "Enqueue arrival_time: %lld, Message: %s\n",
210 arrival_time, *(message.get()));
211
212 // Schedule the wakeup
213 assert(m_consumer != NULL);
214 m_consumer->scheduleEventAbsolute(arrival_time);
215 m_consumer->storeEventInfo(m_vnet_id);
216}
217
218Tick
219MessageBuffer::dequeue(Tick current_time, bool decrement_messages)
220{
221 DPRINTF(RubyQueue, "Popping\n");
222 assert(isReady(current_time));
223
224 // get MsgPtr of the message about to be dequeued
225 MsgPtr message = m_prio_heap.front();
226
227 // get the delay cycles
228 message->updateDelayedTicks(current_time);
229 Tick delay = message->getDelayedTicks();
230
231 m_stall_time = curTick() - message->getTime();
232
233 // record previous size and time so the current buffer size isn't
234 // adjusted until schd cycle
235 if (m_time_last_time_pop < current_time) {
236 m_size_at_cycle_start = m_prio_heap.size();
121 m_not_avail_count++;
122 return false;
123 }
124}
125
126const Message*
127MessageBuffer::peek() const
128{
129 DPRINTF(RubyQueue, "Peeking at head of queue.\n");
130 const Message* msg_ptr = m_prio_heap.front().get();
131 assert(msg_ptr);
132
133 DPRINTF(RubyQueue, "Message: %s\n", (*msg_ptr));
134 return msg_ptr;
135}
136
137// FIXME - move me somewhere else
138Tick
139random_time()
140{
141 Tick time = 1;
142 time += random_mt.random(0, 3); // [0...3]
143 if (random_mt.random(0, 7) == 0) { // 1 in 8 chance
144 time += 100 + random_mt.random(1, 15); // 100 + [1...15]
145 }
146 return time;
147}
148
149void
150MessageBuffer::enqueue(MsgPtr message, Tick current_time, Tick delta)
151{
152 // record current time incase we have a pop that also adjusts my size
153 if (m_time_last_time_enqueue < current_time) {
154 m_msgs_this_cycle = 0; // first msg this cycle
155 m_time_last_time_enqueue = current_time;
156 }
157
158 m_msg_counter++;
159 m_msgs_this_cycle++;
160
161 // Calculate the arrival time of the message, that is, the first
162 // cycle the message can be dequeued.
163 assert(delta > 0);
164 Tick arrival_time = 0;
165
166 // random delays are inserted if either RubySystem level randomization flag
167 // is turned on, or the buffer level randomization is set
168 if (!RubySystem::getRandomization() && !m_randomization) {
169 // No randomization
170 arrival_time = current_time + delta;
171 } else {
172 // Randomization - ignore delta
173 if (m_strict_fifo) {
174 if (m_last_arrival_time < current_time) {
175 m_last_arrival_time = current_time;
176 }
177 arrival_time = m_last_arrival_time + random_time();
178 } else {
179 arrival_time = current_time + random_time();
180 }
181 }
182
183 // Check the arrival time
184 assert(arrival_time > current_time);
185 if (m_strict_fifo) {
186 if (arrival_time < m_last_arrival_time) {
187 panic("FIFO ordering violated: %s name: %s current time: %d "
188 "delta: %d arrival_time: %d last arrival_time: %d\n",
189 *this, name(), current_time, delta, arrival_time,
190 m_last_arrival_time);
191 }
192 }
193
194 // If running a cache trace, don't worry about the last arrival checks
195 if (!RubySystem::getWarmupEnabled()) {
196 m_last_arrival_time = arrival_time;
197 }
198
199 // compute the delay cycles and set enqueue time
200 Message* msg_ptr = message.get();
201 assert(msg_ptr != NULL);
202
203 assert(current_time >= msg_ptr->getLastEnqueueTime() &&
204 "ensure we aren't dequeued early");
205
206 msg_ptr->updateDelayedTicks(current_time);
207 msg_ptr->setLastEnqueueTime(arrival_time);
208 msg_ptr->setMsgCounter(m_msg_counter);
209
210 // Insert the message into the priority heap
211 m_prio_heap.push_back(message);
212 push_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
213 // Increment the number of messages statistic
214 m_buf_msgs++;
215
216 DPRINTF(RubyQueue, "Enqueue arrival_time: %lld, Message: %s\n",
217 arrival_time, *(message.get()));
218
219 // Schedule the wakeup
220 assert(m_consumer != NULL);
221 m_consumer->scheduleEventAbsolute(arrival_time);
222 m_consumer->storeEventInfo(m_vnet_id);
223}
224
225Tick
226MessageBuffer::dequeue(Tick current_time, bool decrement_messages)
227{
228 DPRINTF(RubyQueue, "Popping\n");
229 assert(isReady(current_time));
230
231 // get MsgPtr of the message about to be dequeued
232 MsgPtr message = m_prio_heap.front();
233
234 // get the delay cycles
235 message->updateDelayedTicks(current_time);
236 Tick delay = message->getDelayedTicks();
237
238 m_stall_time = curTick() - message->getTime();
239
240 // record previous size and time so the current buffer size isn't
241 // adjusted until schd cycle
242 if (m_time_last_time_pop < current_time) {
243 m_size_at_cycle_start = m_prio_heap.size();
244 m_stalled_at_cycle_start = m_stall_map_size;
237 m_time_last_time_pop = current_time;
238 }
239
240 pop_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
241 m_prio_heap.pop_back();
242 if (decrement_messages) {
243 // If the message will be removed from the queue, decrement the
244 // number of message in the queue.
245 m_buf_msgs--;
246 }
247
248 // if a dequeue callback was requested, call it now
249 if (m_dequeue_callback) {
250 m_dequeue_callback();
251 }
252
253 return delay;
254}
255
256void
257MessageBuffer::registerDequeueCallback(std::function<void()> callback)
258{
259 m_dequeue_callback = callback;
260}
261
262void
263MessageBuffer::unregisterDequeueCallback()
264{
265 m_dequeue_callback = nullptr;
266}
267
268void
269MessageBuffer::clear()
270{
271 m_prio_heap.clear();
272
273 m_msg_counter = 0;
274 m_time_last_time_enqueue = 0;
275 m_time_last_time_pop = 0;
276 m_size_at_cycle_start = 0;
245 m_time_last_time_pop = current_time;
246 }
247
248 pop_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
249 m_prio_heap.pop_back();
250 if (decrement_messages) {
251 // If the message will be removed from the queue, decrement the
252 // number of message in the queue.
253 m_buf_msgs--;
254 }
255
256 // if a dequeue callback was requested, call it now
257 if (m_dequeue_callback) {
258 m_dequeue_callback();
259 }
260
261 return delay;
262}
263
264void
265MessageBuffer::registerDequeueCallback(std::function<void()> callback)
266{
267 m_dequeue_callback = callback;
268}
269
270void
271MessageBuffer::unregisterDequeueCallback()
272{
273 m_dequeue_callback = nullptr;
274}
275
276void
277MessageBuffer::clear()
278{
279 m_prio_heap.clear();
280
281 m_msg_counter = 0;
282 m_time_last_time_enqueue = 0;
283 m_time_last_time_pop = 0;
284 m_size_at_cycle_start = 0;
285 m_stalled_at_cycle_start = 0;
277 m_msgs_this_cycle = 0;
278}
279
280void
281MessageBuffer::recycle(Tick current_time, Tick recycle_latency)
282{
283 DPRINTF(RubyQueue, "Recycling.\n");
284 assert(isReady(current_time));
285 MsgPtr node = m_prio_heap.front();
286 pop_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
287
288 Tick future_time = current_time + recycle_latency;
289 node->setLastEnqueueTime(future_time);
290
291 m_prio_heap.back() = node;
292 push_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
293 m_consumer->scheduleEventAbsolute(future_time);
294}
295
296void
297MessageBuffer::reanalyzeList(list<MsgPtr> &lt, Tick schdTick)
298{
299 while (!lt.empty()) {
300 MsgPtr m = lt.front();
301 assert(m->getLastEnqueueTime() <= schdTick);
302
303 m_prio_heap.push_back(m);
304 push_heap(m_prio_heap.begin(), m_prio_heap.end(),
305 greater<MsgPtr>());
306
307 m_consumer->scheduleEventAbsolute(schdTick);
308
309 DPRINTF(RubyQueue, "Requeue arrival_time: %lld, Message: %s\n",
310 schdTick, *(m.get()));
311
312 lt.pop_front();
313 }
314}
315
316void
317MessageBuffer::reanalyzeMessages(Addr addr, Tick current_time)
318{
319 DPRINTF(RubyQueue, "ReanalyzeMessages %#x\n", addr);
320 assert(m_stall_msg_map.count(addr) > 0);
321
322 //
323 // Put all stalled messages associated with this address back on the
324 // prio heap. The reanalyzeList call will make sure the consumer is
325 // scheduled for the current cycle so that the previously stalled messages
326 // will be observed before any younger messages that may arrive this cycle
327 //
328 m_stall_map_size -= m_stall_msg_map[addr].size();
329 assert(m_stall_map_size >= 0);
330 reanalyzeList(m_stall_msg_map[addr], current_time);
331 m_stall_msg_map.erase(addr);
332}
333
334void
335MessageBuffer::reanalyzeAllMessages(Tick current_time)
336{
337 DPRINTF(RubyQueue, "ReanalyzeAllMessages\n");
338
339 //
340 // Put all stalled messages associated with this address back on the
341 // prio heap. The reanalyzeList call will make sure the consumer is
342 // scheduled for the current cycle so that the previously stalled messages
343 // will be observed before any younger messages that may arrive this cycle.
344 //
345 for (StallMsgMapType::iterator map_iter = m_stall_msg_map.begin();
346 map_iter != m_stall_msg_map.end(); ++map_iter) {
347 m_stall_map_size -= map_iter->second.size();
348 assert(m_stall_map_size >= 0);
349 reanalyzeList(map_iter->second, current_time);
350 }
351 m_stall_msg_map.clear();
352}
353
354void
355MessageBuffer::stallMessage(Addr addr, Tick current_time)
356{
357 DPRINTF(RubyQueue, "Stalling due to %#x\n", addr);
358 assert(isReady(current_time));
359 assert(getOffset(addr) == 0);
360 MsgPtr message = m_prio_heap.front();
361
362 // Since the message will just be moved to stall map, indicate that the
363 // buffer should not decrement the m_buf_msgs statistic
364 dequeue(current_time, false);
365
366 //
367 // Note: no event is scheduled to analyze the map at a later time.
368 // Instead the controller is responsible to call reanalyzeMessages when
369 // these addresses change state.
370 //
371 (m_stall_msg_map[addr]).push_back(message);
372 m_stall_map_size++;
373 m_stall_count++;
374}
375
376void
377MessageBuffer::print(ostream& out) const
378{
379 ccprintf(out, "[MessageBuffer: ");
380 if (m_consumer != NULL) {
381 ccprintf(out, " consumer-yes ");
382 }
383
384 vector<MsgPtr> copy(m_prio_heap);
385 sort_heap(copy.begin(), copy.end(), greater<MsgPtr>());
386 ccprintf(out, "%s] %s", copy, name());
387}
388
389bool
390MessageBuffer::isReady(Tick current_time) const
391{
392 return ((m_prio_heap.size() > 0) &&
393 (m_prio_heap.front()->getLastEnqueueTime() <= current_time));
394}
395
396void
397MessageBuffer::regStats()
398{
399 m_not_avail_count
400 .name(name() + ".not_avail_count")
401 .desc("Number of times this buffer did not have N slots available")
402 .flags(Stats::nozero);
403
404 m_buf_msgs
405 .name(name() + ".avg_buf_msgs")
406 .desc("Average number of messages in buffer")
407 .flags(Stats::nozero);
408
409 m_stall_count
410 .name(name() + ".num_msg_stalls")
411 .desc("Number of times messages were stalled")
412 .flags(Stats::nozero);
413
414 m_occupancy
415 .name(name() + ".avg_buf_occ")
416 .desc("Average occupancy of buffer capacity")
417 .flags(Stats::nozero);
418
419 m_stall_time
420 .name(name() + ".avg_stall_time")
421 .desc("Average number of cycles messages are stalled in this MB")
422 .flags(Stats::nozero);
423
424 if (m_max_size > 0) {
425 m_occupancy = m_buf_msgs / m_max_size;
426 } else {
427 m_occupancy = 0;
428 }
429}
430
431uint32_t
432MessageBuffer::functionalWrite(Packet *pkt)
433{
434 uint32_t num_functional_writes = 0;
435
436 // Check the priority heap and write any messages that may
437 // correspond to the address in the packet.
438 for (unsigned int i = 0; i < m_prio_heap.size(); ++i) {
439 Message *msg = m_prio_heap[i].get();
440 if (msg->functionalWrite(pkt)) {
441 num_functional_writes++;
442 }
443 }
444
445 // Check the stall queue and write any messages that may
446 // correspond to the address in the packet.
447 for (StallMsgMapType::iterator map_iter = m_stall_msg_map.begin();
448 map_iter != m_stall_msg_map.end();
449 ++map_iter) {
450
451 for (std::list<MsgPtr>::iterator it = (map_iter->second).begin();
452 it != (map_iter->second).end(); ++it) {
453
454 Message *msg = (*it).get();
455 if (msg->functionalWrite(pkt)) {
456 num_functional_writes++;
457 }
458 }
459 }
460
461 return num_functional_writes;
462}
463
464MessageBuffer *
465MessageBufferParams::create()
466{
467 return new MessageBuffer(this);
468}
286 m_msgs_this_cycle = 0;
287}
288
289void
290MessageBuffer::recycle(Tick current_time, Tick recycle_latency)
291{
292 DPRINTF(RubyQueue, "Recycling.\n");
293 assert(isReady(current_time));
294 MsgPtr node = m_prio_heap.front();
295 pop_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
296
297 Tick future_time = current_time + recycle_latency;
298 node->setLastEnqueueTime(future_time);
299
300 m_prio_heap.back() = node;
301 push_heap(m_prio_heap.begin(), m_prio_heap.end(), greater<MsgPtr>());
302 m_consumer->scheduleEventAbsolute(future_time);
303}
304
305void
306MessageBuffer::reanalyzeList(list<MsgPtr> &lt, Tick schdTick)
307{
308 while (!lt.empty()) {
309 MsgPtr m = lt.front();
310 assert(m->getLastEnqueueTime() <= schdTick);
311
312 m_prio_heap.push_back(m);
313 push_heap(m_prio_heap.begin(), m_prio_heap.end(),
314 greater<MsgPtr>());
315
316 m_consumer->scheduleEventAbsolute(schdTick);
317
318 DPRINTF(RubyQueue, "Requeue arrival_time: %lld, Message: %s\n",
319 schdTick, *(m.get()));
320
321 lt.pop_front();
322 }
323}
324
325void
326MessageBuffer::reanalyzeMessages(Addr addr, Tick current_time)
327{
328 DPRINTF(RubyQueue, "ReanalyzeMessages %#x\n", addr);
329 assert(m_stall_msg_map.count(addr) > 0);
330
331 //
332 // Put all stalled messages associated with this address back on the
333 // prio heap. The reanalyzeList call will make sure the consumer is
334 // scheduled for the current cycle so that the previously stalled messages
335 // will be observed before any younger messages that may arrive this cycle
336 //
337 m_stall_map_size -= m_stall_msg_map[addr].size();
338 assert(m_stall_map_size >= 0);
339 reanalyzeList(m_stall_msg_map[addr], current_time);
340 m_stall_msg_map.erase(addr);
341}
342
343void
344MessageBuffer::reanalyzeAllMessages(Tick current_time)
345{
346 DPRINTF(RubyQueue, "ReanalyzeAllMessages\n");
347
348 //
349 // Put all stalled messages associated with this address back on the
350 // prio heap. The reanalyzeList call will make sure the consumer is
351 // scheduled for the current cycle so that the previously stalled messages
352 // will be observed before any younger messages that may arrive this cycle.
353 //
354 for (StallMsgMapType::iterator map_iter = m_stall_msg_map.begin();
355 map_iter != m_stall_msg_map.end(); ++map_iter) {
356 m_stall_map_size -= map_iter->second.size();
357 assert(m_stall_map_size >= 0);
358 reanalyzeList(map_iter->second, current_time);
359 }
360 m_stall_msg_map.clear();
361}
362
363void
364MessageBuffer::stallMessage(Addr addr, Tick current_time)
365{
366 DPRINTF(RubyQueue, "Stalling due to %#x\n", addr);
367 assert(isReady(current_time));
368 assert(getOffset(addr) == 0);
369 MsgPtr message = m_prio_heap.front();
370
371 // Since the message will just be moved to stall map, indicate that the
372 // buffer should not decrement the m_buf_msgs statistic
373 dequeue(current_time, false);
374
375 //
376 // Note: no event is scheduled to analyze the map at a later time.
377 // Instead the controller is responsible to call reanalyzeMessages when
378 // these addresses change state.
379 //
380 (m_stall_msg_map[addr]).push_back(message);
381 m_stall_map_size++;
382 m_stall_count++;
383}
384
385void
386MessageBuffer::print(ostream& out) const
387{
388 ccprintf(out, "[MessageBuffer: ");
389 if (m_consumer != NULL) {
390 ccprintf(out, " consumer-yes ");
391 }
392
393 vector<MsgPtr> copy(m_prio_heap);
394 sort_heap(copy.begin(), copy.end(), greater<MsgPtr>());
395 ccprintf(out, "%s] %s", copy, name());
396}
397
398bool
399MessageBuffer::isReady(Tick current_time) const
400{
401 return ((m_prio_heap.size() > 0) &&
402 (m_prio_heap.front()->getLastEnqueueTime() <= current_time));
403}
404
405void
406MessageBuffer::regStats()
407{
408 m_not_avail_count
409 .name(name() + ".not_avail_count")
410 .desc("Number of times this buffer did not have N slots available")
411 .flags(Stats::nozero);
412
413 m_buf_msgs
414 .name(name() + ".avg_buf_msgs")
415 .desc("Average number of messages in buffer")
416 .flags(Stats::nozero);
417
418 m_stall_count
419 .name(name() + ".num_msg_stalls")
420 .desc("Number of times messages were stalled")
421 .flags(Stats::nozero);
422
423 m_occupancy
424 .name(name() + ".avg_buf_occ")
425 .desc("Average occupancy of buffer capacity")
426 .flags(Stats::nozero);
427
428 m_stall_time
429 .name(name() + ".avg_stall_time")
430 .desc("Average number of cycles messages are stalled in this MB")
431 .flags(Stats::nozero);
432
433 if (m_max_size > 0) {
434 m_occupancy = m_buf_msgs / m_max_size;
435 } else {
436 m_occupancy = 0;
437 }
438}
439
440uint32_t
441MessageBuffer::functionalWrite(Packet *pkt)
442{
443 uint32_t num_functional_writes = 0;
444
445 // Check the priority heap and write any messages that may
446 // correspond to the address in the packet.
447 for (unsigned int i = 0; i < m_prio_heap.size(); ++i) {
448 Message *msg = m_prio_heap[i].get();
449 if (msg->functionalWrite(pkt)) {
450 num_functional_writes++;
451 }
452 }
453
454 // Check the stall queue and write any messages that may
455 // correspond to the address in the packet.
456 for (StallMsgMapType::iterator map_iter = m_stall_msg_map.begin();
457 map_iter != m_stall_msg_map.end();
458 ++map_iter) {
459
460 for (std::list<MsgPtr>::iterator it = (map_iter->second).begin();
461 it != (map_iter->second).end(); ++it) {
462
463 Message *msg = (*it).get();
464 if (msg->functionalWrite(pkt)) {
465 num_functional_writes++;
466 }
467 }
468 }
469
470 return num_functional_writes;
471}
472
473MessageBuffer *
474MessageBufferParams::create()
475{
476 return new MessageBuffer(this);
477}