Skip to content

Commit 6c9973e

Browse files
committed
TaskFlow
1 parent 990d682 commit 6c9973e

File tree

10 files changed

+341
-69
lines changed

10 files changed

+341
-69
lines changed

common/src/main/kotlin/com/lambda/Lambda.kt

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,47 @@
11
package com.lambda
22

3-
import com.lambda.event.EventFlow
3+
import com.lambda.event.events.KeyPressEvent
44
import com.lambda.event.listener.SafeListener.Companion.listener
5-
import com.lambda.event.events.PacketEvent
6-
import com.lambda.event.listener.Listener.Companion.unsubscribe
7-
import com.lambda.event.listener.SafeListener.Companion.concurrentListener
8-
import com.lambda.event.listener.UnsafeListener.Companion.unsafeConcurrentListener
9-
import com.lambda.event.listener.UnsafeListener.Companion.unsafeListener
5+
import com.lambda.task.tasks.HelloWorldTask
6+
import com.lambda.threading.taskContext
107
import net.minecraft.client.MinecraftClient
11-
import net.minecraft.network.packet.c2s.play.PlayerMoveC2SPacket.LookAndOnGround
128
import org.apache.logging.log4j.LogManager
139
import org.apache.logging.log4j.Logger
14-
import java.lang.Thread.sleep
10+
import org.lwjgl.glfw.GLFW
1511

1612
object Lambda {
1713
private const val MOD_NAME = "Lambda"
1814
const val MOD_ID = "lambda"
19-
const val SYMBOL = "λ"
15+
private const val SYMBOL = "λ"
2016
private val VERSION: String = LoaderInfo.getVersion()
2117

22-
val LOG: Logger = LogManager.getLogger()
18+
val LOG: Logger = LogManager.getLogger(SYMBOL)
2319
val mc: MinecraftClient = MinecraftClient.getInstance()
2420

2521
init {
26-
listener<PacketEvent.Send.Pre> {
27-
if (it.packet is LookAndOnGround) {
28-
it.cancel()
29-
LOG.info("SAFE: Canceled: ${it.packet::class.simpleName}")
22+
listener<KeyPressEvent> {
23+
if (it.key != GLFW.GLFW_KEY_Z) {
24+
return@listener
3025
}
31-
}
3226

33-
runConcurrent {
34-
sleep(60000)
35-
LOG.info("Unsubscribing")
36-
unsubscribe<PacketEvent.Send.Pre>()
27+
taskContext {
28+
HelloWorldTask()
29+
.withDelay(500L)
30+
.withTimeout(5000L)
31+
.withRepeats(2)
32+
.onSuccess {
33+
LOG.info("Hello, World! Task completed")
34+
}.onRepeat { repeats ->
35+
LOG.info("Hello, World! Task $name repeated $repeats times")
36+
}.onTimeout {
37+
LOG.warn("Hello, World! Task $name timed out")
38+
HelloWorldTask().execute()
39+
}.onRetry {
40+
LOG.warn("Hello, World! Task $name retrying")
41+
}.onFailure { error ->
42+
LOG.error("Hello, World! Task failed", error)
43+
}.execute()
44+
}
3745
}
3846
}
3947

common/src/main/kotlin/com/lambda/event/EventFlow.kt

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package com.lambda.event
22

33
import com.lambda.event.listener.Listener
4-
import com.lambda.runConcurrent
4+
import com.lambda.event.listener.SafeListener
5+
import com.lambda.threading.runConcurrent
56
import kotlinx.coroutines.*
67
import kotlinx.coroutines.channels.BufferOverflow
78
import kotlinx.coroutines.flow.MutableSharedFlow
@@ -20,16 +21,16 @@ object EventFlow {
2021
*
2122
* The [SupervisorJob] is used so that failure or cancellation of one child does not
2223
* lead to the failure or cancellation of the parent or its other children, which is
23-
* useful when you have multiple independent jobs running in parallel.
24+
* useful when you have multiple independent [Job]s running in parallel.
2425
*/
2526
val lambdaScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
2627
private val concurrentFlow = MutableSharedFlow<Event>(
2728
extraBufferCapacity = 1000,
2829
onBufferOverflow = BufferOverflow.DROP_OLDEST
2930
)
3031

31-
val syncListeners = ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>()
32-
val concurrentListeners = ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>()
32+
val syncListeners = Subscriber()
33+
val concurrentListeners = Subscriber()
3334

3435
init {
3536
// parallel event execution on dedicated threads
@@ -45,23 +46,36 @@ object EventFlow {
4546
}
4647

4748
/**
48-
* Posts an event to the event flow.
49+
* Posts an [Event] to the event flow [concurrentFlow] and the synchronous [Listener]s.
4950
*
50-
* This function first notifies all asynchronous listeners by emitting the event to the concurrent flow.
51-
* Each asynchronous listener will execute its listener function on a new thread.
51+
* This function first notifies all asynchronous [Listener]s by emitting the event to the [concurrentFlow].
52+
* Each asynchronous [Listener] will execute its [Listener] function on a new coroutine.
5253
*
53-
* After notifying asynchronous listeners, it executes the listener functions of all synchronous listeners.
54-
* An instant callback can only be achieved by synchronous listening objects
54+
* After notifying asynchronous [Listener]s, it executes the [Listener] functions of all synchronous [Listener]s.
55+
* An instant callback ([CallbackEvent]) can only be achieved by synchronous listening objects
5556
* as the concurrent listener will be executed "later".
5657
*
57-
* @param event The event to be posted to the event flow.
58+
* @param event The [Event] to be posted to the event flow.
5859
*/
5960
@JvmStatic
6061
fun post(event: Event) {
6162
concurrentFlow.tryEmit(event)
6263
event.executeListenerSynchronous()
6364
}
6465

66+
/**
67+
* Unsubscribes from both synchronous and concurrent event flows for a specific [Event] type [T].
68+
*
69+
* This function removes the listeners associated with the specified event type from both synchronous and concurrent event flows.
70+
* After this function is called, the listeners of the specified event type will no longer be triggered when the event is dispatched.
71+
*
72+
* @param T The type of the event to unsubscribe from. This should be a subclass of Event.
73+
*/
74+
inline fun <reified T : Event> unsubscribe() {
75+
syncListeners.remove(T::class)
76+
concurrentListeners.remove(T::class)
77+
}
78+
6579
private fun Event.executeListenerSynchronous() {
6680
syncListeners[this::class]?.forEach { listener ->
6781
if (this is ICancellable && this.isCanceled()) return
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.lambda.event
2+
3+
import com.lambda.event.listener.Listener
4+
import java.util.concurrent.ConcurrentHashMap
5+
import java.util.concurrent.ConcurrentSkipListSet
6+
import kotlin.reflect.KClass
7+
8+
class Subscriber : ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>() {
9+
inline fun <reified T : Event> subscribe(listener: Listener) =
10+
getOrPut(T::class) {
11+
ConcurrentSkipListSet(Comparator.reverseOrder())
12+
}.add(listener)
13+
14+
fun unsubscribe(eventType: KClass<*>) = remove(eventType)
15+
16+
fun unsubscribe(listener: Listener) {
17+
values.forEach { listeners ->
18+
listeners.remove(listener)
19+
}
20+
}
21+
22+
infix fun subscribe(subscriber: Subscriber) {
23+
subscriber.forEach { (eventType, listeners) ->
24+
getOrPut(eventType) {
25+
ConcurrentSkipListSet(Comparator.reverseOrder())
26+
}.addAll(listeners)
27+
}
28+
}
29+
30+
infix fun unsubscribe(subscriber: Subscriber) {
31+
entries.removeAll { (eventType, listeners) ->
32+
subscriber[eventType]?.let { listeners.removeAll(it) }
33+
listeners.isEmpty()
34+
}
35+
}
36+
}

common/src/main/kotlin/com/lambda/event/listener/Listener.kt

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,15 @@ import com.lambda.event.EventFlow
55

66
abstract class Listener : Comparable<Listener> {
77
abstract val priority: Int
8+
abstract val owner: Any // ToDo: Evaluate if this is even needed
89

910
abstract fun execute(event: Event)
1011

11-
override fun compareTo(other: Listener): Int {
12-
return compareBy<Listener> { it.priority }.thenBy { it.hashCode() }.compare(this, other)
13-
}
14-
15-
companion object {
16-
/**
17-
* Unsubscribes the [SafeListener] from both synchronous and concurrent event flows for a specific [Event] type [T].
18-
*
19-
* This function removes the listeners associated with the specified event type from both synchronous and concurrent event flows.
20-
* After this function is called, the listeners of the specified event type will no longer be triggered when the event is dispatched.
21-
*
22-
* @param T The type of the event to unsubscribe from. This should be a subclass of Event.
23-
*/
24-
inline fun <reified T : Event> unsubscribe() {
25-
EventFlow.syncListeners.remove(T::class)
26-
EventFlow.concurrentListeners.remove(T::class)
27-
}
28-
}
12+
override fun compareTo(other: Listener) =
13+
compareBy<Listener> {
14+
it.priority
15+
}.thenBy {
16+
// Needed because ConcurrentSkipListSet handles insertion based on compareTo
17+
it.hashCode()
18+
}.compare(this, other)
2919
}

common/src/main/kotlin/com/lambda/event/listener/SafeListener.kt

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package com.lambda.event.listener
33
import com.lambda.context.SafeContext
44
import com.lambda.event.Event
55
import com.lambda.event.EventFlow
6-
import com.lambda.runSafe
6+
import com.lambda.task.Task
7+
import com.lambda.threading.runSafe
78
import java.util.concurrent.ConcurrentSkipListSet
89

910
class SafeListener(
1011
override val priority: Int = 0,
12+
override val owner: Any,
1113
val function: SafeContext.(Event) -> Unit
1214
) : Listener() {
1315
override fun execute(event: Event) {
@@ -50,13 +52,26 @@ class SafeListener(
5052
* @param T The type of the event to listen for. This should be a subclass of Event.
5153
* @param priority The priority of the listener. Listeners with higher priority will be executed first. Default value is 0.
5254
* @param function The function to be executed when the event is posted. This function should take a SafeContext and an event of type T as parameters.
55+
* @return The newly created and registered [SafeListener].
5356
*/
54-
inline fun <reified T : Event> listener(priority: Int = 0, noinline function: SafeContext.(T) -> Unit) {
55-
EventFlow.syncListeners.getOrPut(T::class) {
56-
ConcurrentSkipListSet(Comparator.reverseOrder())
57-
}.add(SafeListener(priority) { event ->
57+
inline fun <reified T : Event> Any.listener(priority: Int = 0, noinline function: SafeContext.(T) -> Unit): SafeListener {
58+
val listener = SafeListener(priority, this) { event ->
5859
function(event as T)
59-
})
60+
}
61+
62+
EventFlow.syncListeners.subscribe<T>(listener)
63+
64+
return listener
65+
}
66+
67+
inline fun <reified T : Event> Task<*>.listener(priority: Int = 0, noinline function: SafeContext.(T) -> Unit): SafeListener {
68+
val listener = SafeListener(priority, this) { event ->
69+
function(event as T)
70+
}
71+
72+
syncListeners.subscribe<T>(listener)
73+
74+
return listener
6075
}
6176

6277
/**
@@ -81,13 +96,16 @@ class SafeListener(
8196
* @param T The type of the event to listen for. This should be a subclass of Event.
8297
* @param priority The priority of the listener. Listeners with higher priority will be executed first. Default value is 0.
8398
* @param function The function to be executed when the event is posted. This function should take a SafeContext and an event of type T as parameters.
99+
* @return The newly created and registered [SafeListener].
84100
*/
85-
inline fun <reified T : Event> concurrentListener(priority: Int = 0, noinline function: SafeContext.(T) -> Unit) {
86-
EventFlow.concurrentListeners.getOrPut(T::class) {
87-
ConcurrentSkipListSet(Comparator.reverseOrder())
88-
}.add(SafeListener(priority) { event ->
101+
inline fun <reified T : Event> Any.concurrentListener(priority: Int = 0, noinline function: SafeContext.(T) -> Unit): SafeListener {
102+
val listener = SafeListener(priority, this) { event ->
89103
function(event as T)
90-
})
104+
}
105+
106+
EventFlow.concurrentListeners.subscribe<T>(listener)
107+
108+
return listener
91109
}
92110
}
93111
}

common/src/main/kotlin/com/lambda/event/listener/UnsafeListener.kt

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package com.lambda.event.listener
33
import com.lambda.event.Event
44
import com.lambda.event.EventFlow
55
import java.util.concurrent.ConcurrentSkipListSet
6+
import kotlin.reflect.KClass
67

78
class UnsafeListener(
89
override val priority: Int,
10+
override val owner: Any,
911
val function: (Event) -> Unit
1012
) : Listener() {
1113
override fun execute(event: Event) {
@@ -42,13 +44,16 @@ class UnsafeListener(
4244
* @param T The type of the event to listen for. This should be a subclass of Event.
4345
* @param priority The priority of the listener. Listeners with higher priority will be executed first. Default value is 0.
4446
* @param function The function to be executed when the event is posted. This function should take an event of type T as a parameter.
47+
* @return The newly created and registered [UnsafeListener].
4548
*/
46-
inline fun <reified T : Event> unsafeListener(priority: Int = 0, noinline function: (T) -> Unit) {
47-
EventFlow.syncListeners.getOrPut(T::class) {
48-
ConcurrentSkipListSet(Comparator.reverseOrder())
49-
}.add(UnsafeListener(priority) { event ->
49+
inline fun <reified T : Event> Any.unsafeListener(priority: Int = 0, noinline function: (T) -> Unit): UnsafeListener {
50+
val listener = UnsafeListener(priority, this) { event ->
5051
function(event as T)
51-
})
52+
}
53+
54+
EventFlow.syncListeners.subscribe<T>(listener)
55+
56+
return listener
5257
}
5358

5459
/**
@@ -74,13 +79,16 @@ class UnsafeListener(
7479
* @param T The type of the event to listen for. This should be a subclass of Event.
7580
* @param priority The priority of the listener. Listeners with higher priority will be executed first. Default value is 0.
7681
* @param function The function to be executed when the event is posted. This function should take a SafeContext and an event of type T as parameters.
82+
* @return The newly created and registered [UnsafeListener].
7783
*/
78-
inline fun <reified T : Event> unsafeConcurrentListener(priority: Int = 0, noinline function: (T) -> Unit) {
79-
EventFlow.concurrentListeners.getOrPut(T::class) {
80-
ConcurrentSkipListSet(Comparator.reverseOrder())
81-
}.add(UnsafeListener(priority) { event ->
84+
inline fun <reified T : Event> Any.unsafeConcurrentListener(priority: Int = 0, noinline function: (T) -> Unit): UnsafeListener {
85+
val listener = UnsafeListener(priority, this) { event ->
8286
function(event as T)
83-
})
87+
}
88+
89+
EventFlow.concurrentListeners.subscribe<T>(listener)
90+
91+
return listener
8492
}
8593
}
8694
}

0 commit comments

Comments
 (0)