2
0
mirror of https://github.com/KDE/kdeconnect-android synced 2025-08-29 05:07:40 +00:00

Make sending the payloads async

Fixes the bug described in !359. Since we made sending packets sequential
in !90 (including the payload part of the packet) we could be blocking the
queue for up to 10 seconds if the other end didn't fetch our payload.

This makes the payload part async by default but keeps the option to make
it sync, since we want that behavior in CompositeUploadFileJob.
This commit is contained in:
Albert Vaca Cintora 2023-05-24 19:22:17 +02:00
parent ea1675c76a
commit 0f3ad63ee3
7 changed files with 90 additions and 62 deletions

View File

@ -71,7 +71,7 @@ public abstract class BaseLink {
linkProvider.connectionLost(this);
}
//TO OVERRIDE, should be sync
//TO OVERRIDE, should be sync. If sendPayloadFromSameThread is false, it should only block to send the packet but start a separate thread to send the payload.
@WorkerThread
public abstract boolean sendPacket(NetworkPacket np, Device.SendPacketStatusCallback callback) throws IOException;
public abstract boolean sendPacket(NetworkPacket np, Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) throws IOException;
}

View File

@ -137,7 +137,8 @@ public class BluetoothLink extends BaseLink {
@WorkerThread
@Override
public boolean sendPacket(NetworkPacket np, final Device.SendPacketStatusCallback callback) {
public boolean sendPacket(NetworkPacket np, Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) throws IOException {
// sendPayloadFromSameThread is ignored, we always send from the same thread!
/*if (!isConnected()) {
Log.e("BluetoothLink", "sendPacketEncrypted failed: not connected");
@ -168,7 +169,7 @@ public class BluetoothLink extends BaseLink {
progress += bytesRead;
payloadStream.write(buffer, 0, bytesRead);
if (np.getPayloadSize() > 0) {
callback.onProgressChanged((int) (100 * progress / np.getPayloadSize()));
callback.onPayloadProgressChanged((int) (100 * progress / np.getPayloadSize()));
}
}
payloadStream.flush();

View File

@ -378,7 +378,7 @@ public class BluetoothLinkProvider extends BaseLinkProvider {
public void onFailure(Throwable e) {
}
});
}, true);
} catch (Exception e) {
Log.e("BTLinkProvider/Client", "Connection lost/disconnected on " + device.getAddress(), e);
}

View File

@ -131,7 +131,7 @@ public class LanLink extends BaseLink {
//Blocking, do not call from main thread
@WorkerThread
@Override
public boolean sendPacket(NetworkPacket np, final Device.SendPacketStatusCallback callback) {
public boolean sendPacket(NetworkPacket np, final Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) {
if (socket == null) {
Log.e("KDE/sendPacket", "Not yet connected");
callback.onFailure(new NotYetConnectedException());
@ -165,53 +165,17 @@ public class LanLink extends BaseLink {
//Send payload
if (server != null) {
Socket payloadSocket = null;
OutputStream outputStream = null;
InputStream inputStream;
try {
//Wait a maximum of 10 seconds for the other end to establish a connection with our socket, close it afterwards
server.setSoTimeout(10*1000);
payloadSocket = server.accept();
//Convert to SSL if needed
payloadSocket = SslHelper.convertToSslSocket(context, payloadSocket, getDeviceId(), true, false);
outputStream = payloadSocket.getOutputStream();
inputStream = np.getPayload().getInputStream();
Log.i("KDE/LanLink", "Beginning to send payload");
byte[] buffer = new byte[4096];
int bytesRead;
long size = np.getPayloadSize();
long progress = 0;
long timeSinceLastUpdate = -1;
while (!np.isCanceled() && (bytesRead = inputStream.read(buffer)) != -1) {
//Log.e("ok",""+bytesRead);
progress += bytesRead;
outputStream.write(buffer, 0, bytesRead);
if (size > 0) {
if (timeSinceLastUpdate + 500 < System.currentTimeMillis()) { //Report progress every half a second
long percent = ((100 * progress) / size);
callback.onProgressChanged((int) percent);
timeSinceLastUpdate = System.currentTimeMillis();
}
if (sendPayloadFromSameThread) {
sendPayload(np, callback, server);
} else {
ThreadHelper.execute(() -> {
try {
sendPayload(np, callback, server);
} catch (IOException e) {
e.printStackTrace();
Log.e("LanLink/sendPacket", "Async sendPayload failed for packet of type " + np.getType() + ". The Plugin was NOT notified.");
}
}
outputStream.flush();
Log.i("KDE/LanLink", "Finished sending payload ("+progress+" bytes written)");
} catch(SSLHandshakeException e) {
// The exception can be due to several causes. "Connection closed by peer" seems to be a common one.
// If we could distinguish different cases we could react differently for some of them, but I haven't found how.
Log.e("sendPacket","Payload SSLSocket failed");
e.printStackTrace();
} catch(SocketTimeoutException e) {
Log.e("LanLink", "Socket for payload in packet " + np.getType() + " timed out. The other end didn't fetch the payload.");
} finally {
try { server.close(); } catch (Exception ignored) { }
try { payloadSocket.close(); } catch (Exception ignored) { }
np.getPayload().close();
try { outputStream.close(); } catch (Exception ignored) { }
});
}
}
@ -232,6 +196,59 @@ public class LanLink extends BaseLink {
}
}
private void sendPayload(NetworkPacket np, Device.SendPacketStatusCallback callback, ServerSocket server) throws IOException {
Socket payloadSocket = null;
OutputStream outputStream = null;
InputStream inputStream;
try {
if (!np.isCanceled()) {
//Wait a maximum of 10 seconds for the other end to establish a connection with our socket, close it afterwards
server.setSoTimeout(10 * 1000);
payloadSocket = server.accept();
//Convert to SSL if needed
payloadSocket = SslHelper.convertToSslSocket(context, payloadSocket, getDeviceId(), true, false);
outputStream = payloadSocket.getOutputStream();
inputStream = np.getPayload().getInputStream();
Log.i("KDE/LanLink", "Beginning to send payload for " + np.getType());
byte[] buffer = new byte[4096];
int bytesRead;
long size = np.getPayloadSize();
long progress = 0;
long timeSinceLastUpdate = -1;
while (!np.isCanceled() && (bytesRead = inputStream.read(buffer)) != -1) {
//Log.e("ok",""+bytesRead);
progress += bytesRead;
outputStream.write(buffer, 0, bytesRead);
if (size > 0) {
if (timeSinceLastUpdate + 500 < System.currentTimeMillis()) { //Report progress every half a second
long percent = ((100 * progress) / size);
callback.onPayloadProgressChanged((int) percent);
timeSinceLastUpdate = System.currentTimeMillis();
}
}
}
outputStream.flush();
Log.i("KDE/LanLink", "Finished sending payload (" + progress + " bytes written)");
}
} catch(SocketTimeoutException e) {
Log.e("LanLink", "Socket for payload in packet " + np.getType() + " timed out. The other end didn't fetch the payload.");
} catch(SSLHandshakeException e) {
// The exception can be due to several causes. "Connection closed by peer" seems to be a common one.
// If we could distinguish different cases we could react differently for some of them, but I haven't found how.
Log.e("sendPacket","Payload SSLSocket failed");
e.printStackTrace();
} finally {
try { server.close(); } catch (Exception ignored) { }
try { payloadSocket.close(); } catch (Exception ignored) { }
np.getPayload().close();
try { outputStream.close(); } catch (Exception ignored) { }
}
}
private void receivedNetworkPacket(NetworkPacket np) {
if (np.hasPayloadTransferInfo()) {

View File

@ -34,12 +34,12 @@ public class LoopbackLink extends BaseLink {
@WorkerThread
@Override
public boolean sendPacket(NetworkPacket in, Device.SendPacketStatusCallback callback) {
public boolean sendPacket(NetworkPacket in, Device.SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) {
packetReceived(in);
if (in.hasPayload()) {
callback.onProgressChanged(0);
callback.onPayloadProgressChanged(0);
in.setPayload(in.getPayload());
callback.onProgressChanged(100);
callback.onPayloadProgressChanged(100);
}
callback.onSuccess();
return true;

View File

@ -610,7 +610,7 @@ public class Device implements BaseLink.PacketReceiver {
public abstract void onFailure(Throwable e);
public void onProgressChanged(int percent) {
public void onPayloadProgressChanged(int percent) {
}
}
@ -674,16 +674,24 @@ public class Device implements BaseLink.PacketReceiver {
}
}
@WorkerThread
public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback) {
return sendPacketBlocking(np, callback, false);
}
/**
* Send {@code np} over one of this device's connected {@link #links}.
*
* @param np the packet to send
* @param callback a callback that can receive realtime updates
* @param np the packet to send
* @param callback a callback that can receive realtime updates
* @param sendPayloadFromSameThread when set to true and np contains a Payload, this function
* won't return until the Payload has been received by the
* other end, or times out after 10 seconds
* @return true if the packet was sent ok, false otherwise
* @see BaseLink#sendPacket(NetworkPacket, SendPacketStatusCallback)
*/
@WorkerThread
public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback) {
public boolean sendPacketBlocking(final NetworkPacket np, final SendPacketStatusCallback callback, boolean sendPayloadFromSameThread) {
/*
if (!m_outgoingCapabilities.contains(np.getType()) && !NetworkPacket.protocolPacketTypes.contains(np.getType())) {
@ -696,7 +704,7 @@ public class Device implements BaseLink.PacketReceiver {
for (final BaseLink link : links) {
if (link == null) continue;
try {
success = link.sendPacket(np, callback);
success = link.sendPacket(np, callback, sendPayloadFromSameThread);
} catch (IOException e) {
e.printStackTrace();
}

View File

@ -104,7 +104,9 @@ public class CompositeUploadFileJob extends BackgroundJob<Device, Void> {
addTotalsToNetworkPacket(currentNetworkPacket);
if (!getDevice().sendPacketBlocking(currentNetworkPacket, sendPacketStatusCallback)) {
// We set sendPayloadFromSameThread to true so this call blocks until the payload
// has been received by the other end, so payloads are sent one by one.
if (!getDevice().sendPacketBlocking(currentNetworkPacket, sendPacketStatusCallback, true)) {
throw new RuntimeException("Sending packet failed");
}
@ -202,7 +204,7 @@ public class CompositeUploadFileJob extends BackgroundJob<Device, Void> {
private class SendPacketStatusCallback extends Device.SendPacketStatusCallback {
@Override
public void onProgressChanged(int percent) {
public void onPayloadProgressChanged(int percent) {
float send = totalSend + (currentNetworkPacket.getPayloadSize() * ((float)percent / 100));
int progress = (int)((send * 100) / totalPayloadSize);