renamed ThreadID; changed to new ThreadPool interface (#87110#)

This commit is contained in:
Kay Ramme
2001-05-17 11:55:06 +00:00
parent 18d2636e5f
commit ee19c132d5
7 changed files with 1095 additions and 1144 deletions

View File

@@ -2,9 +2,9 @@
*
* $RCSfile: JavaThreadPool.java,v $
*
* $Revision: 1.7 $
* $Revision: 1.8 $
*
* last change: $Author: kr $ $Date: 2001-05-04 11:56:03 $
* last change: $Author: kr $ $Date: 2001-05-17 12:55:05 $
*
* The Contents of this file are made available subject to the terms of
* either of the following licenses
@@ -62,7 +62,6 @@
package com.sun.star.lib.uno.environments.remote;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.Hashtable;
@@ -72,7 +71,7 @@ import com.sun.star.uno.UnoRuntime;
/**
* This class implements a java thread pool.
* <p>
* @version $Revision: 1.7 $ $ $Date: 2001-05-04 11:56:03 $
* @version $Revision: 1.8 $ $ $Date: 2001-05-17 12:55:05 $
* @author Kay Ramme
* @see com.sun.star.uno.UnoRuntime
* @see com.sun.star.lib.uno.environments.remote.ThreadPool
@@ -85,299 +84,84 @@ public class JavaThreadPool implements IThreadPool {
/**
* When set to true, enables various debugging output.
*/
public static final boolean DEBUG = false;
private static final boolean DEBUG = false;
JavaThreadPoolFactory _javaThreadPoolFactory;
protected Hashtable _jobQueues = new Hashtable();
protected Hashtable _disposeIds = new Hashtable();
protected boolean _disposed = false;
// public JavaThreadPool() {
// new Thread() {
// public void run() {
// try {
// while(true) {
// list();
// Thread.sleep(5000);
// }
// }
// catch(InterruptedException interruptedException) {
// System.err.println("lister interrupted:" + interruptedException);
// }
// }
// }.start();
// }
/**
* For debugging, lists the jobqueues
*/
synchronized void list() {
Enumeration elements = _jobQueues.elements();
System.err.println("##### ThreadPool.list:");
while(elements.hasMoreElements()) {
System.err.println(" - " + elements.nextElement());
}
JavaThreadPool(JavaThreadPoolFactory javaThreadPoolFactory) {
_javaThreadPoolFactory = javaThreadPoolFactory;
}
/**
* Gets the <code>ThreadID</code> of the given thread.
* <p>
* @return the thread id
* @param thread the thread
* @see com.sun.star.lib.uno.environments.remote.ThreadID
*/
static public ThreadID getThreadId(Thread thread) {
ThreadID threadId = null;
public void attach() {
ThreadId threadId = _javaThreadPoolFactory.getThreadId();
if(thread instanceof JobQueue.JobDispatcher)
threadId = ((JobQueue.JobDispatcher)thread).getThreadId();
else {
try {
threadId = new ThreadID(UnoRuntime.generateOid(thread).getBytes("UTF8"));
}
catch(UnsupportedEncodingException unsupportedEncodingException) {
throw new com.sun.star.uno.RuntimeException("JavaThreadPool.getThreadId - unexpected: " + unsupportedEncodingException.toString());
}
}
if(DEBUG) System.err.println("##### " + getClass().getName() + ".attach - id:" + threadId);
if(DEBUG) System.err.println("##### ThreadPool.getThreadId:" + threadId);
// we don't have to synchronize here
// cause the thread can attach itself
// not concurrently
JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(threadId);
if(jobQueue == null)
jobQueue = new JobQueue(_javaThreadPoolFactory, threadId, false);
return threadId;
}
/**
* Gets the <code>ThreadID</code> of this thread.
* Implements the method of <code>IThreadPool</code>
* <p>
* @return the thread id
* @see com.sun.star.lib.uno.environments.remote.IThreadPool#getThreadId
*/
public ThreadID getThreadId() {
if(_disposed) throw new RuntimeException("ThreadPool.getThreadId - is disposed");
return getThreadId(Thread.currentThread());
}
public void removeJobQueue(ThreadID threadId) {
_jobQueues.remove(threadId);
_disposeIds.remove(threadId);
}
public void addJobQueue(ThreadID threadId, JobQueue jobQueue/*, Object disposeId*/) {
if(_disposed) throw new RuntimeException("ThreadPool.addThread(" + threadId + ") - is disposed");
if(DEBUG) System.err.println("##### ThreadPool.addThread:" + threadId);
_jobQueues.put(threadId, jobQueue);
// _disposeIds.put(threadId, disposeId);
}
/**
* Adds a <code>JobQueue</code> for the given thread under the given <code>ThreadID</code>
* with the given disposeId.
* <p>
* @param createWorkerThread create a JobQueue with or without worker thread
* @param threadId the thread id to use
* @param disposeId the dispose id
*/
public JobQueue addThread(boolean createWorkerThread, ThreadID threadId, Object disposeId, JobQueue syncQueue) {
if(_disposed) throw new RuntimeException("ThreadPool.addThread(" + threadId + ") - is disposed");
if(DEBUG) System.err.println("##### ThreadPool.addThread:" + threadId);
JobQueue jobQueue = null;
synchronized(this) {
jobQueue = (JobQueue)_jobQueues.get(threadId);
if(jobQueue == null) {
if(syncQueue != null)
jobQueue = new JobQueue(this, threadId, syncQueue);
else
jobQueue = new JobQueue(this, threadId, createWorkerThread);
if(disposeId != null)
_disposeIds.put(threadId, disposeId);
}
}
// acquiring the jobQueue registers it at the ThreadPoolFactory
jobQueue.acquire();
return jobQueue;
}
/**
* Adds a jobQueue for the current thread to the threadpool.
* Requests are now put into this queue.
* Implements the method of <code>IThreadPool</code>
* <p>
* @param disposeId the dipose id with which the thread can be interrupted while staying in the queue
* @see #enter
* @see com.sun.star.lib.uno.environments.remote.IThreadPool#addThread
*/
public void addThread(Object disposeId) {
if(_disposed) throw new RuntimeException("ThreadPool.addThread - is disposed");
public void detach() {
ThreadId threadId = _javaThreadPoolFactory.getThreadId();
addThread(false, getThreadId(Thread.currentThread()), disposeId, null);
}
/**
* Gives the <code>JobQueue</code> for the given threadId.
* <p>
* @return the job queue
* @param threadId the thread id
* @see com.sun.star.lib.uno.environments.remote.ThreadID
* @see com.sun.star.lib.uno.environments.remote.JobQueue
*/
public JobQueue getJobQueue(ThreadID threadId) {
return (JobQueue)_jobQueues.get(threadId);
}
/**
* Removes the <code>JobQueue</code> for the given threadId.
* <p>
* @param threadId the thread id
* @see com.sun.star.lib.uno.environments.remote.ThreadID
* @see com.sun.star.lib.uno.environments.remote.JobQueue
*/
public void removeThread(ThreadID threadId) {
if(_disposed) throw new RuntimeException("ThreadPool.removeThread - is disposed");
if(DEBUG) System.err.println("##### ThreadPool.removeThread:" + threadId);
JobQueue jobQueue = (JobQueue)_jobQueues.get(threadId);
if(jobQueue != null)
jobQueue.release();
}
/**
* Removes the jobQueue for the current thread.
* Implements the method of <code>IThreadPool</code>
* <p>
* @see com.sun.star.lib.uno.environments.remote.IThreadPool#removeThread
*/
public void removeThread() {
if(_disposed) throw new RuntimeException("ThreadPool.removeQueue - is disposed");
removeThread(getThreadId());
}
/**
* Queues a job into the jobQueue of the thread belonging to the jobs threadId.
* Implements the method of <code>IThreadPool</code>
* <p>
* @param job the job
* @param disposeId the dispose id
* @see com.sun.star.lib.uno.environments.remote.IThreadPool#putJob
*/
private Object _syncPutJob = new Object();
public void putJob(Job job, Object disposeId) {
if(_disposed) throw new RuntimeException("ThreadPool.putJob - is disposed");
JobQueue jobQueue = null;
ThreadID threadId = job.getThreadId();
if(DEBUG) System.err.println("#### ThreadPool.putJob:" + threadId + " " + job + " " + _jobQueues);
synchronized(_syncPutJob) {
jobQueue = (JobQueue)_jobQueues.get(threadId);
if(jobQueue == null) {
if(job.getOperation() == null) // a reply? and no thread for it?
throw new RuntimeException(getClass().getName() + ".putJob - no thread for reply " + threadId);
jobQueue = new JobQueue(this, threadId, true);
}
jobQueue.putJob(job, disposeId);
}
}
/**
* Enters the <code>ThreadPool</code> under the given thread id.
* Waits for a reply job or an exception.
* <p>
* @result the result of final reply
* @param threadId the thread id to use
*/
public Object enter(int waitTime, ThreadID threadId) throws Throwable {
if(_disposed) throw new RuntimeException("ThreadPool.enter - is disposed");
JobQueue jobQueue = (JobQueue)_jobQueues.get(threadId);
Object object = null;
try {
object = jobQueue.enter(waitTime, _disposeIds.get(threadId));
}
finally {
removeThread(threadId);
}
return object;
}
/**
* Lets the current thread enter the ThreadPool.
* The thread then dispatches all jobs and leaves
* the ThreadPool when it gets a reply job.
* Implements the method of <code>IThreadPool</code>
* <p>
* @see com.sun.star.lib.uno.environments.remote.IThreadPool#enter
*/
public Object enter(int waitTime) throws Throwable {
return enter(waitTime, getThreadId());
JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(threadId);
// releasing the jobQueue deregisters it from the ThreadPoolFactory
jobQueue.release();
}
public Object enter() throws Throwable {
return enter(0);
ThreadId threadId = _javaThreadPoolFactory.getThreadId();
JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(threadId);
return jobQueue.enter(this);
}
/**
* Interrupts all threads which have associated the dispose id.
* Implements the method of <code>IThreadPool</code>
* <p>
* @param disposeId the dispose id
* @see com.sun.star.lib.uno.environments.remote.IThreadPool#dispose
*/
public void dispose(Object disposeId) {
if(DEBUG) System.err.println("##### " + getClass().getName() + ".dispose:" + disposeId);
// clear all jobqueues
/*synchronized(_jobQueues)*/ {
Enumeration elements = _jobQueues.elements();
while(elements.hasMoreElements()) {
JobQueue jobQueue = (JobQueue)elements.nextElement();
jobQueue.interrupt(disposeId);
public void putJob(Job job) {
if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob:" + job.isSynchron() + " " + job.getThreadId());
if(job.isSynchron() || job.getOperation() == null) { // note: replys must be synchron
JobQueue jobQueue = _javaThreadPoolFactory.getJobQueue(job.getThreadId());
// this has not be synchronized, cause
// sync jobs can only come over one bridge
// (cause the thread blocks on other side)
if(jobQueue == null)
jobQueue = new JobQueue(_javaThreadPoolFactory, job.getThreadId(), true);
// put job acquires the queue and registers it at the ThreadPoolFactory
jobQueue.putJob(job, this);
}
else {
// this has to be synchronized, cause
// async jobs of the same thread can come
// over different bridges
synchronized(_javaThreadPoolFactory) {
JobQueue async_jobQueue = _javaThreadPoolFactory.getAsyncJobQueue(job.getThreadId());
// ensure there is jobQueue
if(async_jobQueue == null) // so, there is really no async queue
async_jobQueue = new JobQueue(_javaThreadPoolFactory, job.getThreadId());
// put job acquires the queue and registers it at the ThreadPoolFactory
async_jobQueue.putJob(job, this);
}
}
}
/**
* Stops interrupting all jobs queued by the given bridge.
* Implements the method of <IThreadPool>.
*/
public synchronized void stopDispose(Object disposeId) {
public void dispose(Throwable throwable) {
if(DEBUG) System.err.println("##### " + getClass().getName() + ".dispose:" + throwable);
_javaThreadPoolFactory.dispose(this, throwable);
}
synchronized void dispose() {
if(_disposed) throw new RuntimeException("ThreadPool.dispose - is disposed");
_disposed = true;
if(_jobQueues.size() > 0)
System.err.println("Warning! ThreadPool.dipose - there are active JobQueus:" + _jobQueues.size());
// clear all jobqueues
Enumeration elements = _jobQueues.elements();
while(elements.hasMoreElements())
((JobQueue)elements.nextElement()).clear();
_jobQueues.clear();
_jobQueues.notifyAll();
_jobQueues = null;
public void destroy() {
}
}