Compare commits

..

1 Commits

Author SHA1 Message Date
Pravdin Egor 16f781888e add kafka, zookeeper
try to use kafka
2023-09-09 12:37:38 +07:00
12 changed files with 5702 additions and 22 deletions

View File

@ -8,6 +8,12 @@ DB_USER=mam-kupi-admin
DB_PASSWORD=7TTLpNh4GtQcDAMY DB_PASSWORD=7TTLpNh4GtQcDAMY
DB_NAME=mam-kupi-db DB_NAME=mam-kupi-db
#JWT # jwt
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
JWT_EXPIRATION_TIME=3600 JWT_EXPIRATION_TIME=3600
# kafka
ZOOKEEPER_HOST=localhost
ZOOKEEPER_PORT=2181
KAFKA_BROKER_HOST=localhost
KAFKA_BROKER_PORT=9092

View File

@ -12,6 +12,37 @@ services:
ports: ports:
- "25432:5432" - "25432:5432"
zookeeper:
image: docker.io/bitnami/zookeeper:3.7.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
restart: on-failure
kafka:
image: docker.io/bitnami/kafka:3.4.1
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_TLS_CLIENT_AUTH=none
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
- KAFKA_CFG_COMPRESSION_TYPE=lz4
- KAFKA_CFG_BATCH_SIZE=131072
- KAFKA_CFG_LINGER_MS=10
- KAFKA_MESSAGE_MAX_BYTES=52428800
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
restart: on-failure
volumes: volumes:
mam-kupi-postgres-data: mam-kupi-postgres-data:
driver: local driver: local

View File

@ -38,6 +38,7 @@
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.14.0", "class-validator": "^0.14.0",
"cookie-parser": "^1.4.6", "cookie-parser": "^1.4.6",
"kafkajs": "^2.2.4",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"passport": "^0.6.0", "passport": "^0.6.0",
"passport-jwt": "^4.0.1", "passport-jwt": "^4.0.1",

View File

@ -22,6 +22,7 @@ import { AuthModule } from './auth/auth.module';
password: process.env.DB_PASSWORD, password: process.env.DB_PASSWORD,
database: process.env.DB_NAME, database: process.env.DB_NAME,
entities: ['dist/**/*.entity.js'], entities: ['dist/**/*.entity.js'],
// logging: ['query']
}), }),
TasksModule, TasksModule,
LinksModule, LinksModule,

View File

@ -16,8 +16,8 @@ export class Link {
@Column() @Column()
url: string; url: string;
@Column({ nullable: true }) @Column({ type: 'timestamp', nullable: true })
lastCheckDate: Date = new Date(); lastCheckDate: Date;
@OneToMany(() => Price, (price) => price.link) @OneToMany(() => Price, (price) => price.link)
prices: Price[]; prices: Price[];

View File

@ -25,7 +25,7 @@ export class LinksController {
@Get('/') @Get('/')
@UseGuards(JwtAuthGuard) @UseGuards(JwtAuthGuard)
async getLinks(@Req() request: RequestWithUser) { async getLinks(@Req() request: RequestWithUser) {
const links = await this.linksService.getLinks(request.user.id); const links = await this.linksService.getLinksByUser(request.user.id);
return { links }; return { links };
} }

View File

@ -8,5 +8,6 @@ import { LinksController } from './links.cotroller';
imports: [TypeOrmModule.forFeature([Link])], imports: [TypeOrmModule.forFeature([Link])],
controllers: [LinksController], controllers: [LinksController],
providers: [LinksService], providers: [LinksService],
exports: [LinksService],
}) })
export class LinksModule {} export class LinksModule {}

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Link } from './entities/links.entity'; import { Link } from './entities/links.entity';
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm'; import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
@Injectable() @Injectable()
export class LinksService { export class LinksService {
@ -15,7 +15,16 @@ export class LinksService {
return this.linksRepository.save(link); return this.linksRepository.save(link);
} }
async getLinks(userId: number) { async getLinksByUser(userId: number) {
return this.linksRepository.findBy({ user: { id: userId } }); return this.linksRepository.findBy({ user: { id: userId } });
} }
async getLinksByDate(minDate: Date) {
return this.linksRepository.find({
where: [
{ lastCheckDate: IsNull() },
{ lastCheckDate: MoreThanOrEqual(minDate) },
],
});
}
} }

View File

@ -2,21 +2,25 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module'; import { AppModule } from './app.module';
import * as cookieParser from 'cookie-parser'; import * as cookieParser from 'cookie-parser';
import { ValidationPipe } from '@nestjs/common'; import { ValidationPipe } from '@nestjs/common';
// import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() { async function bootstrap() {
const app = await NestFactory.create(AppModule); const app = await NestFactory.create(AppModule);
app.useGlobalPipes(new ValidationPipe()); app.useGlobalPipes(new ValidationPipe());
app.use(cookieParser()); app.use(cookieParser());
await app.listen(process.env.PORT); // await app.listen(process.env.PORT);
// MICROSERVICE // MICROSERVICE
// app.connectMicroservice<MicroserviceOptions>({ // app.connectMicroservice<MicroserviceOptions>({
// transport: Transport.NATS, // transport: Transport.KAFKA,
// options: { retryAttempts: 5, retryDelay: 3000 }, // options: {
// client: {
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
// }
// }
// }); // });
// await app.startAllMicroservices(); // await app.startAllMicroservices();
// await app.listen(3001); await app.listen(process.env.PORT);
} }
bootstrap(); bootstrap();

View File

@ -1,9 +1,41 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { TasksService } from './tasks.service'; import { TasksService } from './tasks.service';
import { HttpModule } from '@nestjs/axios'; import { HttpModule } from '@nestjs/axios';
import { LinksModule } from 'src/links/links.module';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({ @Module({
imports: [HttpModule], imports: [
ClientsModule.registerAsync([
{
name: 'PARSER',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
const brokerUrl = `${configService.get(
'KAFKA_BROKER_HOST',
)}:${configService.get('KAFKA_BROKER_PORT')}`;
console.log('brokerUrl: ', brokerUrl);
return {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'parsing',
brokers: [brokerUrl],
},
consumer: {
groupId: 'parsing-consumer',
},
},
};
},
},
]),
HttpModule,
LinksModule,
],
providers: [TasksService], providers: [TasksService],
}) })
export class TasksModule {} export class TasksModule {}

View File

@ -1,7 +1,9 @@
import { HttpService } from '@nestjs/axios'; import { HttpService } from '@nestjs/axios';
import { Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { Cron, CronExpression } from '@nestjs/schedule'; import { Cron, CronExpression } from '@nestjs/schedule';
import { firstValueFrom } from 'rxjs'; import { firstValueFrom } from 'rxjs';
import { LinksService } from 'src/links/links.service';
// * * * * * * // * * * * * *
// | | | | | | // | | | | | |
@ -16,7 +18,11 @@ import { firstValueFrom } from 'rxjs';
export class TasksService { export class TasksService {
private readonly logger = new Logger(TasksService.name); private readonly logger = new Logger(TasksService.name);
constructor(private readonly httpService: HttpService) {} constructor(
private readonly httpService: HttpService,
private readonly linksService: LinksService,
@Inject('PARSER') private readonly parserClient: ClientKafka,
) {}
private async request(link: string) { private async request(link: string) {
try { try {
@ -29,16 +35,18 @@ export class TasksService {
} }
} }
@Cron('0 */20 * * * *', { @Cron('*/20 * * * * *', {
name: 'sendParse', name: 'sendParse',
timeZone: 'Europe/Moscow', timeZone: 'Europe/Moscow',
disabled: true, // disabled: true
}) })
async sendParse() { async sendParse() {
const link = this.logger.log('TASK RUN');
'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/'; const links = await this.linksService.getLinksByDate(new Date());
const res = await this.request(link); this.logger.log('LINKS', links);
this.logger.debug(res); this.parserClient.emit(
this.logger.debug('Called when the current second is 10'); 'link_to_parse',
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
);
} }
} }

5587
yarn.lock Normal file

File diff suppressed because it is too large Load Diff