From 295dc7b42aadd2836079a70326b013f08cbce152 Mon Sep 17 00:00:00 2001 From: ShellWen Chen Date: Sun, 14 Apr 2024 23:04:55 +0000 Subject: [PATCH] refactor: migrate `DevicePacketQueue` to Kotlin Coroutine --- src/org/kde/kdeconnect/Device.java | 6 +- src/org/kde/kdeconnect/DevicePacketQueue.java | 142 ------------------ src/org/kde/kdeconnect/DevicePacketQueue.kt | 131 ++++++++++++++++ .../kde/kdeconnect/DevicePacketQueueTest.java | 24 +-- 4 files changed, 147 insertions(+), 156 deletions(-) delete mode 100644 src/org/kde/kdeconnect/DevicePacketQueue.java create mode 100644 src/org/kde/kdeconnect/DevicePacketQueue.kt diff --git a/src/org/kde/kdeconnect/Device.java b/src/org/kde/kdeconnect/Device.java index 8aade2aa..d612b278 100644 --- a/src/org/kde/kdeconnect/Device.java +++ b/src/org/kde/kdeconnect/Device.java @@ -474,7 +474,8 @@ public class Device implements BaseLink.PacketReceiver { if (packetQueue == null) { callback.onFailure(new Exception("Device disconnected!")); } else { - packetQueue.addPacket(np, replaceID, callback); + // TODO: Migrate to coroutine version (addPacket) + packetQueue.addPacketSync(np, replaceID, callback); } } @@ -488,7 +489,8 @@ public class Device implements BaseLink.PacketReceiver { if (packetQueue == null) { return null; } else { - return packetQueue.getAndRemoveUnsentPacket(replaceID); + // TODO: Migrate to coroutine version (getAndRemoveUnsentPacket) + return packetQueue.getAndRemoveUnsentPacketSync(replaceID); } } diff --git a/src/org/kde/kdeconnect/DevicePacketQueue.java b/src/org/kde/kdeconnect/DevicePacketQueue.java deleted file mode 100644 index 2aead660..00000000 --- a/src/org/kde/kdeconnect/DevicePacketQueue.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2019 Matthijs Tijink - * - * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL -*/ - -package org.kde.kdeconnect; - -import android.util.Log; - -import org.kde.kdeconnect.Helpers.ThreadHelper; - -import java.util.ArrayDeque; -import java.util.Optional; - -/** - * Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads - */ -class DevicePacketQueue { - /** - * Holds the packet and related stuff to keep in the queue - */ - private static final class Item { - NetworkPacket packet; - /** - * Replacement ID: if positive, it can be replaced by later packets with the same ID - */ - final int replaceID; - Device.SendPacketStatusCallback callback; - - Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) { - this.packet = packet; - this.callback = callback; - this.replaceID = replaceID; - } - } - - private final ArrayDeque items = new ArrayDeque<>(); - private final Device mDevice; - private final Object lock = new Object(); - private boolean exit = false; - - DevicePacketQueue(Device device) { - this(device, true); - } - - DevicePacketQueue(Device device, Boolean startThread) { - mDevice = device; - if (startThread) { - ThreadHelper.execute(new SendingRunnable()); - } - } - - /** - * Send a packet (at some point in the future) - * @param packet The packet - * @param replaceID If positive, it will replace all older packets still in the queue - * @param callback The callback after sending the packet - */ - void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) { - synchronized (lock) { - if (exit) { - callback.onFailure(new Exception("Device disconnected!")); - } else { - boolean replaced = false; - - if (replaceID >= 0) { - for (Item item : items) { - if (item.replaceID == replaceID) { - item.packet = packet; - item.callback = callback; - replaced = true; - break; - } - } - } - - if (!replaced) { - items.addLast(new Item(packet, replaceID, callback)); - lock.notify(); - } - } - } - } - - /** - * Check if we still have an unsent packet in the queue with the given ID. - * If so, remove it from the queue and return it - * @param replaceID The replace ID (must be positive) - * @return The found packet, or null - */ - NetworkPacket getAndRemoveUnsentPacket(int replaceID) { - synchronized (lock) { - final Optional itemOptional = items.stream() - .filter(item -> item.replaceID == replaceID).findFirst(); - if (itemOptional.isPresent()) { - final Item item = itemOptional.get(); - items.remove(item); - return item.packet; - } - } - return null; - } - - void disconnected() { - synchronized (lock) { - exit = true; - lock.notifyAll(); - } - } - - private final class SendingRunnable implements Runnable { - @Override - public void run() { - while (true) { - Item item; - synchronized (lock) { - while (items.isEmpty() && !exit) { - try { - lock.wait(); - } catch (InterruptedException ignored) { - } - } - if (exit) { - Log.i("DevicePacketQueue", "Terminating sending loop"); - break; - } - - item = items.removeFirst(); - } - - mDevice.sendPacketBlocking(item.packet, item.callback); - } - - while (!items.isEmpty()) { - Item item = items.removeFirst(); - item.callback.onFailure(new Exception("Device disconnected!")); - } - } - } - -} diff --git a/src/org/kde/kdeconnect/DevicePacketQueue.kt b/src/org/kde/kdeconnect/DevicePacketQueue.kt new file mode 100644 index 00000000..4365734e --- /dev/null +++ b/src/org/kde/kdeconnect/DevicePacketQueue.kt @@ -0,0 +1,131 @@ +/* + * SPDX-FileCopyrightText: 2019 Matthijs Tijink + * + * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL +*/ +package org.kde.kdeconnect + +import kotlinx.coroutines.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.collections.ArrayDeque + +/** + * Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads + */ +class DevicePacketQueue @JvmOverloads constructor(private val device: Device, startRunning: Boolean = true) { + /** + * Holds the packet and related stuff to keep in the queue + */ + private class Item( + var packet: NetworkPacket, + /** + * Replacement ID: if positive, it can be replaced by later packets with the same ID + */ + val replaceID: Int, + var callback: Device.SendPacketStatusCallback + ) + + private val scope = CoroutineScope(Dispatchers.IO) + private val sendingJob = SupervisorJob() + private val loopJob = Job() + + private val items = ArrayDeque() + private val mutex = Mutex() + + init { + if (startRunning) { + scope.launch(loopJob) { + sending() + } + } + } + + fun addPacketSync(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) { + runBlocking { + addPacket(packet, replaceID, callback) + } + } + + /** + * Send a packet (at some point in the future) + * @param packet The packet + * @param replaceID If positive, it will replace all older packets still in the queue + * @param callback The callback after sending the packet + */ + suspend fun addPacket(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) { + if (sendingJob.isCancelled) { + callback.onFailure(Exception("Device disconnected!")) + } else { + mutex.withLock { + var replaced = false + if (replaceID >= 0) { + items.forEach { + if (it.replaceID == replaceID) { + it.packet = packet + it.callback = callback + replaced = true + } + } + } + + if (!replaced) { + items.addLast(Item(packet, replaceID, callback)) + } + } + } + } + + fun getAndRemoveUnsentPacketSync(replaceID: Int): NetworkPacket? { + return runBlocking { + getAndRemoveUnsentPacket(replaceID) + } + } + /** + * Check if we still have an unsent packet in the queue with the given ID. + * If so, remove it from the queue and return it + * @param replaceID The replace ID (must be positive) + * @return The found packet, or null + */ + suspend fun getAndRemoveUnsentPacket(replaceID: Int): NetworkPacket? { + mutex.withLock { + val itemOptional = items.stream() + .filter { item: Item -> item.replaceID == replaceID }.findFirst() + if (itemOptional.isPresent) { + val item = itemOptional.get() + items.remove(item) + return item.packet + } + } + return null + } + + fun disconnected() { + sendingJob.cancel() + } + + private suspend fun sending() { + while (true) { + val item = mutex.withLock { + if (items.isEmpty()) { + null + } else { + items.removeFirst() + } + } + + if (item == null) { + yield() + continue + } + + if (sendingJob.isCancelled) { + item.callback.onFailure(Exception("Device disconnected!")) + } else { + scope.launch(sendingJob) { + device.sendPacketBlocking(item.packet, item.callback) + } + } + } + } +} diff --git a/tests/org/kde/kdeconnect/DevicePacketQueueTest.java b/tests/org/kde/kdeconnect/DevicePacketQueueTest.java index 63a0b2f4..df4eaba7 100644 --- a/tests/org/kde/kdeconnect/DevicePacketQueueTest.java +++ b/tests/org/kde/kdeconnect/DevicePacketQueueTest.java @@ -13,11 +13,11 @@ public class DevicePacketQueueTest { DevicePacketQueue queue = new DevicePacketQueue(device, false); - queue.addPacket(new NetworkPacket("Test"), 0, callback); - queue.addPacket(new NetworkPacket("Test1"), 1, callback); + queue.addPacketSync(new NetworkPacket("Test"), 0, callback); + queue.addPacketSync(new NetworkPacket("Test1"), 1, callback); - assertNotNull(queue.getAndRemoveUnsentPacket(0)); - assertNotNull(queue.getAndRemoveUnsentPacket(1)); + assertNotNull(queue.getAndRemoveUnsentPacketSync(0)); + assertNotNull(queue.getAndRemoveUnsentPacketSync(1)); } @Test @@ -27,11 +27,11 @@ public class DevicePacketQueueTest { DevicePacketQueue queue = new DevicePacketQueue(device, false); - queue.addPacket(new NetworkPacket("Test"), -1, callback); - queue.addPacket(new NetworkPacket("Test1"), -1, callback); + queue.addPacketSync(new NetworkPacket("Test"), -1, callback); + queue.addPacketSync(new NetworkPacket("Test1"), -1, callback); - assertNotNull(queue.getAndRemoveUnsentPacket(-1)); - assertNotNull(queue.getAndRemoveUnsentPacket(-1)); + assertNotNull(queue.getAndRemoveUnsentPacketSync(-1)); + assertNotNull(queue.getAndRemoveUnsentPacketSync(-1)); } @Test @@ -41,10 +41,10 @@ public class DevicePacketQueueTest { DevicePacketQueue queue = new DevicePacketQueue(device, false); - queue.addPacket(new NetworkPacket("Test"), 1, callback); - queue.addPacket(new NetworkPacket("Test1"), 1, callback); + queue.addPacketSync(new NetworkPacket("Test"), 1, callback); + queue.addPacketSync(new NetworkPacket("Test1"), 1, callback); - assertNotNull(queue.getAndRemoveUnsentPacket(1)); - assertNull(queue.getAndRemoveUnsentPacket(1)); + assertNotNull(queue.getAndRemoveUnsentPacketSync(1)); + assertNull(queue.getAndRemoveUnsentPacketSync(1)); } } \ No newline at end of file