Write At Command Station V104 High Quality [Top 100 Legit]
#!/usr/bin/env python3
"""
at_command_station.py - Schedule and execute commands at specified times
Version: 1.0.4
A robust task scheduler similar to Unix 'at' command with persistent storage,
job queuing, and reliable execution.
"""
import argparse
import json
import os
import sys
import time
import signal
import threading
import logging
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import subprocess
import re
# ============================================================================
# Configuration
# ============================================================================
DEFAULT_DB_PATH = Path.home() / ".at_station" / "jobs.db"
DEFAULT_LOG_PATH = Path.home() / ".at_station" / "at_station.log"
POLL_INTERVAL_SECONDS = 1
MAX_RETRIES = 3
# ============================================================================
# Data Models
# ============================================================================
@dataclass
class AtJob:
"""Represents a scheduled job."""
job_id: int
command: str
execute_at: datetime # Unix timestamp internally
created_at: datetime
status: str # pending, running, completed, failed, cancelled
retry_count: int = 0
output: Optional[str] = None
error: Optional[str] = None
def to_dict(self) -> Dict:
data = asdict(self)
data['execute_at'] = self.execute_at.isoformat()
data['created_at'] = self.created_at.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict) -> 'AtJob':
data['execute_at'] = datetime.fromisoformat(data['execute_at'])
data['created_at'] = datetime.fromisoformat(data['created_at'])
return cls(**data)
# ============================================================================
# Database Manager
# ============================================================================
class DatabaseManager:
"""Handles persistent storage for scheduled jobs."""
def __init__(self, db_path: Path):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize SQLite database with proper schema."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
job_id INTEGER PRIMARY KEY AUTOINCREMENT,
command TEXT NOT NULL,
execute_at TEXT NOT NULL,
created_at TEXT NOT NULL,
status TEXT NOT NULL,
retry_count INTEGER DEFAULT 0,
output TEXT,
error TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_execute_at ON jobs(execute_at)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_status ON jobs(status)
""")
def add_job(self, job: AtJob) -> int:
"""Add a new job to the database."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
INSERT INTO jobs (command, execute_at, created_at, status, retry_count, output, error)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
job.command,
job.execute_at.isoformat(),
job.created_at.isoformat(),
job.status,
job.retry_count,
job.output,
job.error
))
return cursor.lastrowid
def get_pending_jobs(self) -> List[AtJob]:
"""Get all pending jobs scheduled for future execution."""
now = datetime.now().isoformat()
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM jobs
WHERE status = 'pending' AND execute_at <= ?
ORDER BY execute_at ASC
""", (now,))
return [self._row_to_job(row) for row in cursor.fetchall()]
def get_future_jobs(self) -> List[AtJob]:
"""Get all pending jobs scheduled for future execution."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM jobs
WHERE status = 'pending'
ORDER BY execute_at ASC
""")
return [self._row_to_job(row) for row in cursor.fetchall()]
def get_job(self, job_id: int) -> Optional[AtJob]:
"""Get a specific job by ID."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("SELECT * FROM jobs WHERE job_id = ?", (job_id,))
row = cursor.fetchone()
return self._row_to_job(row) if row else None
def update_job_status(self, job_id: int, status: str, output: str = None, error: str = None):
"""Update job status and execution results."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE jobs
SET status = ?, output = ?, error = ?
WHERE job_id = ?
""", (status, output, error, job_id))
def increment_retry(self, job_id: int):
"""Increment retry count for a job."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE jobs
SET retry_count = retry_count + 1, status = 'pending'
WHERE job_id = ?
""", (job_id,))
def delete_job(self, job_id: int):
"""Delete a job from the database."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM jobs WHERE job_id = ?", (job_id,))
def list_jobs(self, status_filter: str = None) -> List[AtJob]:
"""List all jobs, optionally filtered by status."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
if status_filter:
cursor = conn.execute("SELECT * FROM jobs WHERE status = ? ORDER BY execute_at ASC", (status_filter,))
else:
cursor = conn.execute("SELECT * FROM jobs ORDER BY execute_at ASC")
return [self._row_to_job(row) for row in cursor.fetchall()]
@staticmethod
def _row_to_job(row) -> AtJob:
"""Convert database row to AtJob object."""
return AtJob(
job_id=row['job_id'],
command=row['command'],
execute_at=datetime.fromisoformat(row['execute_at']),
created_at=datetime.fromisoformat(row['created_at']),
status=row['status'],
retry_count=row['retry_count'],
output=row['output'],
error=row['error']
)
# ============================================================================
# Time Parser
# ============================================================================
class TimeParser:
"""Parse human-readable time expressions."""
# Patterns for relative time
RELATIVE_PATTERNS = [
(r'now\s*\+\s*(\d+)\s*seconds?', 'seconds'),
(r'now\s*\+\s*(\d+)\s*minutes?', 'minutes'),
(r'now\s*\+\s*(\d+)\s*hours?', 'hours'),
(r'now\s*\+\s*(\d+)\s*days?', 'days'),
(r'in\s+(\d+)\s*seconds?', 'seconds'),
(r'in\s+(\d+)\s*minutes?', 'minutes'),
(r'in\s+(\d+)\s*hours?', 'hours'),
(r'in\s+(\d+)\s*days?', 'days'),
]
# Absolute time formats
ABSOLUTE_FORMATS = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d %H:%M",
"%H:%M:%S %Y-%m-%d",
"%H:%M %Y-%m-%d",
]
@classmethod
def parse(cls, time_str: str) -> Optional[datetime]:
"""Parse a time string and return a datetime object."""
time_str = time_str.strip().lower()
now = datetime.now()
# Try relative patterns
for pattern, unit in cls.RELATIVE_PATTERNS:
match = re.match(pattern, time_str)
if match:
value = int(match.group(1))
kwargs = unit: value
return now + timedelta(**kwargs)
# Try absolute patterns
for fmt in cls.ABSOLUTE_FORMATS:
try:
dt = datetime.strptime(time_str, fmt)
# If no date provided, assume today
if len(time_str.split()) == 1:
dt = dt.replace(year=now.year, month=now.month, day=now.day)
if dt < now:
dt = dt + timedelta(days=1)
return dt
except ValueError:
continue
# Try common natural language
if time_str == "midnight":
return now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
elif time_str == "noon":
return now.replace(hour=12, minute=0, second=0, microsecond=0)
elif time_str == "teatime":
return now.replace(hour=16, minute=0, second=0, microsecond=0)
elif time_str == "tomorrow":
return now + timedelta(days=1)
return None
# ============================================================================
# Command Executor
# ============================================================================
class CommandExecutor:
"""Execute shell commands with timeout and output capture."""
def __init__(self, timeout_seconds: int = 3600):
self.timeout = timeout_seconds
def execute(self, command: str) -> tuple:
"""
Execute a command and return (output, error, return_code).
Args:
command: Shell command to execute
Returns:
Tuple of (stdout, stderr, return_code)
"""
try:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
executable='/bin/bash'
)
try:
stdout, stderr = process.communicate(timeout=self.timeout)
return stdout.strip(), stderr.strip(), process.returncode
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
return stdout.strip(), "Command timed out after {} seconds".format(self.timeout), -1
except Exception as e:
return "", str(e), -1
# ============================================================================
# At Station Service
# ============================================================================
class AtStation:
"""Main service for scheduling and executing commands."""
def __init__(self, db_path: Path = DEFAULT_DB_PATH, log_path: Path = DEFAULT_LOG_PATH):
self.db = DatabaseManager(db_path)
self.executor = CommandExecutor()
self.running = False
self.worker_thread = None
# Setup logging
self.logger = logging.getLogger("AtStation")
self.logger.setLevel(logging.INFO)
log_path.parent.mkdir(parents=True, exist_ok=True)
handler = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Also log to console
console = logging.StreamHandler()
console.setFormatter(formatter)
self.logger.addHandler(console)
def start(self):
"""Start the scheduling service in a background thread."""
if self.running:
return
self.running = True
self.worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self.worker_thread.start()
self.logger.info("At Station service started")
def stop(self):
"""Stop the scheduling service."""
self.running = False
if self.worker_thread:
self.worker_thread.join(timeout=5)
self.logger.info("At Station service stopped")
def _worker_loop(self):
"""Main worker loop that checks and executes pending jobs."""
while self.running:
try:
pending_jobs = self.db.get_pending_jobs()
for job in pending_jobs:
self._execute_job(job)
time.sleep(POLL_INTERVAL_SECONDS)
except Exception as e:
self.logger.error(f"Worker loop error: e")
time.sleep(5)
def _execute_job(self, job: AtJob):
"""Execute a single job with retry logic."""
self.logger.info(f"Executing job job.job_id: job.command")
# Update status to running
self.db.update_job_status(job.job_id, "running")
# Execute command
stdout, stderr, returncode = self.executor.execute(job.command)
if returncode == 0:
# Success
self.db.update_job_status(job.job_id, "completed", stdout, stderr)
self.logger.info(f"Job job.job_id completed successfully")
else:
# Failure - handle retry
if job.retry_count < MAX_RETRIES:
new_retry_count = job.retry_count + 1
self.db.increment_retry(job.job_id)
self.logger.warning(
f"Job job.job_id failed (retry new_retry_count/MAX_RETRIES): stderr"
)
else:
self.db.update_job_status(job.job_id, "failed", stdout, stderr)
self.logger.error(f"Job job.job_id failed permanently: stderr")
def schedule(self, command: str, time_str: str) -> int:
"""
Schedule a command for execution.
Args:
command: Command to execute
time_str: Time specification (e.g., "now + 5 minutes", "14:30")
Returns:
Job ID of the scheduled task
Raises:
ValueError: If time string cannot be parsed
"""
execute_at = TimeParser.parse(time_str)
if not execute_at:
raise ValueError(f"Unable to parse time: time_str")
if execute_at < datetime.now():
raise ValueError(f"Scheduled time is in the past: execute_at")
job = AtJob(
job_id=0, # Will be set by database
command=command,
execute_at=execute_at,
created_at=datetime.now(),
status="pending"
)
job_id = self.db.add_job(job)
self.logger.info(f"Scheduled job job_id: command at execute_at")
return job_id
def list_jobs(self, status: str = None) -> List[AtJob]:
"""List all scheduled jobs."""
return self.db.list_jobs(status)
def cancel(self, job_id: int) -> bool:
"""Cancel a scheduled job."""
job = self.db.get_job(job_id)
if not job:
return False
if job.status == "pending":
self.db.delete_job(job_id)
self.logger.info(f"Cancelled job job_id")
return True
elif job.status == "running":
self.logger.warning(f"Cannot cancel running job job_id")
return False
else:
self.logger.warning(f"Job job_id already job.status")
return False
def show_job(self, job_id: int) -> Optional[AtJob]:
"""Show details of a specific job."""
return self.db.get_job(job_id)
# ============================================================================
# Command Line Interface
# ============================================================================
def create_parser() -> argparse.ArgumentParser:
"""Create argument parser for CLI."""
parser = argparse.ArgumentParser(
prog="at",
description="Schedule commands for future execution",
epilog="""
Examples:
at now + 5 minutes -- "echo Hello"
at 14:30 -- "backup.sh"
at midnight -- "shutdown -h now"
at list
at cancel 42
at show 42
"""
)
subparsers = parser.add_subparsers(dest="command", help="Subcommands")
# Schedule command (default)
schedule_parser = subparsers.add_parser("schedule", aliases=["add", "run"], help="Schedule a command")
schedule_parser.add_argument("time", help="When to run (e.g., 'now + 5 minutes', '14:30')")
schedule_parser.add_argument("command", help="Command to execute")
# List jobs
list_parser = subparsers.add_parser("list", aliases=["ls"], help="List scheduled jobs")
list_parser.add_argument("--status", choices=["pending", "running", "completed", "failed", "cancelled"],
help="Filter by status")
# Cancel job
cancel_parser = subparsers.add_parser("cancel", aliases=["rm"], help="Cancel a scheduled job")
cancel_parser.add_argument("job_id", type=int, help="Job ID to cancel")
# Show job details
show_parser = subparsers.add_parser("show", help="Show job details")
show_parser.add_argument("job_id", type=int, help="Job ID to show")
# Start daemon
subparsers.add_parser("daemon", help="Run as daemon (background service)")
return parser
def format_job_table(jobs: List[AtJob]) -> str:
"""Format jobs as a nice table."""
if not jobs:
return "No jobs found."
headers = ["ID", "Status", "Execute At", "Command", "Retries"]
rows = []
for job in jobs:
rows.append([
str(job.job_id),
job.status,
job.execute_at.strftime("%Y-%m-%d %H:%M:%S"),
job.command[:50] + ("..." if len(job.command) > 50 else ""),
str(job.retry_count)
])
# Calculate column widths
col_widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], len(cell))
# Build table
separator = "+" + "+".join("-" * (w + 2) for w in col_widths) + "+"
header_row = "| " + " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) + " |"
lines = [separator, header_row, separator]
for row in rows:
line = "| " + " | ".join(cell.ljust(col_widths[i]) for i, cell in enumerate(row)) + " |"
lines.append(line)
lines.append(separator)
return "\n".join(lines)
def main():
"""Main entry point for CLI."""
parser = create_parser()
args = parser.parse_args()
# Initialize the at station
station = AtStation()
# Handle daemon mode
if args.command == "daemon":
print("Starting At Station daemon... (Press Ctrl+C to stop)")
station.start()
try:
signal.pause() # Wait for signals
except KeyboardInterrupt:
print("\nShutting down...")
station.stop()
return
# For one-off commands, we don't need the daemon running,
# but we should ensure the service is initialized
try:
if args.command in ["schedule", "add", "run"] or not args.command:
# Default to schedule
if hasattr(args, 'time') and hasattr(args, 'command'):
job_id = station.schedule(args.command, args.time)
print(f"Job scheduled: ID job_id")
print(f"Will execute at: station.show_job(job_id).execute_at")
else:
parser.print_help()
elif args.command in ["list", "ls"]:
jobs = station.list_jobs(args.status if hasattr(args, 'status') else None)
print(format_job_table(jobs))
elif args.command in ["cancel", "rm"]:
if station.cancel(args.job_id):
print(f"Job args.job_id cancelled successfully")
else:
print(f"Failed to cancel job args.job_id", file=sys.stderr)
sys.exit(1)
elif args.command == "show":
job = station.show_job(args.job_id)
if job:
print(f"Job ID: job.job_id")
print(f"Command: job.command")
print(f"Status: job.status")
print(f"Execute At: job.execute_at")
print(f"Created At: job.created_at")
print(f"Retry Count: job.retry_count")
if job.output:
print(f"\nOutput:\njob.output")
if job.error:
print(f"\nError:\njob.error")
else:
print(f"Job args.job_id not found", file=sys.stderr)
sys.exit(1)
else:
parser.print_help()
except ValueError as e:
print(f"Error: e", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\nInterrupted")
sys.exit(130)
if __name__ == "__main__":
main()
This at command station v1.0.4 provides:
Manage jobs
at cancel 42 at show 42
Step 6 – Audit Trail
For every high-quality write, record:
- Timestamp (station local + UTC offset)
- Operator ID (if human-initiated)
- Command station IP and port
- Outstation common address + IOA
- Previous value (if known) and new value
- Command status (pending, confirmed, failed)
- Protocol frame trace (optional but recommended)
This log is invaluable for post-incident analysis and regulatory compliance. write at command station v104 high quality
Part 5: Troubleshooting Low-Quality Output
You know you are not writing high quality if you experience these symptoms. Here is the fix matrix. This at command station v1
| Symptom | Low Quality Cause | High Quality Fix |
| :--- | :--- | :--- |
| Ghost key presses | Missing CLEAR_BUFFER() on release | Add ON_RELEASE: RESET() to every macro |
| Intermittent lag | Using WHILE loops without yield | Replace with TIMED_SEQUENCE blocks |
| Command stutter | Delay values are random (5ms, 10ms, 5ms) | Standardize all delays to multiples of 16ms |
| Profile corruption | Writing to EEPROM more than 50x/hour | Write to RAM first, then commit to EEPROM only once | Timestamp (station local + UTC offset) Operator ID
