From 4ae6e500201c2b7fef51b3d65fb38b1c9f0f4fd2 Mon Sep 17 00:00:00 2001 From: Albert Vaca Cintora Date: Tue, 14 May 2024 00:11:22 +0200 Subject: [PATCH] Remove DevicePacketQueue, use a IO coroutine to send packets --- src/org/kde/kdeconnect/Device.kt | 64 ++++---- src/org/kde/kdeconnect/DevicePacketQueue.java | 142 ------------------ src/org/kde/kdeconnect/NetworkPacket.kt | 3 - .../MousePadPlugin/MousePadPlugin.java | 13 +- .../PresenterPlugin/PresenterPlugin.java | 12 +- .../kde/kdeconnect/DevicePacketQueueTest.java | 50 ------ 6 files changed, 31 insertions(+), 253 deletions(-) delete mode 100644 src/org/kde/kdeconnect/DevicePacketQueue.java delete mode 100644 tests/org/kde/kdeconnect/DevicePacketQueueTest.java diff --git a/src/org/kde/kdeconnect/Device.kt b/src/org/kde/kdeconnect/Device.kt index 5c6272c0..2004fcc1 100644 --- a/src/org/kde/kdeconnect/Device.kt +++ b/src/org/kde/kdeconnect/Device.kt @@ -18,6 +18,12 @@ import androidx.annotation.VisibleForTesting import androidx.annotation.WorkerThread import androidx.core.app.NotificationCompat import androidx.core.content.ContextCompat +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch import org.apache.commons.collections4.MultiValuedMap import org.apache.commons.collections4.multimap.ArrayListValuedHashMap import org.kde.kdeconnect.Backends.BaseLink @@ -56,7 +62,6 @@ class Device : PacketReceiver { var pairingHandler: PairingHandler private val pairingCallbacks = CopyOnWriteArrayList() private val links = CopyOnWriteArrayList() - private var packetQueue: DevicePacketQueue? = null var supportedPlugins: List private set val loadedPlugins: ConcurrentMap = ConcurrentHashMap() @@ -65,6 +70,9 @@ class Device : PacketReceiver { private var pluginsByIncomingInterface: MultiValuedMap = ArrayListValuedHashMap() private val settings: SharedPreferences private val pluginsChangedListeners: MutableList = CopyOnWriteArrayList() + class NetworkPacketWithCallback(val np : NetworkPacket, val callback: SendPacketStatusCallback) + private val sendChannel = Channel(Channel.UNLIMITED) + private var sendCoroutine : Job? = null /** * Constructor for remembered, already-trusted devices. @@ -275,10 +283,9 @@ class Device : PacketReceiver { get() = links.isNotEmpty() fun addLink(link: BaseLink) { - if (links.isEmpty()) { - packetQueue = DevicePacketQueue(this) + if (sendCoroutine == null) { + launchSendCoroutine() } - // FilesHelper.LogOpenFileCount(); links.add(link) @@ -306,9 +313,15 @@ class Device : PacketReceiver { ) if (links.isEmpty()) { reloadPluginsFromSettings() - if (packetQueue != null) { - packetQueue!!.disconnected() - packetQueue = null + sendCoroutine?.cancel(CancellationException("Device disconnected")) + sendCoroutine = null + } + } + + private fun launchSendCoroutine() { + sendCoroutine = CoroutineScope(Dispatchers.IO).launch { + for (item in sendChannel) { + sendPacketBlocking(item.np, item.callback) } } } @@ -410,49 +423,26 @@ class Device : PacketReceiver { } } - @AnyThread - fun sendPacket(np: NetworkPacket) = sendPacket(np, -1, defaultCallback) - - @AnyThread - fun sendPacket(np: NetworkPacket, replaceID: Int) = sendPacket(np, replaceID, defaultCallback) - - @WorkerThread - fun sendPacketBlocking(np: NetworkPacket): Boolean = sendPacketBlocking(np, defaultCallback) - - @AnyThread - fun sendPacket(np: NetworkPacket, callback: SendPacketStatusCallback) = sendPacket(np, -1, callback) - /** * Send a packet to the device asynchronously * @param np The packet - * @param replaceID If positive, replaces all unsent packets with the same replaceID * @param callback A callback for success/failure */ @AnyThread - fun sendPacket(np: NetworkPacket, replaceID: Int, callback: SendPacketStatusCallback) { - val packetQueue = packetQueue ?: run { - callback.onFailure(Exception("Device disconnected!")) - return - } - // TODO: Migrate to coroutine version (addPacket) - packetQueue.addPacket(np, replaceID, callback) + fun sendPacket(np: NetworkPacket, callback: SendPacketStatusCallback) { + sendChannel.trySend(NetworkPacketWithCallback(np, callback)) } - /** - * Check if we still have an unsent packet in the queue with the given ID. - * If so, remove it from the queue and return it - * @param replaceID The replace ID (must be positive) - * @return The found packet, or null - */ - fun getAndRemoveUnsentPacket(replaceID: Int): NetworkPacket? { - // TODO: Migrate to coroutine version (getAndRemoveUnsentPacket) - return packetQueue?.getAndRemoveUnsentPacket(replaceID) - } + @AnyThread + fun sendPacket(np: NetworkPacket) = sendPacket(np, defaultCallback) @WorkerThread fun sendPacketBlocking(np: NetworkPacket, callback: SendPacketStatusCallback): Boolean = sendPacketBlocking(np, callback, false) + @WorkerThread + fun sendPacketBlocking(np: NetworkPacket): Boolean = sendPacketBlocking(np, defaultCallback, false) + /** * Send `np` over one of this device's connected [.links]. * diff --git a/src/org/kde/kdeconnect/DevicePacketQueue.java b/src/org/kde/kdeconnect/DevicePacketQueue.java deleted file mode 100644 index 2aead660..00000000 --- a/src/org/kde/kdeconnect/DevicePacketQueue.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2019 Matthijs Tijink - * - * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL -*/ - -package org.kde.kdeconnect; - -import android.util.Log; - -import org.kde.kdeconnect.Helpers.ThreadHelper; - -import java.util.ArrayDeque; -import java.util.Optional; - -/** - * Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads - */ -class DevicePacketQueue { - /** - * Holds the packet and related stuff to keep in the queue - */ - private static final class Item { - NetworkPacket packet; - /** - * Replacement ID: if positive, it can be replaced by later packets with the same ID - */ - final int replaceID; - Device.SendPacketStatusCallback callback; - - Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) { - this.packet = packet; - this.callback = callback; - this.replaceID = replaceID; - } - } - - private final ArrayDeque items = new ArrayDeque<>(); - private final Device mDevice; - private final Object lock = new Object(); - private boolean exit = false; - - DevicePacketQueue(Device device) { - this(device, true); - } - - DevicePacketQueue(Device device, Boolean startThread) { - mDevice = device; - if (startThread) { - ThreadHelper.execute(new SendingRunnable()); - } - } - - /** - * Send a packet (at some point in the future) - * @param packet The packet - * @param replaceID If positive, it will replace all older packets still in the queue - * @param callback The callback after sending the packet - */ - void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) { - synchronized (lock) { - if (exit) { - callback.onFailure(new Exception("Device disconnected!")); - } else { - boolean replaced = false; - - if (replaceID >= 0) { - for (Item item : items) { - if (item.replaceID == replaceID) { - item.packet = packet; - item.callback = callback; - replaced = true; - break; - } - } - } - - if (!replaced) { - items.addLast(new Item(packet, replaceID, callback)); - lock.notify(); - } - } - } - } - - /** - * Check if we still have an unsent packet in the queue with the given ID. - * If so, remove it from the queue and return it - * @param replaceID The replace ID (must be positive) - * @return The found packet, or null - */ - NetworkPacket getAndRemoveUnsentPacket(int replaceID) { - synchronized (lock) { - final Optional itemOptional = items.stream() - .filter(item -> item.replaceID == replaceID).findFirst(); - if (itemOptional.isPresent()) { - final Item item = itemOptional.get(); - items.remove(item); - return item.packet; - } - } - return null; - } - - void disconnected() { - synchronized (lock) { - exit = true; - lock.notifyAll(); - } - } - - private final class SendingRunnable implements Runnable { - @Override - public void run() { - while (true) { - Item item; - synchronized (lock) { - while (items.isEmpty() && !exit) { - try { - lock.wait(); - } catch (InterruptedException ignored) { - } - } - if (exit) { - Log.i("DevicePacketQueue", "Terminating sending loop"); - break; - } - - item = items.removeFirst(); - } - - mDevice.sendPacketBlocking(item.packet, item.callback); - } - - while (!items.isEmpty()) { - Item item = items.removeFirst(); - item.callback.onFailure(new Exception("Device disconnected!")); - } - } - } - -} diff --git a/src/org/kde/kdeconnect/NetworkPacket.kt b/src/org/kde/kdeconnect/NetworkPacket.kt index 6d00dc77..c9dcece2 100644 --- a/src/org/kde/kdeconnect/NetworkPacket.kt +++ b/src/org/kde/kdeconnect/NetworkPacket.kt @@ -281,9 +281,6 @@ class NetworkPacket private constructor( const val PACKET_TYPE_IDENTITY: String = "kdeconnect.identity" const val PACKET_TYPE_PAIR: String = "kdeconnect.pair" - const val PACKET_REPLACEID_MOUSEMOVE: Int = 0 - const val PACKET_REPLACEID_PRESENTERPOINTER: Int = 1 - @JvmStatic @Throws(JSONException::class) fun unserialize(s: String): NetworkPacket { diff --git a/src/org/kde/kdeconnect/Plugins/MousePadPlugin/MousePadPlugin.java b/src/org/kde/kdeconnect/Plugins/MousePadPlugin/MousePadPlugin.java index 43288391..c59b4141 100644 --- a/src/org/kde/kdeconnect/Plugins/MousePadPlugin/MousePadPlugin.java +++ b/src/org/kde/kdeconnect/Plugins/MousePadPlugin/MousePadPlugin.java @@ -91,19 +91,10 @@ public class MousePadPlugin extends Plugin { } public void sendMouseDelta(float dx, float dy) { - NetworkPacket np = getDevice().getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_MOUSEMOVE); - if (np == null) { - np = new NetworkPacket(PACKET_TYPE_MOUSEPAD_REQUEST); - } else { - // TODO: In my tests we never get here. Decide if it's worth keeping the logic to replace unsent packets. - dx += np.getInt("dx"); - dy += np.getInt("dx"); - } - + NetworkPacket np = new NetworkPacket(PACKET_TYPE_MOUSEPAD_REQUEST); np.set("dx", dx); np.set("dy", dy); - - getDevice().sendPacket(np, NetworkPacket.PACKET_REPLACEID_MOUSEMOVE); + getDevice().sendPacket(np); } public void sendLeftClick() { diff --git a/src/org/kde/kdeconnect/Plugins/PresenterPlugin/PresenterPlugin.java b/src/org/kde/kdeconnect/Plugins/PresenterPlugin/PresenterPlugin.java index 7dac61a6..f84703dd 100644 --- a/src/org/kde/kdeconnect/Plugins/PresenterPlugin/PresenterPlugin.java +++ b/src/org/kde/kdeconnect/Plugins/PresenterPlugin/PresenterPlugin.java @@ -111,21 +111,13 @@ public class PresenterPlugin extends Plugin { } public void sendPointer(float xDelta, float yDelta) { - NetworkPacket np = getDevice().getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_PRESENTERPOINTER); - if (np == null) { - np = new NetworkPacket(PACKET_TYPE_PRESENTER); - } else { - // TODO: In my tests we never get here. Decide if it's worth keeping the logic to replace unsent packets. - xDelta += np.getInt("dx"); - yDelta += np.getInt("dy"); - } + NetworkPacket np = new NetworkPacket(PACKET_TYPE_PRESENTER); np.set("dx", xDelta); np.set("dy", yDelta); - getDevice().sendPacket(np, NetworkPacket.PACKET_REPLACEID_PRESENTERPOINTER); + getDevice().sendPacket(np); } public void stopPointer() { - getDevice().getAndRemoveUnsentPacket(NetworkPacket.PACKET_REPLACEID_PRESENTERPOINTER); NetworkPacket np = new NetworkPacket(PACKET_TYPE_PRESENTER); np.set("stop", true); getDevice().sendPacket(np); diff --git a/tests/org/kde/kdeconnect/DevicePacketQueueTest.java b/tests/org/kde/kdeconnect/DevicePacketQueueTest.java deleted file mode 100644 index 63a0b2f4..00000000 --- a/tests/org/kde/kdeconnect/DevicePacketQueueTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.kde.kdeconnect; - -import static org.junit.Assert.*; - -import org.junit.Test; -import org.mockito.Mockito; - -public class DevicePacketQueueTest { - @Test - public void addPacketWithPositiveReplaceId() { - Device device = Mockito.mock(Device.class); - Device.SendPacketStatusCallback callback = Mockito.mock(Device.SendPacketStatusCallback.class); - - DevicePacketQueue queue = new DevicePacketQueue(device, false); - - queue.addPacket(new NetworkPacket("Test"), 0, callback); - queue.addPacket(new NetworkPacket("Test1"), 1, callback); - - assertNotNull(queue.getAndRemoveUnsentPacket(0)); - assertNotNull(queue.getAndRemoveUnsentPacket(1)); - } - - @Test - public void addPacketWithNegativeReplaceId() { - Device device = Mockito.mock(Device.class); - Device.SendPacketStatusCallback callback = Mockito.mock(Device.SendPacketStatusCallback.class); - - DevicePacketQueue queue = new DevicePacketQueue(device, false); - - queue.addPacket(new NetworkPacket("Test"), -1, callback); - queue.addPacket(new NetworkPacket("Test1"), -1, callback); - - assertNotNull(queue.getAndRemoveUnsentPacket(-1)); - assertNotNull(queue.getAndRemoveUnsentPacket(-1)); - } - - @Test - public void addPacketReplacesPacket() { - Device device = Mockito.mock(Device.class); - Device.SendPacketStatusCallback callback = Mockito.mock(Device.SendPacketStatusCallback.class); - - DevicePacketQueue queue = new DevicePacketQueue(device, false); - - queue.addPacket(new NetworkPacket("Test"), 1, callback); - queue.addPacket(new NetworkPacket("Test1"), 1, callback); - - assertNotNull(queue.getAndRemoveUnsentPacket(1)); - assertNull(queue.getAndRemoveUnsentPacket(1)); - } -} \ No newline at end of file