Enable Nack pacing.
Review URL: https://webrtc-codereview.appspot.com/1357004
git-svn-id: http://webrtc.googlecode.com/svn/trunk/webrtc@3912 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/modules/pacing/include/mock/mock_paced_sender.h b/modules/pacing/include/mock/mock_paced_sender.h
new file mode 100644
index 0000000..13b414d
--- /dev/null
+++ b/modules/pacing/include/mock/mock_paced_sender.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2013 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_
+#define WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include "webrtc/modules/pacing/include/paced_sender.h"
+
+namespace webrtc {
+
+class MockPacedSender : public PacedSender {
+ public:
+ MockPacedSender() : PacedSender(NULL, 0) {}
+ MOCK_METHOD5(SendPacket, bool(Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ int bytes));
+ MOCK_CONST_METHOD0(QueueInMs, int());
+ MOCK_CONST_METHOD0(QueueInPackets, int());
+};
+
+} // namespace webrtc
+
+#endif // WEBRTC_MODULES_PACING_INCLUDE_MOCK_MOCK_PACED_SENDER_H_
diff --git a/modules/pacing/include/paced_sender.h b/modules/pacing/include/paced_sender.h
index 4a0ade8..bd4880d 100644
--- a/modules/pacing/include/paced_sender.h
+++ b/modules/pacing/include/paced_sender.h
@@ -62,11 +62,14 @@
// Returns true if we send the packet now, else it will add the packet
// information to the queue and call TimeToSendPacket when it's time to send.
- bool SendPacket(Priority priority, uint32_t ssrc, uint16_t sequence_number,
- int64_t capture_time_ms, int bytes);
+ virtual bool SendPacket(Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ int bytes);
// Returns the time since the oldest queued packet was captured.
- int QueueInMs() const;
+ virtual int QueueInMs() const;
// Returns the number of milliseconds until the module want a worker thread
// to call Process.
diff --git a/modules/rtp_rtcp/source/rtp_sender.cc b/modules/rtp_rtcp/source/rtp_sender.cc
index 54c242d..e74465f 100644
--- a/modules/rtp_rtcp/source/rtp_sender.cc
+++ b/modules/rtp_rtcp/source/rtp_sender.cc
@@ -448,19 +448,18 @@
packet_history_->SetStorePacketsStatus(enable, number_to_store);
}
-bool RTPSender::StorePackets() const { return packet_history_->StorePackets(); }
+bool RTPSender::StorePackets() const {
+ return packet_history_->StorePackets();
+}
int32_t RTPSender::ReSendPacket(uint16_t packet_id, uint32_t min_resend_time) {
uint16_t length = IP_PACKET_SIZE;
uint8_t data_buffer[IP_PACKET_SIZE];
uint8_t *buffer_to_send_ptr = data_buffer;
-
- int64_t stored_time_in_ms;
+ int64_t capture_time_ms;
StorageType type;
- bool found = packet_history_->GetRTPPacket(packet_id, min_resend_time,
- data_buffer, &length,
- &stored_time_in_ms, &type);
- if (!found) {
+ if (!packet_history_->GetRTPPacket(packet_id, min_resend_time, data_buffer,
+ &length, &capture_time_ms, &type)) {
// Packet not found.
return 0;
}
@@ -469,44 +468,63 @@
// packet should not be retransmitted.
return 0;
}
+
uint8_t data_buffer_rtx[IP_PACKET_SIZE];
if (rtx_ != kRtxOff) {
BuildRtxPacket(data_buffer, &length, data_buffer_rtx);
buffer_to_send_ptr = data_buffer_rtx;
}
- int32_t bytes_sent = ReSendToNetwork(buffer_to_send_ptr, length);
ModuleRTPUtility::RTPHeaderParser rtp_parser(data_buffer, length);
WebRtcRTPHeader rtp_header;
rtp_parser.Parse(rtp_header);
+
+ // Store the time when the packet was last sent or added to pacer.
+ packet_history_->UpdateResendTime(packet_id);
+
+ {
+ // Update send statistics prior to pacer.
+ CriticalSectionScoped cs(send_critsect_);
+ Bitrate::Update(length);
+ packets_sent_++;
+ // We on purpose don't add to payload_bytes_sent_ since this is a
+ // re-transmit and not new payload data.
+ }
+
+ if (paced_sender_) {
+ if (!paced_sender_->SendPacket(PacedSender::kHighPriority,
+ rtp_header.header.ssrc,
+ rtp_header.header.sequenceNumber,
+ capture_time_ms,
+ length)) {
+ // We can't send the packet right now.
+ // We will be called when it is time.
+ return 0;
+ }
+ }
+
TRACE_EVENT_INSTANT2("webrtc_rtp", "RTPSender::ReSendPacket",
"timestamp", rtp_header.header.timestamp,
"seqnum", rtp_header.header.sequenceNumber);
- if (bytes_sent <= 0) {
- WEBRTC_TRACE(kTraceWarning, kTraceRtpRtcp, id_,
- "Transport failed to resend packet_id %u", packet_id);
- return -1;
+
+ if (SendPacketToNetwork(buffer_to_send_ptr, length)) {
+ return 0;
}
- // Store the time when the packet was last resent.
- packet_history_->UpdateResendTime(packet_id);
- return bytes_sent;
+ return -1;
}
-int32_t RTPSender::ReSendToNetwork(const uint8_t *packet, const uint32_t size) {
- int32_t bytes_sent = -1;
+bool RTPSender::SendPacketToNetwork(const uint8_t *packet, uint32_t size) {
+ int bytes_sent = -1;
if (transport_) {
bytes_sent = transport_->SendPacket(id_, packet, size);
}
+ // TODO(pwesin): Add a separate bitrate for sent bitrate after pacer.
if (bytes_sent <= 0) {
- return -1;
+ WEBRTC_TRACE(kTraceWarning, kTraceRtpRtcp, id_,
+ "Transport failed to send packet");
+ return false;
}
- // Update send statistics.
- CriticalSectionScoped cs(send_critsect_);
- Bitrate::Update(bytes_sent);
- packets_sent_++;
- // We on purpose don't add to payload_bytes_sent_ since this is a
- // re-transmit and not new payload data.
- return bytes_sent;
+ return true;
}
int RTPSender::SelectiveRetransmissions() const {
@@ -625,12 +643,13 @@
}
}
+// Called from pacer when we can send the packet.
void RTPSender::TimeToSendPacket(uint16_t sequence_number,
int64_t capture_time_ms) {
StorageType type;
uint16_t length = IP_PACKET_SIZE;
uint8_t data_buffer[IP_PACKET_SIZE];
- int64_t stored_time_ms; // TODO(pwestin) can we deprecate this?
+ int64_t stored_time_ms;
if (packet_history_ == NULL) {
return;
@@ -655,20 +674,7 @@
rtp_header.header.sequenceNumber,
rtp_header.header.headerLength);
}
- int bytes_sent = -1;
- if (transport_) {
- bytes_sent = transport_->SendPacket(id_, data_buffer, length);
- }
- if (bytes_sent <= 0) {
- return;
- }
- // Update send statistics.
- CriticalSectionScoped cs(send_critsect_);
- Bitrate::Update(bytes_sent);
- packets_sent_++;
- if (bytes_sent > rtp_header.header.headerLength) {
- payload_bytes_sent_ += bytes_sent - rtp_header.header.headerLength;
- }
+ SendPacketToNetwork(data_buffer, length);
}
// TODO(pwestin): send in the RTPHeaderParser to avoid parsing it again.
@@ -695,17 +701,26 @@
return -1;
}
- int32_t bytes_sent = -1;
// Create and send RTX Packet.
+ // TODO(pwesin): This should be moved to its own code path triggered by pacer.
+ bool rtx_sent = false;
if (rtx_ == kRtxAll && storage == kAllowRetransmission) {
uint16_t length_rtx = payload_length + rtp_header_length;
uint8_t data_buffer_rtx[IP_PACKET_SIZE];
BuildRtxPacket(buffer, &length_rtx, data_buffer_rtx);
- if (transport_) {
- bytes_sent += transport_->SendPacket(id_, data_buffer_rtx, length_rtx);
- if (bytes_sent <= 0) {
- return -1;
- }
+ if (!SendPacketToNetwork(data_buffer_rtx, length_rtx)) return -1;
+ rtx_sent = true;
+ }
+ {
+ // Update send statistics prior to pacer.
+ CriticalSectionScoped cs(send_critsect_);
+ Bitrate::Update(payload_length + rtp_header_length);
+ ++packets_sent_;
+ payload_bytes_sent_ += payload_length;
+ if (rtx_sent) {
+ // The RTX packet.
+ ++packets_sent_;
+ payload_bytes_sent_ += payload_length;
}
}
@@ -716,26 +731,13 @@
payload_length + rtp_header_length)) {
// We can't send the packet right now.
// We will be called when it is time.
- return payload_length + rtp_header_length;
+ return 0;
}
}
- // Send data packet.
- bytes_sent = -1;
- if (transport_) {
- bytes_sent = transport_->SendPacket(id_, buffer,
- payload_length + rtp_header_length);
+ if (SendPacketToNetwork(buffer, payload_length + rtp_header_length)) {
+ return 0;
}
- if (bytes_sent <= 0) {
- return -1;
- }
- // Update send statistics.
- CriticalSectionScoped cs(send_critsect_);
- Bitrate::Update(bytes_sent);
- packets_sent_++;
- if (bytes_sent > rtp_header_length) {
- payload_bytes_sent_ += bytes_sent - rtp_header_length;
- }
- return 0;
+ return -1;
}
void RTPSender::ProcessBitrate() {
diff --git a/modules/rtp_rtcp/source/rtp_sender.h b/modules/rtp_rtcp/source/rtp_sender.h
index 0f41632..2e207b5 100644
--- a/modules/rtp_rtcp/source/rtp_sender.h
+++ b/modules/rtp_rtcp/source/rtp_sender.h
@@ -169,8 +169,6 @@
int32_t ReSendPacket(uint16_t packet_id, uint32_t min_resend_time = 0);
- int32_t ReSendToNetwork(const uint8_t *packet, const uint32_t size);
-
bool ProcessNACKBitRate(const uint32_t now);
// RTX.
@@ -263,6 +261,8 @@
void BuildRtxPacket(uint8_t* buffer, uint16_t* length,
uint8_t* buffer_rtx);
+ bool SendPacketToNetwork(const uint8_t *packet, uint32_t size);
+
int32_t id_;
const bool audio_configured_;
RTPSenderAudio *audio_;
diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc
index 2c22201..7b7f6ac 100644
--- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc
@@ -14,6 +14,7 @@
#include <gtest/gtest.h>
+#include "webrtc/modules/pacing/include/mock/mock_paced_sender.h"
#include "webrtc/modules/rtp_rtcp/interface/rtp_rtcp_defines.h"
#include "webrtc/modules/rtp_rtcp/source/rtp_format_video_generic.h"
#include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h"
@@ -34,6 +35,8 @@
const int kMaxPacketLength = 1500;
} // namespace
+using testing::_;
+
class LoopbackTransportTest : public webrtc::Transport {
public:
LoopbackTransportTest()
@@ -58,13 +61,17 @@
protected:
RtpSenderTest()
: fake_clock_(123456),
+ mock_paced_sender_(),
rtp_sender_(new RTPSender(0, false, &fake_clock_, &transport_, NULL,
- NULL)),
+ &mock_paced_sender_)),
kMarkerBit(true),
kType(kRtpExtensionTransmissionTimeOffset) {
rtp_sender_->SetSequenceNumber(kSeqNum);
+ EXPECT_CALL(mock_paced_sender_,
+ SendPacket(_, _, _, _, _)).WillRepeatedly(testing::Return(true));
}
SimulatedClock fake_clock_;
+ MockPacedSender mock_paced_sender_;
scoped_ptr<RTPSender> rtp_sender_;
LoopbackTransportTest transport_;
const bool kMarkerBit;
@@ -173,24 +180,11 @@
EXPECT_EQ(kNegTimeOffset, rtp_header.extension.transmissionTimeOffset);
}
-TEST_F(RtpSenderTest, NoTrafficSmoothing) {
- int32_t rtp_length = rtp_sender_->BuildRTPheader(packet_,
- kPayload,
- kMarkerBit,
- kTimestamp);
+TEST_F(RtpSenderTest, TrafficSmoothingWithTimeOffset) {
+ EXPECT_CALL(mock_paced_sender_,
+ SendPacket(PacedSender::kNormalPriority, _, kSeqNum, _, _)).
+ WillOnce(testing::Return(false));
- // Packet should be sent immediately.
- EXPECT_EQ(0, rtp_sender_->SendToNetwork(packet_,
- 0,
- rtp_length,
- kTimestamp / 90,
- kAllowRetransmission));
- EXPECT_EQ(1, transport_.packets_sent_);
- EXPECT_EQ(rtp_length, transport_.last_sent_packet_len_);
-}
-
-TEST_F(RtpSenderTest, DISABLED_TrafficSmoothing) {
- // TODO(pwestin) we need to send in a pacer object.
rtp_sender_->SetStorePacketsStatus(true, 10);
EXPECT_EQ(0, rtp_sender_->RegisterRtpHeaderExtension(kType, kId));
rtp_sender_->SetTargetSendBitrate(300000);
@@ -198,15 +192,23 @@
kPayload,
kMarkerBit,
kTimestamp);
+
+ int64_t capture_time_ms = fake_clock_.TimeInMilliseconds();
+
// Packet should be stored in a send bucket.
EXPECT_EQ(0, rtp_sender_->SendToNetwork(packet_,
0,
rtp_length,
- fake_clock_.TimeInMilliseconds(),
+ capture_time_ms,
kAllowRetransmission));
+
EXPECT_EQ(0, transport_.packets_sent_);
+
const int kStoredTimeInMs = 100;
fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs);
+
+ rtp_sender_->TimeToSendPacket(kSeqNum, capture_time_ms);
+
// Process send bucket. Packet should now be sent.
EXPECT_EQ(1, transport_.packets_sent_);
EXPECT_EQ(rtp_length, transport_.last_sent_packet_len_);
@@ -218,6 +220,60 @@
map.Register(kType, kId);
const bool valid_rtp_header = rtp_parser.Parse(rtp_header, &map);
ASSERT_TRUE(valid_rtp_header);
+
+ // Verify transmission time offset.
+ EXPECT_EQ(kStoredTimeInMs * 90, rtp_header.extension.transmissionTimeOffset);
+}
+
+TEST_F(RtpSenderTest, TrafficSmoothingRetransmits) {
+ EXPECT_CALL(mock_paced_sender_,
+ SendPacket(PacedSender::kNormalPriority, _, kSeqNum, _, _)).
+ WillOnce(testing::Return(false));
+
+ rtp_sender_->SetStorePacketsStatus(true, 10);
+ EXPECT_EQ(0, rtp_sender_->RegisterRtpHeaderExtension(kType, kId));
+ rtp_sender_->SetTargetSendBitrate(300000);
+ int32_t rtp_length = rtp_sender_->BuildRTPheader(packet_,
+ kPayload,
+ kMarkerBit,
+ kTimestamp);
+
+ int64_t capture_time_ms = fake_clock_.TimeInMilliseconds();
+
+ // Packet should be stored in a send bucket.
+ EXPECT_EQ(0, rtp_sender_->SendToNetwork(packet_,
+ 0,
+ rtp_length,
+ capture_time_ms,
+ kAllowRetransmission));
+
+ EXPECT_EQ(0, transport_.packets_sent_);
+
+ EXPECT_CALL(mock_paced_sender_,
+ SendPacket(PacedSender::kHighPriority, _, kSeqNum, _, _)).
+ WillOnce(testing::Return(false));
+
+ const int kStoredTimeInMs = 100;
+ fake_clock_.AdvanceTimeMilliseconds(kStoredTimeInMs);
+
+ EXPECT_EQ(0, rtp_sender_->ReSendPacket(kSeqNum));
+ EXPECT_EQ(0, transport_.packets_sent_);
+
+ rtp_sender_->TimeToSendPacket(kSeqNum, capture_time_ms);
+
+ // Process send bucket. Packet should now be sent.
+ EXPECT_EQ(1, transport_.packets_sent_);
+ EXPECT_EQ(rtp_length, transport_.last_sent_packet_len_);
+
+ // Parse sent packet.
+ webrtc::ModuleRTPUtility::RTPHeaderParser rtp_parser(
+ transport_.last_sent_packet_, rtp_length);
+ webrtc::WebRtcRTPHeader rtp_header;
+ RtpHeaderExtensionMap map;
+ map.Register(kType, kId);
+ const bool valid_rtp_header = rtp_parser.Parse(rtp_header, &map);
+ ASSERT_TRUE(valid_rtp_header);
+
// Verify transmission time offset.
EXPECT_EQ(kStoredTimeInMs * 90, rtp_header.extension.transmissionTimeOffset);
}