src — automation
src — automation
The src/automation module provides foundational capabilities for an intelligent agent to interact with its environment through scheduled tasks, event-driven triggers, and continuous monitoring. It encapsulates three core automation patterns:
- Authentication Monitoring (
AuthMonitor): Ensures the agent's access to external services remains valid and alerts to impending expirations or issues. - Gmail Trigger (
GmailTrigger): Enables the agent to react in real-time to new emails, acting as an event source for agent workflows. - Polling (
PollManager): Allows the agent to periodically fetch and detect changes in data from various external sources (URLs, files, commands).
This module is critical for maintaining the agent's operational readiness, responsiveness, and awareness of external state changes.
1. Authentication Monitoring (AuthMonitor)
The AuthMonitor class is responsible for tracking the authentication status of various external services and channels the agent interacts with. It helps prevent operational disruptions by proactively identifying expired or invalid credentials.
Purpose
To provide a centralized, continuous monitoring system for authentication tokens and API keys, emitting events when their status changes (e.g., expiring, expired, invalid).
Key Concepts
AuthTarget: An interface defining a single item to be monitored. It includes:id: Unique identifier.name: Human-readable name.type: Categorization ('provider','channel','service').envVar?: Optional environment variable name to check for credential presence.expiresAt?: OptionalDateindicating when the credential expires.state: CurrentAuthState('valid','expiring','expired','invalid','unknown').AuthState: A union type representing the possible states of an authentication target.AuthMonitorConfig: Configuration for the monitor, includingcheckIntervalMs(how often to check) andexpiryWarningMs(how far in advance to warn about expiration).AuthEvent: An event object emitted when anAuthTarget's state changes, detailing thepreviousState,newState, and amessage.
How it Works
The AuthMonitor operates as a singleton, ensuring a single point of truth for authentication status.
- Initialization:
AuthMonitor.getInstance(config?): Retrieves or creates the singleton instance.- The constructor sets up default configuration values for
checkIntervalMs(5 minutes) andexpiryWarningMs(24 hours).
- Target Management:
addTarget(target: AuthTarget): Registers a new credential for monitoring.removeTarget(id: string): Stops monitoring a specific credential.listTargets(): Retrieves all currently monitored targets.
- Monitoring Lifecycle:
start():- Calls
registerDefaultTargets()to add common API keys (e.g.,GROK_API_KEY,OPENAI_API_KEY,DISCORD_BOT_TOKEN) if not already present. - Immediately performs an initial check via
checkAll(). - Sets up a
setIntervalto callcheckAll()repeatedly at the configuredcheckIntervalMs. checkAll(): Iterates through all registeredAuthTargets and callscheckTarget()for each. IfcheckTarget()returns anAuthEvent, it's added to the internalhistoryand emitted.checkTarget(target: AuthTarget)(private):- Determines the
newStatebased on two primary checks: - Environment Variable Presence: If
target.envVaris specified, it checksprocess.env[target.envVar]for existence and a minimum length. - Expiration Date: If
target.expiresAtis provided, it compares it against the current time and theexpiryWarningMsthreshold. - If the
newStatediffers from thetarget.state, anAuthEventis generated and returned. stop(): Clears thesetIntervaltimer, halting all monitoring.
- Reporting:
getHistory(limit?: number): Retrieves a chronological list of pastAuthEvents.getSummary(): Provides a count of targets in eachAuthState.
Events
The AuthMonitor extends EventEmitter and emits the following event:
'auth:changed'(payload:AuthEvent): Fired when theAuthStateof any monitored target changes.
Example Usage
import { AuthMonitor } from 39;./auth-monitoring.js39;;
const monitor = AuthMonitor.getInstance({
checkIntervalMs: 60_000, class="hl-cmt">// Check every minute
expiryWarningMs: 2 * 24 * 60 * 60 * 1000, class="hl-cmt">// Warn 2 days before expiry
});
monitor.addTarget({
id: 39;my-custom-api39;,
name: 39;My Custom API39;,
type: 39;provider39;,
envVar: 39;MY_CUSTOM_API_KEY39;,
expiresAt: new Date(Date.now() + 3 * 24 * 60 * 60 * 1000), class="hl-cmt">// Expires in 3 days
state: 39;unknown39;,
});
monitor.on(39;auth:changed39;, (event) => {
console.log(`Auth Event: [${event.target.name}] changed from ${event.previousState} to ${event.newState}. Message: ${event.message}`);
if (event.newState === 39;expired39; || event.newState === 39;invalid39;) {
console.error(`Action required: ${event.message}`);
class="hl-cmt">// Trigger an alert or automated refresh
}
});
monitor.start();
2. Gmail Trigger (GmailTrigger)
The GmailTrigger class provides an event-driven mechanism for the agent to react to new emails in a Gmail inbox. It leverages Google Cloud Pub/Sub and the GmailWebhookAdapter to receive real-time notifications.
Purpose
To act as a trigger source, waking or notifying the agent when new emails arrive that match configured filters, and generating a relevant prompt for the agent.
Key Concepts
GmailTriggerConfig: Configuration for the trigger, including Google Cloud project details (projectId,topicName),labelFilter(e.g.,['INBOX', 'UNREAD']), and apromptTemplatefor the agent.GmailTriggerEvent: The event object emitted when a new email is detected, containingmessageId,subject,from, and the generatedprompt.
How it Works
The GmailTrigger wraps the GmailWebhookAdapter (from src/channels/niche-channels.js) to manage the underlying Gmail API interactions and Pub/Sub subscription.
- Initialization:
constructor(config: GmailTriggerConfig): Creates an instance ofGmailWebhookAdapterusing the provided configuration.getGmailTrigger(config?: GmailTriggerConfig): A singleton-like factory function to ensure only oneGmailTriggerinstance exists.
- Starting the Trigger:
start():- Calls
this.adapter.start()to begin listening for Pub/Sub messages. - Calls
this.adapter.setupWatch(this.config.labelFilter)to establish a Gmail watch, which tells Gmail to send notifications to Pub/Sub for matching emails. Gmail watches expire after 7 days. - Registers an
onNewMessagelistener with the adapter, which callshandleNewMessage()when the adapter processes a new email. - Sets up a
setIntervalto periodically callthis.adapter.setupWatch()again, renewing the Gmail watch before its 7-day expiry (default renewal every 6 days).
- Webhook Handling:
handleWebhook(body: { message?: { data?: string; messageId?: string } }): This method is designed to be exposed as an HTTP endpoint. When Google Pub/Sub sends a notification to the configured webhook URL, this method decodes the Pub/Sub message and passes it tothis.adapter.handlePubSubNotification().
- Message Processing:
handleNewMessage(msg: { id: string; subject: string; from: string })(private):- Deduplicates messages using an internal
processedIdsset. - Generates an agent prompt using the
promptTemplatefrom the configuration, replacing placeholders like{from}and{subject}. - Creates a
GmailTriggerEventand emits it via the'trigger'event. - If
config.autoMarkReadis true, it callsthis.adapter.markRead(msg.id)to mark the email as read in Gmail.
- Stopping the Trigger:
stop(): Clears the watch renewal timer and callsthis.adapter.stop()to stop the underlying Pub/Sub listener.resetGmailTrigger(): Stops and clears the singleton instance.
- Status:
getStatus(): Provides current operational status, including whether the trigger is running, watch activity, expiry, and processed counts.
Events
The GmailTrigger extends EventEmitter and emits the following event:
'trigger'(payload:GmailTriggerEvent): Fired when a new email matching the filters is detected and processed.
Architecture Diagram
graph TD
A[External Webhook Endpoint] --> B{handleWebhook(body)}
B --> C{GmailWebhookAdapter.handlePubSubNotification(data)}
C -- New Message --> D{GmailWebhookAdapter.onNewMessage(msg)}
D --> E{handleNewMessage(msg)}
E -- Deduplicate & Prompt --> F[Emit 'trigger' event]
F --> G[Agent Workflow]
H[GmailTrigger.start()] --> I{GmailWebhookAdapter.start()}
H --> J{GmailWebhookAdapter.setupWatch()}
J -- Periodically --> J
Integration Points
src/channels/niche-channels.js: TheGmailTriggeris tightly coupled withGmailWebhookAdapter, which handles the low-level Google API and Pub/Sub interactions.- External Webhook: The
handleWebhookmethod is designed to be called by an external HTTP endpoint (e.g., an Express route) that receives notifications from Google Cloud Pub/Sub. This is demonstrated in test files liketests/channels/google-chat.test.ts.
3. Polling (PollManager)
The PollManager class provides a flexible and robust system for periodically fetching data from various external sources and detecting changes.
Purpose
To enable the agent to monitor external data sources (URLs, files, command outputs) at configurable intervals, triggering events when data changes or is retrieved.
Key Concepts
PollType: A union type defining the supported polling mechanisms:'url','file','command','custom'.PollConfig: An interface defining a single poll, including:id: Unique identifier.name: Human-readable name.type: ThePollType.target: The specific resource to poll (URL, file path, command string).intervalMs: How often to poll.enabled: Whether the poll should start automatically.onChangeOnly?: If true,'poll:result'is only emitted when data changes.maxRetries?: Maximum retries before stopping a failing poll.PollResult: An event object containing thedataretrieved,previousData, achangedflag,timestamp,durationMs, and anyerror.
How it Works
The PollManager operates as a singleton, managing multiple independent polls.
- Initialization:
PollManager.getInstance(): Retrieves or creates the singleton instance.
- Poll Management:
addPoll(config: PollConfig): Registers a new poll. If a poll with the sameidexists, it's replaced. Ifconfig.enabledis true,startPoll()is called.removePoll(id: string): Stops and removes a poll.listPolls(): Retrieves all configured polls.
- Polling Lifecycle:
startPoll(id: string):- Immediately calls
executePoll(id). - Sets up a
setIntervalto callexecutePoll(id)repeatedly at the configuredintervalMs. executePoll(id: string)(private):- Retrieves the
PollConfigfor the givenid. - Based on
config.type, it calls one of the specialized private polling methods: pollUrl(config: PollConfig): Usesfetchto retrieve data from a URL. Handles JSON and text responses.pollFile(config: PollConfig): Usesfs/promises.readFileto read file content andfs/promises.statfor metadata.pollCommand(config: PollConfig): Useschild_process.execSyncto execute a shell command and capture its output.'custom'type is currently a placeholder.- Error Handling & Retries: If a poll fails, it increments a retry counter. If
maxRetriesis exceeded, the poll is stopped, and a'poll:failed'event is emitted. - Change Detection: Compares the newly fetched
datawith thepreviousData(stored inlastResults) to determine ifchangedis true. - Event Emission: Creates a
PollResultobject. - Emits
'poll:result'ifconfig.onChangeOnlyis false, or ifchangedis true. - Emits
'poll:changed'ifchangedis true. stopPoll(id: string): Clears thesetIntervaltimer for a specific poll.stopAll(): Stops all active polls.
- Results:
getLastResult(id: string): Retrieves the last successfully polled data for a given poll.
Events
The PollManager extends EventEmitter and emits the following events:
'poll:result'(payload:PollResult): Fired after every successful poll execution, or only when data changes ifonChangeOnlyis true.'poll:changed'(payload:PollResult): Fired specifically when the polled data has changed since the last check.'poll:failed'(payload:{ pollId: string; error: string; retries: number }): Fired when a poll consistently fails and is stopped due to exceedingmaxRetries.
Example Usage
import { PollManager } from 39;./polls.js39;;
const pollManager = PollManager.getInstance();
pollManager.addPoll({
id: 39;github-status39;,
name: 39;GitHub Status API39;,
type: 39;url39;,
target: 39;https:class="hl-cmt">//www.githubstatus.com/api/v2/status.json39;,
intervalMs: 30_000, class="hl-cmt">// Every 30 seconds
enabled: true,
onChangeOnly: true,
});
pollManager.addPoll({
id: 39;local-log-file39;,
name: 39;Local Log File39;,
type: 39;file39;,
target: 39;/var/log/agent.log39;,
intervalMs: 5_000, class="hl-cmt">// Every 5 seconds
enabled: true,
onChangeOnly: true,
});
pollManager.on(39;poll:changed39;, (result) => {
console.log(`Poll [${result.pollId}] changed! New data:`, result.data);
class="hl-cmt">// Agent can analyze the change and take action
});
pollManager.on(39;poll:failed39;, ({ pollId, error, retries }) => {
console.error(`Poll [${pollId}] failed after ${retries} attempts: ${error}. Stopping poll.`);
});
class="hl-cmt">// To stop a specific poll:
class="hl-cmt">// pollManager.stopPoll(39;github-status39;);
class="hl-cmt">// To stop all polls:
class="hl-cmt">// pollManager.stopAll();
Integration Points
fs/promises: Dynamically imported bypollFilefor file system operations.child_process: Dynamically imported bypollCommandfor executing shell commands.fetch: Used bypollUrlfor HTTP requests.scripts/tests/cat-automation-sdk.ts: This module'sPollManageris used in test scripts to verify its functionality.
Conclusion
The src/automation module provides essential infrastructure for building reactive and robust agent systems. By offering capabilities for authentication monitoring, event-driven email processing, and flexible data polling, it empowers the agent to maintain operational integrity, respond to critical external events, and stay informed about its environment. Developers contributing to the agent's core functionality or extending its interaction capabilities will frequently interact with these components.