From 45da75f331a1137a59fed1504e3f490ce32e5902 Mon Sep 17 00:00:00 2001 From: TPJ Schikhof Date: Tue, 10 Sep 2024 08:35:09 +0000 Subject: [PATCH] Migrate BackgroundJob(Handler) to Kotlin --- .../SharePlugin/CompositeReceiveFileJob.java | 12 +- .../SharePlugin/CompositeUploadFileJob.java | 6 +- .../Plugins/SharePlugin/SharePlugin.java | 2 +- .../kde/kdeconnect/async/BackgroundJob.java | 62 ------- src/org/kde/kdeconnect/async/BackgroundJob.kt | 60 +++++++ .../async/BackgroundJobHandler.java | 163 ------------------ .../kdeconnect/async/BackgroundJobHandler.kt | 133 ++++++++++++++ 7 files changed, 203 insertions(+), 235 deletions(-) delete mode 100644 src/org/kde/kdeconnect/async/BackgroundJob.java create mode 100644 src/org/kde/kdeconnect/async/BackgroundJob.kt delete mode 100644 src/org/kde/kdeconnect/async/BackgroundJobHandler.java create mode 100644 src/org/kde/kdeconnect/async/BackgroundJobHandler.kt diff --git a/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeReceiveFileJob.java b/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeReceiveFileJob.java index 974723f1..a314b0ee 100644 --- a/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeReceiveFileJob.java +++ b/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeReceiveFileJob.java @@ -88,7 +88,7 @@ public class CompositeReceiveFileJob extends BackgroundJob { } private Device getDevice() { - return requestInfo; + return getRequestInfo(); } boolean isRunning() { return isRunning; } @@ -131,7 +131,7 @@ public class CompositeReceiveFileJob extends BackgroundJob { isRunning = true; - while (!done && !canceled) { + while (!done && !isCancelled()) { synchronized (lock) { currentNetworkPacket = networkPacketList.get(0); } @@ -153,7 +153,7 @@ public class CompositeReceiveFileJob extends BackgroundJob { if ( received != currentNetworkPacket.getPayloadSize()) { fileDocument.delete(); - if (!canceled) { + if (!isCancelled()) { throw new RuntimeException("Failed to receive: " + currentFileName + " received:" + received + " bytes, expected: " + currentNetworkPacket.getPayloadSize() + " bytes"); } } else { @@ -184,7 +184,7 @@ public class CompositeReceiveFileJob extends BackgroundJob { listIsEmpty = networkPacketList.isEmpty(); } - if (listIsEmpty && !canceled) { + if (listIsEmpty && !isCancelled()) { try { Thread.sleep(1000); } catch (InterruptedException ignored) {} @@ -203,7 +203,7 @@ public class CompositeReceiveFileJob extends BackgroundJob { isRunning = false; - if (canceled) { + if (isCancelled()) { receiveNotification.cancel(); return; } @@ -290,7 +290,7 @@ public class CompositeReceiveFileJob extends BackgroundJob { int count; long received = 0; - while ((count = input.read(data)) >= 0 && !canceled) { + while ((count = input.read(data)) >= 0 && !isCancelled()) { received += count; totalReceived += count; diff --git a/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java b/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java index 15572a9f..87ad74de 100644 --- a/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java +++ b/src/org/kde/kdeconnect/Plugins/SharePlugin/CompositeUploadFileJob.java @@ -79,7 +79,7 @@ public class CompositeUploadFileJob extends BackgroundJob { sendPacketStatusCallback = new SendPacketStatusCallback(); } - private Device getDevice() { return requestInfo; } + private Device getDevice() { return getRequestInfo(); } @Override public void run() { @@ -92,7 +92,7 @@ public class CompositeUploadFileJob extends BackgroundJob { } try { - while (!done && !canceled) { + while (!done && !isCancelled()) { synchronized (lock) { currentNetworkPacket = networkPacketList.remove(0); } @@ -115,7 +115,7 @@ public class CompositeUploadFileJob extends BackgroundJob { } } - if (canceled) { + if (isCancelled()) { uploadNotification.cancel(); } else { uploadNotification.setFinished(getDevice().getContext().getResources().getQuantityString(R.plurals.sent_files_title, currentFileNum, getDevice().getName(), currentFileNum)); diff --git a/src/org/kde/kdeconnect/Plugins/SharePlugin/SharePlugin.java b/src/org/kde/kdeconnect/Plugins/SharePlugin/SharePlugin.java index a064979e..47de0773 100644 --- a/src/org/kde/kdeconnect/Plugins/SharePlugin/SharePlugin.java +++ b/src/org/kde/kdeconnect/Plugins/SharePlugin/SharePlugin.java @@ -81,7 +81,7 @@ public class SharePlugin extends Plugin { private SharedPreferences mSharedPrefs; public SharePlugin() { - backgroundJobHandler = BackgroundJobHandler.newFixedThreadPoolBackgroundJobHander(5); + backgroundJobHandler = BackgroundJobHandler.newFixedThreadPoolBackgroundJobHandler(5); handler = new Handler(Looper.getMainLooper()); receiveFileJobCallback = new Callback(); } diff --git a/src/org/kde/kdeconnect/async/BackgroundJob.java b/src/org/kde/kdeconnect/async/BackgroundJob.java deleted file mode 100644 index c5482f99..00000000 --- a/src/org/kde/kdeconnect/async/BackgroundJob.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2018 Erik Duisters - * - * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL - */ - -package org.kde.kdeconnect.async; - -import androidx.annotation.NonNull; - -import java.util.concurrent.atomic.AtomicLong; - -public abstract class BackgroundJob implements Runnable { - private static final AtomicLong atomicLong = new AtomicLong(0); - protected volatile boolean canceled; - private BackgroundJobHandler backgroundJobHandler; - private final long id; - - protected final I requestInfo; - private final Callback callback; - - public BackgroundJob(I requestInfo, Callback callback) { - this.id = atomicLong.incrementAndGet(); - this.requestInfo = requestInfo; - this.callback = callback; - } - - void setBackgroundJobHandler(BackgroundJobHandler handler) { - this.backgroundJobHandler = handler; - } - - public long getId() { return id; } - public I getRequestInfo() { return requestInfo; } - - public void cancel() { - canceled = true; - backgroundJobHandler.cancelJob(this); - } - - public boolean isCancelled() { - return canceled; - } - - public interface Callback { - void onResult(@NonNull BackgroundJob job, R result); - void onError(@NonNull BackgroundJob job, @NonNull Throwable error); - } - - protected void reportResult(R result) { - backgroundJobHandler.runOnUiThread(() -> { - callback.onResult(this, result); - backgroundJobHandler.onFinished(this); - }); - } - - protected void reportError(@NonNull Throwable error) { - backgroundJobHandler.runOnUiThread(() -> { - callback.onError(this, error); - backgroundJobHandler.onFinished(this); - }); - } -} diff --git a/src/org/kde/kdeconnect/async/BackgroundJob.kt b/src/org/kde/kdeconnect/async/BackgroundJob.kt new file mode 100644 index 00000000..87db409c --- /dev/null +++ b/src/org/kde/kdeconnect/async/BackgroundJob.kt @@ -0,0 +1,60 @@ +/* + * SPDX-FileCopyrightText: 2018 Erik Duisters + * + * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL + */ +package org.kde.kdeconnect.async + +import java.util.concurrent.atomic.AtomicLong +import kotlin.concurrent.Volatile + +abstract class BackgroundJob : Runnable { + private val callback: Callback + val requestInfo: I + val id: Long + + constructor(requestInfo: I, callback: Callback) { + this.callback = callback + this.requestInfo = requestInfo + this.id = idIncrementer.incrementAndGet() + } + + @Volatile + var isCancelled: Boolean = false + protected set + + private var backgroundJobHandler: BackgroundJobHandler? = null + + /** Used by the job handler to register itself as the handler */ + fun setBackgroundJobHandler(handler: BackgroundJobHandler) { + this.backgroundJobHandler = handler + } + + open fun cancel() { + isCancelled = true + backgroundJobHandler!!.cancelJob(this) + } + + interface Callback { + fun onResult(job: BackgroundJob<*, *>, result: R) + fun onError(job: BackgroundJob<*, *>, error: Throwable) + } + + protected fun reportResult(result: R) { + backgroundJobHandler!!.runOnUiThread { + callback.onResult(this, result) + backgroundJobHandler!!.onFinished(this) + } + } + + fun reportError(error: Throwable) { + backgroundJobHandler!!.runOnUiThread { + callback.onError(this, error) + backgroundJobHandler!!.onFinished(this) + } + } + + companion object { + private val idIncrementer = AtomicLong(0) + } +} diff --git a/src/org/kde/kdeconnect/async/BackgroundJobHandler.java b/src/org/kde/kdeconnect/async/BackgroundJobHandler.java deleted file mode 100644 index 8f0c3191..00000000 --- a/src/org/kde/kdeconnect/async/BackgroundJobHandler.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2018 Erik Duisters - * - * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL - */ - -package org.kde.kdeconnect.async; - -import android.os.Handler; -import android.os.Looper; -import android.util.Log; - -import androidx.annotation.Nullable; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Scheduler for {@link BackgroundJob} objects. - *

- * We use an internal {@link ThreadPoolExecutor} to catch Exceptions and - * pass them along to {@link #handleUncaughtException(Future, Throwable)}. - *

- */ -public class BackgroundJobHandler { - private static final String TAG = BackgroundJobHandler.class.getSimpleName(); - - private final Map> jobMap = new HashMap<>(); - private final Object jobMapLock = new Object(); - - private class MyThreadPoolExecutor extends ThreadPoolExecutor { - MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { - super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); - } - - @Override - protected void afterExecute(Runnable r, Throwable t) { - super.afterExecute(r, t); - - if (!(r instanceof Future)) { - return; - } - - Future future = (Future) r; - - if (t == null) { - try { - future.get(); - } catch (CancellationException ce) { - Log.d(TAG,"afterExecute got a CancellationException"); - } catch (ExecutionException ee) { - t = ee; - } catch (InterruptedException ie) { - Log.d(TAG, "afterExecute got an InterruptedException"); - Thread.currentThread().interrupt(); // ignore/reset - } - } - - if (t != null) { - BackgroundJobHandler.this.handleUncaughtException(future, t); - } - } - } - - private final ThreadPoolExecutor threadPoolExecutor; - private final Handler handler; - - private BackgroundJobHandler(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { - this.handler = new Handler(Looper.getMainLooper()); - this.threadPoolExecutor = new MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue); - } - - public void runJob(BackgroundJob bgJob) { - Future f; - - bgJob.setBackgroundJobHandler(this); - - try { - synchronized (jobMapLock) { - f = threadPoolExecutor.submit(bgJob); - jobMap.put(bgJob, f); - } - } catch (RejectedExecutionException e) { - Log.d(TAG,"threadPoolExecutor.submit rejected a background job: " + e.getMessage()); - - bgJob.reportError(e); - } - } - - public boolean isRunning(long jobId) { - synchronized (jobMapLock) { - for (BackgroundJob job : jobMap.keySet()) { - if (job.getId() == jobId) { - return true; - } - } - } - - return false; - } - - @Nullable - public BackgroundJob getJob(long jobId) { - synchronized (jobMapLock) { - for (BackgroundJob job : jobMap.keySet()) { - if (job.getId() == jobId) { - return job; - } - } - } - - return null; - } - - void cancelJob(BackgroundJob job) { - synchronized (jobMapLock) { - if (jobMap.containsKey(job)) { - Future f = jobMap.get(job); - - if (f.cancel(true)) { - threadPoolExecutor.purge(); - } - - jobMap.remove(job); - } - } - } - - private void handleUncaughtException(Future ft, Throwable t) { - synchronized (jobMapLock) { - for (Map.Entry> pairs : jobMap.entrySet()) { - Future future = pairs.getValue(); - - if (future == ft) { - pairs.getKey().reportError(t); - break; - } - } - } - } - - void onFinished(BackgroundJob job) { - synchronized (jobMapLock) { - jobMap.remove(job); - } - } - - void runOnUiThread(Runnable runnable) { - handler.post(runnable); - } - - public static BackgroundJobHandler newFixedThreadPoolBackgroundJobHander(int numThreads) { - return new BackgroundJobHandler(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); - } -} diff --git a/src/org/kde/kdeconnect/async/BackgroundJobHandler.kt b/src/org/kde/kdeconnect/async/BackgroundJobHandler.kt new file mode 100644 index 00000000..75e7b88c --- /dev/null +++ b/src/org/kde/kdeconnect/async/BackgroundJobHandler.kt @@ -0,0 +1,133 @@ +/* + * SPDX-FileCopyrightText: 2018 Erik Duisters + * + * SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL + */ +package org.kde.kdeconnect.async + +import android.os.Handler +import android.os.Looper +import android.util.Log +import java.util.concurrent.BlockingQueue +import java.util.concurrent.CancellationException +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +/** + * Scheduler for [BackgroundJob] objects. + * + * We use an internal [ThreadPoolExecutor] to catch Exceptions and pass them along to [.handleUncaughtException]. + * + * Might be able to be replaced with coroutines later + */ +class BackgroundJobHandler { + private constructor(corePoolSize: Int, maxPoolSize: Int, keepAliveTime: Long, unit: TimeUnit, workQueue: BlockingQueue) { + this.threadPoolExecutor = MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue) + this.handler = Handler(Looper.getMainLooper()) + } + + private val jobMap: MutableMap, Future<*>> = HashMap() + private val jobMapLock: Any = Any() + + private inner class MyThreadPoolExecutor(corePoolSize: Int, maxPoolSize: Int, keepAliveTime: Long, unit: TimeUnit, workQueue: BlockingQueue) : ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue) { + override fun afterExecute(runnable: Runnable, throwable: Throwable?) { + super.afterExecute(runnable, throwable) + + if (runnable !is Future<*>) { + return + } + + val future = runnable as Future<*> + + var t: Throwable? = throwable + if (t == null) { + try { + future.get() + } + catch (ce: CancellationException) { + Log.d(LOG_TAG, "afterExecute got a CancellationException") + } + catch (ee: ExecutionException) { + t = ee + } + catch (ie: InterruptedException) { + Log.d(LOG_TAG, "afterExecute got an InterruptedException") + Thread.currentThread().interrupt() // ignore / reset + } + } + + if (t != null) { + this@BackgroundJobHandler.handleUncaughtException(future, t) + } + } + } + + private val threadPoolExecutor: ThreadPoolExecutor + private val handler: Handler + + fun runJob(bgJob: BackgroundJob<*, *>) { + bgJob.setBackgroundJobHandler(this) + try { + synchronized(jobMapLock) { + val future: Future<*> = threadPoolExecutor.submit(bgJob) + jobMap.put(bgJob, future) + } + } + catch (e: RejectedExecutionException) { + Log.d(LOG_TAG, "threadPoolExecutor.submit rejected a background job: ${e.message}") + bgJob.reportError(e) + } + } + + fun isRunning(jobId: Long): Boolean { + synchronized(jobMapLock) { + return jobMap.keys.any { it.id == jobId } + } + } + + fun getJob(jobId: Long): BackgroundJob<*, *>? { + synchronized(jobMapLock) { + return jobMap.keys.find { it.id == jobId } + } + } + + fun cancelJob(job: BackgroundJob<*, *>) { + synchronized(jobMapLock) { + if (jobMap.containsKey(job)) { + if (jobMap[job]!!.cancel(true)) { + threadPoolExecutor.purge() + } + jobMap.remove(job) + } + } + } + + private fun handleUncaughtException(ft: Future<*>, t: Throwable) { + synchronized(jobMapLock) { + jobMap.entries.find { it.value == ft }?.key?.reportError(t) + } + } + + fun onFinished(job: BackgroundJob<*, *>) { + synchronized(jobMapLock) { + jobMap.remove(job) + } + } + + fun runOnUiThread(runnable: Runnable) { + handler.post(runnable) + } + + companion object { + private val LOG_TAG: String = BackgroundJobHandler::class.java.simpleName + + @JvmStatic + fun newFixedThreadPoolBackgroundJobHandler(numThreads: Int): BackgroundJobHandler { + return BackgroundJobHandler(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue()) + } + } +}