Compare commits

..

No commits in common. "add-kafka" and "master" have entirely different histories.

12 changed files with 22 additions and 5702 deletions

View File

@ -8,12 +8,6 @@ DB_USER=mam-kupi-admin
DB_PASSWORD=7TTLpNh4GtQcDAMY DB_PASSWORD=7TTLpNh4GtQcDAMY
DB_NAME=mam-kupi-db DB_NAME=mam-kupi-db
# jwt #JWT
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
JWT_EXPIRATION_TIME=3600 JWT_EXPIRATION_TIME=3600
# kafka
ZOOKEEPER_HOST=localhost
ZOOKEEPER_PORT=2181
KAFKA_BROKER_HOST=localhost
KAFKA_BROKER_PORT=9092

View File

@ -12,37 +12,6 @@ services:
ports: ports:
- "25432:5432" - "25432:5432"
zookeeper:
image: docker.io/bitnami/zookeeper:3.7.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
restart: on-failure
kafka:
image: docker.io/bitnami/kafka:3.4.1
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_TLS_CLIENT_AUTH=none
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
- KAFKA_CFG_COMPRESSION_TYPE=lz4
- KAFKA_CFG_BATCH_SIZE=131072
- KAFKA_CFG_LINGER_MS=10
- KAFKA_MESSAGE_MAX_BYTES=52428800
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
restart: on-failure
volumes: volumes:
mam-kupi-postgres-data: mam-kupi-postgres-data:
driver: local driver: local

View File

@ -38,7 +38,6 @@
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.14.0", "class-validator": "^0.14.0",
"cookie-parser": "^1.4.6", "cookie-parser": "^1.4.6",
"kafkajs": "^2.2.4",
"lodash": "^4.17.21", "lodash": "^4.17.21",
"passport": "^0.6.0", "passport": "^0.6.0",
"passport-jwt": "^4.0.1", "passport-jwt": "^4.0.1",

View File

@ -22,7 +22,6 @@ import { AuthModule } from './auth/auth.module';
password: process.env.DB_PASSWORD, password: process.env.DB_PASSWORD,
database: process.env.DB_NAME, database: process.env.DB_NAME,
entities: ['dist/**/*.entity.js'], entities: ['dist/**/*.entity.js'],
// logging: ['query']
}), }),
TasksModule, TasksModule,
LinksModule, LinksModule,

View File

@ -16,8 +16,8 @@ export class Link {
@Column() @Column()
url: string; url: string;
@Column({ type: 'timestamp', nullable: true }) @Column({ nullable: true })
lastCheckDate: Date; lastCheckDate: Date = new Date();
@OneToMany(() => Price, (price) => price.link) @OneToMany(() => Price, (price) => price.link)
prices: Price[]; prices: Price[];

View File

@ -25,7 +25,7 @@ export class LinksController {
@Get('/') @Get('/')
@UseGuards(JwtAuthGuard) @UseGuards(JwtAuthGuard)
async getLinks(@Req() request: RequestWithUser) { async getLinks(@Req() request: RequestWithUser) {
const links = await this.linksService.getLinksByUser(request.user.id); const links = await this.linksService.getLinks(request.user.id);
return { links }; return { links };
} }

View File

@ -8,6 +8,5 @@ import { LinksController } from './links.cotroller';
imports: [TypeOrmModule.forFeature([Link])], imports: [TypeOrmModule.forFeature([Link])],
controllers: [LinksController], controllers: [LinksController],
providers: [LinksService], providers: [LinksService],
exports: [LinksService],
}) })
export class LinksModule {} export class LinksModule {}

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Link } from './entities/links.entity'; import { Link } from './entities/links.entity';
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm'; import { Repository } from 'typeorm';
@Injectable() @Injectable()
export class LinksService { export class LinksService {
@ -15,16 +15,7 @@ export class LinksService {
return this.linksRepository.save(link); return this.linksRepository.save(link);
} }
async getLinksByUser(userId: number) { async getLinks(userId: number) {
return this.linksRepository.findBy({ user: { id: userId } }); return this.linksRepository.findBy({ user: { id: userId } });
} }
async getLinksByDate(minDate: Date) {
return this.linksRepository.find({
where: [
{ lastCheckDate: IsNull() },
{ lastCheckDate: MoreThanOrEqual(minDate) },
],
});
}
} }

View File

@ -2,25 +2,21 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module'; import { AppModule } from './app.module';
import * as cookieParser from 'cookie-parser'; import * as cookieParser from 'cookie-parser';
import { ValidationPipe } from '@nestjs/common'; import { ValidationPipe } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices'; // import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() { async function bootstrap() {
const app = await NestFactory.create(AppModule); const app = await NestFactory.create(AppModule);
app.useGlobalPipes(new ValidationPipe()); app.useGlobalPipes(new ValidationPipe());
app.use(cookieParser()); app.use(cookieParser());
// await app.listen(process.env.PORT); await app.listen(process.env.PORT);
// MICROSERVICE // MICROSERVICE
// app.connectMicroservice<MicroserviceOptions>({ // app.connectMicroservice<MicroserviceOptions>({
// transport: Transport.KAFKA, // transport: Transport.NATS,
// options: { // options: { retryAttempts: 5, retryDelay: 3000 },
// client: {
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
// }
// }
// }); // });
// await app.startAllMicroservices(); // await app.startAllMicroservices();
await app.listen(process.env.PORT); // await app.listen(3001);
} }
bootstrap(); bootstrap();

View File

@ -1,41 +1,9 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { TasksService } from './tasks.service'; import { TasksService } from './tasks.service';
import { HttpModule } from '@nestjs/axios'; import { HttpModule } from '@nestjs/axios';
import { LinksModule } from 'src/links/links.module';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({ @Module({
imports: [ imports: [HttpModule],
ClientsModule.registerAsync([
{
name: 'PARSER',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
const brokerUrl = `${configService.get(
'KAFKA_BROKER_HOST',
)}:${configService.get('KAFKA_BROKER_PORT')}`;
console.log('brokerUrl: ', brokerUrl);
return {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'parsing',
brokers: [brokerUrl],
},
consumer: {
groupId: 'parsing-consumer',
},
},
};
},
},
]),
HttpModule,
LinksModule,
],
providers: [TasksService], providers: [TasksService],
}) })
export class TasksModule {} export class TasksModule {}

View File

@ -1,9 +1,7 @@
import { HttpService } from '@nestjs/axios'; import { HttpService } from '@nestjs/axios';
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { Cron, CronExpression } from '@nestjs/schedule'; import { Cron, CronExpression } from '@nestjs/schedule';
import { firstValueFrom } from 'rxjs'; import { firstValueFrom } from 'rxjs';
import { LinksService } from 'src/links/links.service';
// * * * * * * // * * * * * *
// | | | | | | // | | | | | |
@ -18,11 +16,7 @@ import { LinksService } from 'src/links/links.service';
export class TasksService { export class TasksService {
private readonly logger = new Logger(TasksService.name); private readonly logger = new Logger(TasksService.name);
constructor( constructor(private readonly httpService: HttpService) {}
private readonly httpService: HttpService,
private readonly linksService: LinksService,
@Inject('PARSER') private readonly parserClient: ClientKafka,
) {}
private async request(link: string) { private async request(link: string) {
try { try {
@ -35,18 +29,16 @@ export class TasksService {
} }
} }
@Cron('*/20 * * * * *', { @Cron('0 */20 * * * *', {
name: 'sendParse', name: 'sendParse',
timeZone: 'Europe/Moscow', timeZone: 'Europe/Moscow',
// disabled: true disabled: true,
}) })
async sendParse() { async sendParse() {
this.logger.log('TASK RUN'); const link =
const links = await this.linksService.getLinksByDate(new Date()); 'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
this.logger.log('LINKS', links); const res = await this.request(link);
this.parserClient.emit( this.logger.debug(res);
'link_to_parse', this.logger.debug('Called when the current second is 10');
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
);
} }
} }

5587
yarn.lock

File diff suppressed because it is too large Load Diff