Direct IP connect functionality for AppRTC Android demo.
This allows connecting between clients without using external servers, which is useful to OEMs if they are working in a network without internet connection. Implementation uses custom AppRTCClient that replaces WebSocketRTCClient if roomId looks like an IP. Instead of a web socket, this class uses direct TCP connection between peers as a signaling channel.
Review-Url: https://codereview.webrtc.org/1963053002
Cr-Original-Commit-Position: refs/heads/master@{#12789}
Cr-Mirrored-From: https://chromium.googlesource.com/external/webrtc
Cr-Mirrored-Commit: 299ccdee0c08b520961025ef7945ca204e378b49
diff --git a/examples/androidapp/src/org/appspot/apprtc/CallActivity.java b/examples/androidapp/src/org/appspot/apprtc/CallActivity.java
index ae653da..ed7e60f 100644
--- a/examples/androidapp/src/org/appspot/apprtc/CallActivity.java
+++ b/examples/androidapp/src/org/appspot/apprtc/CallActivity.java
@@ -139,7 +139,6 @@
private HudFragment hudFragment;
private CpuMonitor cpuMonitor;
-
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
@@ -240,8 +239,15 @@
commandLineRun = intent.getBooleanExtra(EXTRA_CMDLINE, false);
runTimeMs = intent.getIntExtra(EXTRA_RUNTIME, 0);
- // Create connection client and connection parameters.
- appRtcClient = new WebSocketRTCClient(this, new LooperExecutor());
+ // Create connection client. Use DirectRTCClient if room name is an IP otherwise use the
+ // standard WebSocketRTCClient.
+ if (loopback || !DirectRTCClient.IP_PATTERN.matcher(roomId).matches()) {
+ appRtcClient = new WebSocketRTCClient(this, new LooperExecutor());
+ } else {
+ Log.i(TAG, "Using DirectRTCClient because room name looks like an IP.");
+ appRtcClient = new DirectRTCClient(this);
+ }
+ // Create connection parameters.
roomConnectionParameters = new RoomConnectionParameters(
roomUri.toString(), roomId, loopback);
diff --git a/examples/androidapp/src/org/appspot/apprtc/DirectRTCClient.java b/examples/androidapp/src/org/appspot/apprtc/DirectRTCClient.java
new file mode 100644
index 0000000..8db38ae
--- /dev/null
+++ b/examples/androidapp/src/org/appspot/apprtc/DirectRTCClient.java
@@ -0,0 +1,354 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+package org.appspot.apprtc;
+
+import android.util.Log;
+
+import org.appspot.apprtc.util.LooperExecutor;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.webrtc.IceCandidate;
+import org.webrtc.PeerConnection;
+import org.webrtc.SessionDescription;
+
+import java.util.LinkedList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Implementation of AppRTCClient that uses direct TCP connection as the signaling channel.
+ * This eliminates the need for an external server. This class does not support loopback
+ * connections.
+ */
+public class DirectRTCClient implements AppRTCClient, TCPChannelClient.TCPChannelEvents {
+ private static final String TAG = "DirectRTCClient";
+ private static final int DEFAULT_PORT = 8888;
+
+ // Regex pattern used for checking if room id looks like an IP.
+ static final Pattern IP_PATTERN = Pattern.compile(
+ "("
+ // IPv4
+ + "((\\d+\\.){3}\\d+)|"
+ // IPv6
+ + "\\[((([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?::"
+ + "(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?)\\]|"
+ + "\\[(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})\\]|"
+ // IPv6 without []
+ + "((([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?::(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})?)|"
+ + "(([0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{1,4})|"
+ // Literals
+ + "localhost"
+ + ")"
+ // Optional port number
+ + "(:(\\d+))?"
+ );
+
+ private final LooperExecutor executor;
+ private final SignalingEvents events;
+ private TCPChannelClient tcpClient;
+ private RoomConnectionParameters connectionParameters;
+
+ private enum ConnectionState {
+ NEW, CONNECTED, CLOSED, ERROR
+ };
+
+ // All alterations of the room state should be done from inside the looper thread.
+ private ConnectionState roomState;
+
+ public DirectRTCClient(SignalingEvents events) {
+ this.events = events;
+ executor = new LooperExecutor();
+
+ executor.requestStart();
+ roomState = ConnectionState.NEW;
+ }
+
+ /**
+ * Connects to the room, roomId in connectionsParameters is required. roomId must be a valid
+ * IP address matching IP_PATTERN.
+ */
+ @Override
+ public void connectToRoom(RoomConnectionParameters connectionParameters) {
+ this.connectionParameters = connectionParameters;
+
+ if (connectionParameters.loopback) {
+ reportError("Loopback connections aren't supported by DirectRTCClient.");
+ }
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ connectToRoomInternal();
+ }
+ });
+ }
+
+ @Override
+ public void disconnectFromRoom() {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ disconnectFromRoomInternal();
+ }
+ });
+ executor.requestStop();
+ }
+
+ /**
+ * Connects to the room.
+ *
+ * Runs on the looper thread.
+ */
+ private void connectToRoomInternal() {
+ this.roomState = ConnectionState.NEW;
+
+ String endpoint = connectionParameters.roomId;
+
+ Matcher matcher = IP_PATTERN.matcher(endpoint);
+ if (!matcher.matches()) {
+ reportError("roomId must match IP_PATTERN for DirectRTCClient.");
+ return;
+ }
+
+ String ip = matcher.group(1);
+ String portStr = matcher.group(matcher.groupCount());
+ int port;
+
+ if (portStr != null) {
+ try {
+ port = Integer.parseInt(portStr);
+ } catch (NumberFormatException e) {
+ reportError("Invalid port number: " + portStr);
+ return;
+ }
+ } else {
+ port = DEFAULT_PORT;
+ }
+
+ tcpClient = new TCPChannelClient(executor, this, ip, port);
+ }
+
+ /**
+ * Disconnects from the room.
+ *
+ * Runs on the looper thread.
+ */
+ private void disconnectFromRoomInternal() {
+ roomState = ConnectionState.CLOSED;
+
+ if (tcpClient != null) {
+ tcpClient.disconnect();
+ tcpClient = null;
+ }
+ }
+
+ @Override
+ public void sendOfferSdp(final SessionDescription sdp) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (roomState != ConnectionState.CONNECTED) {
+ reportError("Sending offer SDP in non connected state.");
+ return;
+ }
+ JSONObject json = new JSONObject();
+ jsonPut(json, "sdp", sdp.description);
+ jsonPut(json, "type", "offer");
+ sendMessage(json.toString());
+ }
+ });
+ }
+
+ @Override
+ public void sendAnswerSdp(final SessionDescription sdp) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ JSONObject json = new JSONObject();
+ jsonPut(json, "sdp", sdp.description);
+ jsonPut(json, "type", "answer");
+ sendMessage(json.toString());
+ }
+ });
+ }
+
+ @Override
+ public void sendLocalIceCandidate(final IceCandidate candidate) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ JSONObject json = new JSONObject();
+ jsonPut(json, "type", "candidate");
+ jsonPut(json, "label", candidate.sdpMLineIndex);
+ jsonPut(json, "id", candidate.sdpMid);
+ jsonPut(json, "candidate", candidate.sdp);
+
+ if (roomState != ConnectionState.CONNECTED) {
+ reportError("Sending ICE candidate in non connected state.");
+ return;
+ }
+ sendMessage(json.toString());
+ }
+ });
+ }
+
+ /** Send removed Ice candidates to the other participant. */
+ @Override
+ public void sendLocalIceCandidateRemovals(final IceCandidate[] candidates) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ JSONObject json = new JSONObject();
+ jsonPut(json, "type", "remove-candidates");
+ JSONArray jsonArray = new JSONArray();
+ for (final IceCandidate candidate : candidates) {
+ jsonArray.put(toJsonCandidate(candidate));
+ }
+ jsonPut(json, "candidates", jsonArray);
+
+ if (roomState != ConnectionState.CONNECTED) {
+ reportError("Sending ICE candidate removals in non connected state.");
+ return;
+ }
+ sendMessage(json.toString());
+ }
+ });
+ }
+
+ // -------------------------------------------------------------------
+ // TCPChannelClient event handlers
+
+ /**
+ * If the client is the server side, this will trigger onConnectedToRoom.
+ */
+ @Override
+ public void onTCPConnected(boolean isServer) {
+ if (isServer) {
+ roomState = ConnectionState.CONNECTED;
+
+ SignalingParameters parameters = new SignalingParameters(
+ // Ice servers are not needed for direct connections.
+ new LinkedList<PeerConnection.IceServer>(),
+ isServer, // Server side acts as the initiator on direct connections.
+ null, // clientId
+ null, // wssUrl
+ null, // wwsPostUrl
+ null, // offerSdp
+ null // iceCandidates
+ );
+ events.onConnectedToRoom(parameters);
+ }
+ }
+
+ @Override
+ public void onTCPMessage(String msg) {
+ try {
+ JSONObject json = new JSONObject(msg);
+ String type = json.optString("type");
+ if (type.equals("candidate")) {
+ events.onRemoteIceCandidate(toJavaCandidate(json));
+ } else if (type.equals("remove-candidates")) {
+ JSONArray candidateArray = json.getJSONArray("candidates");
+ IceCandidate[] candidates = new IceCandidate[candidateArray.length()];
+ for (int i = 0; i < candidateArray.length(); ++i) {
+ candidates[i] = toJavaCandidate(candidateArray.getJSONObject(i));
+ }
+ events.onRemoteIceCandidatesRemoved(candidates);
+ } else if (type.equals("answer")) {
+ SessionDescription sdp = new SessionDescription(
+ SessionDescription.Type.fromCanonicalForm(type),
+ json.getString("sdp"));
+ events.onRemoteDescription(sdp);
+ } else if (type.equals("offer")) {
+ SessionDescription sdp = new SessionDescription(
+ SessionDescription.Type.fromCanonicalForm(type),
+ json.getString("sdp"));
+
+ SignalingParameters parameters = new SignalingParameters(
+ // Ice servers are not needed for direct connections.
+ new LinkedList<PeerConnection.IceServer>(),
+ false, // This code will only be run on the client side. So, we are not the initiator.
+ null, // clientId
+ null, // wssUrl
+ null, // wssPostUrl
+ sdp, // offerSdp
+ null // iceCandidates
+ );
+ roomState = ConnectionState.CONNECTED;
+ events.onConnectedToRoom(parameters);
+ } else {
+ reportError("Unexpected TCP message: " + msg);
+ }
+ } catch (JSONException e) {
+ reportError("TCP message JSON parsing error: " + e.toString());
+ }
+ }
+
+ @Override
+ public void onTCPError(String description) {
+ reportError("TCP connection error: " + description);
+ }
+
+ @Override
+ public void onTCPClose() {
+ events.onChannelClose();
+ }
+
+ // --------------------------------------------------------------------
+ // Helper functions.
+ private void reportError(final String errorMessage) {
+ Log.e(TAG, errorMessage);
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (roomState != ConnectionState.ERROR) {
+ roomState = ConnectionState.ERROR;
+ events.onChannelError(errorMessage);
+ }
+ }
+ });
+ }
+
+ private void sendMessage(final String message) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ tcpClient.send(message);
+ }
+ });
+ }
+
+ // Put a |key|->|value| mapping in |json|.
+ private static void jsonPut(JSONObject json, String key, Object value) {
+ try {
+ json.put(key, value);
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Converts a Java candidate to a JSONObject.
+ private static JSONObject toJsonCandidate(final IceCandidate candidate) {
+ JSONObject json = new JSONObject();
+ jsonPut(json, "label", candidate.sdpMLineIndex);
+ jsonPut(json, "id", candidate.sdpMid);
+ jsonPut(json, "candidate", candidate.sdp);
+ return json;
+ }
+
+ // Converts a JSON candidate to a Java object.
+ private static IceCandidate toJavaCandidate(JSONObject json) throws JSONException {
+ return new IceCandidate(json.getString("id"),
+ json.getInt("label"),
+ json.getString("candidate"));
+ }
+}
diff --git a/examples/androidapp/src/org/appspot/apprtc/TCPChannelClient.java b/examples/androidapp/src/org/appspot/apprtc/TCPChannelClient.java
new file mode 100644
index 0000000..996483e
--- /dev/null
+++ b/examples/androidapp/src/org/appspot/apprtc/TCPChannelClient.java
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+package org.appspot.apprtc;
+
+import android.util.Log;
+
+import org.appspot.apprtc.util.LooperExecutor;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+/**
+ * Replacement for WebSocketChannelClient for direct communication between two IP addresses. Handles
+ * the signaling between the two clients using a TCP connection.
+ *
+ * <p>All public methods should be called from a looper executor thread
+ * passed in a constructor, otherwise exception will be thrown.
+ * All events are dispatched on the same thread.
+ */
+public class TCPChannelClient {
+ private static final String TAG = "TCPChannelClient";
+
+ private final LooperExecutor executor;
+ private final TCPChannelEvents eventListener;
+ private TCPSocket socket;
+
+ /**
+ * Callback interface for messages delivered on TCP Connection. All callbacks are invoked from the
+ * looper executor thread.
+ */
+ public interface TCPChannelEvents {
+ void onTCPConnected(boolean server);
+ void onTCPMessage(String message);
+ void onTCPError(String description);
+ void onTCPClose();
+ }
+
+ /**
+ * Initializes the TCPChannelClient. If IP is a local IP address, starts a listening server on
+ * that IP. If not, instead connects to the IP.
+ *
+ * @param eventListener Listener that will receive events from the client.
+ * @param ip IP address to listen on or connect to.
+ * @param port Port to listen on or connect to.
+ */
+ public TCPChannelClient(
+ LooperExecutor executor, TCPChannelEvents eventListener, String ip, int port) {
+ this.executor = executor;
+ this.eventListener = eventListener;
+
+ InetAddress address;
+ try {
+ address = InetAddress.getByName(ip);
+ } catch (UnknownHostException e) {
+ reportError("Invalid IP address.");
+ return;
+ }
+
+ if (address.isAnyLocalAddress()) {
+ socket = new TCPSocketServer(address, port);
+ } else {
+ socket = new TCPSocketClient(address, port);
+ }
+
+ socket.start();
+ }
+
+ /**
+ * Disconnects the client if not already disconnected. This will fire the onTCPClose event.
+ */
+ public void disconnect() {
+ checkIfCalledOnValidThread();
+
+ socket.disconnect();
+ }
+
+ /**
+ * Sends a message on the socket.
+ *
+ * @param message Message to be sent.
+ */
+ public void send(String message) {
+ checkIfCalledOnValidThread();
+
+ socket.send(message);
+ }
+
+ /**
+ * Helper method for firing onTCPError events. Calls onTCPError on the executor thread.
+ */
+ private void reportError(final String message) {
+ Log.e(TAG, "TCP Error: " + message);
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ eventListener.onTCPError(message);
+ }
+ });
+ }
+
+ /**
+ * Helper method for debugging purposes.
+ * Ensures that TCPChannelClient method is called on a looper thread.
+ */
+ private void checkIfCalledOnValidThread() {
+ if (!executor.checkOnLooperThread()) {
+ throw new IllegalStateException(
+ "TCPChannelClient method is not called on valid thread");
+ }
+ }
+
+
+ /**
+ * Base class for server and client sockets. Contains a listening thread that will call
+ * eventListener.onTCPMessage on new messages.
+ */
+ private abstract class TCPSocket extends Thread {
+ // Lock for editing out and rawSocket
+ protected final Object rawSocketLock;
+ private PrintWriter out;
+ private Socket rawSocket;
+
+ /**
+ * Connect to the peer, potentially a slow operation.
+ *
+ * @return Socket connection, null if connection failed.
+ */
+ public abstract Socket connect();
+ /** Returns true if sockets is a server rawSocket. */
+ public abstract boolean isServer();
+
+ TCPSocket() {
+ rawSocketLock = new Object();
+ }
+
+ /**
+ * The listening thread.
+ */
+ @Override
+ public void run() {
+ Log.d(TAG, "Listening thread started...");
+
+ // Receive connection to temporary variable first, so we don't block.
+ Socket tempSocket = connect();
+ BufferedReader in;
+
+ Log.d(TAG, "TCP connection established.");
+
+ synchronized (rawSocketLock) {
+ if (rawSocket != null) {
+ Log.e(TAG, "Socket already existed and will be replaced.");
+ }
+
+ rawSocket = tempSocket;
+
+ // Connecting failed, error has already been reported, just exit.
+ if (rawSocket == null) {
+ return;
+ }
+
+ try {
+ out = new PrintWriter(rawSocket.getOutputStream(), true);
+ in = new BufferedReader(new InputStreamReader(rawSocket.getInputStream()));
+ } catch (IOException e) {
+ reportError("Failed to open IO on rawSocket: " + e.getMessage());
+ return;
+ }
+ }
+
+ Log.v(TAG, "Execute onTCPConnected");
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ Log.v(TAG, "Run onTCPConnected");
+ eventListener.onTCPConnected(isServer());
+ }
+ });
+
+ while (true) {
+ final String message;
+ try {
+ message = in.readLine();
+ } catch (IOException e) {
+ synchronized (rawSocketLock) {
+ // If socket was closed, this is expected.
+ if (rawSocket == null) {
+ break;
+ }
+ }
+
+ reportError("Failed to read from rawSocket: " + e.getMessage());
+ break;
+ }
+
+ // No data received, rawSocket probably closed.
+ if (message == null) {
+ break;
+ }
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ Log.v(TAG, "Receive: " + message);
+ eventListener.onTCPMessage(message);
+ }
+ });
+ }
+
+ Log.d(TAG, "Receiving thread exiting...");
+
+ // Close the rawSocket if it is still open.
+ disconnect();
+ }
+
+ /**
+ * Closes the rawSocket if it is still open. Also fires the onTCPClose event.
+ */
+ public void disconnect() {
+ try {
+ synchronized (rawSocketLock) {
+ if (rawSocket != null) {
+ rawSocket.close();
+ rawSocket = null;
+ out = null;
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ eventListener.onTCPClose();
+ }
+ });
+ }
+ }
+ } catch (IOException e) {
+ reportError("Failed to close rawSocket: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Sends a message on the socket. Should only be called on the executor thread.
+ */
+ public void send(String message) {
+ Log.v(TAG, "Send: " + message);
+
+ synchronized (rawSocketLock) {
+ if (out == null) {
+ reportError("Sending data on closed socket.");
+ return;
+ }
+
+ out.write(message + "\n");
+ out.flush();
+ }
+ }
+ }
+
+ private class TCPSocketServer extends TCPSocket {
+ // Server socket is also guarded by rawSocketLock.
+ private ServerSocket serverSocket;
+
+ final private InetAddress address;
+ final private int port;
+
+ public TCPSocketServer(InetAddress address, int port) {
+ this.address = address;
+ this.port = port;
+ }
+
+ /** Opens a listening socket and waits for a connection. */
+ @Override
+ public Socket connect() {
+ Log.d(TAG, "Listening on [" + address.getHostAddress() + "]:" + Integer.toString(port));
+
+ final ServerSocket tempSocket;
+ try {
+ tempSocket = new ServerSocket(port, 0, address);
+ } catch (IOException e) {
+ reportError("Failed to create server socket: " + e.getMessage());
+ return null;
+ }
+
+ synchronized (rawSocketLock) {
+ if (serverSocket != null) {
+ Log.e(TAG, "Server rawSocket was already listening and new will be opened.");
+ }
+
+ serverSocket = tempSocket;
+ }
+
+ try {
+ return tempSocket.accept();
+ } catch (IOException e) {
+ reportError("Failed to receive connection: " + e.getMessage());
+ return null;
+ }
+ }
+
+ /** Closes the listening socket and calls super. */
+ @Override
+ public void disconnect() {
+ try {
+ synchronized (rawSocketLock) {
+ if (serverSocket != null) {
+ serverSocket.close();
+ serverSocket = null;
+ }
+ }
+ } catch (IOException e) {
+ reportError("Failed to close server socket: " + e.getMessage());
+ }
+
+ super.disconnect();
+ }
+
+ @Override
+ public boolean isServer() {
+ return true;
+ }
+ }
+
+ private class TCPSocketClient extends TCPSocket {
+ final private InetAddress address;
+ final private int port;
+
+ public TCPSocketClient(InetAddress address, int port) {
+ this.address = address;
+ this.port = port;
+ }
+
+ /** Connects to the peer. */
+ @Override
+ public Socket connect() {
+ Log.d(TAG, "Connecting to [" + address.getHostAddress() + "]:" + Integer.toString(port));
+
+ try {
+ return new Socket(address, port);
+ } catch (IOException e) {
+ reportError("Failed to connect: " + e.getMessage());
+ return null;
+ }
+ }
+
+ @Override
+ public boolean isServer() {
+ return false;
+ }
+ }
+}
diff --git a/examples/androidjunit/src/org/appspot/apprtc/TCPChannelClientTest.java b/examples/androidjunit/src/org/appspot/apprtc/TCPChannelClientTest.java
new file mode 100644
index 0000000..4c60299
--- /dev/null
+++ b/examples/androidjunit/src/org/appspot/apprtc/TCPChannelClientTest.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+package org.appspot.apprtc;
+
+import org.appspot.apprtc.util.LooperExecutor;
+import org.appspot.apprtc.util.RobolectricLooperExecutor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.robolectric.RobolectricTestRunner;
+import org.robolectric.annotation.Config;
+import org.robolectric.shadows.ShadowLog;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(RobolectricTestRunner.class)
+@Config(manifest = Config.NONE)
+public class TCPChannelClientTest {
+ private static final int PORT = 8888;
+ /**
+ * How long we wait before trying to connect to the server. Chosen quite arbitrarily and
+ * could be made smaller if need be.
+ */
+ private static final int SERVER_WAIT = 10;
+ private static final int CONNECT_TIMEOUT = 100;
+ private static final int SEND_TIMEOUT = 100;
+ private static final int DISCONNECT_TIMEOUT = 100;
+ private static final String TEST_MESSAGE_SERVER = "Hello, Server!";
+ private static final String TEST_MESSAGE_CLIENT = "Hello, Client!";
+
+ @Mock TCPChannelClient.TCPChannelEvents serverEvents;
+ @Mock TCPChannelClient.TCPChannelEvents clientEvents;
+
+ private RobolectricLooperExecutor executor;
+ private TCPChannelClient server;
+ private TCPChannelClient client;
+
+
+ @Before
+ public void setUp() {
+ ShadowLog.stream = System.out;
+
+ MockitoAnnotations.initMocks(this);
+
+ executor = new RobolectricLooperExecutor();
+ executor.requestStart();
+ }
+
+ @After
+ public void tearDown() {
+ verifyNoMoreEvents();
+
+ executor.executeAndWait(new Runnable() {
+ @Override
+ public void run() {
+ client.disconnect();
+ server.disconnect();
+ }
+ });
+
+ // Stop the executor thread
+ executor.requestStop();
+ try {
+ executor.join();
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testConnectIPv4() {
+ setUpIPv4Server();
+ try {
+ Thread.sleep(SERVER_WAIT);
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ setUpIPv4Client();
+
+ verify(serverEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(true);
+ verify(clientEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(false);
+ }
+
+ @Test
+ public void testConnectIPv6() {
+ setUpIPv6Server();
+ try {
+ Thread.sleep(SERVER_WAIT);
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ setUpIPv6Client();
+
+ verify(serverEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(true);
+ verify(clientEvents, timeout(CONNECT_TIMEOUT)).onTCPConnected(false);
+ }
+
+ @Test
+ public void testSendData() {
+ testConnectIPv4();
+
+ executor.executeAndWait(new Runnable() {
+ @Override
+ public void run() {
+ client.send(TEST_MESSAGE_SERVER);
+ server.send(TEST_MESSAGE_CLIENT);
+ }
+ });
+
+ verify(serverEvents, timeout(SEND_TIMEOUT)).onTCPMessage(TEST_MESSAGE_SERVER);
+ verify(clientEvents, timeout(SEND_TIMEOUT)).onTCPMessage(TEST_MESSAGE_CLIENT);
+ }
+
+ @Test
+ public void testDisconnectServer() {
+ testConnectIPv4();
+ executor.executeAndWait(new Runnable() {
+ @Override
+ public void run() {
+ server.disconnect();
+ }
+ });
+
+ verify(serverEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
+ verify(clientEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
+ }
+
+ @Test
+ public void testDisconnectClient() {
+ testConnectIPv4();
+ executor.executeAndWait(new Runnable() {
+ @Override
+ public void run() {
+ client.disconnect();
+ }
+ });
+
+ verify(serverEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
+ verify(clientEvents, timeout(DISCONNECT_TIMEOUT)).onTCPClose();
+ }
+
+ private void setUpIPv4Server() {
+ setUpServer("0.0.0.0", PORT);
+ }
+
+ private void setUpIPv4Client() {
+ setUpClient("127.0.0.1", PORT);
+ }
+
+ private void setUpIPv6Server() {
+ setUpServer("::", PORT);
+ }
+
+ private void setUpIPv6Client() {
+ setUpClient("::1", PORT);
+ }
+
+ private void setUpServer(String ip, int port) {
+ server = new TCPChannelClient(executor, serverEvents, ip, port);
+ }
+
+ private void setUpClient(String ip, int port) {
+ client = new TCPChannelClient(executor, clientEvents, ip, port);
+ }
+
+ /**
+ * Verifies no more server or client events have been issued
+ */
+ private void verifyNoMoreEvents() {
+ verifyNoMoreInteractions(serverEvents);
+ verifyNoMoreInteractions(clientEvents);
+ }
+}
diff --git a/examples/androidjunit/src/org/appspot/apprtc/util/RobolectricLooperExecutor.java b/examples/androidjunit/src/org/appspot/apprtc/util/RobolectricLooperExecutor.java
new file mode 100644
index 0000000..19d595d
--- /dev/null
+++ b/examples/androidjunit/src/org/appspot/apprtc/util/RobolectricLooperExecutor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+package org.appspot.apprtc.util;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.fail;
+
+/**
+ * LooperExecutor that doesn't use Looper because its implementation in Robolectric is not suited
+ * for our needs. Also implements executeAndWait that can be used to wait until the runnable has
+ * been executed.
+ */
+public class RobolectricLooperExecutor extends LooperExecutor {
+ private volatile boolean running = false;
+ private static final int RUNNABLE_QUEUE_CAPACITY = 256;
+ private final BlockingQueue<Runnable> runnableQueue
+ = new ArrayBlockingQueue<>(RUNNABLE_QUEUE_CAPACITY);
+ private long threadId;
+
+ /**
+ * Executes the runnable passed to the constructor and sets isDone flag afterwards.
+ */
+ private static class ExecuteAndWaitRunnable implements Runnable {
+ public boolean isDone = false;
+ private final Runnable runnable;
+
+ ExecuteAndWaitRunnable(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ @Override
+ public void run() {
+ runnable.run();
+
+ synchronized (this) {
+ isDone = true;
+ notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ threadId = Thread.currentThread().getId();
+
+ while (running) {
+ final Runnable runnable;
+
+ try {
+ runnable = runnableQueue.take();
+ } catch (InterruptedException e) {
+ if (running) {
+ fail(e.getMessage());
+ }
+ return;
+ }
+
+ runnable.run();
+ }
+ }
+
+ @Override
+ public synchronized void requestStart() {
+ if (running) {
+ return;
+ }
+ running = true;
+ start();
+ }
+
+ @Override
+ public synchronized void requestStop() {
+ running = false;
+ interrupt();
+ }
+
+ @Override
+ public synchronized void execute(Runnable runnable) {
+ try {
+ runnableQueue.put(runnable);
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Queues runnable to be run and waits for it to be executed by the executor thread
+ */
+ public void executeAndWait(Runnable runnable) {
+ ExecuteAndWaitRunnable executeAndWaitRunnable = new ExecuteAndWaitRunnable(runnable);
+ execute(executeAndWaitRunnable);
+
+ synchronized (executeAndWaitRunnable) {
+ while (!executeAndWaitRunnable.isDone) {
+ try {
+ executeAndWaitRunnable.wait();
+ } catch (InterruptedException e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean checkOnLooperThread() {
+ return (Thread.currentThread().getId() == threadId);
+ }
+}