2
0
mirror of https://github.com/KDE/kdeconnect-android synced 2025-08-30 13:47:41 +00:00

Migrate BackgroundJob(Handler) to Kotlin

This commit is contained in:
TPJ Schikhof
2024-09-10 08:35:09 +00:00
committed by Philip Cohn-Cort
parent 283956c107
commit 45da75f331
7 changed files with 203 additions and 235 deletions

View File

@@ -88,7 +88,7 @@ public class CompositeReceiveFileJob extends BackgroundJob<Device, Void> {
} }
private Device getDevice() { private Device getDevice() {
return requestInfo; return getRequestInfo();
} }
boolean isRunning() { return isRunning; } boolean isRunning() { return isRunning; }
@@ -131,7 +131,7 @@ public class CompositeReceiveFileJob extends BackgroundJob<Device, Void> {
isRunning = true; isRunning = true;
while (!done && !canceled) { while (!done && !isCancelled()) {
synchronized (lock) { synchronized (lock) {
currentNetworkPacket = networkPacketList.get(0); currentNetworkPacket = networkPacketList.get(0);
} }
@@ -153,7 +153,7 @@ public class CompositeReceiveFileJob extends BackgroundJob<Device, Void> {
if ( received != currentNetworkPacket.getPayloadSize()) { if ( received != currentNetworkPacket.getPayloadSize()) {
fileDocument.delete(); fileDocument.delete();
if (!canceled) { if (!isCancelled()) {
throw new RuntimeException("Failed to receive: " + currentFileName + " received:" + received + " bytes, expected: " + currentNetworkPacket.getPayloadSize() + " bytes"); throw new RuntimeException("Failed to receive: " + currentFileName + " received:" + received + " bytes, expected: " + currentNetworkPacket.getPayloadSize() + " bytes");
} }
} else { } else {
@@ -184,7 +184,7 @@ public class CompositeReceiveFileJob extends BackgroundJob<Device, Void> {
listIsEmpty = networkPacketList.isEmpty(); listIsEmpty = networkPacketList.isEmpty();
} }
if (listIsEmpty && !canceled) { if (listIsEmpty && !isCancelled()) {
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException ignored) {} } catch (InterruptedException ignored) {}
@@ -203,7 +203,7 @@ public class CompositeReceiveFileJob extends BackgroundJob<Device, Void> {
isRunning = false; isRunning = false;
if (canceled) { if (isCancelled()) {
receiveNotification.cancel(); receiveNotification.cancel();
return; return;
} }
@@ -290,7 +290,7 @@ public class CompositeReceiveFileJob extends BackgroundJob<Device, Void> {
int count; int count;
long received = 0; long received = 0;
while ((count = input.read(data)) >= 0 && !canceled) { while ((count = input.read(data)) >= 0 && !isCancelled()) {
received += count; received += count;
totalReceived += count; totalReceived += count;

View File

@@ -79,7 +79,7 @@ public class CompositeUploadFileJob extends BackgroundJob<Device, Void> {
sendPacketStatusCallback = new SendPacketStatusCallback(); sendPacketStatusCallback = new SendPacketStatusCallback();
} }
private Device getDevice() { return requestInfo; } private Device getDevice() { return getRequestInfo(); }
@Override @Override
public void run() { public void run() {
@@ -92,7 +92,7 @@ public class CompositeUploadFileJob extends BackgroundJob<Device, Void> {
} }
try { try {
while (!done && !canceled) { while (!done && !isCancelled()) {
synchronized (lock) { synchronized (lock) {
currentNetworkPacket = networkPacketList.remove(0); currentNetworkPacket = networkPacketList.remove(0);
} }
@@ -115,7 +115,7 @@ public class CompositeUploadFileJob extends BackgroundJob<Device, Void> {
} }
} }
if (canceled) { if (isCancelled()) {
uploadNotification.cancel(); uploadNotification.cancel();
} else { } else {
uploadNotification.setFinished(getDevice().getContext().getResources().getQuantityString(R.plurals.sent_files_title, currentFileNum, getDevice().getName(), currentFileNum)); uploadNotification.setFinished(getDevice().getContext().getResources().getQuantityString(R.plurals.sent_files_title, currentFileNum, getDevice().getName(), currentFileNum));

View File

@@ -81,7 +81,7 @@ public class SharePlugin extends Plugin {
private SharedPreferences mSharedPrefs; private SharedPreferences mSharedPrefs;
public SharePlugin() { public SharePlugin() {
backgroundJobHandler = BackgroundJobHandler.newFixedThreadPoolBackgroundJobHander(5); backgroundJobHandler = BackgroundJobHandler.newFixedThreadPoolBackgroundJobHandler(5);
handler = new Handler(Looper.getMainLooper()); handler = new Handler(Looper.getMainLooper());
receiveFileJobCallback = new Callback(); receiveFileJobCallback = new Callback();
} }

View File

@@ -1,62 +0,0 @@
/*
* SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect.async;
import androidx.annotation.NonNull;
import java.util.concurrent.atomic.AtomicLong;
public abstract class BackgroundJob<I, R> implements Runnable {
private static final AtomicLong atomicLong = new AtomicLong(0);
protected volatile boolean canceled;
private BackgroundJobHandler backgroundJobHandler;
private final long id;
protected final I requestInfo;
private final Callback<R> callback;
public BackgroundJob(I requestInfo, Callback<R> callback) {
this.id = atomicLong.incrementAndGet();
this.requestInfo = requestInfo;
this.callback = callback;
}
void setBackgroundJobHandler(BackgroundJobHandler handler) {
this.backgroundJobHandler = handler;
}
public long getId() { return id; }
public I getRequestInfo() { return requestInfo; }
public void cancel() {
canceled = true;
backgroundJobHandler.cancelJob(this);
}
public boolean isCancelled() {
return canceled;
}
public interface Callback<R> {
void onResult(@NonNull BackgroundJob job, R result);
void onError(@NonNull BackgroundJob job, @NonNull Throwable error);
}
protected void reportResult(R result) {
backgroundJobHandler.runOnUiThread(() -> {
callback.onResult(this, result);
backgroundJobHandler.onFinished(this);
});
}
protected void reportError(@NonNull Throwable error) {
backgroundJobHandler.runOnUiThread(() -> {
callback.onError(this, error);
backgroundJobHandler.onFinished(this);
});
}
}

View File

@@ -0,0 +1,60 @@
/*
* SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect.async
import java.util.concurrent.atomic.AtomicLong
import kotlin.concurrent.Volatile
abstract class BackgroundJob<I, R> : Runnable {
private val callback: Callback<R>
val requestInfo: I
val id: Long
constructor(requestInfo: I, callback: Callback<R>) {
this.callback = callback
this.requestInfo = requestInfo
this.id = idIncrementer.incrementAndGet()
}
@Volatile
var isCancelled: Boolean = false
protected set
private var backgroundJobHandler: BackgroundJobHandler? = null
/** Used by the job handler to register itself as the handler */
fun setBackgroundJobHandler(handler: BackgroundJobHandler) {
this.backgroundJobHandler = handler
}
open fun cancel() {
isCancelled = true
backgroundJobHandler!!.cancelJob(this)
}
interface Callback<R> {
fun onResult(job: BackgroundJob<*, *>, result: R)
fun onError(job: BackgroundJob<*, *>, error: Throwable)
}
protected fun reportResult(result: R) {
backgroundJobHandler!!.runOnUiThread {
callback.onResult(this, result)
backgroundJobHandler!!.onFinished(this)
}
}
fun reportError(error: Throwable) {
backgroundJobHandler!!.runOnUiThread {
callback.onError(this, error)
backgroundJobHandler!!.onFinished(this)
}
}
companion object {
private val idIncrementer = AtomicLong(0)
}
}

View File

@@ -1,163 +0,0 @@
/*
* SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect.async;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import androidx.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Scheduler for {@link BackgroundJob} objects.
* <p>
* We use an internal {@link ThreadPoolExecutor} to catch Exceptions and
* pass them along to {@link #handleUncaughtException(Future, Throwable)}.
* </p>
*/
public class BackgroundJobHandler {
private static final String TAG = BackgroundJobHandler.class.getSimpleName();
private final Map<BackgroundJob, Future<?>> jobMap = new HashMap<>();
private final Object jobMapLock = new Object();
private class MyThreadPoolExecutor extends ThreadPoolExecutor {
MyThreadPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (!(r instanceof Future)) {
return;
}
Future<?> future = (Future<?>) r;
if (t == null) {
try {
future.get();
} catch (CancellationException ce) {
Log.d(TAG,"afterExecute got a CancellationException");
} catch (ExecutionException ee) {
t = ee;
} catch (InterruptedException ie) {
Log.d(TAG, "afterExecute got an InterruptedException");
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
BackgroundJobHandler.this.handleUncaughtException(future, t);
}
}
}
private final ThreadPoolExecutor threadPoolExecutor;
private final Handler handler;
private BackgroundJobHandler(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this.handler = new Handler(Looper.getMainLooper());
this.threadPoolExecutor = new MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
}
public void runJob(BackgroundJob bgJob) {
Future<?> f;
bgJob.setBackgroundJobHandler(this);
try {
synchronized (jobMapLock) {
f = threadPoolExecutor.submit(bgJob);
jobMap.put(bgJob, f);
}
} catch (RejectedExecutionException e) {
Log.d(TAG,"threadPoolExecutor.submit rejected a background job: " + e.getMessage());
bgJob.reportError(e);
}
}
public boolean isRunning(long jobId) {
synchronized (jobMapLock) {
for (BackgroundJob job : jobMap.keySet()) {
if (job.getId() == jobId) {
return true;
}
}
}
return false;
}
@Nullable
public BackgroundJob getJob(long jobId) {
synchronized (jobMapLock) {
for (BackgroundJob job : jobMap.keySet()) {
if (job.getId() == jobId) {
return job;
}
}
}
return null;
}
void cancelJob(BackgroundJob job) {
synchronized (jobMapLock) {
if (jobMap.containsKey(job)) {
Future<?> f = jobMap.get(job);
if (f.cancel(true)) {
threadPoolExecutor.purge();
}
jobMap.remove(job);
}
}
}
private void handleUncaughtException(Future<?> ft, Throwable t) {
synchronized (jobMapLock) {
for (Map.Entry<BackgroundJob, Future<?>> pairs : jobMap.entrySet()) {
Future<?> future = pairs.getValue();
if (future == ft) {
pairs.getKey().reportError(t);
break;
}
}
}
}
void onFinished(BackgroundJob job) {
synchronized (jobMapLock) {
jobMap.remove(job);
}
}
void runOnUiThread(Runnable runnable) {
handler.post(runnable);
}
public static BackgroundJobHandler newFixedThreadPoolBackgroundJobHander(int numThreads) {
return new BackgroundJobHandler(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
}

View File

@@ -0,0 +1,133 @@
/*
* SPDX-FileCopyrightText: 2018 Erik Duisters <e.duisters1@gmail.com>
*
* SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
*/
package org.kde.kdeconnect.async
import android.os.Handler
import android.os.Looper
import android.util.Log
import java.util.concurrent.BlockingQueue
import java.util.concurrent.CancellationException
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
/**
* Scheduler for [BackgroundJob] objects.
*
* We use an internal [ThreadPoolExecutor] to catch Exceptions and pass them along to [.handleUncaughtException].
*
* Might be able to be replaced with coroutines later
*/
class BackgroundJobHandler {
private constructor(corePoolSize: Int, maxPoolSize: Int, keepAliveTime: Long, unit: TimeUnit, workQueue: BlockingQueue<Runnable>) {
this.threadPoolExecutor = MyThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue)
this.handler = Handler(Looper.getMainLooper())
}
private val jobMap: MutableMap<BackgroundJob<*, *>, Future<*>> = HashMap()
private val jobMapLock: Any = Any()
private inner class MyThreadPoolExecutor(corePoolSize: Int, maxPoolSize: Int, keepAliveTime: Long, unit: TimeUnit, workQueue: BlockingQueue<Runnable>) : ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue) {
override fun afterExecute(runnable: Runnable, throwable: Throwable?) {
super.afterExecute(runnable, throwable)
if (runnable !is Future<*>) {
return
}
val future = runnable as Future<*>
var t: Throwable? = throwable
if (t == null) {
try {
future.get()
}
catch (ce: CancellationException) {
Log.d(LOG_TAG, "afterExecute got a CancellationException")
}
catch (ee: ExecutionException) {
t = ee
}
catch (ie: InterruptedException) {
Log.d(LOG_TAG, "afterExecute got an InterruptedException")
Thread.currentThread().interrupt() // ignore / reset
}
}
if (t != null) {
this@BackgroundJobHandler.handleUncaughtException(future, t)
}
}
}
private val threadPoolExecutor: ThreadPoolExecutor
private val handler: Handler
fun runJob(bgJob: BackgroundJob<*, *>) {
bgJob.setBackgroundJobHandler(this)
try {
synchronized(jobMapLock) {
val future: Future<*> = threadPoolExecutor.submit(bgJob)
jobMap.put(bgJob, future)
}
}
catch (e: RejectedExecutionException) {
Log.d(LOG_TAG, "threadPoolExecutor.submit rejected a background job: ${e.message}")
bgJob.reportError(e)
}
}
fun isRunning(jobId: Long): Boolean {
synchronized(jobMapLock) {
return jobMap.keys.any { it.id == jobId }
}
}
fun getJob(jobId: Long): BackgroundJob<*, *>? {
synchronized(jobMapLock) {
return jobMap.keys.find { it.id == jobId }
}
}
fun cancelJob(job: BackgroundJob<*, *>) {
synchronized(jobMapLock) {
if (jobMap.containsKey(job)) {
if (jobMap[job]!!.cancel(true)) {
threadPoolExecutor.purge()
}
jobMap.remove(job)
}
}
}
private fun handleUncaughtException(ft: Future<*>, t: Throwable) {
synchronized(jobMapLock) {
jobMap.entries.find { it.value == ft }?.key?.reportError(t)
}
}
fun onFinished(job: BackgroundJob<*, *>) {
synchronized(jobMapLock) {
jobMap.remove(job)
}
}
fun runOnUiThread(runnable: Runnable) {
handler.post(runnable)
}
companion object {
private val LOG_TAG: String = BackgroundJobHandler::class.java.simpleName
@JvmStatic
fun newFixedThreadPoolBackgroundJobHandler(numThreads: Int): BackgroundJobHandler {
return BackgroundJobHandler(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue())
}
}
}