top of page

Write At Command Station V104 High Quality [Top 100 Legit]

#!/usr/bin/env python3
"""
at_command_station.py - Schedule and execute commands at specified times
Version: 1.0.4
A robust task scheduler similar to Unix 'at' command with persistent storage,
job queuing, and reliable execution.
"""
import argparse
import json
import os
import sys
import time
import signal
import threading
import logging
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import subprocess
import re
# ============================================================================
# Configuration
# ============================================================================
DEFAULT_DB_PATH = Path.home() / ".at_station" / "jobs.db"
DEFAULT_LOG_PATH = Path.home() / ".at_station" / "at_station.log"
POLL_INTERVAL_SECONDS = 1
MAX_RETRIES = 3
# ============================================================================
# Data Models
# ============================================================================
@dataclass
class AtJob:
    """Represents a scheduled job."""
    job_id: int
    command: str
    execute_at: datetime  # Unix timestamp internally
    created_at: datetime
    status: str  # pending, running, completed, failed, cancelled
    retry_count: int = 0
    output: Optional[str] = None
    error: Optional[str] = None
def to_dict(self) -> Dict:
        data = asdict(self)
        data['execute_at'] = self.execute_at.isoformat()
        data['created_at'] = self.created_at.isoformat()
        return data
@classmethod
    def from_dict(cls, data: Dict) -> 'AtJob':
        data['execute_at'] = datetime.fromisoformat(data['execute_at'])
        data['created_at'] = datetime.fromisoformat(data['created_at'])
        return cls(**data)
# ============================================================================
# Database Manager
# ============================================================================
class DatabaseManager:
    """Handles persistent storage for scheduled jobs."""
def __init__(self, db_path: Path):
        self.db_path = db_path
        self._init_database()
def _init_database(self):
        """Initialize SQLite database with proper schema."""
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS jobs (
                    job_id INTEGER PRIMARY KEY AUTOINCREMENT,
                    command TEXT NOT NULL,
                    execute_at TEXT NOT NULL,
                    created_at TEXT NOT NULL,
                    status TEXT NOT NULL,
                    retry_count INTEGER DEFAULT 0,
                    output TEXT,
                    error TEXT
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_execute_at ON jobs(execute_at)
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_status ON jobs(status)
            """)
def add_job(self, job: AtJob) -> int:
        """Add a new job to the database."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                INSERT INTO jobs (command, execute_at, created_at, status, retry_count, output, error)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            """, (
                job.command,
                job.execute_at.isoformat(),
                job.created_at.isoformat(),
                job.status,
                job.retry_count,
                job.output,
                job.error
            ))
            return cursor.lastrowid
def get_pending_jobs(self) -> List[AtJob]:
        """Get all pending jobs scheduled for future execution."""
        now = datetime.now().isoformat()
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute("""
                SELECT * FROM jobs 
                WHERE status = 'pending' AND execute_at <= ?
                ORDER BY execute_at ASC
            """, (now,))
            return [self._row_to_job(row) for row in cursor.fetchall()]
def get_future_jobs(self) -> List[AtJob]:
        """Get all pending jobs scheduled for future execution."""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute("""
                SELECT * FROM jobs 
                WHERE status = 'pending'
                ORDER BY execute_at ASC
            """)
            return [self._row_to_job(row) for row in cursor.fetchall()]
def get_job(self, job_id: int) -> Optional[AtJob]:
        """Get a specific job by ID."""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,))
            row = cursor.fetchone()
            return self._row_to_job(row) if row else None
def update_job_status(self, job_id: int, status: str, output: str = None, error: str = None):
        """Update job status and execution results."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                UPDATE jobs 
                SET status = ?, output = ?, error = ?
                WHERE job_id = ?
            """, (status, output, error, job_id))
def increment_retry(self, job_id: int):
        """Increment retry count for a job."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                UPDATE jobs 
                SET retry_count = retry_count + 1, status = 'pending'
                WHERE job_id = ?
            """, (job_id,))
def delete_job(self, job_id: int):
        """Delete a job from the database."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("DELETE FROM jobs WHERE job_id = ?", (job_id,))
def list_jobs(self, status_filter: str = None) -> List[AtJob]:
        """List all jobs, optionally filtered by status."""
        with sqlite3.connect(self.db_path) as conn:
            conn.row_factory = sqlite3.Row
            if status_filter:
                cursor = conn.execute("SELECT * FROM jobs WHERE status = ? ORDER BY execute_at ASC", (status_filter,))
            else:
                cursor = conn.execute("SELECT * FROM jobs ORDER BY execute_at ASC")
            return [self._row_to_job(row) for row in cursor.fetchall()]
@staticmethod
    def _row_to_job(row) -> AtJob:
        """Convert database row to AtJob object."""
        return AtJob(
            job_id=row['job_id'],
            command=row['command'],
            execute_at=datetime.fromisoformat(row['execute_at']),
            created_at=datetime.fromisoformat(row['created_at']),
            status=row['status'],
            retry_count=row['retry_count'],
            output=row['output'],
            error=row['error']
        )
# ============================================================================
# Time Parser
# ============================================================================
class TimeParser:
    """Parse human-readable time expressions."""
# Patterns for relative time
    RELATIVE_PATTERNS = [
        (r'now\s*\+\s*(\d+)\s*seconds?', 'seconds'),
        (r'now\s*\+\s*(\d+)\s*minutes?', 'minutes'),
        (r'now\s*\+\s*(\d+)\s*hours?', 'hours'),
        (r'now\s*\+\s*(\d+)\s*days?', 'days'),
        (r'in\s+(\d+)\s*seconds?', 'seconds'),
        (r'in\s+(\d+)\s*minutes?', 'minutes'),
        (r'in\s+(\d+)\s*hours?', 'hours'),
        (r'in\s+(\d+)\s*days?', 'days'),
    ]
# Absolute time formats
    ABSOLUTE_FORMATS = [
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d %H:%M",
        "%Y-%m-%d %H:%M",
        "%H:%M:%S %Y-%m-%d",
        "%H:%M %Y-%m-%d",
    ]
@classmethod
    def parse(cls, time_str: str) -> Optional[datetime]:
        """Parse a time string and return a datetime object."""
        time_str = time_str.strip().lower()
        now = datetime.now()
# Try relative patterns
        for pattern, unit in cls.RELATIVE_PATTERNS:
            match = re.match(pattern, time_str)
            if match:
                value = int(match.group(1))
                kwargs = unit: value
                return now + timedelta(**kwargs)
# Try absolute patterns
        for fmt in cls.ABSOLUTE_FORMATS:
            try:
                dt = datetime.strptime(time_str, fmt)
                # If no date provided, assume today
                if len(time_str.split()) == 1:
                    dt = dt.replace(year=now.year, month=now.month, day=now.day)
                    if dt < now:
                        dt = dt + timedelta(days=1)
                return dt
            except ValueError:
                continue
# Try common natural language
        if time_str == "midnight":
            return now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
        elif time_str == "noon":
            return now.replace(hour=12, minute=0, second=0, microsecond=0)
        elif time_str == "teatime":
            return now.replace(hour=16, minute=0, second=0, microsecond=0)
        elif time_str == "tomorrow":
            return now + timedelta(days=1)
return None
# ============================================================================
# Command Executor
# ============================================================================
class CommandExecutor:
    """Execute shell commands with timeout and output capture."""
def __init__(self, timeout_seconds: int = 3600):
        self.timeout = timeout_seconds
def execute(self, command: str) -> tuple:
        """
        Execute a command and return (output, error, return_code).
Args:
            command: Shell command to execute
Returns:
            Tuple of (stdout, stderr, return_code)
        """
        try:
            process = subprocess.Popen(
                command,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                executable='/bin/bash'
            )
try:
                stdout, stderr = process.communicate(timeout=self.timeout)
                return stdout.strip(), stderr.strip(), process.returncode
            except subprocess.TimeoutExpired:
                process.kill()
                stdout, stderr = process.communicate()
                return stdout.strip(), "Command timed out after {} seconds".format(self.timeout), -1
except Exception as e:
            return "", str(e), -1
# ============================================================================
# At Station Service
# ============================================================================
class AtStation:
    """Main service for scheduling and executing commands."""
def __init__(self, db_path: Path = DEFAULT_DB_PATH, log_path: Path = DEFAULT_LOG_PATH):
        self.db = DatabaseManager(db_path)
        self.executor = CommandExecutor()
        self.running = False
        self.worker_thread = None
# Setup logging
        self.logger = logging.getLogger("AtStation")
        self.logger.setLevel(logging.INFO)
log_path.parent.mkdir(parents=True, exist_ok=True)
        handler = logging.FileHandler(log_path)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
# Also log to console
        console = logging.StreamHandler()
        console.setFormatter(formatter)
        self.logger.addHandler(console)
def start(self):
        """Start the scheduling service in a background thread."""
        if self.running:
            return
self.running = True
        self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
        self.worker_thread.start()
        self.logger.info("At Station service started")
def stop(self):
        """Stop the scheduling service."""
        self.running = False
        if self.worker_thread:
            self.worker_thread.join(timeout=5)
        self.logger.info("At Station service stopped")
def _worker_loop(self):
        """Main worker loop that checks and executes pending jobs."""
        while self.running:
            try:
                pending_jobs = self.db.get_pending_jobs()
for job in pending_jobs:
                    self._execute_job(job)
time.sleep(POLL_INTERVAL_SECONDS)
            except Exception as e:
                self.logger.error(f"Worker loop error: e")
                time.sleep(5)
def _execute_job(self, job: AtJob):
        """Execute a single job with retry logic."""
        self.logger.info(f"Executing job job.job_id: job.command")
# Update status to running
        self.db.update_job_status(job.job_id, "running")
# Execute command
        stdout, stderr, returncode = self.executor.execute(job.command)
if returncode == 0:
            # Success
            self.db.update_job_status(job.job_id, "completed", stdout, stderr)
            self.logger.info(f"Job job.job_id completed successfully")
        else:
            # Failure - handle retry
            if job.retry_count < MAX_RETRIES:
                new_retry_count = job.retry_count + 1
                self.db.increment_retry(job.job_id)
                self.logger.warning(
                    f"Job job.job_id failed (retry new_retry_count/MAX_RETRIES): stderr"
                )
            else:
                self.db.update_job_status(job.job_id, "failed", stdout, stderr)
                self.logger.error(f"Job job.job_id failed permanently: stderr")
def schedule(self, command: str, time_str: str) -> int:
        """
        Schedule a command for execution.
Args:
            command: Command to execute
            time_str: Time specification (e.g., "now + 5 minutes", "14:30")
Returns:
            Job ID of the scheduled task
Raises:
            ValueError: If time string cannot be parsed
        """
        execute_at = TimeParser.parse(time_str)
        if not execute_at:
            raise ValueError(f"Unable to parse time: time_str")
if execute_at < datetime.now():
            raise ValueError(f"Scheduled time is in the past: execute_at")
job = AtJob(
            job_id=0,  # Will be set by database
            command=command,
            execute_at=execute_at,
            created_at=datetime.now(),
            status="pending"
        )
job_id = self.db.add_job(job)
        self.logger.info(f"Scheduled job job_id: command at execute_at")
return job_id
def list_jobs(self, status: str = None) -> List[AtJob]:
        """List all scheduled jobs."""
        return self.db.list_jobs(status)
def cancel(self, job_id: int) -> bool:
        """Cancel a scheduled job."""
        job = self.db.get_job(job_id)
        if not job:
            return False
if job.status == "pending":
            self.db.delete_job(job_id)
            self.logger.info(f"Cancelled job job_id")
            return True
        elif job.status == "running":
            self.logger.warning(f"Cannot cancel running job job_id")
            return False
        else:
            self.logger.warning(f"Job job_id already job.status")
            return False
def show_job(self, job_id: int) -> Optional[AtJob]:
        """Show details of a specific job."""
        return self.db.get_job(job_id)
# ============================================================================
# Command Line Interface
# ============================================================================
def create_parser() -> argparse.ArgumentParser:
    """Create argument parser for CLI."""
    parser = argparse.ArgumentParser(
        prog="at",
        description="Schedule commands for future execution",
        epilog="""
Examples:
  at now + 5 minutes -- "echo Hello"
  at 14:30 -- "backup.sh"
  at midnight -- "shutdown -h now"
  at list
  at cancel 42
  at show 42
        """
    )
subparsers = parser.add_subparsers(dest="command", help="Subcommands")
# Schedule command (default)
    schedule_parser = subparsers.add_parser("schedule", aliases=["add", "run"], help="Schedule a command")
    schedule_parser.add_argument("time", help="When to run (e.g., 'now + 5 minutes', '14:30')")
    schedule_parser.add_argument("command", help="Command to execute")
# List jobs
    list_parser = subparsers.add_parser("list", aliases=["ls"], help="List scheduled jobs")
    list_parser.add_argument("--status", choices=["pending", "running", "completed", "failed", "cancelled"],
                            help="Filter by status")
# Cancel job
    cancel_parser = subparsers.add_parser("cancel", aliases=["rm"], help="Cancel a scheduled job")
    cancel_parser.add_argument("job_id", type=int, help="Job ID to cancel")
# Show job details
    show_parser = subparsers.add_parser("show", help="Show job details")
    show_parser.add_argument("job_id", type=int, help="Job ID to show")
# Start daemon
    subparsers.add_parser("daemon", help="Run as daemon (background service)")
return parser
def format_job_table(jobs: List[AtJob]) -> str:
    """Format jobs as a nice table."""
    if not jobs:
        return "No jobs found."
headers = ["ID", "Status", "Execute At", "Command", "Retries"]
    rows = []
for job in jobs:
        rows.append([
            str(job.job_id),
            job.status,
            job.execute_at.strftime("%Y-%m-%d %H:%M:%S"),
            job.command[:50] + ("..." if len(job.command) > 50 else ""),
            str(job.retry_count)
        ])
# Calculate column widths
    col_widths = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            col_widths[i] = max(col_widths[i], len(cell))
# Build table
    separator = "+" + "+".join("-" * (w + 2) for w in col_widths) + "+"
    header_row = "| " + " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + " |"
lines = [separator, header_row, separator]
    for row in rows:
        line = "| " + " | ".join(cell.ljust(col_widths[i]) for i, cell in enumerate(row)) + " |"
        lines.append(line)
    lines.append(separator)
return "\n".join(lines)
def main():
    """Main entry point for CLI."""
    parser = create_parser()
    args = parser.parse_args()
# Initialize the at station
    station = AtStation()
# Handle daemon mode
    if args.command == "daemon":
        print("Starting At Station daemon... (Press Ctrl+C to stop)")
        station.start()
        try:
            signal.pause()  # Wait for signals
        except KeyboardInterrupt:
            print("\nShutting down...")
            station.stop()
        return
# For one-off commands, we don't need the daemon running,
    # but we should ensure the service is initialized
    try:
        if args.command in ["schedule", "add", "run"] or not args.command:
            # Default to schedule
            if hasattr(args, 'time') and hasattr(args, 'command'):
                job_id = station.schedule(args.command, args.time)
                print(f"Job scheduled: ID job_id")
                print(f"Will execute at: station.show_job(job_id).execute_at")
            else:
                parser.print_help()
elif args.command in ["list", "ls"]:
            jobs = station.list_jobs(args.status if hasattr(args, 'status') else None)
            print(format_job_table(jobs))
elif args.command in ["cancel", "rm"]:
            if station.cancel(args.job_id):
                print(f"Job args.job_id cancelled successfully")
            else:
                print(f"Failed to cancel job args.job_id", file=sys.stderr)
                sys.exit(1)
elif args.command == "show":
            job = station.show_job(args.job_id)
            if job:
                print(f"Job ID: job.job_id")
                print(f"Command: job.command")
                print(f"Status: job.status")
                print(f"Execute At: job.execute_at")
                print(f"Created At: job.created_at")
                print(f"Retry Count: job.retry_count")
                if job.output:
                    print(f"\nOutput:\njob.output")
                if job.error:
                    print(f"\nError:\njob.error")
            else:
                print(f"Job args.job_id not found", file=sys.stderr)
                sys.exit(1)
else:
            parser.print_help()
except ValueError as e:
        print(f"Error: e", file=sys.stderr)
        sys.exit(1)
    except KeyboardInterrupt:
        print("\nInterrupted")
        sys.exit(130)
if __name__ == "__main__":
    main()

This at command station v1.0.4 provides:

Manage jobs

at cancel 42 at show 42

Step 6 – Audit Trail

For every high-quality write, record:

  • Timestamp (station local + UTC offset)
  • Operator ID (if human-initiated)
  • Command station IP and port
  • Outstation common address + IOA
  • Previous value (if known) and new value
  • Command status (pending, confirmed, failed)
  • Protocol frame trace (optional but recommended)

This log is invaluable for post-incident analysis and regulatory compliance. write at command station v104 high quality


Part 5: Troubleshooting Low-Quality Output

You know you are not writing high quality if you experience these symptoms. Here is the fix matrix. This at command station v1

| Symptom | Low Quality Cause | High Quality Fix | | :--- | :--- | :--- | | Ghost key presses | Missing CLEAR_BUFFER() on release | Add ON_RELEASE: RESET() to every macro | | Intermittent lag | Using WHILE loops without yield | Replace with TIMED_SEQUENCE blocks | | Command stutter | Delay values are random (5ms, 10ms, 5ms) | Standardize all delays to multiples of 16ms | | Profile corruption | Writing to EEPROM more than 50x/hour | Write to RAM first, then commit to EEPROM only once | Timestamp (station local + UTC offset) Operator ID

© 2026 The Atlas. All rights reserved.

bottom of page