2
0
mirror of https://github.com/KDE/kdeconnect-android synced 2025-08-28 12:47:43 +00:00

Remove DevicePacketQueue, use a IO coroutine to send packets

This commit is contained in:
Albert Vaca Cintora 2024-05-14 00:11:22 +02:00
parent 66ea01ad29
commit 4ae6e50020
No known key found for this signature in database
6 changed files with 31 additions and 253 deletions

View File

@ -18,6 +18,12 @@ import androidx.annotation.VisibleForTesting
import androidx.annotation.WorkerThread import androidx.annotation.WorkerThread
import androidx.core.app.NotificationCompat import androidx.core.app.NotificationCompat
import androidx.core.content.ContextCompat import androidx.core.content.ContextCompat
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import org.apache.commons.collections4.MultiValuedMap import org.apache.commons.collections4.MultiValuedMap
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap import org.apache.commons.collections4.multimap.ArrayListValuedHashMap
import org.kde.kdeconnect.Backends.BaseLink import org.kde.kdeconnect.Backends.BaseLink
@ -56,7 +62,6 @@ class Device : PacketReceiver {
var pairingHandler: PairingHandler var pairingHandler: PairingHandler
private val pairingCallbacks = CopyOnWriteArrayList<PairingCallback>() private val pairingCallbacks = CopyOnWriteArrayList<PairingCallback>()
private val links = CopyOnWriteArrayList<BaseLink>() private val links = CopyOnWriteArrayList<BaseLink>()
private var packetQueue: DevicePacketQueue? = null
var supportedPlugins: List<String> var supportedPlugins: List<String>
private set private set
val loadedPlugins: ConcurrentMap<String, Plugin> = ConcurrentHashMap() val loadedPlugins: ConcurrentMap<String, Plugin> = ConcurrentHashMap()
@ -65,6 +70,9 @@ class Device : PacketReceiver {
private var pluginsByIncomingInterface: MultiValuedMap<String, String> = ArrayListValuedHashMap() private var pluginsByIncomingInterface: MultiValuedMap<String, String> = ArrayListValuedHashMap()
private val settings: SharedPreferences private val settings: SharedPreferences
private val pluginsChangedListeners: MutableList<PluginsChangedListener> = CopyOnWriteArrayList() private val pluginsChangedListeners: MutableList<PluginsChangedListener> = CopyOnWriteArrayList()
class NetworkPacketWithCallback(val np : NetworkPacket, val callback: SendPacketStatusCallback)
private val sendChannel = Channel<NetworkPacketWithCallback>(Channel.UNLIMITED)
private var sendCoroutine : Job? = null
/** /**
* Constructor for remembered, already-trusted devices. * Constructor for remembered, already-trusted devices.
@ -275,10 +283,9 @@ class Device : PacketReceiver {
get() = links.isNotEmpty() get() = links.isNotEmpty()
fun addLink(link: BaseLink) { fun addLink(link: BaseLink) {
if (links.isEmpty()) { if (sendCoroutine == null) {
packetQueue = DevicePacketQueue(this) launchSendCoroutine()
} }
// FilesHelper.LogOpenFileCount(); // FilesHelper.LogOpenFileCount();
links.add(link) links.add(link)
@ -306,9 +313,15 @@ class Device : PacketReceiver {
) )
if (links.isEmpty()) { if (links.isEmpty()) {
reloadPluginsFromSettings() reloadPluginsFromSettings()
if (packetQueue != null) { sendCoroutine?.cancel(CancellationException("Device disconnected"))
packetQueue!!.disconnected() sendCoroutine = null
packetQueue = null }
}
private fun launchSendCoroutine() {
sendCoroutine = CoroutineScope(Dispatchers.IO).launch {
for (item in sendChannel) {
sendPacketBlocking(item.np, item.callback)
} }
} }
} }
@ -410,49 +423,26 @@ class Device : PacketReceiver {
} }
} }
@AnyThread
fun sendPacket(np: NetworkPacket) = sendPacket(np, -1, defaultCallback)
@AnyThread
fun sendPacket(np: NetworkPacket, replaceID: Int) = sendPacket(np, replaceID, defaultCallback)
@WorkerThread
fun sendPacketBlocking(np: NetworkPacket): Boolean = sendPacketBlocking(np, defaultCallback)
@AnyThread
fun sendPacket(np: NetworkPacket, callback: SendPacketStatusCallback) = sendPacket(np, -1, callback)
/** /**
* Send a packet to the device asynchronously * Send a packet to the device asynchronously
* @param np The packet * @param np The packet
* @param replaceID If positive, replaces all unsent packets with the same replaceID
* @param callback A callback for success/failure * @param callback A callback for success/failure
*/ */
@AnyThread @AnyThread
fun sendPacket(np: NetworkPacket, replaceID: Int, callback: SendPacketStatusCallback) { fun sendPacket(np: NetworkPacket, callback: SendPacketStatusCallback) {
val packetQueue = packetQueue ?: run { sendChannel.trySend(NetworkPacketWithCallback(np, callback))
callback.onFailure(Exception("Device disconnected!"))
return
}
// TODO: Migrate to coroutine version (addPacket)
packetQueue.addPacket(np, replaceID, callback)
} }
/** @AnyThread
* Check if we still have an unsent packet in the queue with the given ID. fun sendPacket(np: NetworkPacket) = sendPacket(np, defaultCallback)
* If so, remove it from the queue and return it
* @param replaceID The replace ID (must be positive)
* @return The found packet, or null
*/
fun getAndRemoveUnsentPacket(replaceID: Int): NetworkPacket? {
// TODO: Migrate to coroutine version (getAndRemoveUnsentPacket)
return packetQueue?.getAndRemoveUnsentPacket(replaceID)
}
@WorkerThread @WorkerThread
fun sendPacketBlocking(np: NetworkPacket, callback: SendPacketStatusCallback): Boolean = fun sendPacketBlocking(np: NetworkPacket, callback: SendPacketStatusCallback): Boolean =
sendPacketBlocking(np, callback, false) sendPacketBlocking(np, callback, false)
@WorkerThread
fun sendPacketBlocking(np: NetworkPacket): Boolean = sendPacketBlocking(np, defaultCallback, false)
/** /**
* Send `np` over one of this device's connected [.links]. * Send `np` over one of this device's connected [.links].
* *

View File

@ -1,142 +0,0 @@
/*
* SPDX-FileCopyrightText: 2019 Matthijs Tijink <matthijstijink@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect;
import android.util.Log;
import org.kde.kdeconnect.Helpers.ThreadHelper;
import java.util.ArrayDeque;
import java.util.Optional;
/**
* Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads
*/
class DevicePacketQueue {
/**
* Holds the packet and related stuff to keep in the queue
*/
private static final class Item {
NetworkPacket packet;
/**
* Replacement ID: if positive, it can be replaced by later packets with the same ID
*/
final int replaceID;
Device.SendPacketStatusCallback callback;
Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
this.packet = packet;
this.callback = callback;
this.replaceID = replaceID;
}
}
private final ArrayDeque<Item> items = new ArrayDeque<>();
private final Device mDevice;
private final Object lock = new Object();
private boolean exit = false;
DevicePacketQueue(Device device) {
this(device, true);
}
DevicePacketQueue(Device device, Boolean startThread) {
mDevice = device;
if (startThread) {
ThreadHelper.execute(new SendingRunnable());
}
}
/**
* Send a packet (at some point in the future)
* @param packet The packet
* @param replaceID If positive, it will replace all older packets still in the queue
* @param callback The callback after sending the packet
*/
void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
synchronized (lock) {
if (exit) {
callback.onFailure(new Exception("Device disconnected!"));
} else {
boolean replaced = false;
if (replaceID >= 0) {
for (Item item : items) {
if (item.replaceID == replaceID) {
item.packet = packet;
item.callback = callback;
replaced = true;
break;
}
}
}
if (!replaced) {
items.addLast(new Item(packet, replaceID, callback));
lock.notify();
}
}
}
}
/**
* Check if we still have an unsent packet in the queue with the given ID.
* If so, remove it from the queue and return it
* @param replaceID The replace ID (must be positive)
* @return The found packet, or null
*/
NetworkPacket getAndRemoveUnsentPacket(int replaceID) {
synchronized (lock) {
final Optional<Item> itemOptional = items.stream()
.filter(item -> item.replaceID == replaceID).findFirst();
if (itemOptional.isPresent()) {
final Item item = itemOptional.get();
items.remove(item);
return item.packet;
}
}
return null;
}
void disconnected() {
synchronized (lock) {
exit = true;
lock.notifyAll();
}
}
private final class SendingRunnable implements Runnable {
@Override
public void run() {
while (true) {
Item item;
synchronized (lock) {
while (items.isEmpty() && !exit) {
try {
lock.wait();
} catch (InterruptedException ignored) {
}
}
if (exit) {
Log.i("DevicePacketQueue", "Terminating sending loop");
break;
}
item = items.removeFirst();
}
mDevice.sendPacketBlocking(item.packet, item.callback);
}
while (!items.isEmpty()) {
Item item = items.removeFirst();
item.callback.onFailure(new Exception("Device disconnected!"));
}
}
}
}

View File

@ -281,9 +281,6 @@ class NetworkPacket private constructor(
const val PACKET_TYPE_IDENTITY: String = "kdeconnect.identity" const val PACKET_TYPE_IDENTITY: String = "kdeconnect.identity"
const val PACKET_TYPE_PAIR: String = "kdeconnect.pair" const val PACKET_TYPE_PAIR: String = "kdeconnect.pair"
const val PACKET_REPLACEID_MOUSEMOVE: Int = 0
const val PACKET_REPLACEID_PRESENTERPOINTER: Int = 1
@JvmStatic @JvmStatic
@Throws(JSONException::class) @Throws(JSONException::class)
fun unserialize(s: String): NetworkPacket { fun unserialize(s: String): NetworkPacket {

View File

@ -91,19 +91,10 @@ public class MousePadPlugin extends Plugin {
} }
public void sendMouseDelta(float dx, float dy) { public void sendMouseDelta(float dx, float dy) {
NetworkPacket np = getDevice().getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_MOUSEMOVE); NetworkPacket np = new NetworkPacket(PACKET_TYPE_MOUSEPAD_REQUEST);
if (np == null) {
np = new NetworkPacket(PACKET_TYPE_MOUSEPAD_REQUEST);
} else {
// TODO: In my tests we never get here. Decide if it's worth keeping the logic to replace unsent packets.
dx += np.getInt("dx");
dy += np.getInt("dx");
}
np.set("dx", dx); np.set("dx", dx);
np.set("dy", dy); np.set("dy", dy);
getDevice().sendPacket(np);
getDevice().sendPacket(np, NetworkPacket.PACKET_REPLACEID_MOUSEMOVE);
} }
public void sendLeftClick() { public void sendLeftClick() {

View File

@ -111,21 +111,13 @@ public class PresenterPlugin extends Plugin {
} }
public void sendPointer(float xDelta, float yDelta) { public void sendPointer(float xDelta, float yDelta) {
NetworkPacket np = getDevice().getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_PRESENTERPOINTER); NetworkPacket np = new NetworkPacket(PACKET_TYPE_PRESENTER);
if (np == null) {
np = new NetworkPacket(PACKET_TYPE_PRESENTER);
} else {
// TODO: In my tests we never get here. Decide if it's worth keeping the logic to replace unsent packets.
xDelta += np.getInt("dx");
yDelta += np.getInt("dy");
}
np.set("dx", xDelta); np.set("dx", xDelta);
np.set("dy", yDelta); np.set("dy", yDelta);
getDevice().sendPacket(np, NetworkPacket.PACKET_REPLACEID_PRESENTERPOINTER); getDevice().sendPacket(np);
} }
public void stopPointer() { public void stopPointer() {
getDevice().getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_PRESENTERPOINTER);
NetworkPacket np = new NetworkPacket(PACKET_TYPE_PRESENTER); NetworkPacket np = new NetworkPacket(PACKET_TYPE_PRESENTER);
np.set("stop", true); np.set("stop", true);
getDevice().sendPacket(np); getDevice().sendPacket(np);

View File

@ -1,50 +0,0 @@
package org.kde.kdeconnect;
import static org.junit.Assert.*;
import org.junit.Test;
import org.mockito.Mockito;
public class DevicePacketQueueTest {
@Test
public void addPacketWithPositiveReplaceId() {
Device device = Mockito.mock(Device.class);
Device.SendPacketStatusCallback callback = Mockito.mock(Device.SendPacketStatusCallback.class);
DevicePacketQueue queue = new DevicePacketQueue(device, false);
queue.addPacket(new NetworkPacket("Test"), 0, callback);
queue.addPacket(new NetworkPacket("Test1"), 1, callback);
assertNotNull(queue.getAndRemoveUnsentPacket(0));
assertNotNull(queue.getAndRemoveUnsentPacket(1));
}
@Test
public void addPacketWithNegativeReplaceId() {
Device device = Mockito.mock(Device.class);
Device.SendPacketStatusCallback callback = Mockito.mock(Device.SendPacketStatusCallback.class);
DevicePacketQueue queue = new DevicePacketQueue(device, false);
queue.addPacket(new NetworkPacket("Test"), -1, callback);
queue.addPacket(new NetworkPacket("Test1"), -1, callback);
assertNotNull(queue.getAndRemoveUnsentPacket(-1));
assertNotNull(queue.getAndRemoveUnsentPacket(-1));
}
@Test
public void addPacketReplacesPacket() {
Device device = Mockito.mock(Device.class);
Device.SendPacketStatusCallback callback = Mockito.mock(Device.SendPacketStatusCallback.class);
DevicePacketQueue queue = new DevicePacketQueue(device, false);
queue.addPacket(new NetworkPacket("Test"), 1, callback);
queue.addPacket(new NetworkPacket("Test1"), 1, callback);
assertNotNull(queue.getAndRemoveUnsentPacket(1));
assertNull(queue.getAndRemoveUnsentPacket(1));
}
}