diff --git a/src/org/kde/kdeconnect/Device.java b/src/org/kde/kdeconnect/Device.java index b01089bd..7a796675 100644 --- a/src/org/kde/kdeconnect/Device.java +++ b/src/org/kde/kdeconnect/Device.java @@ -480,8 +480,7 @@ public class Device implements BaseLink.PacketReceiver { if (packetQueue == null) { callback.onFailure(new Exception("Device disconnected!")); } else { - // TODO: Migrate to coroutine version (addPacket) - packetQueue.addPacketSync(np, replaceID, callback); + packetQueue.addPacket(np, replaceID, callback); } } @@ -495,8 +494,7 @@ public class Device implements BaseLink.PacketReceiver { if (packetQueue == null) { return null; } else { - // TODO: Migrate to coroutine version (getAndRemoveUnsentPacket) - return packetQueue.getAndRemoveUnsentPacketSync(replaceID); + return packetQueue.getAndRemoveUnsentPacket(replaceID); } } diff --git a/src/org/kde/kdeconnect/DevicePacketQueue.java b/src/org/kde/kdeconnect/DevicePacketQueue.java new file mode 100644 index 00000000..2aead660 --- /dev/null +++ b/src/org/kde/kdeconnect/DevicePacketQueue.java @@ -0,0 +1,142 @@ +/* + * SPDX-FileCopyrightText: 2019 Matthijs Tijink + * + * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL +*/ + +package org.kde.kdeconnect; + +import android.util.Log; + +import org.kde.kdeconnect.Helpers.ThreadHelper; + +import java.util.ArrayDeque; +import java.util.Optional; + +/** + * Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads + */ +class DevicePacketQueue { + /** + * Holds the packet and related stuff to keep in the queue + */ + private static final class Item { + NetworkPacket packet; + /** + * Replacement ID: if positive, it can be replaced by later packets with the same ID + */ + final int replaceID; + Device.SendPacketStatusCallback callback; + + Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) { + this.packet = packet; + this.callback = callback; + this.replaceID = replaceID; + } + } + + private final ArrayDeque items = new ArrayDeque<>(); + private final Device mDevice; + private final Object lock = new Object(); + private boolean exit = false; + + DevicePacketQueue(Device device) { + this(device, true); + } + + DevicePacketQueue(Device device, Boolean startThread) { + mDevice = device; + if (startThread) { + ThreadHelper.execute(new SendingRunnable()); + } + } + + /** + * Send a packet (at some point in the future) + * @param packet The packet + * @param replaceID If positive, it will replace all older packets still in the queue + * @param callback The callback after sending the packet + */ + void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) { + synchronized (lock) { + if (exit) { + callback.onFailure(new Exception("Device disconnected!")); + } else { + boolean replaced = false; + + if (replaceID >= 0) { + for (Item item : items) { + if (item.replaceID == replaceID) { + item.packet = packet; + item.callback = callback; + replaced = true; + break; + } + } + } + + if (!replaced) { + items.addLast(new Item(packet, replaceID, callback)); + lock.notify(); + } + } + } + } + + /** + * Check if we still have an unsent packet in the queue with the given ID. + * If so, remove it from the queue and return it + * @param replaceID The replace ID (must be positive) + * @return The found packet, or null + */ + NetworkPacket getAndRemoveUnsentPacket(int replaceID) { + synchronized (lock) { + final Optional itemOptional = items.stream() + .filter(item -> item.replaceID == replaceID).findFirst(); + if (itemOptional.isPresent()) { + final Item item = itemOptional.get(); + items.remove(item); + return item.packet; + } + } + return null; + } + + void disconnected() { + synchronized (lock) { + exit = true; + lock.notifyAll(); + } + } + + private final class SendingRunnable implements Runnable { + @Override + public void run() { + while (true) { + Item item; + synchronized (lock) { + while (items.isEmpty() && !exit) { + try { + lock.wait(); + } catch (InterruptedException ignored) { + } + } + if (exit) { + Log.i("DevicePacketQueue", "Terminating sending loop"); + break; + } + + item = items.removeFirst(); + } + + mDevice.sendPacketBlocking(item.packet, item.callback); + } + + while (!items.isEmpty()) { + Item item = items.removeFirst(); + item.callback.onFailure(new Exception("Device disconnected!")); + } + } + } + +} diff --git a/src/org/kde/kdeconnect/DevicePacketQueue.kt b/src/org/kde/kdeconnect/DevicePacketQueue.kt deleted file mode 100644 index 4365734e..00000000 --- a/src/org/kde/kdeconnect/DevicePacketQueue.kt +++ /dev/null @@ -1,131 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2019 Matthijs Tijink - * - * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL -*/ -package org.kde.kdeconnect - -import kotlinx.coroutines.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock -import kotlin.collections.ArrayDeque - -/** - * Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads - */ -class DevicePacketQueue @JvmOverloads constructor(private val device: Device, startRunning: Boolean = true) { - /** - * Holds the packet and related stuff to keep in the queue - */ - private class Item( - var packet: NetworkPacket, - /** - * Replacement ID: if positive, it can be replaced by later packets with the same ID - */ - val replaceID: Int, - var callback: Device.SendPacketStatusCallback - ) - - private val scope = CoroutineScope(Dispatchers.IO) - private val sendingJob = SupervisorJob() - private val loopJob = Job() - - private val items = ArrayDeque() - private val mutex = Mutex() - - init { - if (startRunning) { - scope.launch(loopJob) { - sending() - } - } - } - - fun addPacketSync(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) { - runBlocking { - addPacket(packet, replaceID, callback) - } - } - - /** - * Send a packet (at some point in the future) - * @param packet The packet - * @param replaceID If positive, it will replace all older packets still in the queue - * @param callback The callback after sending the packet - */ - suspend fun addPacket(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) { - if (sendingJob.isCancelled) { - callback.onFailure(Exception("Device disconnected!")) - } else { - mutex.withLock { - var replaced = false - if (replaceID >= 0) { - items.forEach { - if (it.replaceID == replaceID) { - it.packet = packet - it.callback = callback - replaced = true - } - } - } - - if (!replaced) { - items.addLast(Item(packet, replaceID, callback)) - } - } - } - } - - fun getAndRemoveUnsentPacketSync(replaceID: Int): NetworkPacket? { - return runBlocking { - getAndRemoveUnsentPacket(replaceID) - } - } - /** - * Check if we still have an unsent packet in the queue with the given ID. - * If so, remove it from the queue and return it - * @param replaceID The replace ID (must be positive) - * @return The found packet, or null - */ - suspend fun getAndRemoveUnsentPacket(replaceID: Int): NetworkPacket? { - mutex.withLock { - val itemOptional = items.stream() - .filter { item: Item -> item.replaceID == replaceID }.findFirst() - if (itemOptional.isPresent) { - val item = itemOptional.get() - items.remove(item) - return item.packet - } - } - return null - } - - fun disconnected() { - sendingJob.cancel() - } - - private suspend fun sending() { - while (true) { - val item = mutex.withLock { - if (items.isEmpty()) { - null - } else { - items.removeFirst() - } - } - - if (item == null) { - yield() - continue - } - - if (sendingJob.isCancelled) { - item.callback.onFailure(Exception("Device disconnected!")) - } else { - scope.launch(sendingJob) { - device.sendPacketBlocking(item.packet, item.callback) - } - } - } - } -} diff --git a/tests/org/kde/kdeconnect/DevicePacketQueueTest.java b/tests/org/kde/kdeconnect/DevicePacketQueueTest.java index df4eaba7..63a0b2f4 100644 --- a/tests/org/kde/kdeconnect/DevicePacketQueueTest.java +++ b/tests/org/kde/kdeconnect/DevicePacketQueueTest.java @@ -13,11 +13,11 @@ public class DevicePacketQueueTest { DevicePacketQueue queue = new DevicePacketQueue(device, false); - queue.addPacketSync(new NetworkPacket("Test"), 0, callback); - queue.addPacketSync(new NetworkPacket("Test1"), 1, callback); + queue.addPacket(new NetworkPacket("Test"), 0, callback); + queue.addPacket(new NetworkPacket("Test1"), 1, callback); - assertNotNull(queue.getAndRemoveUnsentPacketSync(0)); - assertNotNull(queue.getAndRemoveUnsentPacketSync(1)); + assertNotNull(queue.getAndRemoveUnsentPacket(0)); + assertNotNull(queue.getAndRemoveUnsentPacket(1)); } @Test @@ -27,11 +27,11 @@ public class DevicePacketQueueTest { DevicePacketQueue queue = new DevicePacketQueue(device, false); - queue.addPacketSync(new NetworkPacket("Test"), -1, callback); - queue.addPacketSync(new NetworkPacket("Test1"), -1, callback); + queue.addPacket(new NetworkPacket("Test"), -1, callback); + queue.addPacket(new NetworkPacket("Test1"), -1, callback); - assertNotNull(queue.getAndRemoveUnsentPacketSync(-1)); - assertNotNull(queue.getAndRemoveUnsentPacketSync(-1)); + assertNotNull(queue.getAndRemoveUnsentPacket(-1)); + assertNotNull(queue.getAndRemoveUnsentPacket(-1)); } @Test @@ -41,10 +41,10 @@ public class DevicePacketQueueTest { DevicePacketQueue queue = new DevicePacketQueue(device, false); - queue.addPacketSync(new NetworkPacket("Test"), 1, callback); - queue.addPacketSync(new NetworkPacket("Test1"), 1, callback); + queue.addPacket(new NetworkPacket("Test"), 1, callback); + queue.addPacket(new NetworkPacket("Test1"), 1, callback); - assertNotNull(queue.getAndRemoveUnsentPacketSync(1)); - assertNull(queue.getAndRemoveUnsentPacketSync(1)); + assertNotNull(queue.getAndRemoveUnsentPacket(1)); + assertNull(queue.getAndRemoveUnsentPacket(1)); } } \ No newline at end of file