|  | /* | 
|  | *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. | 
|  | * | 
|  | *  Use of this source code is governed by a BSD-style license | 
|  | *  that can be found in the LICENSE file in the root of the source | 
|  | *  tree. An additional intellectual property rights grant can be found | 
|  | *  in the file PATENTS.  All contributing project authors may | 
|  | *  be found in the AUTHORS file in the root of the source tree. | 
|  | * | 
|  | *  Data Channel Benchmarking tool. | 
|  | * | 
|  | *  Create a server using: ./data_channel_benchmark --server --port 12345 | 
|  | *  Start the flow of data from the server to a client using: | 
|  | *  ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196 | 
|  | *  The throughput is reported on the server console. | 
|  | * | 
|  | *  The negotiation does not require a 3rd party server and is done over a gRPC | 
|  | *  transport. No TURN server is configured, so both peers need to be reachable | 
|  | *  using STUN only. | 
|  | */ | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <charconv> | 
|  | #include <cstddef> | 
|  | #include <cstdint> | 
|  | #include <cstdio> | 
|  | #include <memory> | 
|  | #include <string> | 
|  | #include <utility> | 
|  |  | 
|  | #include "absl/cleanup/cleanup.h" | 
|  | #include "absl/flags/flag.h" | 
|  | #include "absl/flags/parse.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "api/data_channel_interface.h" | 
|  | #include "api/field_trials.h" | 
|  | #include "api/peer_connection_interface.h" | 
|  | #include "api/rtc_error.h" | 
|  | #include "api/scoped_refptr.h" | 
|  | #include "rtc_base/checks.h" | 
|  | #include "rtc_base/copy_on_write_buffer.h" | 
|  | #include "rtc_base/event.h" | 
|  | #include "rtc_base/logging.h" | 
|  | #include "rtc_base/ssl_adapter.h" | 
|  | #include "rtc_base/string_encode.h" | 
|  | #include "rtc_base/strings/string_builder.h" | 
|  | #include "rtc_base/thread.h" | 
|  | #include "rtc_tools/data_channel_benchmark/grpc_signaling.h" | 
|  | #include "rtc_tools/data_channel_benchmark/peer_connection_client.h" | 
|  | #include "rtc_tools/data_channel_benchmark/signaling_interface.h" | 
|  | #include "system_wrappers/include/clock.h" | 
|  |  | 
|  | ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)"); | 
|  | ABSL_FLAG(bool, server, false, "Server mode"); | 
|  | ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client"); | 
|  | ABSL_FLAG(std::string, address, "localhost", "Connect to server address"); | 
|  | ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)"); | 
|  | ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)"); | 
|  | ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size"); | 
|  | ABSL_FLAG(std::string, | 
|  | force_fieldtrials, | 
|  | "", | 
|  | "Field trials control experimental feature code which can be forced. " | 
|  | "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/" | 
|  | " will assign the group Enable to field trial WebRTC-FooFeature."); | 
|  |  | 
|  | struct SetupMessage { | 
|  | size_t packet_size; | 
|  | size_t transfer_size; | 
|  |  | 
|  | std::string ToString() { | 
|  | char buffer[64]; | 
|  | webrtc::SimpleStringBuilder sb(buffer); | 
|  | sb << packet_size << "," << transfer_size; | 
|  |  | 
|  | return sb.str(); | 
|  | } | 
|  |  | 
|  | static SetupMessage FromString(absl::string_view sv) { | 
|  | SetupMessage result; | 
|  | auto parameters = webrtc::split(sv, ','); | 
|  | std::from_chars(parameters[0].data(), | 
|  | parameters[0].data() + parameters[0].size(), | 
|  | result.packet_size, 10); | 
|  | std::from_chars(parameters[1].data(), | 
|  | parameters[1].data() + parameters[1].size(), | 
|  | result.transfer_size, 10); | 
|  | return result; | 
|  | } | 
|  | }; | 
|  |  | 
|  | class DataChannelServerObserverImpl : public webrtc::DataChannelObserver { | 
|  | public: | 
|  | explicit DataChannelServerObserverImpl(webrtc::DataChannelInterface* dc, | 
|  | webrtc::Thread* signaling_thread) | 
|  | : dc_(dc), signaling_thread_(signaling_thread) {} | 
|  |  | 
|  | void OnStateChange() override { | 
|  | RTC_LOG(LS_INFO) << "Server state changed to " << dc_->state(); | 
|  | switch (dc_->state()) { | 
|  | case webrtc::DataChannelInterface::DataState::kOpen: | 
|  | break; | 
|  | case webrtc::DataChannelInterface::DataState::kClosed: | 
|  | closed_event_.Set(); | 
|  | break; | 
|  | default: | 
|  | break; | 
|  | } | 
|  | } | 
|  |  | 
|  | void OnMessage(const webrtc::DataBuffer& buffer) override { | 
|  | if (!buffer.binary) { | 
|  | std::string setup_message(buffer.data.cdata<char>(), buffer.data.size()); | 
|  | setup_ = SetupMessage::FromString(setup_message); | 
|  | remaining_data_ = setup_.transfer_size; | 
|  | setup_message_event_.Set(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void OnBufferedAmountChange(uint64_t sent_data_size) override { | 
|  | remaining_data_ -= sent_data_size; | 
|  | // Allow the transport buffer to be drained before starting again. | 
|  | if (buffer_ && dc_->buffered_amount() <= ok_to_resume_sending_threshold_) { | 
|  | total_queued_up_ += buffer_->size(); | 
|  | dc_->SendAsync(*buffer_, [this, buffer = buffer_](webrtc::RTCError err) { | 
|  | OnSendAsyncComplete(err, buffer); | 
|  | }); | 
|  | buffer_ = nullptr; | 
|  | } | 
|  | } | 
|  |  | 
|  | bool IsOkToCallOnTheNetworkThread() override { return true; } | 
|  |  | 
|  | bool WaitForClosedState() { | 
|  | return closed_event_.Wait(webrtc::Event::kForever); | 
|  | } | 
|  |  | 
|  | bool WaitForSetupMessage() { | 
|  | return setup_message_event_.Wait(webrtc::Event::kForever); | 
|  | } | 
|  |  | 
|  | void StartSending() { | 
|  | RTC_CHECK(remaining_data_) << "Error: no data to send"; | 
|  | std::string data(std::min(setup_.packet_size, remaining_data_), '0'); | 
|  | webrtc::DataBuffer* data_buffer = | 
|  | new webrtc::DataBuffer(webrtc::CopyOnWriteBuffer(data), true); | 
|  | total_queued_up_ = data_buffer->size(); | 
|  | dc_->SendAsync(*data_buffer, | 
|  | [this, data_buffer = data_buffer](webrtc::RTCError err) { | 
|  | OnSendAsyncComplete(err, data_buffer); | 
|  | }); | 
|  | } | 
|  |  | 
|  | const struct SetupMessage& parameters() const { return setup_; } | 
|  |  | 
|  | private: | 
|  | void OnSendAsyncComplete(webrtc::RTCError error, webrtc::DataBuffer* buffer) { | 
|  | total_queued_up_ -= buffer->size(); | 
|  | if (!error.ok()) { | 
|  | RTC_CHECK_EQ(error.type(), webrtc::RTCErrorType::RESOURCE_EXHAUSTED); | 
|  | RTC_CHECK(!buffer_); | 
|  | // Buffer saturated. Retry when OnBufferedAmountChange() detects we can. | 
|  | buffer_ = buffer; | 
|  | return; | 
|  | } | 
|  | signaling_thread_->PostTask([this, buffer = buffer, | 
|  | remaining_data = remaining_data_]() { | 
|  | fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n", | 
|  | (setup_.transfer_size - remaining_data), setup_.transfer_size, | 
|  | (100 - remaining_data * 100 / setup_.transfer_size)); | 
|  |  | 
|  | if (!remaining_data) { | 
|  | RTC_CHECK(!total_queued_up_); | 
|  | // We're done. | 
|  | delete buffer; | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (remaining_data < buffer->data.size()) { | 
|  | buffer->data.SetSize(remaining_data); | 
|  | } | 
|  |  | 
|  | total_queued_up_ += buffer->size(); | 
|  | dc_->SendAsync(*buffer, [this, buffer = buffer](webrtc::RTCError err) { | 
|  | OnSendAsyncComplete(err, buffer); | 
|  | }); | 
|  | }); | 
|  | } | 
|  |  | 
|  | webrtc::DataChannelInterface* const dc_; | 
|  | webrtc::Thread* const signaling_thread_; | 
|  | webrtc::Event closed_event_; | 
|  | webrtc::Event setup_message_event_; | 
|  | size_t remaining_data_ = 0u; | 
|  | size_t total_queued_up_ = 0u; | 
|  | struct SetupMessage setup_; | 
|  | webrtc::DataBuffer* buffer_ = nullptr; | 
|  | const uint64_t ok_to_resume_sending_threshold_ = | 
|  | webrtc::DataChannelInterface::MaxSendQueueSize() / 2; | 
|  | }; | 
|  |  | 
|  | class DataChannelClientObserverImpl : public webrtc::DataChannelObserver { | 
|  | public: | 
|  | explicit DataChannelClientObserverImpl(webrtc::DataChannelInterface* dc, | 
|  | uint64_t bytes_received_threshold) | 
|  | : dc_(dc), bytes_received_threshold_(bytes_received_threshold) {} | 
|  |  | 
|  | void OnStateChange() override { | 
|  | RTC_LOG(LS_INFO) << "Client state changed to " << dc_->state(); | 
|  | switch (dc_->state()) { | 
|  | case webrtc::DataChannelInterface::DataState::kOpen: | 
|  | open_event_.Set(); | 
|  | break; | 
|  | default: | 
|  | break; | 
|  | } | 
|  | } | 
|  |  | 
|  | void OnMessage(const webrtc::DataBuffer& buffer) override { | 
|  | bytes_received_ += buffer.data.size(); | 
|  | if (bytes_received_ >= bytes_received_threshold_) { | 
|  | bytes_received_event_.Set(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void OnBufferedAmountChange(uint64_t sent_data_size) override {} | 
|  | bool IsOkToCallOnTheNetworkThread() override { return true; } | 
|  |  | 
|  | bool WaitForOpenState() { return open_event_.Wait(webrtc::Event::kForever); } | 
|  |  | 
|  | // Wait until the received byte count reaches the desired value. | 
|  | bool WaitForBytesReceivedThreshold() { | 
|  | return bytes_received_event_.Wait(webrtc::Event::kForever); | 
|  | } | 
|  |  | 
|  | private: | 
|  | webrtc::DataChannelInterface* const dc_; | 
|  | webrtc::Event open_event_; | 
|  | webrtc::Event bytes_received_event_; | 
|  | const uint64_t bytes_received_threshold_; | 
|  | uint64_t bytes_received_ = 0u; | 
|  | }; | 
|  |  | 
|  | int RunServer() { | 
|  | bool oneshot = absl::GetFlag(FLAGS_oneshot); | 
|  | uint16_t port = absl::GetFlag(FLAGS_port); | 
|  |  | 
|  | auto signaling_thread = webrtc::Thread::Create(); | 
|  | signaling_thread->Start(); | 
|  | { | 
|  | auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( | 
|  | signaling_thread.get(), std::make_unique<webrtc::FieldTrials>( | 
|  | absl::GetFlag(FLAGS_force_fieldtrials))); | 
|  |  | 
|  | auto grpc_server = webrtc::GrpcSignalingServerInterface::Create( | 
|  | [factory = | 
|  | webrtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>( | 
|  | factory), | 
|  | signaling_thread = | 
|  | signaling_thread.get()](webrtc::SignalingInterface* signaling) { | 
|  | webrtc::PeerConnectionClient client(factory.get(), signaling); | 
|  | client.StartPeerConnection(); | 
|  | auto peer_connection = client.peerConnection(); | 
|  |  | 
|  | // Set up the data channel | 
|  | auto dc_or_error = | 
|  | peer_connection->CreateDataChannelOrError("benchmark", nullptr); | 
|  | RTC_CHECK(dc_or_error.ok()); | 
|  | auto data_channel = dc_or_error.MoveValue(); | 
|  | auto data_channel_observer = | 
|  | std::make_unique<DataChannelServerObserverImpl>( | 
|  | data_channel.get(), signaling_thread); | 
|  | data_channel->RegisterObserver(data_channel_observer.get()); | 
|  | absl::Cleanup unregister_observer( | 
|  | [data_channel] { data_channel->UnregisterObserver(); }); | 
|  |  | 
|  | // Wait for a first message from the remote peer. | 
|  | // It configures how much data should be sent and how big the packets | 
|  | // should be. | 
|  | // First message is "packet_size,transfer_size". | 
|  | data_channel_observer->WaitForSetupMessage(); | 
|  |  | 
|  | // Wait for the sender and receiver peers to stabilize (send all ACKs) | 
|  | // This makes it easier to isolate the sending part when profiling. | 
|  | webrtc::Thread::SleepMs(/*millis=*/1'000); | 
|  |  | 
|  | auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); | 
|  |  | 
|  | data_channel_observer->StartSending(); | 
|  |  | 
|  | // Receiver signals the data channel close event when it has received | 
|  | // all the data it requested. | 
|  | data_channel_observer->WaitForClosedState(); | 
|  |  | 
|  | auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime(); | 
|  | auto duration_ms = (end_time - begin_time).ms<size_t>(); | 
|  | double throughput = | 
|  | (data_channel_observer->parameters().transfer_size / 1024. / | 
|  | 1024.) / | 
|  | (duration_ms / 1000.); | 
|  | printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput); | 
|  | }, | 
|  | port, oneshot); | 
|  | grpc_server->Start(); | 
|  |  | 
|  | printf("Server listening on port %d\n", grpc_server->SelectedPort()); | 
|  | grpc_server->Wait(); | 
|  | } | 
|  |  | 
|  | signaling_thread->Stop(); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | int RunClient() { | 
|  | uint16_t port = absl::GetFlag(FLAGS_port); | 
|  | std::string server_address = absl::GetFlag(FLAGS_address); | 
|  | size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024; | 
|  | size_t packet_size = absl::GetFlag(FLAGS_packet_size); | 
|  |  | 
|  | auto signaling_thread = webrtc::Thread::Create(); | 
|  | signaling_thread->Start(); | 
|  | { | 
|  | auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory( | 
|  | signaling_thread.get(), std::make_unique<webrtc::FieldTrials>( | 
|  | absl::GetFlag(FLAGS_force_fieldtrials))); | 
|  | auto grpc_client = webrtc::GrpcSignalingClientInterface::Create( | 
|  | server_address + ":" + std::to_string(port)); | 
|  | webrtc::PeerConnectionClient client(factory.get(), | 
|  | grpc_client->signaling_client()); | 
|  |  | 
|  | std::unique_ptr<DataChannelClientObserverImpl> observer; | 
|  |  | 
|  | // Set up the callback to receive the data channel from the sender. | 
|  | webrtc::scoped_refptr<webrtc::DataChannelInterface> data_channel; | 
|  | webrtc::Event got_data_channel; | 
|  | client.SetOnDataChannel( | 
|  | [&](webrtc::scoped_refptr<webrtc::DataChannelInterface> channel) { | 
|  | data_channel = std::move(channel); | 
|  | // DataChannel needs an observer to drain the read queue. | 
|  | observer = std::make_unique<DataChannelClientObserverImpl>( | 
|  | data_channel.get(), transfer_size); | 
|  | data_channel->RegisterObserver(observer.get()); | 
|  | got_data_channel.Set(); | 
|  | }); | 
|  |  | 
|  | // Connect to the server. | 
|  | if (!grpc_client->Start()) { | 
|  | fprintf(stderr, "Failed to connect to server\n"); | 
|  | return 1; | 
|  | } | 
|  |  | 
|  | // Wait for the data channel to be received | 
|  | got_data_channel.Wait(webrtc::Event::kForever); | 
|  |  | 
|  | absl::Cleanup unregister_observer( | 
|  | [data_channel] { data_channel->UnregisterObserver(); }); | 
|  |  | 
|  | // Send a configuration string to the server to tell it to send | 
|  | // 'packet_size' bytes packets and send a total of 'transfer_size' MB. | 
|  | observer->WaitForOpenState(); | 
|  | SetupMessage setup_message = { | 
|  | .packet_size = packet_size, | 
|  | .transfer_size = transfer_size, | 
|  | }; | 
|  | if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) { | 
|  | fprintf(stderr, "Failed to send parameter string\n"); | 
|  | return 1; | 
|  | } | 
|  |  | 
|  | // Wait until we have received all the data | 
|  | observer->WaitForBytesReceivedThreshold(); | 
|  |  | 
|  | // Close the data channel, signaling to the server we have received | 
|  | // all the requested data. | 
|  | data_channel->Close(); | 
|  | } | 
|  |  | 
|  | signaling_thread->Stop(); | 
|  |  | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | int main(int argc, char** argv) { | 
|  | webrtc::InitializeSSL(); | 
|  | absl::ParseCommandLine(argc, argv); | 
|  |  | 
|  | // Make sure that higher severity number means more logs by reversing the | 
|  | // webrtc::LoggingSeverity values. | 
|  | auto logging_severity = | 
|  | std::max(0, webrtc::LS_NONE - absl::GetFlag(FLAGS_verbose)); | 
|  | webrtc::LogMessage::LogToDebug( | 
|  | static_cast<webrtc::LoggingSeverity>(logging_severity)); | 
|  |  | 
|  | bool is_server = absl::GetFlag(FLAGS_server); | 
|  |  | 
|  | return is_server ? RunServer() : RunClient(); | 
|  | } |