Archived
0

Improve WorktimeLimiter api

This commit is contained in:
Dico Karssiens
2018-07-30 13:35:45 +01:00
parent 72c82371b1
commit ee287253d6
3 changed files with 176 additions and 174 deletions

View File

@@ -1,6 +1,6 @@
package io.dico.parcels2 package io.dico.parcels2
import io.dico.parcels2.blockvisitor.JobData import io.dico.parcels2.blockvisitor.Worker
import io.dico.parcels2.blockvisitor.RegionTraversal import io.dico.parcels2.blockvisitor.RegionTraversal
import io.dico.parcels2.util.* import io.dico.parcels2.util.*
import org.bukkit.* import org.bukkit.*
@@ -50,7 +50,7 @@ abstract class ParcelGenerator : ChunkGenerator(), ParcelProvider {
abstract fun getBlocks(parcel: Parcel, yRange: IntRange = 0..255): Iterator<Block> abstract fun getBlocks(parcel: Parcel, yRange: IntRange = 0..255): Iterator<Block>
abstract fun clearParcel(parcel: Parcel): JobData abstract fun clearParcel(parcel: Parcel): Worker
} }

View File

@@ -1,26 +1,75 @@
package io.dico.parcels2.blockvisitor package io.dico.parcels2.blockvisitor
import io.dico.parcels2.Options import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.CoroutineStart
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.asCoroutineDispatcher
import kotlinx.coroutines.experimental.launch
import org.bukkit.plugin.Plugin import org.bukkit.plugin.Plugin
import org.bukkit.scheduler.BukkitTask import org.bukkit.scheduler.BukkitTask
import java.lang.System.currentTimeMillis
import java.util.* import java.util.*
import java.util.concurrent.Executor import java.util.concurrent.Executor
import kotlin.coroutines.experimental.Continuation import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.ContinuationInterceptor
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn import kotlin.coroutines.experimental.intrinsics.suspendCoroutineUninterceptedOrReturn
interface WorktimeLimiter { typealias TimeLimitedTask = suspend WorkerScope.() -> Unit
typealias WorkerUpdateLister = Worker.(Double, Long) -> Unit
data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
sealed class WorktimeLimiter {
/** /**
* Submit a task that should be run synchronously, but limited such that it does not stall the server * Submit a [task] that should be run synchronously, but limited such that it does not stall the server
* a bunch * a bunch
*/ */
fun submit(job: TimeLimitedTask): JobData abstract fun submit(task: TimeLimitedTask): Worker
/**
* Get a list of all workers
*/
abstract val workers: List<Worker>
}
interface Timed {
/**
* The time that elapsed since this worker was dispatched, in milliseconds
*/
val elapsedTime: Long
}
interface Worker : Timed {
/**
* The coroutine associated with this worker, if any
*/
val job: Job?
/**
* true if this worker has completed
*/
val isComplete: Boolean
/**
* A value indicating the progress of this worker, in the range 0.0 <= progress <= 1.0
* with no guarantees to its accuracy. May be null.
*/
val progress: Double?
/**
* Calls the given [block] whenever the progress of this worker is updated,
* if [minInterval] milliseconds expired since the last call.
* The first call occurs after at least [minDelay] milliseconds in a likewise manner.
* Repeated invocations of this method result in an [IllegalStateException]
*
* if [asCompletionListener] is true, [onCompleted] is called with the same [block]
*/
fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: WorkerUpdateLister): Worker
/**
* Calls the given [block] when this worker completes, with the progress value 1.0.
* Repeated invocations of this method result in an [IllegalStateException]
*/
fun onCompleted(block: WorkerUpdateLister): Worker
}
interface WorkerScope : Timed {
/** /**
* A task should call this frequently during its execution, such that the timer can suspend it when necessary. * A task should call this frequently during its execution, such that the timer can suspend it when necessary.
*/ */
@@ -32,198 +81,155 @@ interface WorktimeLimiter {
fun setProgress(progress: Double) fun setProgress(progress: Double)
} }
typealias TimeLimitedTask = suspend WorktimeLimiter.() -> Unit private interface WorkerContinuation : Worker, WorkerScope {
interface JobData {
/** /**
* The coroutine associated with this task, if any * Start or resume the execution of this worker
* returns true if the worker completed
*/ */
val job: Job? fun resume(worktime: Long): Boolean
/**
* The time that elapsed since this task was dispatched, in milliseconds
*/
val elapsedTime: Long
/**
* true if this task has completed
*/
val isComplete: Boolean
/**
* A value indicating the progress of this task, in the range 0.0 <= progress <= 1.0
* with no guarantees to its accuracy. May be null.
*/
val progress: Double?
/**
* Calls the given [block] whenever the progress is updated,
* if [minInterval] milliseconds expired since the last call.
* The first call occurs after at least [minDelay] milliseconds in a likewise manner.
* Repeated invocations of this method result in an [IllegalStateException]
*
* if [asCompletionListener] is true, [onCompleted] is called with the same [block]
*/
fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean = true, block: JobUpdateListener): JobData
/**
* Calls the given [block] when this job completes, with the progress value 1.0.
* Repeated invocations of this method result in an [IllegalStateException]
*/
fun onCompleted(block: JobUpdateListener): JobData
} }
typealias JobUpdateListener = JobData.(Double, Long) -> Unit /**
* An object that controls one or more jobs, ensuring that they don't stall the server too much.
* There is a configurable maxiumum amount of milliseconds that can be allocated to all workers together in each server tick
* This object attempts to split that maximum amount of milliseconds equally between all jobs
*/
class TickWorktimeLimiter(private val plugin: Plugin, var options: TickWorktimeOptions) : WorktimeLimiter() {
// Coroutine dispatcher for jobs
private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher()
// The currently registered bukkit scheduler task
private var bukkitTask: BukkitTask? = null
// The workers.
private var _workers = LinkedList<WorkerContinuation>()
override val workers: List<Worker> = _workers
class JobDataImpl(val task: TimeLimitedTask) : JobData { override fun submit(task: TimeLimitedTask): Worker {
val worker: WorkerContinuation = WorkerImpl(dispatcher, task)
_workers.addFirst(worker)
if (bukkitTask == null) bukkitTask = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong())
return worker
}
override var job: Job? = null private fun tickJobs() {
set(value) { val workers = _workers
field?.let { throw IllegalStateException() } if (workers.isEmpty()) return
field = value!! val tickStartTime = System.currentTimeMillis()
startTimeOrElapsedTime = System.currentTimeMillis()
value.invokeOnCompletion { val iterator = workers.listIterator(index = 0)
startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime while (iterator.hasNext()) {
onCompletedBlock?.invoke(this, 1.0, elapsedTime) val time = System.currentTimeMillis()
val timeElapsed = time - tickStartTime
val timeLeft = options.workTime - timeElapsed
if (timeLeft <= 0) return
val count = iterator.nextIndex()
val timePerJob = (timeLeft + count - 1) / count
val worker = iterator.next()
val completed = worker.resume(timePerJob)
if (completed) {
iterator.remove()
} }
} }
// when running: startTime, else: total elapsed time if (workers.isEmpty()) {
private var startTimeOrElapsedTime: Long = 0L bukkitTask?.cancel()
override val elapsedTime get() = job?.let { if (it.isCompleted) startTimeOrElapsedTime else System.currentTimeMillis() - startTimeOrElapsedTime } ?: 0L bukkitTask = null
var next: Continuation<Unit>? = null
override var progress: Double? = null
set(value) {
field = value
doProgressUpdate()
}
private fun doProgressUpdate() {
val progressUpdate = progressUpdateBlock ?: return
val time = System.currentTimeMillis()
if (time > lastUpdateTime + progressUpdateInterval) {
progressUpdate(progress!!, elapsedTime)
lastUpdateTime = time
} }
} }
private var progressUpdateBlock: JobUpdateListener? = null }
private class WorkerImpl(val dispatcher: CoroutineDispatcher,
val task: TimeLimitedTask) : WorkerContinuation {
override var job: Job? = null; private set
override val elapsedTime
get() = job?.let {
if (it.isCompleted) startTimeOrElapsedTime
else currentTimeMillis() - startTimeOrElapsedTime
} ?: 0L
override val isComplete get() = job?.isCompleted == true
override var progress: Double? = null; private set
private var startTimeOrElapsedTime: Long = 0L // startTime before completed, elapsed time otherwise
private var onProgressUpdate: WorkerUpdateLister? = null
private var progressUpdateInterval: Int = 0 private var progressUpdateInterval: Int = 0
private var lastUpdateTime: Long = 0L private var lastUpdateTime: Long = 0L
override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: JobUpdateListener): JobDataImpl { private var onCompleted: WorkerUpdateLister? = null
progressUpdateBlock?.let { throw IllegalStateException() } private var continuation: Continuation<Unit>? = null
progressUpdateBlock = block private var nextSuspensionTime: Long = 0L
private fun initJob(job: Job) {
this.job?.let { throw IllegalStateException() }
this.job = job
startTimeOrElapsedTime = System.currentTimeMillis()
job.invokeOnCompletion {
// convert to elapsed time here
startTimeOrElapsedTime = System.currentTimeMillis() - startTimeOrElapsedTime
onCompleted?.let { it(1.0, elapsedTime) }
}
}
override fun onProgressUpdate(minDelay: Int, minInterval: Int, asCompletionListener: Boolean, block: WorkerUpdateLister): Worker {
onProgressUpdate?.let { throw IllegalStateException() }
onProgressUpdate = block
progressUpdateInterval = minInterval progressUpdateInterval = minInterval
lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval lastUpdateTime = System.currentTimeMillis() + minDelay - minInterval
if (asCompletionListener) onCompleted(block) if (asCompletionListener) onCompleted(block)
return this return this
} }
override val isComplete get() = job?.isCompleted == true override fun onCompleted(block: WorkerUpdateLister): Worker {
private var onCompletedBlock: JobUpdateListener? = null onCompleted?.let { throw IllegalStateException() }
override fun onCompleted(block: JobUpdateListener): JobDataImpl { onCompleted = block
onCompletedBlock?.let { throw IllegalStateException() }
onCompletedBlock = block
return this return this
} }
}
/**
* An object that controls one or more jobs, ensuring that they don't stall the server too much.
* The amount of milliseconds that can accumulate each server tick is configurable
*/
class TickWorktimeLimiter(private val plugin: Plugin, private val optionsRoot: Options) : WorktimeLimiter {
// Coroutine dispatcher for jobs
private val dispatcher = Executor(Runnable::run).asCoroutineDispatcher()
// union of Continuation<Unit> and suspend WorktimeLimited.() -> Unit
private var jobs = LinkedList<JobDataImpl>()
// The currently registered bukkit scheduler task
private var task: BukkitTask? = null
// The data associated with the task that is currently being executed
private var curJobData: JobDataImpl? = null
// Used to keep track of when the current task should end
private var curJobEndTime = 0L
// Tick work time options
private inline val options get() = optionsRoot.tickWorktime
override fun submit(job: TimeLimitedTask): JobData {
val jobData = JobDataImpl(job)
jobs.addFirst(jobData)
if (task == null) task = plugin.server.scheduler.runTaskTimer(plugin, ::tickJobs, 0, options.tickInterval.toLong())
return jobData
}
override suspend fun markSuspensionPoint() { override suspend fun markSuspensionPoint() {
if (System.currentTimeMillis() >= curJobEndTime) if (System.currentTimeMillis() >= nextSuspensionTime)
suspendCoroutineUninterceptedOrReturn(::scheduleContinuation) suspendCoroutineUninterceptedOrReturn { cont: Continuation<Unit> ->
continuation = cont
COROUTINE_SUSPENDED
}
} }
override fun setProgress(progress: Double) { override fun setProgress(progress: Double) {
curJobData!!.progress = progress this.progress = progress
} val onProgressUpdate = onProgressUpdate ?: return
val time = System.currentTimeMillis()
private fun tickJobs() { if (time > lastUpdateTime + progressUpdateInterval) {
if (jobs.isEmpty()) return onProgressUpdate(progress, elapsedTime)
val tickStartTime = System.currentTimeMillis() lastUpdateTime = time
val jobs = this.jobs; this.jobs = LinkedList()
var count = jobs.size
while (!jobs.isEmpty()) {
val job = jobs.poll()
val time = System.currentTimeMillis()
val timeElapsed = time - tickStartTime
val timeLeft = options.workTime - timeElapsed
if (timeLeft <= 0) {
this.jobs.addAll(0, jobs)
return
}
val timePerJob = (timeLeft + count - 1) / count
tickJob(job, time + timePerJob)
count--
}
if (jobs.isEmpty() && this.jobs.isEmpty()) {
task?.cancel()
task = null
} }
} }
@Suppress("UNCHECKED_CAST") override fun resume(worktime: Long): Boolean {
private fun tickJob(job: JobDataImpl, endTime: Long) { nextSuspensionTime = currentTimeMillis() + worktime
curJobData = job
curJobEndTime = endTime
try {
val next = job.next
if (next == null) startJob(job)
else next.resume(Unit)
}
finally {
curJobData = null
curJobEndTime = 0L
}
}
private fun startJob(job: JobDataImpl) { continuation?.let {
job.job = launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) { job.task(this@TickWorktimeLimiter) } continuation = null
} it.resume(Unit)
return continuation == null
}
private fun scheduleContinuation(continuation: Continuation<Unit>): Any? { job?.let {
curJobData!!.next = continuation nextSuspensionTime = 0L
jobs.addLast(curJobData) throw IllegalStateException()
return COROUTINE_SUSPENDED }
launch(context = dispatcher, start = CoroutineStart.UNDISPATCHED) {
initJob(job = kotlin.coroutines.experimental.coroutineContext[Job]!!)
task()
}
return continuation == null
} }
} }
data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int) /*
/** /**
* While the implementation of [kotlin.coroutines.experimental.intrinsics.intercepted] is intrinsic, it should look something like this * While the implementation of [kotlin.coroutines.experimental.intrinsics.intercepted] is intrinsic, it should look something like this
* We don't care for intercepting the coroutine as we want it to resume immediately when we call resume(). * We don't care for intercepting the coroutine as we want it to resume immediately when we call resume().
@@ -231,4 +237,5 @@ data class TickWorktimeOptions(var workTime: Int, var tickInterval: Int)
*/ */
private fun <T> Continuation<T>.interceptedImpl(): Continuation<T> { private fun <T> Continuation<T>.interceptedImpl(): Continuation<T> {
return context[ContinuationInterceptor]?.interceptContinuation(this) ?: this return context[ContinuationInterceptor]?.interceptContinuation(this) ?: this
} }
*/

View File

@@ -7,13 +7,11 @@ import io.dico.dicore.command.annotation.Desc
import io.dico.dicore.command.annotation.RequireParameters import io.dico.dicore.command.annotation.RequireParameters
import io.dico.parcels2.ParcelOwner import io.dico.parcels2.ParcelOwner
import io.dico.parcels2.ParcelsPlugin import io.dico.parcels2.ParcelsPlugin
import io.dico.parcels2.blockvisitor.JobUpdateListener
import io.dico.parcels2.command.NamedParcelDefaultValue.FIRST_OWNED import io.dico.parcels2.command.NamedParcelDefaultValue.FIRST_OWNED
import io.dico.parcels2.storage.getParcelBySerializedValue import io.dico.parcels2.storage.getParcelBySerializedValue
import io.dico.parcels2.util.hasAdminManage import io.dico.parcels2.util.hasAdminManage
import io.dico.parcels2.util.hasParcelHomeOthers import io.dico.parcels2.util.hasParcelHomeOthers
import io.dico.parcels2.util.uuid import io.dico.parcels2.util.uuid
import kotlinx.coroutines.experimental.Job
import org.bukkit.entity.Player import org.bukkit.entity.Player
//@Suppress("unused") //@Suppress("unused")
@@ -84,11 +82,8 @@ class CommandsGeneral(plugin: ParcelsPlugin) : AbstractParcelCommands(plugin) {
@Cmd("clear") @Cmd("clear")
@ParcelRequire(owner = true) @ParcelRequire(owner = true)
fun ParcelScope.cmdClear(player: Player, context: ExecutionContext) { fun ParcelScope.cmdClear(player: Player, context: ExecutionContext) {
val onProgressUpdate: JobUpdateListener = { progress, elapsedTime ->
context.sendMessage("[Clearing] Progress: %.06f%%".format(progress * 100))
}
world.generator.clearParcel(parcel) world.generator.clearParcel(parcel)
.onProgressUpdate(5, 5) { progress, elapsedTime -> .onProgressUpdate(1000, 1000) { progress, elapsedTime ->
context.sendMessage(EMessageType.INFORMATIVE, "Clear progress: %.06f%%, %.2fs elapsed" context.sendMessage(EMessageType.INFORMATIVE, "Clear progress: %.06f%%, %.2fs elapsed"
.format(progress * 100, elapsedTime / 1000.0)) .format(progress * 100, elapsedTime / 1000.0))
} }