Skip to content

Commit 25f6246

Browse files
committed
Proper cancellation handling and minor fixes
1 parent a44fbf6 commit 25f6246

File tree

8 files changed

+42
-13
lines changed

8 files changed

+42
-13
lines changed

common/src/main/kotlin/com/lambda/Lambda.kt

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.lambda
22

3+
import com.lambda.event.EventFlow
34
import com.lambda.event.listener.SafeListener.Companion.listener
45
import com.lambda.event.events.PacketEvent
56
import com.lambda.event.listener.Listener.Companion.unsubscribe
@@ -24,17 +25,40 @@ object Lambda {
2425
init {
2526
listener<PacketEvent.Send.Pre> {
2627
if (it.packet is LookAndOnGround) {
27-
it.cancel()
28+
// it.cancel()
2829
LOG.info("SAFE: Canceled: ${it.packet::class.simpleName}")
2930
}
3031
}
3132

33+
listener<PacketEvent.Send.Pre> {
34+
if (it.packet is LookAndOnGround) {
35+
// it.cancel()
36+
LOG.info("2 SAFE: Canceled: ${it.packet::class.simpleName}")
37+
}
38+
}
39+
40+
unsafeListener<PacketEvent.Send.Pre> {
41+
if (it.packet is LookAndOnGround) {
42+
// it.cancel()
43+
LOG.info("UNSAFE: Canceled: ${it.packet::class.simpleName}")
44+
}
45+
}
46+
3247
concurrentListener<PacketEvent.Send.Pre> {
3348
if (it.packet is LookAndOnGround) {
3449
LOG.info("CONCURRENT: ${it.packet::class.simpleName}")
3550
}
3651
}
3752

53+
unsafeConcurrentListener<PacketEvent.Send.Pre> {
54+
if (it.packet is LookAndOnGround) {
55+
LOG.info("UNSAFE CONCURRENT: ${it.packet::class.simpleName}")
56+
}
57+
}
58+
59+
LOG.info("Registered sync listeners: ${EventFlow.syncListeners}")
60+
LOG.info("Registered concurrent listeners: ${EventFlow.concurrentListeners}")
61+
3862
runConcurrent {
3963
sleep(60000)
4064
LOG.info("Unsubscribing")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.lambda.event
2+
3+
interface CallbackEvent : Event

common/src/main/kotlin/com/lambda/event/EventFlow.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package com.lambda.event
22

3-
import com.lambda.Lambda.LOG
4-
import com.lambda.Lambda.mc
53
import com.lambda.event.listener.Listener
64
import com.lambda.runConcurrent
75
import kotlinx.coroutines.*
86
import kotlinx.coroutines.channels.BufferOverflow
97
import kotlinx.coroutines.flow.MutableSharedFlow
108
import kotlinx.coroutines.flow.filter
9+
import kotlinx.coroutines.flow.filterNot
1110
import java.util.concurrent.ConcurrentHashMap
1211
import java.util.concurrent.ConcurrentSkipListSet
1312
import kotlin.reflect.KClass
@@ -38,6 +37,7 @@ object EventFlow {
3837
concurrentFlow
3938
// early filter to avoid an unnecessary collection
4039
.filter { concurrentListeners.containsKey(it::class) }
40+
.filterNot { it is ICancellable && it.isCanceled() }
4141
.collect { event ->
4242
event.executeListenerConcurrently()
4343
}
@@ -51,7 +51,8 @@ object EventFlow {
5151
* Each asynchronous listener will execute its listener function on a new thread.
5252
*
5353
* After notifying asynchronous listeners, it executes the listener functions of all synchronous listeners.
54-
* An instant callback can only be achieved by synchronous listening objects as the concurrently listener will be executed "later".
54+
* An instant callback can only be achieved by synchronous listening objects
55+
* as the concurrent listener will be executed "later".
5556
*
5657
* @param event The event to be posted to the event flow.
5758
*/
@@ -63,12 +64,14 @@ object EventFlow {
6364

6465
private fun Event.executeListenerSynchronous() {
6566
syncListeners[this::class]?.forEach { listener ->
67+
if (this is ICancellable && this.isCanceled()) return
6668
listener.execute(this@executeListenerSynchronous)
6769
}
6870
}
6971

7072
private fun Event.executeListenerConcurrently() {
7173
concurrentListeners[this::class]?.forEach { listener ->
74+
if (this is ICancellable && this.isCanceled()) return
7275
runConcurrent {
7376
listener.execute(this@executeListenerConcurrently)
7477
}

common/src/main/kotlin/com/lambda/event/ICancellable.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.lambda.event
22

33
import java.util.concurrent.atomic.AtomicBoolean
44

5-
interface ICancellable : MutableEvent {
5+
interface ICancellable : CallbackEvent {
66
val cancelSignal: AtomicBoolean
77

88
fun cancel() {

common/src/main/kotlin/com/lambda/event/MutableEvent.kt

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.lambda.event.events
22

3-
import com.lambda.event.MutableEvent
3+
import com.lambda.event.Cancellable
4+
import com.lambda.event.ICancellable
45

5-
class KeyPressEvent(val key: Int): MutableEvent
6+
class KeyPressEvent(val key: Int): ICancellable by Cancellable()

common/src/main/kotlin/com/lambda/event/listener/SafeListener.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import com.lambda.event.EventFlow
66
import com.lambda.runSafe
77
import java.util.concurrent.ConcurrentSkipListSet
88

9-
data class SafeListener(
9+
class SafeListener(
1010
override val priority: Int = 0,
1111
val function: SafeContext.(Event) -> Unit
1212
) : Listener() {

common/src/main/kotlin/com/lambda/event/listener/UnsafeListener.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.lambda.event.Event
44
import com.lambda.event.EventFlow
55
import java.util.concurrent.ConcurrentSkipListSet
66

7-
data class UnsafeListener(
7+
class UnsafeListener(
88
override val priority: Int,
99
val function: (Event) -> Unit
1010
) : Listener() {
@@ -25,7 +25,7 @@ data class UnsafeListener(
2525
* The [function] is executed on the same thread where the [Event] was dispatched.
2626
* The execution of the [function] is independent of the safety conditions of the context.
2727
* Use this function when you need to listen to an [Event] in a context that is not in-game.
28-
* For only in-game related contexts, use the [listener] function instead.
28+
* For only in-game related contexts, use the [SafeListener.listener] function instead.
2929
*
3030
* Usage:
3131
* ```kotlin
@@ -55,6 +55,7 @@ data class UnsafeListener(
5555
* Registers a new [UnsafeListener] for a generic [Event] type [T].
5656
* The [function] is executed on a new thread running asynchronously to the game thread.
5757
* This function should only be used when the [function] performs read actions on the game data.
58+
* For only in-game related contexts, use the [SafeListener.concurrentListener] function instead.
5859
*
5960
* Caution: Using this function to write to the game data can lead to race conditions. Therefore, it is recommended
6061
* to use this function only for read operations to avoid potential concurrency issues.

0 commit comments

Comments
 (0)