Comment
Author: Admin | 2025-04-28
Class BTMiner(StockFirmware): """Base handler for BTMiner based miners.""" _rpc_cls = BTMinerRPCAPI rpc: BTMinerRPCAPI data_locations = BTMINER_DATA_LOC supports_shutdown = True supports_power_modes = True async def _reset_rpc_pwd_to_admin(self, pwd: str): try: data = await self.rpc.update_pwd(pwd, "admin") except APIError: return False if data: if "Code" in data.keys(): if data["Code"] == 131: return True return False async def fault_light_off(self) -> bool: try: data = await self.rpc.set_led(auto=True) except APIError: return False if data: if "Code" in data.keys(): if data["Code"] == 131: self.light = False return True return False async def fault_light_on(self) -> bool: try: data = await self.rpc.set_led(auto=False) await self.rpc.set_led( auto=False, color="green", start=0, period=1, duration=0 ) except APIError: return False if data: if "Code" in data.keys(): if data["Code"] == 131: self.light = True return True return False async def reboot(self) -> bool: try: data = await self.rpc.reboot() except APIError: return False if data.get("Msg"): if data["Msg"] == "API command OK": return True return False async def restart_backend(self) -> bool: try: data = await self.rpc.restart() except APIError: return False if data.get("Msg"): if data["Msg"] == "API command OK": return True return False async def stop_mining(self) -> bool: try: data = await self.rpc.power_off(respbefore=True) except APIError: return False if data.get("Msg"): if data["Msg"] == "API command OK": return True return False async def resume_mining(self) -> bool: try: data = await self.rpc.power_on() except APIError: return False if data.get("Msg"): if data["Msg"] == "API command OK": return True return False async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None: self.config = config conf = config.as_wm(user_suffix=user_suffix) pools_conf = conf["pools"] try: await self.rpc.update_pools(**pools_conf) if conf["mode"] == "normal": await self.rpc.set_normal_power() elif conf["mode"] == "high": await self.rpc.set_high_power() elif conf["mode"] == "low": await self.rpc.set_low_power() elif conf["mode"] == "power_tuning": await self.rpc.adjust_power_limit(conf["power_tuning"]["wattage"]) except APIError: # cannot update, no API access usually pass async def get_config(self) -> MinerConfig: pools = None summary = None status = None try: data = await self.rpc.multicommand("pools", "summary", "status") pools = data["pools"][0] summary = data["summary"][0] status = data["status"][0] except APIError as e: logging.warning(e) except LookupError: pass if pools is not None: cfg = MinerConfig.from_api(pools) else: cfg = MinerConfig() is_mining = await self._is_mining(status) if not is_mining: cfg.mining_mode = MiningModeConfig.sleep() return cfg if summary is not None: mining_mode = None try: mining_mode = summary["SUMMARY"][0]["Power Mode"] except LookupError: pass if mining_mode == "High": cfg.mining_mode = MiningModeConfig.high() return cfg elif mining_mode == "Low": cfg.mining_mode = MiningModeConfig.low() return cfg try: power_lim = summary["SUMMARY"][0]["Power Limit"] except LookupError: power_lim = None if power_lim is None: cfg.mining_mode = MiningModeConfig.normal() return cfg cfg.mining_mode = MiningModeConfig.power_tuning(power=power_lim) self.config = cfg return self.config async def set_power_limit(self, wattage: int) -> bool: try: await self.rpc.adjust_power_limit(wattage) except Exception as e: logging.warning(f"{self} set_power_limit: {e}") return False else: return True ################################################## ### DATA GATHERING FUNCTIONS (get_{some_data}) ### ################################################## async def _get_mac( self, rpc_summary: dict = None, rpc_get_miner_info: dict = None ) -> Optional[str]:
Add Comment