src — channels

Module: src-channels Cohesion: 0.80 Members: 0

src — channels

The src/channels module provides a robust, unified interface for integrating with various messaging platforms. Advanced enterprise architecture for multi-platform messaging support, it abstracts away the complexities of platform-specific APIs (Telegram, Discord, Slack, Feishu, Google Chat, etc.) into a consistent model for inbound and outbound messages.

This module is central to how the system interacts with external users and services, offering features like:

Architecture Overview

The channels module is designed around a core abstraction (BaseChannel) and a central orchestrator (ChannelManager). It integrates with several specialized sub-modules to provide advanced features like session management, identity linking, and policy enforcement.

Here's a high-level view of how an inbound message flows through the system:

graph TD
    subgraph External Channels
        A[Discord]
        B[Feishu]
        C[Google Chat]
        D[CLI/Webchat/Other]
    end

    subgraph Channels Module
        E[Platform-Specific Channel Client] --> F{BaseChannel}
        F --> G[ChannelManager]
        G -- "InboundMessage Event" --> H[Message Handlers]
    end

    subgraph Supporting Modules
        H -- "getSessionKey()" --> I[SessionIsolator]
        H -- "getCanonicalIdentity()" --> J[IdentityLinker]
        H -- "checkDMPairing()" --> K[DMPairingManager]
        H -- "resolveRoute()" --> L[PeerRouter]
        H -- "evaluate()" --> M[DMPolicyEngine]
        H -- "enqueueMessage()" --> N[LaneQueue]
    end

    A --> E
    B --> E
    C --> E
    D --> E
    N --> P[Agent/Workflow Execution]

  1. Inbound Message Reception: A platform-specific channel client (e.g., DiscordChannel) receives a message from its respective platform.
  2. Standardization: The client converts the platform-specific message into a unified InboundMessage format.
  3. BaseChannel Processing: The BaseChannel handles common tasks like command parsing and basic access checks (isUserAllowed, isChannelAllowed).
  4. ChannelManager Event: The BaseChannel emits a message event, which is caught by the ChannelManager. The ChannelManager then re-emits this event and passes it to its registered messageHandlers.
  5. Pre-processing & Policy: Within the message handlers, the InboundMessage is enriched and evaluated:

  1. Concurrency Control: enqueueMessage() uses a LaneQueue to ensure messages from the same session key are processed serially, while different sessions can run in parallel.
  2. Agent/Workflow Execution: The message is then passed to the appropriate agent or workflow for processing.

Core Components

1. Unified Messaging Model (src/channels/core.ts)

The core.ts file defines the fundamental data structures for multi-channel communication. These types ensure consistency across all channel implementations.

2. Channel Abstraction (BaseChannel)

The abstract class BaseChannel extends EventEmitter provides a common interface and basic functionality for all channel implementations.

3. Channel Management (ChannelManager)

The ChannelManager class is responsible for orchestrating multiple BaseChannel instances. It acts as a central hub for managing the lifecycle of all integrated channels and routing messages.

The ChannelManager is exposed as a singleton via getChannelManager() and resetChannelManager().

4. Concurrency & Session Isolation (LaneQueue & SessionIsolator)

5. Identity Resolution (IdentityLinker)

6. DM Access Control (DMPairingManager)

7. Intelligent Routing & Policy (DMPolicyEngine & PeerRouter)

Channel Implementations (Examples)

DiscordChannel (src/channels/discord/client.ts)

This class extends BaseChannel to integrate with Discord.

FeishuChannel (src/channels/feishu/index.ts)

This class extends BaseChannel to integrate with Feishu (Lark). It uses an internal FeishuAdapter to manage API interactions.

GoogleChatChannel (src/channels/google-chat/index.ts)

This class extends BaseChannel for Google Chat integration.

Integration Points & Developer Guide

Adding a New Channel

  1. Create a new file: src/channels//index.ts (and client.ts, types.ts if complex).
  2. Define ChannelConfig: Create an interface extending ChannelConfig (e.g., MyChannelConfig) for platform-specific settings.
  3. Implement BaseChannel: Create class MyChannel extends BaseChannel.

  1. Handle Inbound Messages:

  1. Register with ChannelManager: In your application's bootstrap, get the singleton ChannelManager and call manager.registerChannel(new MyChannel(myConfig)).

Handling Incoming Messages

Global message processing happens via ChannelManager.onMessage().

import { getChannelManager, InboundMessage, BaseChannel, enqueueMessage, getCanonicalIdentity, resolveRoute, getRouteAgentConfig } from './channels/core.js';
import { getDMPolicyEngine } from './channels/dm-policy/engine.js';
import { logger } from './utils/logger.js';

const manager = getChannelManager();
const policyEngine = getDMPolicyEngine();

manager.onMessage(async (message: InboundMessage, channel: BaseChannel) => {
  class="hl-cmt">// Ensure session isolation for this message's processing
  if (!message.sessionKey) {
    logger.warn('Message received without sessionKey, skipping enqueue.', { messageId: message.id });
    return;
  }

  await enqueueMessage(message.sessionKey, async () => {
    logger.info(`Processing message from ${message.sender.displayName} on ${message.channel.type}: ${message.content}`);

    class="hl-cmt">// Resolve canonical identity
    const canonicalIdentity = getCanonicalIdentity(message);
    if (canonicalIdentity) {
      logger.debug(`Resolved canonical identity for sender: ${canonicalIdentity.id}`);
    }

    class="hl-cmt">// Evaluate DM policy
    const policyDecision = policyEngine.evaluate({
      messageId: message.id,
      senderId: message.sender.id,
      channelType: message.channel.type,
      content: message.content,
      hasAttachments: !!message.attachments?.length,
      isFirstContact: !policyEngine.getReputation(message.sender.id).firstSeen, class="hl-cmt">// Simplified
      timestamp: message.timestamp,
    });

    if (policyDecision.action === 'deny') {
      logger.warn(`Message denied by policy: ${policyDecision.reason}`);
      await channel.send({ channelId: message.channel.id, content: `Sorry, your message was denied: ${policyDecision.reason}` });
      return;
    }
    class="hl-cmt">// Handle other policy actions (challenge, queue, forward, rate_limit)

    class="hl-cmt">// Resolve routing to an agent
    const route = resolveRoute(message);
    if (route) {
      logger.debug(`Message routed to agent: ${route.agentId}`);
      class="hl-cmt">// TODO: Pass message to the resolved agent for processing
      await channel.send({ channelId: message.channel.id, content: `Message received and routed to agent ${route.agentId}.` });
    } else {
      logger.warn('No route found for message.');
      await channel.send({ channelId: message.channel.id, content: 'Sorry, I could not find a handler for your message.' });
    }

    class="hl-cmt">// Update sender reputation based on interaction outcome
    policyEngine.updateReputation({
      senderId: message.sender.id,
      type: 'positive', class="hl-cmt">// Or 'negative'/'neutral' based on agent's response
      scoreAdjustment: 1,
      reason: 'Message processed successfully',
    });

  }).catch(error => {
    logger.error(`Error processing message in lane ${message.sessionKey}:`, error);
    channel.send({ channelId: message.channel.id, content: 'An internal error occurred while processing your message.' }).catch(sendErr => logger.error('Failed to send error message:', sendErr));
  });
});

Sending Outgoing Messages

You can send messages directly to a specific channel or broadcast them:

import { getChannelManager } from './channels/core.js';

const manager = getChannelManager();

class="hl-cmt">// Send to a specific channel
await manager.send('discord', {
  channelId: '1234567890',
  content: 'Hello from the system!',
  contentType: 'text',
  buttons: [{ text: 'Click Me', type: 'callback', data: 'clicked' }],
});

class="hl-cmt">// Broadcast to all connected channels
await manager.broadcast({
  content: 'This is a broadcast message to all connected channels.',
  contentType: 'text',
});

class="hl-cmt">// Send a proactive message with priority
await manager.sendToUser('telegram', 'user_id_123', 'Your task is complete!', 'high');

Configuring DM Pairing and Policy

DM Pairing and Policy Engine are configured via their respective singleton instances:

import { getDMPairing } from './channels/dm-pairing.js';
import { getDMPolicyEngine } from './channels/dm-policy/engine.js';

class="hl-cmt">// Configure DM Pairing
const dmPairing = getDMPairing({
  enabled: true,
  pairingChannels: ['telegram', 'discord'], class="hl-cmt">// Only these channels require pairing
  codeLength: 8,
  codeExpiryMs: 30 * 60 * 1000, class="hl-cmt">// 30 minutes
  pairingMessage: 'Please pair with me! Your code is: {code}. Owner, approve with: /pairing approve {channel} {code}',
});
await dmPairing.loadAllowlist(); class="hl-cmt">// Load previously approved senders

class="hl-cmt">// Configure DM Policy Engine
const policyEngine = getDMPolicyEngine({
  defaultAction: 'challenge', class="hl-cmt">// Default to challenge if no rule matches
  initialReputationScore: 60,
});

class="hl-cmt">// Add a custom rule (example)
policyEngine.addRule({
  id: 'block-bad-words',
  name: 'Block Bad Words',
  description: 'Blocks messages containing specific offensive terms.',
  priority: 150,
  enabled: true,
  conditions: [
    { type: 'keyword', operator: 'eq', value: ['badword1', 'badword2'] },
  ],
  action: 'deny',
});

Testing with MockChannel

The MockChannel class is invaluable for testing channel-agnostic logic without needing actual platform connections.

import { MockChannel, ChannelManager, InboundMessage } from './channels/core.js';
import { expect } from 'chai';

describe('ChannelManager with MockChannel', () => {
  let manager: ChannelManager;
  let mockChannel: MockChannel;

  beforeEach(() => {
    manager = new ChannelManager();
    mockChannel = new MockChannel({ type: 'cli' }); class="hl-cmt">// Or any type
    manager.registerChannel(mockChannel);
  });

  afterEach(async () => {
    await manager.shutdown();
  });

  it('should receive and process a simulated message', async () => {
    let receivedMessage: InboundMessage | null = null;
    manager.onMessage(async (msg) => {
      receivedMessage = msg;
    });

    const simulated = mockChannel.simulateMessage('Hello, world!');

    expect(receivedMessage).to.not.be.null;
    expect(receivedMessage?.content).to.equal('Hello, world!');
    expect(mockChannel.getMessages()).to.include(simulated);
  });

  it('should send messages via the manager', async () => {
    await manager.send('cli', { channelId: 'mock-channel', content: 'Test outbound' });

    const sentMessages = mockChannel.getSentMessages();
    expect(sentMessages).to.have.lengthOf(1);
    expect(sentMessages[0].content).to.equal('Test outbound');
  });
});