diff --git a/src/org/kde/kdeconnect/Backends/BaseLink.java b/src/org/kde/kdeconnect/Backends/BaseLink.java index 09723066..622ed411 100644 --- a/src/org/kde/kdeconnect/Backends/BaseLink.java +++ b/src/org/kde/kdeconnect/Backends/BaseLink.java @@ -71,7 +71,7 @@ public abstract class BaseLink { linkProvider.connectionLost(this); } - //TO OVERRIDE, should be sync + //TO OVERRIDE, should be sync. If sendPayloadFromSameThread is false, it should only block to send the packet but start a separate thread to send the payload. @WorkerThread - public abstract boolean sendPacket(NetworkPacket np, Device.SendPacketStatusCallback callback) throws IOException; + public abstract boolean sendPacket(NetworkPacket np, Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) throws IOException; } diff --git a/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLink.java b/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLink.java index 5cf3fc0c..2ae8f5f2 100644 --- a/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLink.java +++ b/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLink.java @@ -137,7 +137,8 @@ public class BluetoothLink extends BaseLink { @WorkerThread @Override - public boolean sendPacket(NetworkPacket np, final Device.SendPacketStatusCallback callback) { + public boolean sendPacket(NetworkPacket np, Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) throws IOException { + // sendPayloadFromSameThread is ignored, we always send from the same thread! /*if (!isConnected()) { Log.e("BluetoothLink", "sendPacketEncrypted failed: not connected"); @@ -168,7 +169,7 @@ public class BluetoothLink extends BaseLink { progress += bytesRead; payloadStream.write(buffer, 0, bytesRead); if (np.getPayloadSize() > 0) { - callback.onProgressChanged((int) (100 * progress / np.getPayloadSize())); + callback.onPayloadProgressChanged((int) (100 * progress / np.getPayloadSize())); } } payloadStream.flush(); diff --git a/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLinkProvider.java b/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLinkProvider.java index a623fcee..a182b8b7 100644 --- a/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLinkProvider.java +++ b/src/org/kde/kdeconnect/Backends/BluetoothBackend/BluetoothLinkProvider.java @@ -378,7 +378,7 @@ public class BluetoothLinkProvider extends BaseLinkProvider { public void onFailure(Throwable e) { } - }); + }, true); } catch (Exception e) { Log.e("BTLinkProvider/Client", "Connection lost/disconnected on " + device.getAddress(), e); } diff --git a/src/org/kde/kdeconnect/Backends/LanBackend/LanLink.java b/src/org/kde/kdeconnect/Backends/LanBackend/LanLink.java index 7b9f5f41..e3a67f91 100644 --- a/src/org/kde/kdeconnect/Backends/LanBackend/LanLink.java +++ b/src/org/kde/kdeconnect/Backends/LanBackend/LanLink.java @@ -131,7 +131,7 @@ public class LanLink extends BaseLink { //Blocking, do not call from main thread @WorkerThread @Override - public boolean sendPacket(NetworkPacket np, final Device.SendPacketStatusCallback callback) { + public boolean sendPacket(NetworkPacket np, final Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) { if (socket == null) { Log.e("KDE/sendPacket", "Not yet connected"); callback.onFailure(new NotYetConnectedException()); @@ -165,53 +165,17 @@ public class LanLink extends BaseLink { //Send payload if (server != null) { - Socket payloadSocket = null; - OutputStream outputStream = null; - InputStream inputStream; - try { - //Wait a maximum of 10 seconds for the other end to establish a connection with our socket, close it afterwards - server.setSoTimeout(10*1000); - - payloadSocket = server.accept(); - - //Convert to SSL if needed - payloadSocket = SslHelper.convertToSslSocket(context, payloadSocket, getDeviceId(), true, false); - - outputStream = payloadSocket.getOutputStream(); - inputStream = np.getPayload().getInputStream(); - - Log.i("KDE/LanLink", "Beginning to send payload"); - byte[] buffer = new byte[4096]; - int bytesRead; - long size = np.getPayloadSize(); - long progress = 0; - long timeSinceLastUpdate = -1; - while (!np.isCanceled() && (bytesRead = inputStream.read(buffer)) != -1) { - //Log.e("ok",""+bytesRead); - progress += bytesRead; - outputStream.write(buffer, 0, bytesRead); - if (size > 0) { - if (timeSinceLastUpdate + 500 < System.currentTimeMillis()) { //Report progress every half a second - long percent = ((100 * progress) / size); - callback.onProgressChanged((int) percent); - timeSinceLastUpdate = System.currentTimeMillis(); - } + if (sendPayloadFromSameThread) { + sendPayload(np, callback, server); + } else { + ThreadHelper.execute(() -> { + try { + sendPayload(np, callback, server); + } catch (IOException e) { + e.printStackTrace(); + Log.e("LanLink/sendPacket", "Async sendPayload failed for packet of type " + np.getType() + ". The Plugin was NOT notified."); } - } - outputStream.flush(); - Log.i("KDE/LanLink", "Finished sending payload ("+progress+" bytes written)"); - } catch(SSLHandshakeException e) { - // The exception can be due to several causes. "Connection closed by peer" seems to be a common one. - // If we could distinguish different cases we could react differently for some of them, but I haven't found how. - Log.e("sendPacket","Payload SSLSocket failed"); - e.printStackTrace(); - } catch(SocketTimeoutException e) { - Log.e("LanLink", "Socket for payload in packet " + np.getType() + " timed out. The other end didn't fetch the payload."); - } finally { - try { server.close(); } catch (Exception ignored) { } - try { payloadSocket.close(); } catch (Exception ignored) { } - np.getPayload().close(); - try { outputStream.close(); } catch (Exception ignored) { } + }); } } @@ -232,6 +196,59 @@ public class LanLink extends BaseLink { } } + private void sendPayload(NetworkPacket np, Device.SendPacketStatusCallback callback, ServerSocket server) throws IOException { + Socket payloadSocket = null; + OutputStream outputStream = null; + InputStream inputStream; + try { + if (!np.isCanceled()) { + //Wait a maximum of 10 seconds for the other end to establish a connection with our socket, close it afterwards + server.setSoTimeout(10 * 1000); + + payloadSocket = server.accept(); + + //Convert to SSL if needed + payloadSocket = SslHelper.convertToSslSocket(context, payloadSocket, getDeviceId(), true, false); + + outputStream = payloadSocket.getOutputStream(); + inputStream = np.getPayload().getInputStream(); + + Log.i("KDE/LanLink", "Beginning to send payload for " + np.getType()); + byte[] buffer = new byte[4096]; + int bytesRead; + long size = np.getPayloadSize(); + long progress = 0; + long timeSinceLastUpdate = -1; + while (!np.isCanceled() && (bytesRead = inputStream.read(buffer)) != -1) { + //Log.e("ok",""+bytesRead); + progress += bytesRead; + outputStream.write(buffer, 0, bytesRead); + if (size > 0) { + if (timeSinceLastUpdate + 500 < System.currentTimeMillis()) { //Report progress every half a second + long percent = ((100 * progress) / size); + callback.onPayloadProgressChanged((int) percent); + timeSinceLastUpdate = System.currentTimeMillis(); + } + } + } + outputStream.flush(); + Log.i("KDE/LanLink", "Finished sending payload (" + progress + " bytes written)"); + } + } catch(SocketTimeoutException e) { + Log.e("LanLink", "Socket for payload in packet " + np.getType() + " timed out. The other end didn't fetch the payload."); + } catch(SSLHandshakeException e) { + // The exception can be due to several causes. "Connection closed by peer" seems to be a common one. + // If we could distinguish different cases we could react differently for some of them, but I haven't found how. + Log.e("sendPacket","Payload SSLSocket failed"); + e.printStackTrace(); + } finally { + try { server.close(); } catch (Exception ignored) { } + try { payloadSocket.close(); } catch (Exception ignored) { } + np.getPayload().close(); + try { outputStream.close(); } catch (Exception ignored) { } + } + } + private void receivedNetworkPacket(NetworkPacket np) { if (np.hasPayloadTransferInfo()) { diff --git a/src/org/kde/kdeconnect/Backends/LoopbackBackend/LoopbackLink.java b/src/org/kde/kdeconnect/Backends/LoopbackBackend/LoopbackLink.java index c7e728e8..691e127c 100644 --- a/src/org/kde/kdeconnect/Backends/LoopbackBackend/LoopbackLink.java +++ b/src/org/kde/kdeconnect/Backends/LoopbackBackend/LoopbackLink.java @@ -34,12 +34,12 @@ public class LoopbackLink extends BaseLink { @WorkerThread @Override - public boolean sendPacket(NetworkPacket in, Device.SendPacketStatusCallback callback) { + public boolean sendPacket(NetworkPacket in, Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) { packetReceived(in); if (in.hasPayload()) { - callback.onProgressChanged(0); + callback.onPayloadProgressChanged(0); in.setPayload(in.getPayload()); - callback.onProgressChanged(100); + callback.onPayloadProgressChanged(100); } callback.onSuccess(); return true; diff --git a/src/org/kde/kdeconnect/Device.java b/src/org/kde/kdeconnect/Device.java index 5a198f25..834481e1 100644 --- a/src/org/kde/kdeconnect/Device.java +++ b/src/org/kde/kdeconnect/Device.java @@ -610,7 +610,7 @@ public class Device implements BaseLink.PacketReceiver { public abstract void onFailure(Throwable e); - public void onProgressChanged(int percent) { + public void onPayloadProgressChanged(int percent) { } } @@ -674,16 +674,24 @@ public class Device implements BaseLink.PacketReceiver { } } + @WorkerThread + public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback) { + return sendPacketBlocking(np, callback, false); + } + /** * Send {@code np} over one of this device's connected {@link #links}. * - * @param np the packet to send - * @param callback a callback that can receive realtime updates + * @param np the packet to send + * @param callback a callback that can receive realtime updates + * @param sendPayloadFromSameThread when set to true and np contains a Payload, this function + * won't return until the Payload has been received by the + * other end, or times out after 10 seconds * @return true if the packet was sent ok, false otherwise * @see BaseLink#sendPacket(NetworkPacket, SendPacketStatusCallback) */ @WorkerThread - public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback) { + public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) { /* if (!m_outgoingCapabilities.contains(np.getType()) && !NetworkPacket.protocolPacketTypes.contains(np.getType())) { @@ -696,7 +704,7 @@ public class Device implements BaseLink.PacketReceiver { for (final BaseLink link : links) { if (link == null) continue; try { - success = link.sendPacket(np, callback); + success = link.sendPacket(np, callback, sendPayloadFromSameThread); } catch (IOException e) { e.printStackTrace(); } diff --git a/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java b/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java index 3b739119..ef4a9389 100644 --- a/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java +++ b/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java @@ -104,7 +104,9 @@ public class CompositeUploadFileJob extends BackgroundJob { addTotalsToNetworkPacket(currentNetworkPacket); - if (!getDevice().sendPacketBlocking(currentNetworkPacket, sendPacketStatusCallback)) { + // We set sendPayloadFromSameThread to true so this call blocks until the payload + // has been received by the other end, so payloads are sent one by one. + if (!getDevice().sendPacketBlocking(currentNetworkPacket, sendPacketStatusCallback, true)) { throw new RuntimeException("Sending packet failed"); } @@ -202,7 +204,7 @@ public class CompositeUploadFileJob extends BackgroundJob { private class SendPacketStatusCallback extends Device.SendPacketStatusCallback { @Override - public void onProgressChanged(int percent) { + public void onPayloadProgressChanged(int percent) { float send = totalSend + (currentNetworkPacket.getPayloadSize() * ((float)percent / 100)); int progress = (int)((send * 100) / totalPayloadSize);