Initial commit: GSPro Remote MVP - Phase 1 complete

This commit is contained in:
Ryan Hill 2025-11-13 15:38:58 -06:00
commit 74ca4b38eb
50 changed files with 12818 additions and 0 deletions

View file

@ -0,0 +1,21 @@
"""
Core modules for GSPro Remote backend.
"""
from .config import AppConfig, get_config
from .input_ctrl import press_key, press_keys, key_down, key_up, focus_window, is_gspro_running
from .screen import capture_screen, get_screen_size, capture_region
__all__ = [
"AppConfig",
"get_config",
"press_key",
"press_keys",
"key_down",
"key_up",
"focus_window",
"is_gspro_running",
"capture_screen",
"get_screen_size",
"capture_region",
]

193
backend/app/core/config.py Normal file
View file

@ -0,0 +1,193 @@
"""
Configuration management for GSPro Remote.
"""
import json
import logging
from pathlib import Path
from typing import Optional
from functools import lru_cache
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
logger = logging.getLogger(__name__)
class ServerConfig(BaseModel):
"""Server configuration settings."""
host: str = Field("0.0.0.0", description="Server host address")
port: int = Field(5005, description="Server port")
mdns_enabled: bool = Field(True, description="Enable mDNS service discovery")
class CaptureConfig(BaseModel):
"""Screen capture configuration settings."""
fps: int = Field(30, description="Frames per second for streaming")
quality: int = Field(85, description="JPEG quality (0-100)")
resolution: str = Field("720p", description="Stream resolution")
region_x: int = Field(0, description="Map region X coordinate")
region_y: int = Field(0, description="Map region Y coordinate")
region_width: int = Field(640, description="Map region width")
region_height: int = Field(480, description="Map region height")
class GSProConfig(BaseModel):
"""GSPro application configuration settings."""
window_title: str = Field("GSPro", description="GSPro window title")
auto_focus: bool = Field(True, description="Auto-focus GSPro window before sending keys")
key_delay: float = Field(0.05, description="Default delay between key presses (seconds)")
class VisionConfig(BaseModel):
"""Computer vision configuration settings (for V2 features)."""
enabled: bool = Field(False, description="Enable vision features")
ocr_engine: str = Field("easyocr", description="OCR engine to use (easyocr or tesseract)")
confidence_threshold: float = Field(0.7, description="Minimum confidence for OCR detection")
class AppConfig(BaseSettings):
"""Main application configuration."""
server: ServerConfig = Field(default_factory=ServerConfig)
capture: CaptureConfig = Field(default_factory=CaptureConfig)
gspro: GSProConfig = Field(default_factory=GSProConfig)
vision: VisionConfig = Field(default_factory=VisionConfig)
config_path: Optional[Path] = None
debug: bool = Field(False, description="Enable debug mode")
class Config:
env_prefix = "GSPRO_REMOTE_"
env_nested_delimiter = "__"
case_sensitive = False
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.config_path is None:
self.config_path = self._get_default_config_path()
self.load()
@staticmethod
def _get_default_config_path() -> Path:
"""Get the default configuration file path."""
import os
if os.name == "nt": # Windows
base_path = Path(os.environ.get("LOCALAPPDATA", ""))
if not base_path:
base_path = Path.home() / "AppData" / "Local"
else: # Unix-like
base_path = Path.home() / ".config"
config_dir = base_path / "GSPro Remote"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir / "config.json"
def load(self) -> None:
"""Load configuration from file."""
if self.config_path and self.config_path.exists():
try:
with open(self.config_path, "r") as f:
data = json.load(f)
# Update configuration with loaded data
if "server" in data:
self.server = ServerConfig(**data["server"])
if "capture" in data:
self.capture = CaptureConfig(**data["capture"])
if "gspro" in data:
self.gspro = GSProConfig(**data["gspro"])
if "vision" in data:
self.vision = VisionConfig(**data["vision"])
if "debug" in data:
self.debug = data["debug"]
logger.info(f"Configuration loaded from {self.config_path}")
except Exception as e:
logger.warning(f"Failed to load configuration: {e}")
self.save() # Save default configuration
else:
# Create default configuration file
self.save()
logger.info(f"Created default configuration at {self.config_path}")
def save(self) -> None:
"""Save configuration to file."""
if self.config_path:
try:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"server": self.server.model_dump(),
"capture": self.capture.model_dump(),
"gspro": self.gspro.model_dump(),
"vision": self.vision.model_dump(),
"debug": self.debug,
}
with open(self.config_path, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Configuration saved to {self.config_path}")
except Exception as e:
logger.error(f"Failed to save configuration: {e}")
def update(self, **kwargs) -> None:
"""Update configuration with new values."""
for key, value in kwargs.items():
if hasattr(self, key):
if isinstance(value, dict):
# Update nested configuration
current = getattr(self, key)
if isinstance(current, BaseModel):
for sub_key, sub_value in value.items():
if hasattr(current, sub_key):
setattr(current, sub_key, sub_value)
else:
setattr(self, key, value)
self.save()
def reset(self) -> None:
"""Reset configuration to defaults."""
self.server = ServerConfig()
self.capture = CaptureConfig()
self.gspro = GSProConfig()
self.vision = VisionConfig()
self.debug = False
self.save()
def to_dict(self) -> dict:
"""Convert configuration to dictionary."""
return {
"server": self.server.model_dump(),
"capture": self.capture.model_dump(),
"gspro": self.gspro.model_dump(),
"vision": self.vision.model_dump(),
"debug": self.debug,
"config_path": str(self.config_path) if self.config_path else None,
}
# Global configuration instance
_config: Optional[AppConfig] = None
@lru_cache(maxsize=1)
def get_config() -> AppConfig:
"""Get the global configuration instance."""
global _config
if _config is None:
_config = AppConfig()
return _config
def reset_config() -> None:
"""Reset the global configuration instance."""
global _config
_config = None
get_config.cache_clear()

View file

@ -0,0 +1,350 @@
"""
Windows input control module for simulating keyboard inputs to GSPro.
"""
import logging
import time
from typing import Optional, List
try:
import pydirectinput
import win32gui
import win32con
import win32process
import psutil
except ImportError as e:
raise ImportError(f"Required Windows dependencies not installed: {e}")
logger = logging.getLogger(__name__)
# Configure pydirectinput
pydirectinput.PAUSE = 0.01 # Reduce default pause between actions
def is_gspro_running(window_title: str = "GSPro") -> bool:
"""
Check if GSPro is running by looking for its window.
Args:
window_title: The window title to search for
Returns:
True if GSPro window is found, False otherwise
"""
def enum_window_callback(hwnd, windows):
if win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd):
window_text = win32gui.GetWindowText(hwnd)
if window_title.lower() in window_text.lower():
windows.append(hwnd)
return True
windows = []
win32gui.EnumWindows(enum_window_callback, windows)
return len(windows) > 0
def find_gspro_window(window_title: str = "GSPro") -> Optional[int]:
"""
Find the GSPro window handle.
Args:
window_title: The window title to search for
Returns:
Window handle if found, None otherwise
"""
def enum_window_callback(hwnd, result):
window_text = win32gui.GetWindowText(hwnd)
if window_title.lower() in window_text.lower():
result.append(hwnd)
return True
result = []
win32gui.EnumWindows(enum_window_callback, result)
if result:
return result[0]
return None
def focus_window(window_title: str = "GSPro") -> bool:
"""
Focus the GSPro window to ensure it receives keyboard input.
Args:
window_title: The window title to focus
Returns:
True if window was focused successfully, False otherwise
"""
try:
hwnd = find_gspro_window(window_title)
if hwnd:
# Restore window if minimized
if win32gui.IsIconic(hwnd):
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
# Set foreground window
win32gui.SetForegroundWindow(hwnd)
# Small delay to ensure window is focused
time.sleep(0.1)
logger.debug(f"Focused window: {window_title}")
return True
else:
logger.warning(f"Window not found: {window_title}")
return False
except Exception as e:
logger.error(f"Failed to focus window: {e}")
return False
def press_key(key: str, interval: float = 0.0) -> None:
"""
Simulate a single key press.
Args:
key: The key to press (e.g., 'a', 'space', 'f1', 'up')
interval: Time to wait after pressing the key
"""
try:
# Normalize key name for pydirectinput
key_normalized = key.lower().strip()
# Handle special key mappings
key_mappings = {
"ctrl": "ctrl",
"control": "ctrl",
"alt": "alt",
"shift": "shift",
"tab": "tab",
"space": "space",
"enter": "enter",
"return": "enter",
"escape": "esc",
"esc": "esc",
"backspace": "backspace",
"delete": "delete",
"del": "delete",
"insert": "insert",
"ins": "insert",
"home": "home",
"end": "end",
"pageup": "pageup",
"pagedown": "pagedown",
"up": "up",
"down": "down",
"left": "left",
"right": "right",
"plus": "+",
"minus": "-",
"apostrophe": "'",
"quote": "'",
}
# Map key if needed
key_to_press = key_mappings.get(key_normalized, key_normalized)
# Press the key
pydirectinput.press(key_to_press)
if interval > 0:
time.sleep(interval)
logger.debug(f"Pressed key: {key}")
except Exception as e:
logger.error(f"Failed to press key '{key}': {e}")
raise
def press_keys(keys: str, interval: float = 0.0) -> None:
"""
Simulate a key combination or sequence.
Args:
keys: Key combination string (e.g., 'ctrl+m', 'shift+tab')
interval: Time to wait after pressing the keys
"""
try:
# Check if it's a key combination
if "+" in keys:
# Split into modifiers and key
parts = keys.lower().split("+")
modifiers = []
main_key = parts[-1]
# Identify modifiers
for part in parts[:-1]:
if part in ["ctrl", "control"]:
modifiers.append("ctrl")
elif part in ["alt"]:
modifiers.append("alt")
elif part in ["shift"]:
modifiers.append("shift")
elif part in ["win", "windows", "cmd", "command"]:
modifiers.append("win")
# Press combination using hotkey
if modifiers:
hotkey_parts = modifiers + [main_key]
pydirectinput.hotkey(*hotkey_parts)
else:
press_key(main_key)
else:
# Single key press
press_key(keys)
if interval > 0:
time.sleep(interval)
logger.debug(f"Pressed keys: {keys}")
except Exception as e:
logger.error(f"Failed to press keys '{keys}': {e}")
raise
def key_down(key: str) -> None:
"""
Hold a key down.
Args:
key: The key to hold down
"""
try:
key_normalized = key.lower().strip()
pydirectinput.keyDown(key_normalized)
logger.debug(f"Key down: {key}")
except Exception as e:
logger.error(f"Failed to hold key down '{key}': {e}")
raise
def key_up(key: str) -> None:
"""
Release a held key.
Args:
key: The key to release
"""
try:
key_normalized = key.lower().strip()
pydirectinput.keyUp(key_normalized)
logger.debug(f"Key up: {key}")
except Exception as e:
logger.error(f"Failed to release key '{key}': {e}")
raise
def type_text(text: str, interval: float = 0.0) -> None:
"""
Type a string of text.
Args:
text: The text to type
interval: Time between each character
"""
try:
pydirectinput.typewrite(text, interval=interval)
logger.debug(f"Typed text: {text[:20]}...")
except Exception as e:
logger.error(f"Failed to type text: {e}")
raise
def mouse_click(x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> None:
"""
Simulate a mouse click.
Args:
x: X coordinate (None for current position)
y: Y coordinate (None for current position)
button: Mouse button ('left', 'right', 'middle')
"""
try:
if x is not None and y is not None:
pydirectinput.click(x, y, button=button)
logger.debug(f"Mouse click at ({x}, {y}) with {button} button")
else:
pydirectinput.click(button=button)
logger.debug(f"Mouse click with {button} button")
except Exception as e:
logger.error(f"Failed to perform mouse click: {e}")
raise
def mouse_move(x: int, y: int, duration: float = 0.0) -> None:
"""
Move the mouse cursor.
Args:
x: Target X coordinate
y: Target Y coordinate
duration: Time to take for the movement
"""
try:
if duration > 0:
pydirectinput.moveTo(x, y, duration=duration)
else:
pydirectinput.moveTo(x, y)
logger.debug(f"Mouse moved to ({x}, {y})")
except Exception as e:
logger.error(f"Failed to move mouse: {e}")
raise
def get_gspro_process_info() -> Optional[dict]:
"""
Get information about the GSPro process if it's running.
Returns:
Dictionary with process info or None if not found
"""
try:
for proc in psutil.process_iter(["pid", "name", "cpu_percent", "memory_info"]):
if "gspro" in proc.info["name"].lower():
return {
"pid": proc.info["pid"],
"name": proc.info["name"],
"cpu_percent": proc.info["cpu_percent"],
"memory_mb": proc.info["memory_info"].rss / 1024 / 1024 if proc.info["memory_info"] else 0,
}
except Exception as e:
logger.error(f"Failed to get GSPro process info: {e}")
return None
# Test function for development
def test_input_control():
"""Test function to verify input control is working."""
print("Testing input control...")
# Check if GSPro is running
if is_gspro_running():
print("✓ GSPro is running")
# Try to focus the window
if focus_window():
print("✓ GSPro window focused")
else:
print("✗ Could not focus GSPro window")
else:
print("✗ GSPro is not running")
print("Please start GSPro and try again")
return
# Get process info
info = get_gspro_process_info()
if info:
print(
f"✓ GSPro process found: PID={info['pid']}, CPU={info['cpu_percent']:.1f}%, Memory={info['memory_mb']:.1f}MB"
)
print("\nInput control test complete!")
if __name__ == "__main__":
# Run test when module is executed directly
test_input_control()

335
backend/app/core/mdns.py Normal file
View file

@ -0,0 +1,335 @@
"""
mDNS service registration for GSPro Remote.
Allows the application to be discovered on the local network.
"""
import logging
import socket
import threading
from typing import Optional, Dict, Any
try:
from zeroconf import ServiceInfo, Zeroconf, IPVersion
except ImportError as e:
raise ImportError(f"Zeroconf library not installed: {e}")
logger = logging.getLogger(__name__)
class MDNSService:
"""
Manages mDNS/Bonjour service registration for network discovery.
"""
def __init__(
self,
name: str = "gsproapp",
port: int = 5005,
service_type: str = "_http._tcp.local.",
properties: Optional[Dict[str, Any]] = None,
):
"""
Initialize mDNS service.
Args:
name: Service name (will be accessible as {name}.local)
port: Port number the service is running on
service_type: mDNS service type
properties: Additional service properties
"""
self.name = name
self.port = port
self.service_type = service_type
self.properties = properties or {}
self.zeroconf: Optional[Zeroconf] = None
self.service_info: Optional[ServiceInfo] = None
self.is_running = False
self._lock = threading.Lock()
def _get_local_ip(self) -> str:
"""
Get the local IP address of the machine.
Returns:
Local IP address as string
"""
try:
# Create a socket to determine the local IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# Connect to a public DNS server to determine local interface
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
# Fallback to localhost if can't determine
return "127.0.0.1"
def _create_service_info(self) -> ServiceInfo:
"""
Create the ServiceInfo object for registration.
Returns:
Configured ServiceInfo object
"""
local_ip = self._get_local_ip()
hostname = socket.gethostname()
# Create fully qualified service name
service_name = f"{self.name}.{self.service_type}"
# Add default properties
default_properties = {
"version": "0.1.0",
"platform": "windows",
"api": "rest",
"ui": "web",
}
# Merge with custom properties
all_properties = {**default_properties, **self.properties}
# Convert properties to bytes
properties_bytes = {}
for key, value in all_properties.items():
if isinstance(value, str):
properties_bytes[key] = value.encode("utf-8")
elif isinstance(value, bytes):
properties_bytes[key] = value
else:
properties_bytes[key] = str(value).encode("utf-8")
# Create service info
service_info = ServiceInfo(
type_=self.service_type,
name=service_name,
addresses=[socket.inet_aton(local_ip)],
port=self.port,
properties=properties_bytes,
server=f"{hostname}.local.",
)
return service_info
def start(self) -> bool:
"""
Start the mDNS service registration.
Returns:
True if service started successfully, False otherwise
"""
with self._lock:
if self.is_running:
logger.warning("mDNS service is already running")
return True
try:
# Create Zeroconf instance
self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only)
# Create and register service
self.service_info = self._create_service_info()
self.zeroconf.register_service(self.service_info)
self.is_running = True
logger.info(f"mDNS service registered: {self.name}.local:{self.port} (type: {self.service_type})")
return True
except Exception as e:
logger.error(f"Failed to start mDNS service: {e}")
self.cleanup()
return False
def stop(self) -> None:
"""Stop the mDNS service registration."""
with self._lock:
if not self.is_running:
return
self.cleanup()
self.is_running = False
logger.info("mDNS service stopped")
def cleanup(self) -> None:
"""Clean up mDNS resources."""
try:
if self.zeroconf and self.service_info:
self.zeroconf.unregister_service(self.service_info)
if self.zeroconf:
self.zeroconf.close()
self.zeroconf = None
self.service_info = None
except Exception as e:
logger.error(f"Error during mDNS cleanup: {e}")
def update_properties(self, properties: Dict[str, Any]) -> bool:
"""
Update service properties.
Args:
properties: New properties to set
Returns:
True if properties updated successfully, False otherwise
"""
with self._lock:
if not self.is_running:
logger.warning("Cannot update properties: service is not running")
return False
try:
self.properties.update(properties)
# Recreate and re-register service with new properties
if self.zeroconf and self.service_info:
self.zeroconf.unregister_service(self.service_info)
self.service_info = self._create_service_info()
self.zeroconf.register_service(self.service_info)
logger.info("mDNS service properties updated")
return True
except Exception as e:
logger.error(f"Failed to update mDNS properties: {e}")
return False
def get_url(self) -> str:
"""
Get the URL for accessing the service.
Returns:
Service URL
"""
return f"http://{self.name}.local:{self.port}"
def __enter__(self):
"""Context manager entry."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()
class MDNSBrowser:
"""
Browse for mDNS services on the network.
Useful for discovering other GSPro Remote instances.
"""
def __init__(self, service_type: str = "_http._tcp.local."):
"""
Initialize mDNS browser.
Args:
service_type: Type of services to browse for
"""
self.service_type = service_type
self.services: Dict[str, Dict[str, Any]] = {}
self.zeroconf: Optional[Zeroconf] = None
def browse(self, timeout: float = 5.0) -> Dict[str, Dict[str, Any]]:
"""
Browse for services on the network.
Args:
timeout: Time to wait for services (seconds)
Returns:
Dictionary of discovered services
"""
try:
from zeroconf import ServiceBrowser, ServiceListener
import time
class Listener(ServiceListener):
def __init__(self, browser):
self.browser = browser
def add_service(self, zeroconf, service_type, name):
info = zeroconf.get_service_info(service_type, name)
if info:
self.browser.services[name] = {
"name": name,
"address": socket.inet_ntoa(info.addresses[0]) if info.addresses else None,
"port": info.port,
"properties": info.properties,
}
def remove_service(self, zeroconf, service_type, name):
self.browser.services.pop(name, None)
def update_service(self, zeroconf, service_type, name):
pass
self.zeroconf = Zeroconf(ip_version=IPVersion.V4Only)
listener = Listener(self)
browser = ServiceBrowser(self.zeroconf, self.service_type, listener)
# Wait for services to be discovered
time.sleep(timeout)
browser.cancel()
self.zeroconf.close()
return self.services
except Exception as e:
logger.error(f"Failed to browse for services: {e}")
return {}
# Test function for development
def test_mdns_service():
"""Test mDNS service registration."""
import time
print("Testing mDNS service registration...")
# Test service registration
service = MDNSService(
name="gsproapp-test",
port=5005,
properties={"test": "true", "instance": "development"},
)
if service.start():
print(f"✓ mDNS service started: {service.get_url()}")
print(f" You should be able to access it at: http://gsproapp-test.local:5005")
# Keep service running for 10 seconds
print(" Service will run for 10 seconds...")
time.sleep(10)
# Test property update
if service.update_properties({"status": "running", "uptime": "10s"}):
print("✓ Properties updated successfully")
service.stop()
print("✓ mDNS service stopped")
else:
print("✗ Failed to start mDNS service")
# Test service browsing
print("\nBrowsing for HTTP services on the network...")
browser = MDNSBrowser()
services = browser.browse(timeout=3.0)
if services:
print(f"Found {len(services)} services:")
for name, info in services.items():
print(f" - {name}: {info['address']}:{info['port']}")
else:
print("No services found")
print("\nmDNS test complete!")
if __name__ == "__main__":
test_mdns_service()

370
backend/app/core/screen.py Normal file
View file

@ -0,0 +1,370 @@
"""
Screen capture utilities for GSPro Remote.
"""
import logging
from typing import Optional, Tuple, Dict, Any
from io import BytesIO
import base64
try:
import mss
import mss.tools
from PIL import Image
import cv2
import numpy as np
except ImportError as e:
raise ImportError(f"Required screen capture dependencies not installed: {e}")
logger = logging.getLogger(__name__)
class ScreenCapture:
"""Manages screen capture operations."""
def __init__(self):
"""Initialize screen capture manager."""
self.sct = mss.mss()
self._monitor_info = None
def get_monitors(self) -> list[dict]:
"""
Get information about all available monitors.
Returns:
List of monitor information dictionaries
"""
monitors = []
for i, monitor in enumerate(self.sct.monitors):
monitors.append(
{
"index": i,
"left": monitor["left"],
"top": monitor["top"],
"width": monitor["width"],
"height": monitor["height"],
"is_primary": i == 0, # Index 0 is combined virtual screen
}
)
return monitors
def get_primary_monitor(self) -> dict:
"""
Get the primary monitor information.
Returns:
Primary monitor information
"""
# Index 1 is typically the primary monitor in mss
return self.sct.monitors[1] if len(self.sct.monitors) > 1 else self.sct.monitors[0]
def capture_screen(self, monitor_index: int = 1) -> np.ndarray:
"""
Capture the entire screen.
Args:
monitor_index: Index of the monitor to capture (0 for all, 1 for primary)
Returns:
Captured screen as numpy array (BGR format)
"""
try:
monitor = self.sct.monitors[monitor_index]
screenshot = self.sct.grab(monitor)
# Convert to numpy array (BGR format for OpenCV compatibility)
img = np.array(screenshot)
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
return img
except Exception as e:
logger.error(f"Failed to capture screen: {e}")
raise
def capture_region(self, x: int, y: int, width: int, height: int, monitor_index: int = 1) -> np.ndarray:
"""
Capture a specific region of the screen.
Args:
x: X coordinate of the region (relative to monitor)
y: Y coordinate of the region (relative to monitor)
width: Width of the region
height: Height of the region
monitor_index: Index of the monitor to capture from
Returns:
Captured region as numpy array (BGR format)
"""
try:
monitor = self.sct.monitors[monitor_index]
# Define region to capture
region = {
"left": monitor["left"] + x,
"top": monitor["top"] + y,
"width": width,
"height": height,
}
screenshot = self.sct.grab(region)
# Convert to numpy array
img = np.array(screenshot)
img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR)
return img
except Exception as e:
logger.error(f"Failed to capture region: {e}")
raise
def capture_window(self, window_title: str) -> Optional[np.ndarray]:
"""
Capture a specific window by title.
Args:
window_title: Title of the window to capture
Returns:
Captured window as numpy array or None if window not found
"""
try:
import win32gui
def enum_window_callback(hwnd, windows):
if win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd):
window_text = win32gui.GetWindowText(hwnd)
if window_title.lower() in window_text.lower():
windows.append(hwnd)
return True
windows = []
win32gui.EnumWindows(enum_window_callback, windows)
if not windows:
logger.warning(f"Window not found: {window_title}")
return None
# Get window rectangle
hwnd = windows[0]
rect = win32gui.GetWindowRect(hwnd)
x, y, x2, y2 = rect
width = x2 - x
height = y2 - y
# Capture the window region
return self.capture_region(x, y, width, height, monitor_index=0)
except Exception as e:
logger.error(f"Failed to capture window: {e}")
return None
def image_to_base64(self, image: np.ndarray, quality: int = 85, format: str = "JPEG") -> str:
"""
Convert an image array to base64 string.
Args:
image: Image as numpy array (BGR format)
quality: JPEG quality (1-100)
format: Image format (JPEG, PNG)
Returns:
Base64 encoded image string
"""
try:
# Convert BGR to RGB for PIL
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_image)
# Save to bytes
buffer = BytesIO()
if format.upper() == "JPEG":
pil_image.save(buffer, format=format, quality=quality, optimize=True)
else:
pil_image.save(buffer, format=format)
# Encode to base64
buffer.seek(0)
base64_string = base64.b64encode(buffer.getvalue()).decode("utf-8")
return base64_string
except Exception as e:
logger.error(f"Failed to convert image to base64: {e}")
raise
def resize_image(self, image: np.ndarray, width: Optional[int] = None, height: Optional[int] = None) -> np.ndarray:
"""
Resize an image while maintaining aspect ratio.
Args:
image: Image as numpy array
width: Target width (None to calculate from height)
height: Target height (None to calculate from width)
Returns:
Resized image as numpy array
"""
try:
h, w = image.shape[:2]
if width and not height:
# Calculate height maintaining aspect ratio
height = int(h * (width / w))
elif height and not width:
# Calculate width maintaining aspect ratio
width = int(w * (height / h))
elif not width and not height:
# No resize needed
return image
return cv2.resize(image, (width, height), interpolation=cv2.INTER_AREA)
except Exception as e:
logger.error(f"Failed to resize image: {e}")
raise
def get_resolution_preset(self, preset: str) -> Tuple[int, int]:
"""
Get width and height for a resolution preset.
Args:
preset: Resolution preset (e.g., '720p', '1080p', '480p')
Returns:
Tuple of (width, height)
"""
presets = {
"480p": (854, 480),
"540p": (960, 540),
"720p": (1280, 720),
"900p": (1600, 900),
"1080p": (1920, 1080),
"1440p": (2560, 1440),
"2160p": (3840, 2160),
"4k": (3840, 2160),
}
return presets.get(preset.lower(), (1280, 720))
def close(self):
"""Close the screen capture resources."""
if hasattr(self, "sct"):
self.sct.close()
# Global screen capture instance
_screen_capture: Optional[ScreenCapture] = None
def get_screen_capture() -> ScreenCapture:
"""
Get the global screen capture instance.
Returns:
ScreenCapture instance
"""
global _screen_capture
if _screen_capture is None:
_screen_capture = ScreenCapture()
return _screen_capture
def capture_screen(monitor_index: int = 1) -> np.ndarray:
"""
Capture the entire screen.
Args:
monitor_index: Index of the monitor to capture
Returns:
Captured screen as numpy array
"""
return get_screen_capture().capture_screen(monitor_index)
def capture_region(x: int, y: int, width: int, height: int) -> np.ndarray:
"""
Capture a specific region of the screen.
Args:
x: X coordinate of the region
y: Y coordinate of the region
width: Width of the region
height: Height of the region
Returns:
Captured region as numpy array
"""
return get_screen_capture().capture_region(x, y, width, height)
def get_screen_size() -> Tuple[int, int]:
"""
Get the primary screen size.
Returns:
Tuple of (width, height)
"""
monitor = get_screen_capture().get_primary_monitor()
return monitor["width"], monitor["height"]
def capture_gspro_window(window_title: str = "GSPro") -> Optional[np.ndarray]:
"""
Capture the GSPro window.
Args:
window_title: GSPro window title
Returns:
Captured window as numpy array or None if not found
"""
return get_screen_capture().capture_window(window_title)
# Test function for development
def test_screen_capture():
"""Test screen capture functionality."""
print("Testing screen capture...")
capture = ScreenCapture()
# Get monitor information
monitors = capture.get_monitors()
print(f"Found {len(monitors)} monitors:")
for monitor in monitors:
print(
f" Monitor {monitor['index']}: {monitor['width']}x{monitor['height']} at ({monitor['left']}, {monitor['top']})"
)
# Capture primary screen
try:
screen = capture.capture_screen()
print(f"✓ Captured primary screen: {screen.shape}")
except Exception as e:
print(f"✗ Failed to capture screen: {e}")
# Test region capture
try:
region = capture.capture_region(100, 100, 640, 480)
print(f"✓ Captured region: {region.shape}")
except Exception as e:
print(f"✗ Failed to capture region: {e}")
# Test image to base64 conversion
try:
base64_str = capture.image_to_base64(region)
print(f"✓ Converted to base64: {len(base64_str)} chars")
except Exception as e:
print(f"✗ Failed to convert to base64: {e}")
# Test resolution presets
presets = ["480p", "720p", "1080p"]
for preset in presets:
width, height = capture.get_resolution_preset(preset)
print(f" {preset}: {width}x{height}")
capture.close()
print("\nScreen capture test complete!")
if __name__ == "__main__":
test_screen_capture()