2
0
mirror of https://github.com/KDE/kdeconnect-android synced 2025-08-30 21:55:10 +00:00

Initial netty implementation

This commit is contained in:
Vineet Garg
2015-06-19 04:00:27 +05:30
parent 79df72b80b
commit 2f16656aa0
3 changed files with 166 additions and 148 deletions

View File

@@ -11,7 +11,7 @@ apply plugin: 'com.android.application'
android {
compileSdkVersion 22
buildToolsVersion '22.0.1'
buildToolsVersion '22.0.0'
defaultConfig {
minSdkVersion 9
targetSdkVersion 22
@@ -54,8 +54,7 @@ dependencies {
compile 'org.apache.mina:mina-core:2.0.9'
compile 'org.apache.sshd:sshd-core:0.8.0'
compile 'org.bouncycastle:bcprov-jdk16:1.46'
compile 'io.netty:netty-all:4.0.23.Final'
androidTestCompile 'org.mockito:mockito-core:1.10.19'
// Because mockito has some problems with dex environment

View File

@@ -42,19 +42,23 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeoutException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
public class LanLink extends BaseLink {
private IoSession session = null;
// private IoSession session = null;
private Channel session = null;
public void disconnect() {
if (session == null) {
Log.e("KDE/LanLink", "Not yet connected");
return;
}
session.close(true);
session.close();
}
public LanLink(IoSession session, String deviceId, BaseLinkProvider linkProvider) {
public LanLink(Channel session, String deviceId, BaseLinkProvider linkProvider) {
super(deviceId, linkProvider);
this.session = session;
}
@@ -86,14 +90,14 @@ public class LanLink extends BaseLink {
}
//Send body of the network package
WriteFuture future = session.write(np.serialize());
future.awaitUninterruptibly();
if (!future.isWritten()) {
ChannelFuture future = session.writeAndFlush(np.serialize()).sync();
if (!future.isSuccess()) {
Log.e("KDE/sendPackage", "!future.isWritten()");
callback.sendFailure(future.getException());
callback.sendFailure(future.cause());
return;
}
//Send payload
if (server != null) {
OutputStream socket = null;
@@ -173,7 +177,7 @@ public class LanLink extends BaseLink {
try {
socket = new Socket();
int tcpPort = np.getPayloadTransferInfo().getInt("port");
InetSocketAddress address = (InetSocketAddress)session.getRemoteAddress();
InetSocketAddress address = (InetSocketAddress)session.remoteAddress();
socket.connect(new InetSocketAddress(address.getAddress(), tcpPort));
np.setPayload(socket.getInputStream(), np.getPayloadSize());
} catch (Exception e) {

View File

@@ -25,32 +25,41 @@ import android.preference.PreferenceManager;
import android.support.v4.util.LongSparseArray;
import android.util.Log;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.kde.kdeconnect.Backends.BaseLinkProvider;
import org.kde.kdeconnect.Device;
import org.kde.kdeconnect.NetworkPackage;
import org.kde.kdeconnect.UserInterface.CustomDevicesActivity;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Set;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
public class LanLinkProvider extends BaseLinkProvider {
@@ -59,26 +68,29 @@ public class LanLinkProvider extends BaseLinkProvider {
private final Context context;
private final HashMap<String, LanLink> visibleComputers = new HashMap<String, LanLink>();
private final LongSparseArray<LanLink> nioSessions = new LongSparseArray<LanLink>();
private final LongSparseArray<NioSocketConnector> nioConnectors = new LongSparseArray<NioSocketConnector>();
private final LongSparseArray<LanLink> nioLinks = new LongSparseArray<LanLink>();
private final LongSparseArray<Channel> nioChannels = new LongSparseArray<Channel>();
private ServerBootstrap tcpBootstrap = null;
private Bootstrap udpBootstrap = null;
private Channel udpChannel = null;
private NioSocketAcceptor tcpAcceptor = null;
private NioDatagramAcceptor udpAcceptor = null;
private final IoHandler tcpHandler = new IoHandlerAdapter() {
private class TcpHandler extends ChannelInboundHandlerAdapter{
@Override
public void sessionClosed(IoSession session) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
try {
long id = session.getId();
final LanLink brokenLink = nioSessions.get(id);
NioSocketConnector connector = nioConnectors.get(id);
if (connector != null) {
connector.dispose();
nioConnectors.remove(id);
long id = ctx.channel().hashCode();
final LanLink brokenLink = nioLinks.get(id);
Channel channel = nioChannels.get(id);
if (channel != null) {
channel.close();
nioChannels.remove(id);
}
if (brokenLink != null) {
nioSessions.remove(id);
//Log.i("KDE/LanLinkProvider", "nioSessions.size(): " + nioSessions.size() + " (-)");
nioLinks.remove(id);
//Log.i("KDE/LanLinkProvider", "nioLinks.size(): " + nioLinks.size() + " (-)");
try {
brokenLink.disconnect();
} catch (Exception e) {
@@ -110,12 +122,13 @@ public class LanLinkProvider extends BaseLinkProvider {
}
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
super.messageReceived(session, message);
//Log.e("LanLinkProvider","Incoming package, address: "+session.getRemoteAddress()).toString());
//Log.e("LanLinkProvider","Received:"+message);
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
super.channelRead(ctx, message);
// Log.e("LanLinkProvider","Incoming package, address: " + ctx.channel().remoteAddress());
// Log.e("LanLinkProvider","Received:"+message);
String theMessage = (String) message;
if (theMessage.isEmpty()) {
@@ -134,12 +147,12 @@ public class LanLinkProvider extends BaseLinkProvider {
//Log.i("KDE/LanLinkProvider", "Identity package received from " + np.getString("deviceName"));
LanLink link = new LanLink(session, np.getString("deviceId"), LanLinkProvider.this);
nioSessions.put(session.getId(),link);
//Log.e("KDE/LanLinkProvider","nioSessions.size(): " + nioSessions.size());
LanLink link = new LanLink(ctx.channel(), np.getString("deviceId"), LanLinkProvider.this);
nioLinks.put(ctx.channel().hashCode(), link);
//Log.i("KDE/LanLinkProvider","nioLinks.size(): " + nioLinks.size());
addLink(np, link);
} else {
LanLink prevLink = nioSessions.get(session.getId());
LanLink prevLink = nioLinks.get(ctx.channel().hashCode());
if (prevLink == null) {
Log.e("KDE/LanLinkProvider","Expecting an identity package (A)");
} else {
@@ -148,18 +161,15 @@ public class LanLinkProvider extends BaseLinkProvider {
}
}
};
}
private class UdpHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private final IoHandler udpHandler = new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession udpSession, Object message) throws Exception {
super.messageReceived(udpSession, message);
//Log.e("LanLinkProvider", "Udp message received (" + message.getClass() + ") " + message.toString());
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
try {
//We should receive a string thanks to the TextLineCodecFactory filter
String theMessage = (String) message;
String theMessage = packet.content().toString(CharsetUtil.UTF_8);
final NetworkPackage identityPackage = NetworkPackage.unserialize(theMessage);
if (!identityPackage.getType().equals(NetworkPackage.PACKAGE_TYPE_IDENTITY)) {
@@ -172,30 +182,35 @@ public class LanLinkProvider extends BaseLinkProvider {
}
}
//Log.i("KDE/LanLinkProvider", "Identity package received, creating link");
Log.i("KDE/LanLinkProvider", "Identity package received, creating link");
final InetSocketAddress address = (InetSocketAddress) udpSession.getRemoteAddress();
final NioSocketConnector connector = new NioSocketConnector();
connector.setHandler(tcpHandler);
connector.getSessionConfig().setKeepAlive(true);
//TextLineCodecFactory will buffer incoming data and emit a message very time it finds a \n
TextLineCodecFactory textLineFactory = new TextLineCodecFactory(Charset.defaultCharset(), LineDelimiter.UNIX, LineDelimiter.UNIX);
textLineFactory.setDecoderMaxLineLength(512*1024); //Allow to receive up to 512kb of data
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(textLineFactory));
EventLoopGroup clientGroup = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(clientGroup);
b.channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new TcpHandler());
}
});
int tcpPort = identityPackage.getInt("tcpPort", port);
final ChannelFuture channelFuture = b.connect(packet.sender().getAddress(), tcpPort).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelFuture.removeListener(this);
final Channel channel = channelFuture.channel();
int tcpPort = identityPackage.getInt("tcpPort", port);
final ConnectFuture future = connector.connect(new InetSocketAddress(address.getAddress(), tcpPort));
future.addListener(new IoFutureListener<IoFuture>() {
Log.i("KDE/LanLinkProvider", "Connection successful: " + channel.isActive());
@Override
public void operationComplete(IoFuture ioFuture) {
try {
future.removeListener(this);
final IoSession session = ioFuture.getSession();
Log.i("KDE/LanLinkProvider", "Connection successful: " + session.isConnected());
final LanLink link = new LanLink(session, identityPackage.getString("deviceId"), LanLinkProvider.this);
final LanLink link = new LanLink(channel, identityPackage.getString("deviceId"), LanLinkProvider.this);
new Thread(new Runnable() {
@Override
public void run() {
@@ -203,9 +218,9 @@ public class LanLinkProvider extends BaseLinkProvider {
link.sendPackage(np2,new Device.SendPackageStatusCallback() {
@Override
protected void onSuccess() {
nioSessions.put(session.getId(), link);
nioConnectors.put(session.getId(), connector);
//Log.e("KDE/LanLinkProvider","nioSessions.size(): " + nioSessions.size());
nioLinks.put(channel.hashCode(), link);
nioChannels.put(channel.hashCode(), channel);
Log.i("KDE/LanLinkProvider","nioLinks.size(): " + nioLinks.size());
addLink(identityPackage, link);
}
@@ -217,20 +232,19 @@ public class LanLinkProvider extends BaseLinkProvider {
}
}).start();
} catch (Exception e) { //If we don't catch it here, Mina will swallow it :/
e.printStackTrace();
Log.e("KDE/LanLinkProvider", "sessionClosed exception");
}
}
});
});
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
Log.e("KDE/LanLinkProvider","Exception receiving udp package!!");
e.printStackTrace();
}
}
};
}
private void addLink(NetworkPackage identityPackage, LanLink link) {
String deviceId = identityPackage.getString("deviceId");
@@ -253,64 +267,59 @@ public class LanLinkProvider extends BaseLinkProvider {
this.context = context;
//This handles the case when I'm the new device in the network and somebody answers my introduction package
tcpAcceptor = new NioSocketAcceptor();
tcpAcceptor.setHandler(tcpHandler);
tcpAcceptor.getSessionConfig().setKeepAlive(true);
tcpAcceptor.getSessionConfig().setReuseAddress(true);
//TextLineCodecFactory will buffer incoming data and emit a message very time it finds a \n
TextLineCodecFactory textLineFactory = new TextLineCodecFactory(Charset.defaultCharset(), LineDelimiter.UNIX, LineDelimiter.UNIX);
textLineFactory.setDecoderMaxLineLength(512*1024); //Allow to receive up to 512kb of data
tcpAcceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(textLineFactory));
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
tcpBootstrap = new ServerBootstrap();
tcpBootstrap.group(bossGroup, workerGroup);
tcpBootstrap.channel(NioServerSocketChannel.class);
tcpBootstrap.option(ChannelOption.SO_BACKLOG, 100);
tcpBootstrap.handler(new LoggingHandler(LogLevel.INFO));
tcpBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
tcpBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
tcpBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new TcpHandler());
}
});
tcpBootstrap.bind(new InetSocketAddress(port)).sync();
}catch (Exception e) {
e.printStackTrace();
}
udpAcceptor = new NioDatagramAcceptor();
udpAcceptor.getSessionConfig().setReuseAddress(true); //Share port if existing
//TextLineCodecFactory will buffer incoming data and emit a message very time it finds a \n
//This one will have the default MaxLineLength of 1KB
udpAcceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(
new TextLineCodecFactory(Charset.defaultCharset(), LineDelimiter.UNIX, LineDelimiter.UNIX)
)
);
EventLoopGroup udpEventLoopGroup = new NioEventLoopGroup();
try {
udpBootstrap = new Bootstrap();
udpBootstrap.group(udpEventLoopGroup);
udpBootstrap.channel(NioDatagramChannel.class);
udpBootstrap.option(ChannelOption.SO_BROADCAST, true);
udpBootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new UdpHandler());
}
});
udpChannel = udpBootstrap.bind(new InetSocketAddress(port)).sync().channel();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void onStart() {
//This handles the case when I'm the existing device in the network and receive a "hello" UDP package
Log.e("KDE/LanLinkProvider", "onStart");
Set<SocketAddress> addresses = udpAcceptor.getLocalAddresses();
for (SocketAddress address : addresses) {
Log.i("KDE/LanLinkProvider", "UDP unbind old address");
udpAcceptor.unbind(address);
}
//Log.i("KDE/LanLinkProvider", "UDP Bind.");
udpAcceptor.setHandler(udpHandler);
try {
udpAcceptor.bind(new InetSocketAddress(port));
} catch(Exception e) {
Log.e("KDE/LanLinkProvider", "Error: Could not bind udp socket");
e.printStackTrace();
}
boolean success = false;
int tcpPort = port;
while(!success) {
try {
tcpAcceptor.bind(new InetSocketAddress(tcpPort));
success = true;
} catch(Exception e) {
tcpPort++;
}
}
Log.i("KDE/LanLinkProvider","Using tcpPort "+tcpPort);
//I'm on a new network, let's be polite and introduce myself
final int finalTcpPort = tcpPort;
new Thread(new Runnable() {
@Override
public void run() {
@@ -324,7 +333,7 @@ public class LanLinkProvider extends BaseLinkProvider {
iplist.add("255.255.255.255"); //Default: broadcast.
NetworkPackage identity = NetworkPackage.createIdentityPackage(context);
identity.set("tcpPort", finalTcpPort);
identity.set("tcpPort", port);
DatagramSocket socket = null;
byte[] bytes = null;
try {
@@ -342,7 +351,7 @@ public class LanLinkProvider extends BaseLinkProvider {
for (String ipstr : iplist) {
try {
InetAddress client = InetAddress.getByName(ipstr);
DatagramPacket packet = new DatagramPacket(bytes, bytes.length, client, port);
java.net.DatagramPacket packet = new java.net.DatagramPacket(bytes, bytes.length, client, port);
socket.send(packet);
//Log.i("KDE/LanLinkProvider","Udp identity package sent to address "+packet.getAddress());
} catch (Exception e) {
@@ -360,14 +369,14 @@ public class LanLinkProvider extends BaseLinkProvider {
@Override
public void onNetworkChange() {
//Log.e("KDE/LanLinkProvider","onNetworkChange");
Log.e("KDE/LanLinkProvider","onNetworkChange");
//FilesHelper.LogOpenFileCount();
//Keep existing connections open while unbinding the socket
tcpAcceptor.setCloseOnDeactivation(false);
onStop();
tcpAcceptor.setCloseOnDeactivation(true);
// tcpAcceptor.setCloseOnDeactivation(false);
// onStop();
// tcpAcceptor.setCloseOnDeactivation(true);
//FilesHelper.LogOpenFileCount();
@@ -378,8 +387,14 @@ public class LanLinkProvider extends BaseLinkProvider {
@Override
public void onStop() {
udpAcceptor.unbind();
tcpAcceptor.unbind();
Log.e("KDE/LanLinkProvider", "onStop");
try {
udpBootstrap.group().shutdownGracefully().sync();
tcpBootstrap.group().shutdownGracefully().sync();
tcpBootstrap.childGroup().shutdownGracefully().sync();
}catch (Exception e){
e.printStackTrace();
}
}
@Override