Source code for tt.plugins.plugin_manager

import asyncio
import importlib
import pkgutil
from datetime import datetime

from asyncz.triggers import CronTrigger, IntervalTrigger

from tt.config import logger, scheduler, settings


[docs] class PluginManager: """ 🔌 Plugins are the core of Talky Trader, they are loaded at startup, to interact with the trading platform. Plugin Manager is used to load, start and dispatch message to the plugins Args: plugin_directory (str): Directory of plugins Returns: None """
[docs] def __init__(self, plugin_directory=None): self.plugin_directory = plugin_directory or settings.plugin_directory self.plugins = []
[docs] def load_plugins(self, plugin_names=None): """ 🔌Load plugins from directory Args: plugin_names (list): List of plugin names to load if None, load all plugins from self.plugin_directory You can use this to minimize the load time, memory and CPU. Returns: None Raises: Exception: If there was an error loading a plugin """ package = importlib.import_module(self.plugin_directory) logger.debug("Loading plugins from: {}", package) if not plugin_names: plugin_names = [ name for _, name, _ in pkgutil.iter_modules(package.__path__) ] for plugin_name in plugin_names: try: module = importlib.import_module( f"{self.plugin_directory}.{plugin_name}" ) self.load_plugin(module, plugin_name) except Exception as e: logger.warning("Error loading plugin {}: {}", plugin_name, e)
[docs] def load_plugin(self, module, plugin_name): """ Load a plugin from a module Args: module (Module): Module plugin_name (str): Plugin name Returns: None """ for name, obj in module.__dict__.items(): if ( isinstance(obj, type) and issubclass(obj, BasePlugin) and obj is not BasePlugin ): plugin_instance = obj() self.plugins.append(plugin_instance) logger.debug("Plugin loaded: {}", name)
[docs] async def start_all_plugins(self): """ Start all plugins Start the scheduler Returns: None """ for plugin in self.plugins: await self.start_plugin(plugin) scheduler.start()
[docs] async def start_plugin(self, plugin): """ Start a plugin Args: plugin (Plugin): Plugin Returns: None """ await plugin.start()
[docs] async def process_message(self, message): """ Send message to plugins Args: message (str): Message Returns: None """ logger.debug("Processing: {}", message) if not message: return tasks = [] for plugin in self.plugins: try: if plugin.should_handle(message): task = asyncio.create_task(plugin.handle_message(message)) tasks.append(task) except Exception as error: logger.error("process {}: {}", plugin, error) await asyncio.gather(*tasks)
[docs] class BasePlugin: """ ⚡ Base Plugin Class use to be inherited by Talky Plugins especially the scheduling, notification and message handling. Scheduling is manage via asyncz lib More info: https://github.com/tarsil/asyncz Args: None Returns: None """
[docs] def __init__(self): self.enabled = False self.scheduler = scheduler
[docs] async def start(self): pass
[docs] async def stop(self): pass
[docs] async def send_notification(self, message): pass
[docs] def should_handle(self, message): """ Returns True if the plugin should handle the message Args: message (str): Message Returns: bool """ if self.enabled: return not message.startswith(settings.bot_ignore)
[docs] async def plugin_notify_schedule_task( self, user_name=None, frequency=8, function=None ): """ Handles task notification every X hours. Defaulted to 8 hours Args: user_name (str): User name frequency (int): Frequency function (function): Function Returns: None """ if function: self.scheduler.add_task( name=user_name, fn=self.send_notification, args=[f"{await function()}"], trigger=IntervalTrigger(hours=frequency), is_enabled=True, )
[docs] async def plugin_notify_cron_task( self, user_name=None, user_day_of_week=None, user_hours=None, user_timezone=None, function=None, ): """ Handles task cron scheduling for notification default set to Tuesday to Thursday at 6AM, 12PM and 6PM UTC via settings Args: user_name (str): User name user_day_of_week (str): Day of week user_hours (str): Hours user_timezone (str): Timezone function (function): Function Returns: None """ if not user_day_of_week: user_day_of_week = settings.user_day_of_week if not user_hours: user_hours = settings.user_hours if not user_timezone: user_timezone = settings.user_timezone if function: self.scheduler.add_task( name=user_name, fn=self.send_notification, args=[f"{await function()}"], trigger=CronTrigger( day_of_week=user_day_of_week, hour=user_hours, timezone=user_timezone, ), is_enabled=True, )
[docs] async def handle_message(self, msg): pass
# if not self.should_handle(msg): # return # command, *args = msg.split(" ") # command = command[1:] # command_mapping = self.get_command_mapping() # if command in command_mapping: # function = command_mapping[command] # await self.send_notification(f"{await function()}")
[docs] def should_handle_timeframe(self): """ Returns True if the current day and time are within the configured trading window. Use to control trading hours for plugins Returns: bool """ if settings.trading_control: logger.debug("Trading control enabled") current_time = datetime.now().time() current_day = datetime.now().strftime("%a").lower() start_time = datetime.strptime(settings.trading_hours_start, "%H:%M").time() end_time = datetime.strptime(settings.trading_hours_end, "%H:%M").time() logger.debug( "Current time: {}, Current day: {}, Start time: {}, End time: {}", current_time, current_day, start_time, end_time, ) return ( current_day in settings.trading_days_allowed and start_time <= current_time <= end_time ) return True