2
0
mirror of https://github.com/KDE/kdeconnect-android synced 2025-08-22 01:51:47 +00:00

refactor: migrate DevicePacketQueue to Kotlin Coroutine

This commit is contained in:
ShellWen Chen 2024-04-14 23:04:55 +00:00 committed by Philip Cohn-Cort
parent 0de545773d
commit 295dc7b42a
4 changed files with 147 additions and 156 deletions

View File

@ -474,7 +474,8 @@ public class Device implements BaseLink.PacketReceiver {
if (packetQueue == null) {
callback.onFailure(new Exception("Device disconnected!"));
} else {
packetQueue.addPacket(np, replaceID, callback);
// TODO: Migrate to coroutine version (addPacket)
packetQueue.addPacketSync(np, replaceID, callback);
}
}
@ -488,7 +489,8 @@ public class Device implements BaseLink.PacketReceiver {
if (packetQueue == null) {
return null;
} else {
return packetQueue.getAndRemoveUnsentPacket(replaceID);
// TODO: Migrate to coroutine version (getAndRemoveUnsentPacket)
return packetQueue.getAndRemoveUnsentPacketSync(replaceID);
}
}

View File

@ -1,142 +0,0 @@
/*
* SPDX-FileCopyrightText: 2019 Matthijs Tijink <matthijstijink@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect;
import android.util.Log;
import org.kde.kdeconnect.Helpers.ThreadHelper;
import java.util.ArrayDeque;
import java.util.Optional;
/**
* Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads
*/
class DevicePacketQueue {
/**
* Holds the packet and related stuff to keep in the queue
*/
private static final class Item {
NetworkPacket packet;
/**
* Replacement ID: if positive, it can be replaced by later packets with the same ID
*/
final int replaceID;
Device.SendPacketStatusCallback callback;
Item(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
this.packet = packet;
this.callback = callback;
this.replaceID = replaceID;
}
}
private final ArrayDeque<Item> items = new ArrayDeque<>();
private final Device mDevice;
private final Object lock = new Object();
private boolean exit = false;
DevicePacketQueue(Device device) {
this(device, true);
}
DevicePacketQueue(Device device, Boolean startThread) {
mDevice = device;
if (startThread) {
ThreadHelper.execute(new SendingRunnable());
}
}
/**
* Send a packet (at some point in the future)
* @param packet The packet
* @param replaceID If positive, it will replace all older packets still in the queue
* @param callback The callback after sending the packet
*/
void addPacket(NetworkPacket packet, int replaceID, Device.SendPacketStatusCallback callback) {
synchronized (lock) {
if (exit) {
callback.onFailure(new Exception("Device disconnected!"));
} else {
boolean replaced = false;
if (replaceID >= 0) {
for (Item item : items) {
if (item.replaceID == replaceID) {
item.packet = packet;
item.callback = callback;
replaced = true;
break;
}
}
}
if (!replaced) {
items.addLast(new Item(packet, replaceID, callback));
lock.notify();
}
}
}
}
/**
* Check if we still have an unsent packet in the queue with the given ID.
* If so, remove it from the queue and return it
* @param replaceID The replace ID (must be positive)
* @return The found packet, or null
*/
NetworkPacket getAndRemoveUnsentPacket(int replaceID) {
synchronized (lock) {
final Optional<Item> itemOptional = items.stream()
.filter(item -> item.replaceID == replaceID).findFirst();
if (itemOptional.isPresent()) {
final Item item = itemOptional.get();
items.remove(item);
return item.packet;
}
}
return null;
}
void disconnected() {
synchronized (lock) {
exit = true;
lock.notifyAll();
}
}
private final class SendingRunnable implements Runnable {
@Override
public void run() {
while (true) {
Item item;
synchronized (lock) {
while (items.isEmpty() && !exit) {
try {
lock.wait();
} catch (InterruptedException ignored) {
}
}
if (exit) {
Log.i("DevicePacketQueue", "Terminating sending loop");
break;
}
item = items.removeFirst();
}
mDevice.sendPacketBlocking(item.packet, item.callback);
}
while (!items.isEmpty()) {
Item item = items.removeFirst();
item.callback.onFailure(new Exception("Device disconnected!"));
}
}
}
}

View File

@ -0,0 +1,131 @@
/*
* SPDX-FileCopyrightText: 2019 Matthijs Tijink <matthijstijink@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.collections.ArrayDeque
/**
* Keeps a queue of packets to send to a device, to prevent either blocking or using lots of threads
*/
class DevicePacketQueue @JvmOverloads constructor(private val device: Device, startRunning: Boolean = true) {
/**
* Holds the packet and related stuff to keep in the queue
*/
private class Item(
var packet: NetworkPacket,
/**
* Replacement ID: if positive, it can be replaced by later packets with the same ID
*/
val replaceID: Int,
var callback: Device.SendPacketStatusCallback
)
private val scope = CoroutineScope(Dispatchers.IO)
private val sendingJob = SupervisorJob()
private val loopJob = Job()
private val items = ArrayDeque<Item>()
private val mutex = Mutex()
init {
if (startRunning) {
scope.launch(loopJob) {
sending()
}
}
}
fun addPacketSync(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) {
runBlocking {
addPacket(packet, replaceID, callback)
}
}
/**
* Send a packet (at some point in the future)
* @param packet The packet
* @param replaceID If positive, it will replace all older packets still in the queue
* @param callback The callback after sending the packet
*/
suspend fun addPacket(packet: NetworkPacket, replaceID: Int, callback: Device.SendPacketStatusCallback) {
if (sendingJob.isCancelled) {
callback.onFailure(Exception("Device disconnected!"))
} else {
mutex.withLock {
var replaced = false
if (replaceID >= 0) {
items.forEach {
if (it.replaceID == replaceID) {
it.packet = packet
it.callback = callback
replaced = true
}
}
}
if (!replaced) {
items.addLast(Item(packet, replaceID, callback))
}
}
}
}
fun getAndRemoveUnsentPacketSync(replaceID: Int): NetworkPacket? {
return runBlocking {
getAndRemoveUnsentPacket(replaceID)
}
}
/**
* Check if we still have an unsent packet in the queue with the given ID.
* If so, remove it from the queue and return it
* @param replaceID The replace ID (must be positive)
* @return The found packet, or null
*/
suspend fun getAndRemoveUnsentPacket(replaceID: Int): NetworkPacket? {
mutex.withLock {
val itemOptional = items.stream()
.filter { item: Item -> item.replaceID == replaceID }.findFirst()
if (itemOptional.isPresent) {
val item = itemOptional.get()
items.remove(item)
return item.packet
}
}
return null
}
fun disconnected() {
sendingJob.cancel()
}
private suspend fun sending() {
while (true) {
val item = mutex.withLock {
if (items.isEmpty()) {
null
} else {
items.removeFirst()
}
}
if (item == null) {
yield()
continue
}
if (sendingJob.isCancelled) {
item.callback.onFailure(Exception("Device disconnected!"))
} else {
scope.launch(sendingJob) {
device.sendPacketBlocking(item.packet, item.callback)
}
}
}
}
}

View File

@ -13,11 +13,11 @@ public class DevicePacketQueueTest {
DevicePacketQueue queue = new DevicePacketQueue(device, false);
queue.addPacket(new NetworkPacket("Test"), 0, callback);
queue.addPacket(new NetworkPacket("Test1"), 1, callback);
queue.addPacketSync(new NetworkPacket("Test"), 0, callback);
queue.addPacketSync(new NetworkPacket("Test1"), 1, callback);
assertNotNull(queue.getAndRemoveUnsentPacket(0));
assertNotNull(queue.getAndRemoveUnsentPacket(1));
assertNotNull(queue.getAndRemoveUnsentPacketSync(0));
assertNotNull(queue.getAndRemoveUnsentPacketSync(1));
}
@Test
@ -27,11 +27,11 @@ public class DevicePacketQueueTest {
DevicePacketQueue queue = new DevicePacketQueue(device, false);
queue.addPacket(new NetworkPacket("Test"), -1, callback);
queue.addPacket(new NetworkPacket("Test1"), -1, callback);
queue.addPacketSync(new NetworkPacket("Test"), -1, callback);
queue.addPacketSync(new NetworkPacket("Test1"), -1, callback);
assertNotNull(queue.getAndRemoveUnsentPacket(-1));
assertNotNull(queue.getAndRemoveUnsentPacket(-1));
assertNotNull(queue.getAndRemoveUnsentPacketSync(-1));
assertNotNull(queue.getAndRemoveUnsentPacketSync(-1));
}
@Test
@ -41,10 +41,10 @@ public class DevicePacketQueueTest {
DevicePacketQueue queue = new DevicePacketQueue(device, false);
queue.addPacket(new NetworkPacket("Test"), 1, callback);
queue.addPacket(new NetworkPacket("Test1"), 1, callback);
queue.addPacketSync(new NetworkPacket("Test"), 1, callback);
queue.addPacketSync(new NetworkPacket("Test1"), 1, callback);
assertNotNull(queue.getAndRemoveUnsentPacket(1));
assertNull(queue.getAndRemoveUnsentPacket(1));
assertNotNull(queue.getAndRemoveUnsentPacketSync(1));
assertNull(queue.getAndRemoveUnsentPacketSync(1));
}
}