Added command intercepter tool, used it for //calc #49

Closed
Dico200 wants to merge 8 commits from command-intercepter into dev
4 changed files with 176 additions and 115 deletions

129
imbusy.py
View File

@ -209,110 +209,43 @@ def reply(sender):
reply_targets[target.getName()] = (sender_name, True) reply_targets[target.getName()] = (sender_name, True)
return True return True
try:
class CommandWrapper(Command): def msg_interception(sender, args):
return not is_player(sender) or len(args) <= 1 or whisper(sender, args[0])
def __init__(self, wrapped, checker): def reply_interception(sender, args):
Command.__init__(self, wrapped.getName()) return not is_player(sender) or len(args) == 0 or reply(sender)
self.setDescription(wrapped.getDescription())
self.setPermission(wrapped.getPermission())
self.setUsage(wrapped.getUsage())
self.setAliases(wrapped.getAliases())
self.wrapped = wrapped
self.checker = checker
def execute(self, sender, label, args): def tpa_interception(sender, args):
try: if len(args) == 0 or not is_player(sender):
if not is_player(sender) or self.checker(sender, args): return True
return self.wrapped.execute(sender, label, args) target = server.getPlayer(args[0])
except: if target is not None and not can_send(sender, target):
error(trace()) msg(sender, "&c[&fBUSY&c] %s&r is busy!" % target.getDisplayName())
return False
return True return True
def tabComplete(self, sender, alias, args): def tpahere_interception(sender, args):
return self.wrapped.tabComplete(sender, alias, args) return tpa_command_checker(sender, args)
def mail_interception(sender, args):
def msg_command_checker(sender, args): if len(args) < 3 or args[0].lower() != "send" or not is_player(sender):
return len(args) <= 1 or whisper(sender, args[0]) return True
target = server.getPlayer(args[1])
def reply_command_checker(sender, args): if target is not None and not can_send(sender, target):
return len(args) == 0 or reply(sender) msg(sender, "&c[&fBUSY&c] %s&r is busy!" % target.getDisplayName())
return False
def tpa_command_checker(sender, args):
if len(args) == 0:
return True return True
target = server.getPlayer(args[0])
if target is not None and not can_send(sender, target):
msg(sender, "&c[&fBUSY&c] %s&r is busy!" % target.getDisplayName())
return False
return True
def tpahere_command_checker(sender, args): register = shared["modules"]["misc"].CommandInterceptions.register
return tpa_command_checker(sender, args) register("essentials", "msg", msg_interception)
register("minecraft", "tell", msg_interception)
register("essentials", "reply", reply_interception)
register("essentials", "tpa", tpa_interception)
register("essentials", "tpahere", tpahere_interception)
register("essentials", "mail", mail_interception)
def mail_command_checker(sender, args): except:
if len(args) < 3 or args[0].lower() != "send": error("[Imbusy] Failed to intercept commands:")
return True error(trace())
target = server.getPlayer(args[1])
if target is not None and not can_send(sender, target):
msg(sender, "&c[&fBUSY&c] %s&r is busy!" % target.getDisplayName())
return False
return True
@hook.event("player.PlayerCommandPreprocessEvent", "monitor")
def on_player_command_preprocess(event):
message = event.getMessage().split(" ")
if len(message) > 1 and message[0].lower() in ("/tell", "/minecraft:tell") and not whisper(event.getPlayer(), message[1]):
event.setCancelled(True)
def replace_ess_commands():
try:
map_field = server.getPluginManager().getClass().getDeclaredField("commandMap")
map_field.setAccessible(True)
command_map = map_field.get(server.getPluginManager())
commands_field = command_map.getClass().getDeclaredField("knownCommands")
commands_field.setAccessible(True)
map = commands_field.get(command_map)
ess_msg_cmd = map.get("essentials:msg")
ess_reply_cmd = map.get("essentials:reply")
ess_tpa_cmd = map.get("essentials:tpa")
ess_tpahere_cmd = map.get("essentials:tpahere")
ess_mail_cmd = map.get("essentials:mail")
msg_cmd_wrapper = CommandWrapper(ess_msg_cmd, msg_command_checker)
reply_cmd_wrapper = CommandWrapper(ess_reply_cmd, reply_command_checker)
tpa_cmd_wrapper = CommandWrapper(ess_tpa_cmd, tpa_command_checker)
tpahere_cmd_wrapper = CommandWrapper(ess_tpahere_cmd, tpahere_command_checker)
mail_cmd_wrapper = CommandWrapper(ess_mail_cmd, mail_command_checker)
iterator = map.entrySet().iterator()
wrapped_commands = []
while iterator.hasNext():
entry = iterator.next()
value = entry.getValue()
changed = True
if value is ess_msg_cmd:
entry.setValue(msg_cmd_wrapper)
elif value is ess_reply_cmd:
entry.setValue(reply_cmd_wrapper)
elif value is ess_tpa_cmd:
entry.setValue(tpa_cmd_wrapper)
elif value is ess_tpahere_cmd:
entry.setValue(tpahere_cmd_wrapper)
elif value is ess_mail_cmd:
entry.setValue(mail_cmd_wrapper)
else:
changed = False
if changed:
wrapped_commands.append(entry.getKey())
info("[imbusy] wrapped commands: /" + ", /".join(wrapped_commands))
except:
error("[Imbusy] Failed to wrap essentials commands")
error(trace())

View File

@ -21,10 +21,8 @@ except:
def on_enable(): def on_enable():
if "blockplacemods" in shared["modules"]: if "blockplacemods" in shared["modules"]:
shared["modules"]["blockplacemods"].schedule_torch_breaker() shared["modules"]["blockplacemods"].schedule_torch_breaker()
if "imbusy" in shared["modules"]:
shared["modules"]["imbusy"].replace_ess_commands()
if "serversigns" in shared["modules"]: if "serversigns" in shared["modules"]:
shared["modules"]["serversigns"].check_all_signs_and_force_commands() shared["modules"]["serversigns"].check_all_signs_and_intercept_command()
info("RedstonerUtils enabled!") info("RedstonerUtils enabled!")

123
misc.py
View File

@ -8,8 +8,131 @@ import org.bukkit.inventory.ItemStack as ItemStack
import org.bukkit.Bukkit as Bukkit import org.bukkit.Bukkit as Bukkit
from basecommands import simplecommand, Validate from basecommands import simplecommand, Validate
import java.util.Arrays as Arrays import java.util.Arrays as Arrays
import org.bukkit.command.Command as Command
import java.util.HashMap as HashMap
"""
This section is a little tool to intercept commands.
It can intercept any aliases of the command, which it does by default.
An intercepter is a function that is run before the command is executed,
which takes the parameters: sender, args
It should return a boolean value to specify whether it should be executed.
The optional tab completer is a function that takes the following parameters:
original_completions (List<String>), sender, alias, args
and it should return a List<String>. By default it returns the original completions.
"""
class CommandInterceptions:
def __init__(self):
raise Exception("Instances of 'CommandInterceptions' are not meant to exist")
registrations = {} # cmd : (intercepter, tab_completer)
interceptions = {} # original_obj : replacement_obj
cmd_map = None # CustomHashMap
@staticmethod
def register(plugin_name, command, intercepter, tab_completer = None):
key = (plugin_name + ":" + command if plugin_name else command).lower()
CommandInterceptions.registrations[key] = (intercepter, tab_completer)
if CommandInterceptions.cmd_map.containsKey(key):
CommandInterceptions.add_interception(key, CommandInterceptions.cmd_map.get(key))
class Intercepter(Command):
def __init__(self, wrapped, intercepter, tab_completer):
try:
Command.__init__(self, wrapped.getName())
self.setDescription(wrapped.getDescription())
self.setPermission(wrapped.getPermission())
self.setUsage(wrapped.getUsage())
# I had to dig deep in spigot code to find out what's happening
# But without this snippet, the server shuts itself down because
# commands can't be aliases for themselves (shrug) :)
aliases = wrapped.getAliases()
try:
aliases.remove(wrapped.getLabel())
except:
pass
self.setAliases(aliases)
self.wrapped = wrapped
self.intercepter = intercepter
self.tab_completer = tab_completer
except:
error(trace())
def execute(self, sender, label, args):
if self.intercepter(sender, args):
return self.wrapped.execute(sender, label, args)
return True
def tabComplete(self, sender, alias, args):
return self.tab_completer(self.wrapped.tabComplete(sender, alias, args), sender, alias, args)
@staticmethod
def add_interception(key, intercepted):
registration = CommandInterceptions.registrations[key]
tab_completer = registration[1]
if tab_completer is None:
tab_completer = lambda original_completions, sender, alias, args: original_completions
cmd_intercepter = CommandInterceptions.Intercepter(intercepted, registration[0], tab_completer)
CommandInterceptions.interceptions[intercepted] = cmd_intercepter
for entry in CommandInterceptions.cmd_map.entrySet():
if entry.getValue() is intercepted:
entry.setValue(cmd_intercepter)
@staticmethod
def init_interceptions():
# The map object in the command map used by spigot is previously already a hashmap, we replace its put() here
class CustomHashMap(HashMap):
def __init__(self, replaced):
HashMap.__init__(self)
for entry in replaced.entrySet():
self.put(entry.getKey(), entry.getValue())
def java_put(self, key, value):
return HashMap.put(self, key, value)
def put(self, key, value):
for intercepted in CommandInterceptions.interceptions:
if value is intercepted:
return self.java_put(key, CommandInterceptions.interceptions[intercepted])
ret = self.java_put(key, value)
if key in CommandInterceptions.registrations:
CommandInterceptions.add_interception(key, value)
return ret
try:
map_field = server.getPluginManager().getClass().getDeclaredField("commandMap")
map_field.setAccessible(True)
command_map = map_field.get(server.getPluginManager())
commands_field = command_map.getClass().getDeclaredField("knownCommands")
commands_field.setAccessible(True)
CommandInterceptions.cmd_map = CustomHashMap(commands_field.get(command_map))
commands_field.set(command_map, CommandInterceptions.cmd_map)
except:
error("[Interceptions] Failed to wrap the command map:")
error(trace())
CommandInterceptions.init_interceptions()
def worldedit_calc_intercepter(sender, args):
if not sender.hasPermission("worldedit.calc"):
noperm(sender)
return False
return True
CommandInterceptions.register("worldedit", "/calc", worldedit_calc_intercepter)
@hook.event("player.PlayerJoinEvent", "monitor") @hook.event("player.PlayerJoinEvent", "monitor")
def on_join(event): def on_join(event):

View File

@ -392,25 +392,32 @@ def can_build2(player, block):
return not event.isCancelled() return not event.isCancelled()
def check_all_signs_and_force_commands(): def check_all_signs_and_intercept_command():
for loc in signs:
if server.getWorld(loc[0]).getBlockAt(loc[1], loc[2], loc[3]).getType() not in (Material.WALL_SIGN, Material.SIGN_POST):
del signs[loc]
try: try:
map_field = server.getPluginManager().getClass().getDeclaredField("commandMap") #CommandInterceptions = shared["modules"]["misc"].CommandInterceptions
map_field.setAccessible(True) #rsutils_cmd = CommandInterceptions.cmd_map.get("redstonerutils:serversigns")
command_map = map_field.get(server.getPluginManager()) #label = rsutils_cmd.getLabel()
commands_field = command_map.getClass().getDeclaredField("knownCommands") def interception(sender, args):
commands_field.setAccessible(True) svs_command(sender, None, None, args)
map = commands_field.get(command_map) return False
rsutils_cmd = map.get("redstonerutils:serversigns") def tab_completion(original, sender, alias, args):
map.put("svs", rsutils_cmd) return None
map.put("serversigns", rsutils_cmd)
map.put("signsmsg", rsutils_cmd)
shared["modules"]["misc"].CommandInterceptions.register("serversigns", "serversigns", interception, tab_completion)
except: except:
error("[Serversigns] failed to force commands") error("[Serversigns] failed to force commands")
error(trace()) error(trace())
"""
Check if all registered signs have an associated sign block in the world.
WorldEdit commands could remove them without notification.
Pistons might also be able to achieve the same thing.
A sign missing from the world won't affect the world so it only checks on start.
"""
for loc in signs:
if server.getWorld(loc[0]).getBlockAt(loc[1], loc[2], loc[3]).getType() not in (Material.WALL_SIGN, Material.SIGN_POST):
del signs[loc]