Skip to content

Commit a44fbf6

Browse files
committed
EventFlow prototype
1 parent 8e4787b commit a44fbf6

File tree

19 files changed

+590
-5
lines changed

19 files changed

+590
-5
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.lambda.mixin;
2+
3+
import com.lambda.event.EventFlow;
4+
import com.lambda.event.events.PacketEvent;
5+
import io.netty.channel.ChannelHandlerContext;
6+
import net.minecraft.network.ClientConnection;
7+
import net.minecraft.network.NetworkSide;
8+
import net.minecraft.network.packet.Packet;
9+
import org.spongepowered.asm.mixin.Final;
10+
import org.spongepowered.asm.mixin.Mixin;
11+
import org.spongepowered.asm.mixin.Shadow;
12+
import org.spongepowered.asm.mixin.injection.At;
13+
import org.spongepowered.asm.mixin.injection.Inject;
14+
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
15+
16+
@Mixin(ClientConnection.class)
17+
public class ClientConnectionMixin {
18+
@Shadow
19+
@Final
20+
private NetworkSide side;
21+
22+
@Inject(method = "send(Lnet/minecraft/network/packet/Packet;)V", at = @At("HEAD"), cancellable = true)
23+
private void sendingPacket(Packet<?> packet, final CallbackInfo callbackInfo) {
24+
PacketEvent.Send.Pre event = new PacketEvent.Send.Pre(packet);
25+
EventFlow.post(event);
26+
if (event.isCanceled()) callbackInfo.cancel();
27+
}
28+
29+
@Inject(method = "send(Lnet/minecraft/network/packet/Packet;)V", at = @At("RETURN"))
30+
private void sendingPacketPost(Packet<?> packet, final CallbackInfo callbackInfo) {
31+
EventFlow.post(new PacketEvent.Send.Post(packet));
32+
}
33+
34+
@Inject(method = "channelRead0(Lio/netty/channel/ChannelHandlerContext;Lnet/minecraft/network/packet/Packet;)V", at = @At("HEAD"), cancellable = true, require = 1)
35+
private void receivingPacket(
36+
ChannelHandlerContext channelHandlerContext,
37+
Packet<?> packet,
38+
CallbackInfo callbackInfo
39+
) {
40+
if (side != NetworkSide.CLIENTBOUND) return;
41+
42+
PacketEvent.Receive.Pre event = new PacketEvent.Receive.Pre(packet);
43+
EventFlow.post(event);
44+
if (event.isCanceled()) callbackInfo.cancel();
45+
}
46+
47+
@Inject(method = "channelRead0(Lio/netty/channel/ChannelHandlerContext;Lnet/minecraft/network/packet/Packet;)V", at = @At(value = "INVOKE", target = "Lnet/minecraft/network/ClientConnection;handlePacket(Lnet/minecraft/network/packet/Packet;Lnet/minecraft/network/listener/PacketListener;)V", shift = At.Shift.AFTER))
48+
private void receivingPacketPost(
49+
ChannelHandlerContext channelHandlerContext,
50+
Packet<?> packet,
51+
CallbackInfo callbackInfo
52+
) {
53+
if (side != NetworkSide.CLIENTBOUND) return;
54+
55+
EventFlow.post(new PacketEvent.Receive.Post(packet));
56+
}
57+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.lambda.mixin;
2+
3+
import com.lambda.event.EventFlow;
4+
import com.lambda.event.events.KeyPressEvent;
5+
import net.minecraft.client.Keyboard;
6+
import org.spongepowered.asm.mixin.Mixin;
7+
import org.spongepowered.asm.mixin.injection.At;
8+
import org.spongepowered.asm.mixin.injection.Inject;
9+
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
10+
11+
@Mixin(Keyboard.class)
12+
public class KeyboardMixin {
13+
@Inject(method = "onKey", at = @At("HEAD"))
14+
void onKey(long window, int key, int scancode, int action, int modifiers, CallbackInfo ci) {
15+
if (key <= 0) return;
16+
if (action != 1) return;
17+
EventFlow.post(new KeyPressEvent(key));
18+
}
19+
}

common/src/main/java/com/lambda/mixin/MixinMinecraftClient.java renamed to common/src/main/java/com/lambda/mixin/MinecraftClientMixin.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
package com.lambda.mixin;
22

3+
import com.lambda.event.EventFlow;
4+
import com.lambda.event.events.TickEvent;
35
import net.minecraft.client.MinecraftClient;
46
import org.spongepowered.asm.mixin.Mixin;
57
import org.spongepowered.asm.mixin.injection.At;
68
import org.spongepowered.asm.mixin.injection.Inject;
79
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
810

911
@Mixin(MinecraftClient.class)
10-
public class MixinMinecraftClient {
12+
public class MinecraftClientMixin {
1113
@Inject(method = "tick", at = @At("HEAD"))
1214
void onTickPre(CallbackInfo ci) {
13-
// EventBus.post(new TickEvent.Pre());
15+
EventFlow.post(new TickEvent.Pre());
1416
}
1517

1618
@Inject(method = "tick", at = @At("RETURN"))
1719
void onTickPost(CallbackInfo ci) {
18-
// EventBus.post(new TickEvent.Post());
20+
EventFlow.post(new TickEvent.Post());
1921
}
2022
}

common/src/main/kotlin/com/lambda/Lambda.kt

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package com.lambda
22

3+
import com.lambda.event.listener.SafeListener.Companion.listener
4+
import com.lambda.event.events.PacketEvent
5+
import com.lambda.event.listener.Listener.Companion.unsubscribe
6+
import com.lambda.event.listener.SafeListener.Companion.concurrentListener
7+
import com.lambda.event.listener.UnsafeListener.Companion.unsafeConcurrentListener
8+
import com.lambda.event.listener.UnsafeListener.Companion.unsafeListener
39
import net.minecraft.client.MinecraftClient
10+
import net.minecraft.network.packet.c2s.play.PlayerMoveC2SPacket.LookAndOnGround
411
import org.apache.logging.log4j.LogManager
512
import org.apache.logging.log4j.Logger
13+
import java.lang.Thread.sleep
614

715
object Lambda {
816
private const val MOD_NAME = "Lambda"
@@ -11,9 +19,29 @@ object Lambda {
1119
private val VERSION: String = LoaderInfo.getVersion()
1220

1321
val LOG: Logger = LogManager.getLogger()
14-
1522
val mc: MinecraftClient = MinecraftClient.getInstance()
1623

24+
init {
25+
listener<PacketEvent.Send.Pre> {
26+
if (it.packet is LookAndOnGround) {
27+
it.cancel()
28+
LOG.info("SAFE: Canceled: ${it.packet::class.simpleName}")
29+
}
30+
}
31+
32+
concurrentListener<PacketEvent.Send.Pre> {
33+
if (it.packet is LookAndOnGround) {
34+
LOG.info("CONCURRENT: ${it.packet::class.simpleName}")
35+
}
36+
}
37+
38+
runConcurrent {
39+
sleep(60000)
40+
LOG.info("Unsubscribing")
41+
unsubscribe<PacketEvent.Send.Pre>()
42+
}
43+
}
44+
1745
fun initialize() {
1846
LOG.info("Initializing $MOD_NAME $VERSION")
1947
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.lambda
2+
3+
import com.lambda.context.ClientContext
4+
import com.lambda.event.EventFlow
5+
import com.lambda.context.SafeContext
6+
import kotlinx.coroutines.launch
7+
8+
/**
9+
* Executes a block of code only if the context is safe. A context is considered safe when all the following properties are not null:
10+
* - [SafeContext.world]
11+
* - [SafeContext.player]
12+
* - [SafeContext.interaction]
13+
* - [SafeContext.connection]
14+
*
15+
* If the context is not safe, the function will return null and the block of code will not be executed.
16+
*
17+
* @param block The block of code to be executed within the safe context.
18+
* @return The result of the block execution if the context is safe, null otherwise.
19+
*/
20+
inline fun <T> runSafe(block: SafeContext.() -> T): T? {
21+
return ClientContext().toSafe()?.let { block(it) }
22+
}
23+
24+
/**
25+
* This function is used to execute a block of code on a new thread running asynchronously to the game thread.
26+
* It should only be used when you need to perform read actions on the game data.
27+
*
28+
* Caution: Using this function to write to the game data can lead to race conditions. Therefore, it is recommended
29+
* to use this function only for read operations to avoid potential concurrency issues.
30+
*
31+
* @param block The block of code to be executed concurrently.
32+
*/
33+
inline fun runConcurrent(crossinline block: suspend () -> Unit) =
34+
EventFlow.lambdaScope.launch {
35+
block()
36+
}
37+
38+
/**
39+
* This function is used to execute a block of code within a safe context on a new thread running asynchronously to the game thread.
40+
* A context is considered safe when all the following properties are not null:
41+
* - [SafeContext.world]
42+
* - [SafeContext.player]
43+
* - [SafeContext.interaction]
44+
* - [SafeContext.connection]
45+
*
46+
* If the context is not safe, the function will not execute the block of code.
47+
*
48+
* @param block The block of code to be executed within the safe context.
49+
*/
50+
inline fun runSafeConcurrent(crossinline block: SafeContext.() -> Unit) {
51+
EventFlow.lambdaScope.launch {
52+
runSafe { block() }
53+
}
54+
}
55+
56+
/**
57+
* Executes a given task on the game's main thread.
58+
*
59+
* This function is used when a task needs to be performed on the game's main thread,
60+
* as certain operations are not safe to perform on other threads.
61+
* It uses the Minecraft client's `execute` method to schedule the task.
62+
*
63+
* Note: This function is non-blocking as the task is scheduled to be executed
64+
* on the game's main thread, but does not provide any feedback.
65+
*
66+
* @param block The task to be executed on the game's main thread.
67+
*/
68+
inline fun runOnGameThread(crossinline block: () -> Unit) {
69+
Lambda.mc.executeSync { block() }
70+
}
71+
72+
/**
73+
* Executes a given task on the game's main thread within a safe context.
74+
* A context is considered safe when all the following properties are not null:
75+
* - [SafeContext.world]
76+
* - [SafeContext.player]
77+
* - [SafeContext.interaction]
78+
* - [SafeContext.connection]
79+
*
80+
* This function is used when a task needs to be performed on the game's main thread,
81+
* as certain operations are not safe to perform on other threads.
82+
* It uses the Minecraft client's `execute` method to schedule the task.
83+
*
84+
* Note: This function is non-blocking as the task is scheduled to be executed
85+
* on the game's main thread, but does not provide any feedback.
86+
*
87+
* @param block The task to be executed on the game's main thread within a safe context.
88+
*/
89+
inline fun runSafeOnGameThread(crossinline block: SafeContext.() -> Unit) {
90+
runOnGameThread { runSafe { block() } }
91+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.lambda.context
2+
3+
import net.minecraft.client.MinecraftClient
4+
import net.minecraft.client.network.ClientPlayNetworkHandler
5+
import net.minecraft.client.network.ClientPlayerEntity
6+
import net.minecraft.client.network.ClientPlayerInteractionManager
7+
import net.minecraft.client.world.ClientWorld
8+
9+
abstract class AbstractContext {
10+
val mc: MinecraftClient = MinecraftClient.getInstance()
11+
abstract val world: ClientWorld?
12+
abstract val player: ClientPlayerEntity?
13+
abstract val interaction: ClientPlayerInteractionManager?
14+
abstract val connection: ClientPlayNetworkHandler?
15+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.lambda.context
2+
3+
import net.minecraft.client.network.ClientPlayNetworkHandler
4+
import net.minecraft.client.network.ClientPlayerEntity
5+
import net.minecraft.client.network.ClientPlayerInteractionManager
6+
import net.minecraft.client.world.ClientWorld
7+
8+
open class ClientContext : AbstractContext() {
9+
final override val world: ClientWorld? = mc.world
10+
final override val player: ClientPlayerEntity? = mc.player
11+
final override val interaction: ClientPlayerInteractionManager? = mc.interactionManager
12+
final override val connection: ClientPlayNetworkHandler? = mc.networkHandler
13+
14+
fun toSafe(): SafeContext? {
15+
if (world == null || player == null || interaction == null || connection == null) {
16+
return null
17+
}
18+
return SafeContext(world, player, interaction, connection)
19+
}
20+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.lambda.context
2+
3+
import net.minecraft.client.network.ClientPlayNetworkHandler
4+
import net.minecraft.client.network.ClientPlayerEntity
5+
import net.minecraft.client.network.ClientPlayerInteractionManager
6+
import net.minecraft.client.world.ClientWorld
7+
8+
open class SafeContext internal constructor(
9+
override val world: ClientWorld,
10+
override val player: ClientPlayerEntity,
11+
override val interaction: ClientPlayerInteractionManager,
12+
override val connection: ClientPlayNetworkHandler
13+
) : AbstractContext()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package com.lambda.event
2+
3+
interface Event
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.lambda.event
2+
3+
import com.lambda.Lambda.LOG
4+
import com.lambda.Lambda.mc
5+
import com.lambda.event.listener.Listener
6+
import com.lambda.runConcurrent
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.BufferOverflow
9+
import kotlinx.coroutines.flow.MutableSharedFlow
10+
import kotlinx.coroutines.flow.filter
11+
import java.util.concurrent.ConcurrentHashMap
12+
import java.util.concurrent.ConcurrentSkipListSet
13+
import kotlin.reflect.KClass
14+
15+
object EventFlow {
16+
/**
17+
* [lambdaScope] is a [CoroutineScope] which is used to launch coroutines.
18+
*
19+
* It is defined with [Dispatchers.Default] which is optimized for CPU-intensive work
20+
* such as sorting large lists, doing complex calculations and similar.
21+
*
22+
* The [SupervisorJob] is used so that failure or cancellation of one child does not
23+
* lead to the failure or cancellation of the parent or its other children, which is
24+
* useful when you have multiple independent jobs running in parallel.
25+
*/
26+
val lambdaScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
27+
private val concurrentFlow = MutableSharedFlow<Event>(
28+
extraBufferCapacity = 1000,
29+
onBufferOverflow = BufferOverflow.DROP_OLDEST
30+
)
31+
32+
val syncListeners = ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>()
33+
val concurrentListeners = ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>()
34+
35+
init {
36+
// parallel event execution on dedicated threads
37+
runConcurrent {
38+
concurrentFlow
39+
// early filter to avoid an unnecessary collection
40+
.filter { concurrentListeners.containsKey(it::class) }
41+
.collect { event ->
42+
event.executeListenerConcurrently()
43+
}
44+
}
45+
}
46+
47+
/**
48+
* Posts an event to the event flow.
49+
*
50+
* This function first notifies all asynchronous listeners by emitting the event to the concurrent flow.
51+
* Each asynchronous listener will execute its listener function on a new thread.
52+
*
53+
* After notifying asynchronous listeners, it executes the listener functions of all synchronous listeners.
54+
* An instant callback can only be achieved by synchronous listening objects as the concurrently listener will be executed "later".
55+
*
56+
* @param event The event to be posted to the event flow.
57+
*/
58+
@JvmStatic
59+
fun post(event: Event) {
60+
concurrentFlow.tryEmit(event)
61+
event.executeListenerSynchronous()
62+
}
63+
64+
private fun Event.executeListenerSynchronous() {
65+
syncListeners[this::class]?.forEach { listener ->
66+
listener.execute(this@executeListenerSynchronous)
67+
}
68+
}
69+
70+
private fun Event.executeListenerConcurrently() {
71+
concurrentListeners[this::class]?.forEach { listener ->
72+
runConcurrent {
73+
listener.execute(this@executeListenerConcurrently)
74+
}
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)