Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions apps/bot/src/bot.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { TemporalModule } from 'nestjs-temporal';
import { temporalConfig } from '@app/common/config/temporal.config';
import { Connection } from '@temporalio/client';
import { RabbitMQModule } from './rabbitmq/rabbitmq.module';
import { HandlersModule } from './handlers/handlers.module';

@Module({
imports: [
Expand All @@ -17,25 +18,19 @@ import { RabbitMQModule } from './rabbitmq/rabbitmq.module';
isGlobal: true,
}),
RabbitMQModule,
HandlersModule,
TemporalModule.registerClientAsync({
inject: [ConfigService],
useFactory: async (config: ConfigService) => {
const uri = config.get('temporal.uri');
console.log('uri', uri);
let connection: Connection;
try {
connection = await Connection.connect({
address: uri,
});
} catch (error) {
console.error('Failed to connect to Temporal:', error);
throw error;
}
const connection: Connection = await Connection.connect({
address: uri,
});
return { connection };
},
}),
],
controllers: [BotController],
providers: [BotService],
})
export class BotModule {}
export class BotModule { }
249 changes: 76 additions & 173 deletions apps/bot/src/bot.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { IgnoreEvent, UpdateEvent } from '@app/common';
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WorkflowClient } from '@temporalio/client';
import { API_CONSTANTS, Bot, Context, RawApi } from 'grammy';
import { Other } from 'grammy/out/core/api';
import { Message, Update } from 'grammy/types';
import { InjectTemporalClient } from 'nestjs-temporal';

const HIVEMIND_BLOCKLIST = ['-1002141367711'];
import { Message } from 'grammy/types';
import { QuestionHandler } from './handlers/question.handler';
import { UpdateHandler } from './handlers/update.handler';
import { VerifyHandler } from './handlers/verify.handler';
import { SummaryHandler } from './handlers/summary.handler';

@Injectable()
export class BotService implements OnModuleInit {
Expand All @@ -16,14 +16,23 @@ export class BotService implements OnModuleInit {

constructor(
private readonly configService: ConfigService,
@InjectTemporalClient()
private readonly temporalClient: WorkflowClient,
private readonly questionHandler: QuestionHandler,
private readonly updateHandler: UpdateHandler,
private readonly verifyHandler: VerifyHandler,
private readonly summaryHandler: SummaryHandler,
) {
this.bot = new Bot(configService.get<string>('telegram.token'));
this.bot = new Bot(this.configService.get<string>('telegram.token'));

this.bot.command('start', this.start);
this.bot.command('verify', this.isAdmin, this.verify);
this.bot.command('summary', this.getSummary);
this.bot.command(
'verify',
this.isAdmin,
this.verifyHandler.handle.bind(this.verifyHandler),
);
this.bot.command(
'summary',
this.summaryHandler.handle.bind(this.summaryHandler),
);

Object.values(IgnoreEvent).map((event) => {
this.bot.on(event, () => {
Expand All @@ -34,20 +43,8 @@ export class BotService implements OnModuleInit {
Object.values(UpdateEvent).map((event) => {
this.bot.on(event, (ctx: Context) => {
this.logger.log(`Received ${event} from ${ctx.chat.id}`);
this.processEvent(event, ctx.update);
try {
if (
event !== UpdateEvent.CHAT_MEMBER &&
ctx.update.message?.from &&
!ctx.update.message.from.is_bot &&
ctx.update.message.text?.trim().length > 0
) {
this.processQuestion(ctx);
}
} catch (error) {
this.logger.error(error);
}

this.updateHandler.handle(ctx);
this.questionHandler.handle(ctx);
return;
});
});
Expand Down Expand Up @@ -123,153 +120,59 @@ export class BotService implements OnModuleInit {
await ctx.reply(text, { parse_mode: 'HTML' });
};

protected verify = async (ctx: Context) => {
const token = ctx.match;
const chat = ctx.chat;
const from = ctx.from;

if (!token || !chat || !from) {
await ctx.reply(
'Not enough data to complete this request. Reach out to our support team.',
);
return;
}

try {
const handle = await this.temporalClient.start('TelegramVerifyWorkflow', {
taskQueue: this.configService.get<string>('temporal.queue'),
args: [{ token, chat, from }],
workflowId: `telegram:verify:${token}`,
});

const result = await handle.result();
await ctx.reply(result);
} catch (error) {
console.error(error);
await ctx.reply(
'Something unexpected happened. Reach out to our support team.',
);
}
};

protected processEvent = async (event: string, update: Update) => {
try {
await this.temporalClient.start('TelegramEventWorkflow', {
taskQueue: this.configService.get<string>('temporal.queue'),
args: [{ event, update }],
workflowId: `telegram:event:${update.update_id}`,
});
} catch (error) {
console.error(error);
}
};

protected processQuestion = async (ctx: Context) => {
try {
if (HIVEMIND_BLOCKLIST.includes(ctx.chat.id.toString())) {
console.log('Skipping question from HIVEMIND for', ctx.chat.id);
return;
}

const community = await this.temporalClient.execute(
'TelegramGetCommunityWorkflow',
{
taskQueue: 'TEMPORAL_QUEUE_LIGHT',
args: [{ chatId: ctx.chat.id }],
workflowId: `telegram:getcommunity:${ctx.update.update_id}`,
},
);
if (!community) {
console.log('No community found for', ctx.chat.id);
return;
}

const reply = await this.temporalClient.execute(
'AgenticHivemindTemporalWorkflow',
{
taskQueue: 'HIVEMIND_AGENT_QUEUE',
args: [
{
community_id: community.id,
query: ctx.update.message.text,
enable_answer_skipping: true,
},
],
workflowId: `telegram:hivemind:${ctx.update.update_id}`,
},
);

if (!reply || reply.length === 0) {
console.log('No reply from hivemind.');
return;
}

const other = {
reply_parameters: {
message_id: ctx.update.message.message_id,
},
};

console.log('Reply from hivemind:', reply);
await ctx.reply(reply, other);
} catch (error) {
console.error(error);
}
};

protected getSummary = async (ctx: Context) => {
try {
const community = await this.temporalClient.execute(
'TelegramGetCommunityWorkflow',
{
taskQueue: 'TEMPORAL_QUEUE_LIGHT',
args: [{ chatId: ctx.chat.id }],
workflowId: `telegram:getcommunity:${ctx.update.update_id}`,
},
);
if (!community) {
console.log('No community found for', ctx.chat.id);
return;
}

const platform = await this.temporalClient.execute(
'TelegramGetPlatformWorkflow',
{
taskQueue: 'TEMPORAL_QUEUE_LIGHT',
args: [{ chatId: ctx.chat.id }],
workflowId: `telegram:getplatform:${ctx.update.update_id}`,
},
);
if (!platform) {
console.log('No platform found for', ctx.chat.id);
return;
}

const summary = await this.temporalClient.execute(
'PlatformSummariesWorkflow',
{
taskQueue: 'TEMPORAL_QUEUE_PYTHON_LIGHT',
args: [
{
platform_id: platform.id,
community_id: community.id,
start_date: null,
end_date: null,
extract_text_only: true,
},
],
workflowId: `telegram:summaries:${ctx.update.update_id}`,
},
);

if (!summary || summary.length === 0) {
console.log('No summary found for', ctx.chat.id);
return;
}

await ctx.reply(summary);
} catch (error) {
console.error(error);
}
};
// protected getSummary = async (ctx: Context) => {
// try {
// const community = await this.temporalClient.execute(
// 'TelegramGetCommunityWorkflow',
// {
// taskQueue: 'TEMPORAL_QUEUE_LIGHT',
// args: [{ chatId: ctx.chat.id }],
// workflowId: `telegram:getcommunity:${ctx.update.update_id}`,
// },
// );
// if (!community) {
// console.log('No community found for', ctx.chat.id);
// return;
// }

// const platform = await this.temporalClient.execute(
// 'TelegramGetPlatformWorkflow',
// {
// taskQueue: 'TEMPORAL_QUEUE_LIGHT',
// args: [{ chatId: ctx.chat.id }],
// workflowId: `telegram:getplatform:${ctx.update.update_id}`,
// },
// );
// if (!platform) {
// console.log('No platform found for', ctx.chat.id);
// return;
// }

// const summary = await this.temporalClient.execute(
// 'PlatformSummariesWorkflow',
// {
// taskQueue: 'TEMPORAL_QUEUE_PYTHON_LIGHT',
// args: [
// {
// platform_id: platform.id,
// community_id: community.id,
// start_date: null,
// end_date: null,
// extract_text_only: true,
// },
// ],
// workflowId: `telegram:summaries:${ctx.update.update_id}`,
// },
// );

// if (!summary || summary.length === 0) {
// console.log('No summary found for', ctx.chat.id);
// return;
// }

// await ctx.reply(summary);
// } catch (error) {
// console.error(error);
// }
// };
}
16 changes: 16 additions & 0 deletions apps/bot/src/handlers/base.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { WorkflowClient } from '@temporalio/client';
import { Context, FilterQuery } from 'grammy';
import { InjectTemporalClient } from 'nestjs-temporal';

export abstract class BaseHandler {
protected readonly logger = new Logger(this.constructor.name);

constructor(
@InjectTemporalClient()
protected readonly temporalClient: WorkflowClient,
protected readonly configService: ConfigService,
) { }
abstract handle(ctx: Context, event?: FilterQuery): Promise<void>;
}
14 changes: 14 additions & 0 deletions apps/bot/src/handlers/handlers.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TemporalModule } from 'nestjs-temporal';
import { QuestionHandler } from './question.handler';
import { UpdateHandler } from './update.handler';
import { VerifyHandler } from './verify.handler';
import { SummaryHandler } from './summary.handler';

@Module({
imports: [ConfigModule, TemporalModule],
providers: [QuestionHandler, UpdateHandler, VerifyHandler, SummaryHandler],
exports: [QuestionHandler, UpdateHandler, VerifyHandler, SummaryHandler],
})
export class HandlersModule { }
Loading
Loading