Compare commits

...

1 Commits

Author SHA1 Message Date
Pravdin Egor 16f781888e add kafka, zookeeper
try to use kafka
2023-09-09 12:37:38 +07:00
12 changed files with 5702 additions and 22 deletions

View File

@ -8,6 +8,12 @@ DB_USER=mam-kupi-admin
DB_PASSWORD=7TTLpNh4GtQcDAMY
DB_NAME=mam-kupi-db
#JWT
# jwt
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
JWT_EXPIRATION_TIME=3600
# kafka
ZOOKEEPER_HOST=localhost
ZOOKEEPER_PORT=2181
KAFKA_BROKER_HOST=localhost
KAFKA_BROKER_PORT=9092

View File

@ -12,6 +12,37 @@ services:
ports:
- "25432:5432"
zookeeper:
image: docker.io/bitnami/zookeeper:3.7.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
restart: on-failure
kafka:
image: docker.io/bitnami/kafka:3.4.1
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_TLS_CLIENT_AUTH=none
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
- KAFKA_CFG_COMPRESSION_TYPE=lz4
- KAFKA_CFG_BATCH_SIZE=131072
- KAFKA_CFG_LINGER_MS=10
- KAFKA_MESSAGE_MAX_BYTES=52428800
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
restart: on-failure
volumes:
mam-kupi-postgres-data:
driver: local

View File

@ -38,6 +38,7 @@
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"cookie-parser": "^1.4.6",
"kafkajs": "^2.2.4",
"lodash": "^4.17.21",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",

View File

@ -22,6 +22,7 @@ import { AuthModule } from './auth/auth.module';
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
entities: ['dist/**/*.entity.js'],
// logging: ['query']
}),
TasksModule,
LinksModule,

View File

@ -16,8 +16,8 @@ export class Link {
@Column()
url: string;
@Column({ nullable: true })
lastCheckDate: Date = new Date();
@Column({ type: 'timestamp', nullable: true })
lastCheckDate: Date;
@OneToMany(() => Price, (price) => price.link)
prices: Price[];

View File

@ -25,7 +25,7 @@ export class LinksController {
@Get('/')
@UseGuards(JwtAuthGuard)
async getLinks(@Req() request: RequestWithUser) {
const links = await this.linksService.getLinks(request.user.id);
const links = await this.linksService.getLinksByUser(request.user.id);
return { links };
}

View File

@ -8,5 +8,6 @@ import { LinksController } from './links.cotroller';
imports: [TypeOrmModule.forFeature([Link])],
controllers: [LinksController],
providers: [LinksService],
exports: [LinksService],
})
export class LinksModule {}

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { Link } from './entities/links.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
@Injectable()
export class LinksService {
@ -15,7 +15,16 @@ export class LinksService {
return this.linksRepository.save(link);
}
async getLinks(userId: number) {
async getLinksByUser(userId: number) {
return this.linksRepository.findBy({ user: { id: userId } });
}
async getLinksByDate(minDate: Date) {
return this.linksRepository.find({
where: [
{ lastCheckDate: IsNull() },
{ lastCheckDate: MoreThanOrEqual(minDate) },
],
});
}
}

View File

@ -2,21 +2,25 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import * as cookieParser from 'cookie-parser';
import { ValidationPipe } from '@nestjs/common';
// import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.useGlobalPipes(new ValidationPipe());
app.use(cookieParser());
await app.listen(process.env.PORT);
// await app.listen(process.env.PORT);
// MICROSERVICE
// app.connectMicroservice<MicroserviceOptions>({
// transport: Transport.NATS,
// options: { retryAttempts: 5, retryDelay: 3000 },
// transport: Transport.KAFKA,
// options: {
// client: {
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
// }
// }
// });
// await app.startAllMicroservices();
// await app.listen(3001);
await app.listen(process.env.PORT);
}
bootstrap();

View File

@ -1,9 +1,41 @@
import { Module } from '@nestjs/common';
import { TasksService } from './tasks.service';
import { HttpModule } from '@nestjs/axios';
import { LinksModule } from 'src/links/links.module';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [HttpModule],
imports: [
ClientsModule.registerAsync([
{
name: 'PARSER',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
const brokerUrl = `${configService.get(
'KAFKA_BROKER_HOST',
)}:${configService.get('KAFKA_BROKER_PORT')}`;
console.log('brokerUrl: ', brokerUrl);
return {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'parsing',
brokers: [brokerUrl],
},
consumer: {
groupId: 'parsing-consumer',
},
},
};
},
},
]),
HttpModule,
LinksModule,
],
providers: [TasksService],
})
export class TasksModule {}

View File

@ -1,7 +1,9 @@
import { HttpService } from '@nestjs/axios';
import { Injectable, Logger } from '@nestjs/common';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { Cron, CronExpression } from '@nestjs/schedule';
import { firstValueFrom } from 'rxjs';
import { LinksService } from 'src/links/links.service';
// * * * * * *
// | | | | | |
@ -16,7 +18,11 @@ import { firstValueFrom } from 'rxjs';
export class TasksService {
private readonly logger = new Logger(TasksService.name);
constructor(private readonly httpService: HttpService) {}
constructor(
private readonly httpService: HttpService,
private readonly linksService: LinksService,
@Inject('PARSER') private readonly parserClient: ClientKafka,
) {}
private async request(link: string) {
try {
@ -29,16 +35,18 @@ export class TasksService {
}
}
@Cron('0 */20 * * * *', {
@Cron('*/20 * * * * *', {
name: 'sendParse',
timeZone: 'Europe/Moscow',
disabled: true,
// disabled: true
})
async sendParse() {
const link =
'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
const res = await this.request(link);
this.logger.debug(res);
this.logger.debug('Called when the current second is 10');
this.logger.log('TASK RUN');
const links = await this.linksService.getLinksByDate(new Date());
this.logger.log('LINKS', links);
this.parserClient.emit(
'link_to_parse',
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
);
}
}

5587
yarn.lock Normal file

File diff suppressed because it is too large Load Diff