diff --git a/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt b/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt index e61788c..724eba7 100644 --- a/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt +++ b/src/main/kotlin/io/dico/parcels2/WorldGenerator.kt @@ -1,6 +1,6 @@ package io.dico.parcels2 -import io.dico.parcels2.blockvisitor.JobData +import io.dico.parcels2.blockvisitor.Worker import io.dico.parcels2.blockvisitor.RegionTraversal import io.dico.parcels2.util.* import org.bukkit.* @@ -50,7 +50,7 @@ abstract class ParcelGenerator : ChunkGenerator(), ParcelProvider { abstract fun getBlocks(parcel: Parcel, yRange: IntRange = 0..255): Iterator - abstract fun clearParcel(parcel: Parcel): JobData + abstract fun clearParcel(parcel: Parcel): Worker } diff --git a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt index 08521fc..0eca6c9 100644 --- a/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt +++ b/src/main/kotlin/io/dico/parcels2/blockvisitor/WorktimeLimiter.kt @@ -1,26 +1,75 @@ package io.dico.parcels2.blockvisitor -import io.dico.parcels2.Options -import kotlinx.coroutines.experimental.CoroutineStart -import kotlinx.coroutines.experimental.Job -import kotlinx.coroutines.experimental.asCoroutineDispatcher -import kotlinx.coroutines.experimental.launch +import kotlinx.coroutines.experimental.* import org.bukkit.plugin.Plugin import org.bukkit.scheduler.BukkitTask +import java.lang.System.currentTimeMillis import java.util.* import java.util.concurrent.Executor import kotlin.coroutines.experimental.Continuation -import kotlin.coroutines.experimental.ContinuationInterceptor import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn -interface WorktimeLimiter { +typealias TimeLimitedTask = suspend WorkerScope.() -> Unit +typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit + +data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) + +sealed class WorktimeLimiter { /** - * Submit a task that should be run synchronously, but limited such that it does not stall the server + * Submit a [task] that should be run synchronously, but limited such that it does not stall the server * a bunch */ - fun submit(job: TimeLimitedTask): JobData + abstract fun submit(task: TimeLimitedTask): Worker + /** + * Get a list of all workers + */ + abstract val workers: List +} + +interface Timed { + /** + * The time that elapsed since this worker was dispatched, in milliseconds + */ + val elapsedTime: Long +} + +interface Worker : Timed { + /** + * The coroutine associated with this worker, if any + */ + val job: Job? + + /** + * true if this worker has completed + */ + val isComplete: Boolean + + /** + * A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0 + * with no guarantees to its accuracy. May be null. + */ + val progress: Double? + + /** + * Calls the given [block] whenever the progress of this worker is updated, + * if [minInterval] milliseconds expired since the last call. + * The first call occurs after at least [minDelay] milliseconds in a likewise manner. + * Repeated invocations of this method result in an [IllegalStateException] + * + * if [asCompletionListener] is true, [onCompleted] is called with the same [block] + */ + fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: WorkerUpdateLister): Worker + + /** + * Calls the given [block] when this worker completes, with the progress value 1.0. + * Repeated invocations of this method result in an [IllegalStateException] + */ + fun onCompleted(block: WorkerUpdateLister): Worker +} + +interface WorkerScope : Timed { /** * A task should call this frequently during its execution, such that the timer can suspend it when necessary. */ @@ -32,198 +81,155 @@ interface WorktimeLimiter { fun setProgress(progress: Double) } -typealias TimeLimitedTask = suspend WorktimeLimiter.() -> Unit - -interface JobData { +private interface WorkerContinuation : Worker, WorkerScope { /** - * The coroutine associated with this task, if any + * Start or resume the execution of this worker + * returns true if the worker completed */ - val job: Job? - - /** - * The time that elapsed since this task was dispatched, in milliseconds - */ - val elapsedTime: Long - - /** - * true if this task has completed - */ - val isComplete: Boolean - - /** - * A value indicating the progress of this task, in the range 0.0 <= progress <= 1.0 - * with no guarantees to its accuracy. May be null. - */ - val progress: Double? - - /** - * Calls the given [block] whenever the progress is updated, - * if [minInterval] milliseconds expired since the last call. - * The first call occurs after at least [minDelay] milliseconds in a likewise manner. - * Repeated invocations of this method result in an [IllegalStateException] - * - * if [asCompletionListener] is true, [onCompleted] is called with the same [block] - */ - fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: JobUpdateListener): JobData - - /** - * Calls the given [block] when this job completes, with the progress value 1.0. - * Repeated invocations of this method result in an [IllegalStateException] - */ - fun onCompleted(block: JobUpdateListener): JobData + fun resume(worktime: Long): Boolean } -typealias JobUpdateListener = JobData.(Double, Long) -> Unit +/** + * An object that controls one or more jobs, ensuring that they don't stall the server too much. + * There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick + * This object attempts to split that maximum amount of milliseconds equally between all jobs + */ +class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeOptions) : WorktimeLimiter() { + // Coroutine dispatcher for jobs + private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher() + // The currently registered bukkit scheduler task + private var bukkitTask: BukkitTask? = null + // The workers. + private var _workers = LinkedList() + override val workers: List = _workers -class JobDataImpl(val task: TimeLimitedTask) : JobData { + override fun submit(task: TimeLimitedTask): Worker { + val worker: WorkerContinuation = WorkerImpl(dispatcher, task) + _workers.addFirst(worker) + if (bukkitTask == null) bukkitTask = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong()) + return worker + } - override var job: Job? = null - set(value) { - field?.let { throw IllegalStateException() } - field = value!! - startTimeOrElapsedTime = System.currentTimeMillis() - value.invokeOnCompletion { - startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime - onCompletedBlock?.invoke(this, 1.0, elapsedTime) + private fun tickJobs() { + val workers = _workers + if (workers.isEmpty()) return + val tickStartTime = System.currentTimeMillis() + + val iterator = workers.listIterator(index = 0) + while (iterator.hasNext()) { + val time = System.currentTimeMillis() + val timeElapsed = time - tickStartTime + val timeLeft = options.workTime - timeElapsed + if (timeLeft <= 0) return + + val count = iterator.nextIndex() + val timePerJob = (timeLeft + count - 1) / count + val worker = iterator.next() + val completed = worker.resume(timePerJob) + if (completed) { + iterator.remove() } } - // when running: startTime, else: total elapsed time - private var startTimeOrElapsedTime: Long = 0L - override val elapsedTime get() = job?.let { if (it.isCompleted) startTimeOrElapsedTime else System.currentTimeMillis() - startTimeOrElapsedTime } ?: 0L - - var next: Continuation? = null - - override var progress: Double? = null - set(value) { - field = value - doProgressUpdate() - } - - private fun doProgressUpdate() { - val progressUpdate = progressUpdateBlock ?: return - val time = System.currentTimeMillis() - if (time > lastUpdateTime + progressUpdateInterval) { - progressUpdate(progress!!, elapsedTime) - lastUpdateTime = time + if (workers.isEmpty()) { + bukkitTask?.cancel() + bukkitTask = null } } - private var progressUpdateBlock: JobUpdateListener? = null +} + +private class WorkerImpl(val dispatcher: CoroutineDispatcher, + val task: TimeLimitedTask) : WorkerContinuation { + override var job: Job? = null; private set + + override val elapsedTime + get() = job?.let { + if (it.isCompleted) startTimeOrElapsedTime + else currentTimeMillis() - startTimeOrElapsedTime + } ?: 0L + + override val isComplete get() = job?.isCompleted == true + + override var progress: Double? = null; private set + + private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise + private var onProgressUpdate: WorkerUpdateLister? = null private var progressUpdateInterval: Int = 0 private var lastUpdateTime: Long = 0L - override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: JobUpdateListener): JobDataImpl { - progressUpdateBlock?.let { throw IllegalStateException() } - progressUpdateBlock = block + private var onCompleted: WorkerUpdateLister? = null + private var continuation: Continuation? = null + private var nextSuspensionTime: Long = 0L + + private fun initJob(job: Job) { + this.job?.let { throw IllegalStateException() } + this.job = job + startTimeOrElapsedTime = System.currentTimeMillis() + job.invokeOnCompletion { + // convert to elapsed time here + startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime + onCompleted?.let { it(1.0, elapsedTime) } + } + } + + override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker { + onProgressUpdate?.let { throw IllegalStateException() } + onProgressUpdate = block progressUpdateInterval = minInterval lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval if (asCompletionListener) onCompleted(block) return this } - override val isComplete get() = job?.isCompleted == true - private var onCompletedBlock: JobUpdateListener? = null - override fun onCompleted(block: JobUpdateListener): JobDataImpl { - onCompletedBlock?.let { throw IllegalStateException() } - onCompletedBlock = block + override fun onCompleted(block: WorkerUpdateLister): Worker { + onCompleted?.let { throw IllegalStateException() } + onCompleted = block return this } -} - -/** - * An object that controls one or more jobs, ensuring that they don't stall the server too much. - * The amount of milliseconds that can accumulate each server tick is configurable - */ -class TickWorktimeLimiter(private val plugin: Plugin, private val optionsRoot: Options) : WorktimeLimiter { - // Coroutine dispatcher for jobs - private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher() - // union of Continuation and suspend WorktimeLimited.() -> Unit - private var jobs = LinkedList() - // The currently registered bukkit scheduler task - private var task: BukkitTask? = null - // The data associated with the task that is currently being executed - private var curJobData: JobDataImpl? = null - // Used to keep track of when the current task should end - private var curJobEndTime = 0L - // Tick work time options - private inline val options get() = optionsRoot.tickWorktime - - override fun submit(job: TimeLimitedTask): JobData { - val jobData = JobDataImpl(job) - jobs.addFirst(jobData) - if (task == null) task = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong()) - return jobData - } - override suspend fun markSuspensionPoint() { - if (System.currentTimeMillis() >= curJobEndTime) - suspendCoroutineUninterceptedOrReturn(::scheduleContinuation) + if (System.currentTimeMillis() >= nextSuspensionTime) + suspendCoroutineUninterceptedOrReturn { cont: Continuation -> + continuation = cont + COROUTINE_SUSPENDED + } } override fun setProgress(progress: Double) { - curJobData!!.progress = progress - } - - private fun tickJobs() { - if (jobs.isEmpty()) return - val tickStartTime = System.currentTimeMillis() - val jobs = this.jobs; this.jobs = LinkedList() - - var count = jobs.size - - while (!jobs.isEmpty()) { - val job = jobs.poll() - val time = System.currentTimeMillis() - val timeElapsed = time - tickStartTime - val timeLeft = options.workTime - timeElapsed - - if (timeLeft <= 0) { - this.jobs.addAll(0, jobs) - return - } - - val timePerJob = (timeLeft + count - 1) / count - tickJob(job, time + timePerJob) - count-- - } - - if (jobs.isEmpty() && this.jobs.isEmpty()) { - task?.cancel() - task = null + this.progress = progress + val onProgressUpdate = onProgressUpdate ?: return + val time = System.currentTimeMillis() + if (time > lastUpdateTime + progressUpdateInterval) { + onProgressUpdate(progress, elapsedTime) + lastUpdateTime = time } } - @Suppress("UNCHECKED_CAST") - private fun tickJob(job: JobDataImpl, endTime: Long) { - curJobData = job - curJobEndTime = endTime - try { - val next = job.next - if (next == null) startJob(job) - else next.resume(Unit) - } - finally { - curJobData = null - curJobEndTime = 0L - } - } + override fun resume(worktime: Long): Boolean { + nextSuspensionTime = currentTimeMillis() + worktime - private fun startJob(job: JobDataImpl) { - job.job = launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { job.task(this@TickWorktimeLimiter) } - } + continuation?.let { + continuation = null + it.resume(Unit) + return continuation == null + } - private fun scheduleContinuation(continuation: Continuation): Any? { - curJobData!!.next = continuation - jobs.addLast(curJobData) - return COROUTINE_SUSPENDED + job?.let { + nextSuspensionTime = 0L + throw IllegalStateException() + } + + launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { + initJob(job = kotlin.coroutines.experimental.coroutineContext[Job]!!) + task() + } + + return continuation == null } } -data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) - - +/* /** * While the implementation of [kotlin.coroutines.experimental.intrinsics.intercepted] is intrinsic, it should look something like this * We don't care for intercepting the coroutine as we want it to resume immediately when we call resume(). @@ -231,4 +237,5 @@ data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) */ private fun Continuation.interceptedImpl(): Continuation { return context[ContinuationInterceptor]?.interceptContinuation(this) ?: this -} \ No newline at end of file +} + */ diff --git a/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt b/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt index 2de847c..c3f8836 100644 --- a/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt +++ b/src/main/kotlin/io/dico/parcels2/command/CommandsGeneral.kt @@ -7,13 +7,11 @@ import io.dico.dicore.command.annotation.Desc import io.dico.dicore.command.annotation.RequireParameters import io.dico.parcels2.ParcelOwner import io.dico.parcels2.ParcelsPlugin -import io.dico.parcels2.blockvisitor.JobUpdateListener import io.dico.parcels2.command.NamedParcelDefaultValue.FIRST_OWNED import io.dico.parcels2.storage.getParcelBySerializedValue import io.dico.parcels2.util.hasAdminManage import io.dico.parcels2.util.hasParcelHomeOthers import io.dico.parcels2.util.uuid -import kotlinx.coroutines.experimental.Job import org.bukkit.entity.Player //@Suppress("unused") @@ -84,11 +82,8 @@ class CommandsGeneral(plugin: ParcelsPlugin) : AbstractParcelCommands(plugin) { @Cmd("clear") @ParcelRequire(owner = true) fun ParcelScope.cmdClear(player: Player, context: ExecutionContext) { - val onProgressUpdate: JobUpdateListener = { progress, elapsedTime -> - context.sendMessage("[Clearing] Progress: %.06f%%".format(progress * 100)) - } world.generator.clearParcel(parcel) - .onProgressUpdate(5, 5) { progress, elapsedTime -> + .onProgressUpdate(1000, 1000) { progress, elapsedTime -> context.sendMessage(EMessageType.INFORMATIVE, "Clear progress: %.06f%%, %.2fs elapsed" .format(progress * 100, elapsedTime / 1000.0)) }