|  | /* | 
|  | *  Copyright 2004 The WebRTC Project Authors. All rights reserved. | 
|  | * | 
|  | *  Use of this source code is governed by a BSD-style license | 
|  | *  that can be found in the LICENSE file in the root of the source | 
|  | *  tree. An additional intellectual property rights grant can be found | 
|  | *  in the file PATENTS.  All contributing project authors may | 
|  | *  be found in the AUTHORS file in the root of the source tree. | 
|  | */ | 
|  |  | 
|  | #include <iomanip> | 
|  |  | 
|  | #include "webrtc/base/asyncsocket.h" | 
|  | #include "webrtc/base/logging.h" | 
|  | #include "webrtc/base/socketfactory.h" | 
|  | #include "webrtc/base/socketpool.h" | 
|  | #include "webrtc/base/socketstream.h" | 
|  | #include "webrtc/base/thread.h" | 
|  |  | 
|  | namespace rtc { | 
|  |  | 
|  | /////////////////////////////////////////////////////////////////////////////// | 
|  | // StreamCache - Caches a set of open streams, defers creation to a separate | 
|  | //  StreamPool. | 
|  | /////////////////////////////////////////////////////////////////////////////// | 
|  |  | 
|  | StreamCache::StreamCache(StreamPool* pool) : pool_(pool) { | 
|  | } | 
|  |  | 
|  | StreamCache::~StreamCache() { | 
|  | for (ConnectedList::iterator it = active_.begin(); it != active_.end(); | 
|  | ++it) { | 
|  | delete it->second; | 
|  | } | 
|  | for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | 
|  | ++it) { | 
|  | delete it->second; | 
|  | } | 
|  | } | 
|  |  | 
|  | StreamInterface* StreamCache::RequestConnectedStream( | 
|  | const SocketAddress& remote, int* err) { | 
|  | LOG_F(LS_VERBOSE) << "(" << remote << ")"; | 
|  | for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | 
|  | ++it) { | 
|  | if (remote == it->first) { | 
|  | it->second->SignalEvent.disconnect(this); | 
|  | // Move from cached_ to active_ | 
|  | active_.push_front(*it); | 
|  | cached_.erase(it); | 
|  | if (err) | 
|  | *err = 0; | 
|  | LOG_F(LS_VERBOSE) << "Providing cached stream"; | 
|  | return active_.front().second; | 
|  | } | 
|  | } | 
|  | if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { | 
|  | // We track active streams so that we can remember their address | 
|  | active_.push_front(ConnectedStream(remote, stream)); | 
|  | LOG_F(LS_VERBOSE) << "Providing new stream"; | 
|  | return active_.front().second; | 
|  | } | 
|  | return NULL; | 
|  | } | 
|  |  | 
|  | void StreamCache::ReturnConnectedStream(StreamInterface* stream) { | 
|  | for (ConnectedList::iterator it = active_.begin(); it != active_.end(); | 
|  | ++it) { | 
|  | if (stream == it->second) { | 
|  | LOG_F(LS_VERBOSE) << "(" << it->first << ")"; | 
|  | if (stream->GetState() == SS_CLOSED) { | 
|  | // Return closed streams | 
|  | LOG_F(LS_VERBOSE) << "Returning closed stream"; | 
|  | pool_->ReturnConnectedStream(it->second); | 
|  | } else { | 
|  | // Monitor open streams | 
|  | stream->SignalEvent.connect(this, &StreamCache::OnStreamEvent); | 
|  | LOG_F(LS_VERBOSE) << "Caching stream"; | 
|  | cached_.push_front(*it); | 
|  | } | 
|  | active_.erase(it); | 
|  | return; | 
|  | } | 
|  | } | 
|  | ASSERT(false); | 
|  | } | 
|  |  | 
|  | void StreamCache::OnStreamEvent(StreamInterface* stream, int events, int err) { | 
|  | if ((events & SE_CLOSE) == 0) { | 
|  | LOG_F(LS_WARNING) << "(" << events << ", " << err | 
|  | << ") received non-close event"; | 
|  | return; | 
|  | } | 
|  | for (ConnectedList::iterator it = cached_.begin(); it != cached_.end(); | 
|  | ++it) { | 
|  | if (stream == it->second) { | 
|  | LOG_F(LS_VERBOSE) << "(" << it->first << ")"; | 
|  | // We don't cache closed streams, so return it. | 
|  | it->second->SignalEvent.disconnect(this); | 
|  | LOG_F(LS_VERBOSE) << "Returning closed stream"; | 
|  | pool_->ReturnConnectedStream(it->second); | 
|  | cached_.erase(it); | 
|  | return; | 
|  | } | 
|  | } | 
|  | ASSERT(false); | 
|  | } | 
|  |  | 
|  | ////////////////////////////////////////////////////////////////////// | 
|  | // NewSocketPool | 
|  | ////////////////////////////////////////////////////////////////////// | 
|  |  | 
|  | NewSocketPool::NewSocketPool(SocketFactory* factory) : factory_(factory) { | 
|  | } | 
|  |  | 
|  | NewSocketPool::~NewSocketPool() { | 
|  | } | 
|  |  | 
|  | StreamInterface* | 
|  | NewSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { | 
|  | AsyncSocket* socket = | 
|  | factory_->CreateAsyncSocket(remote.family(), SOCK_STREAM); | 
|  | if (!socket) { | 
|  | if (err) | 
|  | *err = -1; | 
|  | return NULL; | 
|  | } | 
|  | if ((socket->Connect(remote) != 0) && !socket->IsBlocking()) { | 
|  | if (err) | 
|  | *err = socket->GetError(); | 
|  | delete socket; | 
|  | return NULL; | 
|  | } | 
|  | if (err) | 
|  | *err = 0; | 
|  | return new SocketStream(socket); | 
|  | } | 
|  |  | 
|  | void | 
|  | NewSocketPool::ReturnConnectedStream(StreamInterface* stream) { | 
|  | Thread::Current()->Dispose(stream); | 
|  | } | 
|  |  | 
|  | ////////////////////////////////////////////////////////////////////// | 
|  | // ReuseSocketPool | 
|  | ////////////////////////////////////////////////////////////////////// | 
|  |  | 
|  | ReuseSocketPool::ReuseSocketPool(SocketFactory* factory) | 
|  | : factory_(factory), stream_(NULL), checked_out_(false) { | 
|  | } | 
|  |  | 
|  | ReuseSocketPool::~ReuseSocketPool() { | 
|  | ASSERT(!checked_out_); | 
|  | delete stream_; | 
|  | } | 
|  |  | 
|  | StreamInterface* | 
|  | ReuseSocketPool::RequestConnectedStream(const SocketAddress& remote, int* err) { | 
|  | // Only one socket can be used from this "pool" at a time | 
|  | ASSERT(!checked_out_); | 
|  | if (!stream_) { | 
|  | LOG_F(LS_VERBOSE) << "Creating new socket"; | 
|  | int family = remote.family(); | 
|  | // TODO: Deal with this when we/I clean up DNS resolution. | 
|  | if (remote.IsUnresolvedIP()) { | 
|  | family = AF_INET; | 
|  | } | 
|  | AsyncSocket* socket = | 
|  | factory_->CreateAsyncSocket(family, SOCK_STREAM); | 
|  | if (!socket) { | 
|  | if (err) | 
|  | *err = -1; | 
|  | return NULL; | 
|  | } | 
|  | stream_ = new SocketStream(socket); | 
|  | } | 
|  | if ((stream_->GetState() == SS_OPEN) && (remote == remote_)) { | 
|  | LOG_F(LS_VERBOSE) << "Reusing connection to: " << remote_; | 
|  | } else { | 
|  | remote_ = remote; | 
|  | stream_->Close(); | 
|  | if ((stream_->GetSocket()->Connect(remote_) != 0) | 
|  | && !stream_->GetSocket()->IsBlocking()) { | 
|  | if (err) | 
|  | *err = stream_->GetSocket()->GetError(); | 
|  | return NULL; | 
|  | } else { | 
|  | LOG_F(LS_VERBOSE) << "Opening connection to: " << remote_; | 
|  | } | 
|  | } | 
|  | stream_->SignalEvent.disconnect(this); | 
|  | checked_out_ = true; | 
|  | if (err) | 
|  | *err = 0; | 
|  | return stream_; | 
|  | } | 
|  |  | 
|  | void | 
|  | ReuseSocketPool::ReturnConnectedStream(StreamInterface* stream) { | 
|  | ASSERT(stream == stream_); | 
|  | ASSERT(checked_out_); | 
|  | checked_out_ = false; | 
|  | // Until the socket is reused, monitor it to determine if it closes. | 
|  | stream_->SignalEvent.connect(this, &ReuseSocketPool::OnStreamEvent); | 
|  | } | 
|  |  | 
|  | void | 
|  | ReuseSocketPool::OnStreamEvent(StreamInterface* stream, int events, int err) { | 
|  | ASSERT(stream == stream_); | 
|  | ASSERT(!checked_out_); | 
|  |  | 
|  | // If the stream was written to and then immediately returned to us then | 
|  | // we may get a writable notification for it, which we should ignore. | 
|  | if (events == SE_WRITE) { | 
|  | LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly writable: ignoring"; | 
|  | return; | 
|  | } | 
|  |  | 
|  | // If the peer sent data, we can't process it, so drop the connection. | 
|  | // If the socket has closed, clean it up. | 
|  | // In either case, we'll reconnect it the next time it is used. | 
|  | ASSERT(0 != (events & (SE_READ|SE_CLOSE))); | 
|  | if (0 != (events & SE_CLOSE)) { | 
|  | LOG_F(LS_VERBOSE) << "Connection closed with error: " << err; | 
|  | } else { | 
|  | LOG_F(LS_VERBOSE) << "Pooled Socket unexpectedly readable: closing"; | 
|  | } | 
|  | stream_->Close(); | 
|  | } | 
|  |  | 
|  | /////////////////////////////////////////////////////////////////////////////// | 
|  | // LoggingPoolAdapter - Adapts a StreamPool to supply streams with attached | 
|  | // LoggingAdapters. | 
|  | /////////////////////////////////////////////////////////////////////////////// | 
|  |  | 
|  | LoggingPoolAdapter::LoggingPoolAdapter( | 
|  | StreamPool* pool, LoggingSeverity level, const std::string& label, | 
|  | bool binary_mode) | 
|  | : pool_(pool), level_(level), label_(label), binary_mode_(binary_mode) { | 
|  | } | 
|  |  | 
|  | LoggingPoolAdapter::~LoggingPoolAdapter() { | 
|  | for (StreamList::iterator it = recycle_bin_.begin(); | 
|  | it != recycle_bin_.end(); ++it) { | 
|  | delete *it; | 
|  | } | 
|  | } | 
|  |  | 
|  | StreamInterface* LoggingPoolAdapter::RequestConnectedStream( | 
|  | const SocketAddress& remote, int* err) { | 
|  | if (StreamInterface* stream = pool_->RequestConnectedStream(remote, err)) { | 
|  | ASSERT(SS_CLOSED != stream->GetState()); | 
|  | std::stringstream ss; | 
|  | ss << label_ << "(0x" << std::setfill('0') << std::hex << std::setw(8) | 
|  | << stream << ")"; | 
|  | LOG_V(level_) << ss.str() | 
|  | << ((SS_OPEN == stream->GetState()) ? " Connected" | 
|  | : " Connecting") | 
|  | << " to " << remote; | 
|  | if (recycle_bin_.empty()) { | 
|  | return new LoggingAdapter(stream, level_, ss.str(), binary_mode_); | 
|  | } | 
|  | LoggingAdapter* logging = recycle_bin_.front(); | 
|  | recycle_bin_.pop_front(); | 
|  | logging->set_label(ss.str()); | 
|  | logging->Attach(stream); | 
|  | return logging; | 
|  | } | 
|  | return NULL; | 
|  | } | 
|  |  | 
|  | void LoggingPoolAdapter::ReturnConnectedStream(StreamInterface* stream) { | 
|  | LoggingAdapter* logging = static_cast<LoggingAdapter*>(stream); | 
|  | pool_->ReturnConnectedStream(logging->Detach()); | 
|  | recycle_bin_.push_back(logging); | 
|  | } | 
|  |  | 
|  | /////////////////////////////////////////////////////////////////////////////// | 
|  |  | 
|  | } // namespace rtc |