Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Pravdin Egor | 16f781888e |
|
@ -8,6 +8,12 @@ DB_USER=mam-kupi-admin
|
||||||
DB_PASSWORD=7TTLpNh4GtQcDAMY
|
DB_PASSWORD=7TTLpNh4GtQcDAMY
|
||||||
DB_NAME=mam-kupi-db
|
DB_NAME=mam-kupi-db
|
||||||
|
|
||||||
#JWT
|
# jwt
|
||||||
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
|
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
|
||||||
JWT_EXPIRATION_TIME=3600
|
JWT_EXPIRATION_TIME=3600
|
||||||
|
|
||||||
|
# kafka
|
||||||
|
ZOOKEEPER_HOST=localhost
|
||||||
|
ZOOKEEPER_PORT=2181
|
||||||
|
KAFKA_BROKER_HOST=localhost
|
||||||
|
KAFKA_BROKER_PORT=9092
|
|
@ -12,6 +12,37 @@ services:
|
||||||
ports:
|
ports:
|
||||||
- "25432:5432"
|
- "25432:5432"
|
||||||
|
|
||||||
|
zookeeper:
|
||||||
|
image: docker.io/bitnami/zookeeper:3.7.1
|
||||||
|
ports:
|
||||||
|
- "2181:2181"
|
||||||
|
environment:
|
||||||
|
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||||
|
restart: on-failure
|
||||||
|
kafka:
|
||||||
|
image: docker.io/bitnami/kafka:3.4.1
|
||||||
|
ports:
|
||||||
|
- "9092:9092"
|
||||||
|
environment:
|
||||||
|
- KAFKA_BROKER_ID=1
|
||||||
|
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
|
||||||
|
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
|
||||||
|
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||||
|
- KAFKA_TLS_CLIENT_AUTH=none
|
||||||
|
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||||
|
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
|
||||||
|
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
|
||||||
|
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
|
||||||
|
- KAFKA_CFG_COMPRESSION_TYPE=lz4
|
||||||
|
- KAFKA_CFG_BATCH_SIZE=131072
|
||||||
|
- KAFKA_CFG_LINGER_MS=10
|
||||||
|
- KAFKA_MESSAGE_MAX_BYTES=52428800
|
||||||
|
- KAFKA_ADVERTISED_HOST_NAME=localhost
|
||||||
|
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
|
||||||
|
depends_on:
|
||||||
|
- zookeeper
|
||||||
|
restart: on-failure
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
mam-kupi-postgres-data:
|
mam-kupi-postgres-data:
|
||||||
driver: local
|
driver: local
|
||||||
|
|
|
@ -38,6 +38,7 @@
|
||||||
"class-transformer": "^0.5.1",
|
"class-transformer": "^0.5.1",
|
||||||
"class-validator": "^0.14.0",
|
"class-validator": "^0.14.0",
|
||||||
"cookie-parser": "^1.4.6",
|
"cookie-parser": "^1.4.6",
|
||||||
|
"kafkajs": "^2.2.4",
|
||||||
"lodash": "^4.17.21",
|
"lodash": "^4.17.21",
|
||||||
"passport": "^0.6.0",
|
"passport": "^0.6.0",
|
||||||
"passport-jwt": "^4.0.1",
|
"passport-jwt": "^4.0.1",
|
||||||
|
|
|
@ -22,6 +22,7 @@ import { AuthModule } from './auth/auth.module';
|
||||||
password: process.env.DB_PASSWORD,
|
password: process.env.DB_PASSWORD,
|
||||||
database: process.env.DB_NAME,
|
database: process.env.DB_NAME,
|
||||||
entities: ['dist/**/*.entity.js'],
|
entities: ['dist/**/*.entity.js'],
|
||||||
|
// logging: ['query']
|
||||||
}),
|
}),
|
||||||
TasksModule,
|
TasksModule,
|
||||||
LinksModule,
|
LinksModule,
|
||||||
|
|
|
@ -16,8 +16,8 @@ export class Link {
|
||||||
@Column()
|
@Column()
|
||||||
url: string;
|
url: string;
|
||||||
|
|
||||||
@Column({ nullable: true })
|
@Column({ type: 'timestamp', nullable: true })
|
||||||
lastCheckDate: Date = new Date();
|
lastCheckDate: Date;
|
||||||
|
|
||||||
@OneToMany(() => Price, (price) => price.link)
|
@OneToMany(() => Price, (price) => price.link)
|
||||||
prices: Price[];
|
prices: Price[];
|
||||||
|
|
|
@ -25,7 +25,7 @@ export class LinksController {
|
||||||
@Get('/')
|
@Get('/')
|
||||||
@UseGuards(JwtAuthGuard)
|
@UseGuards(JwtAuthGuard)
|
||||||
async getLinks(@Req() request: RequestWithUser) {
|
async getLinks(@Req() request: RequestWithUser) {
|
||||||
const links = await this.linksService.getLinks(request.user.id);
|
const links = await this.linksService.getLinksByUser(request.user.id);
|
||||||
|
|
||||||
return { links };
|
return { links };
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,5 +8,6 @@ import { LinksController } from './links.cotroller';
|
||||||
imports: [TypeOrmModule.forFeature([Link])],
|
imports: [TypeOrmModule.forFeature([Link])],
|
||||||
controllers: [LinksController],
|
controllers: [LinksController],
|
||||||
providers: [LinksService],
|
providers: [LinksService],
|
||||||
|
exports: [LinksService],
|
||||||
})
|
})
|
||||||
export class LinksModule {}
|
export class LinksModule {}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { Link } from './entities/links.entity';
|
import { Link } from './entities/links.entity';
|
||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
import { Repository } from 'typeorm';
|
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class LinksService {
|
export class LinksService {
|
||||||
|
@ -15,7 +15,16 @@ export class LinksService {
|
||||||
return this.linksRepository.save(link);
|
return this.linksRepository.save(link);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLinks(userId: number) {
|
async getLinksByUser(userId: number) {
|
||||||
return this.linksRepository.findBy({ user: { id: userId } });
|
return this.linksRepository.findBy({ user: { id: userId } });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getLinksByDate(minDate: Date) {
|
||||||
|
return this.linksRepository.find({
|
||||||
|
where: [
|
||||||
|
{ lastCheckDate: IsNull() },
|
||||||
|
{ lastCheckDate: MoreThanOrEqual(minDate) },
|
||||||
|
],
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
14
src/main.ts
14
src/main.ts
|
@ -2,21 +2,25 @@ import { NestFactory } from '@nestjs/core';
|
||||||
import { AppModule } from './app.module';
|
import { AppModule } from './app.module';
|
||||||
import * as cookieParser from 'cookie-parser';
|
import * as cookieParser from 'cookie-parser';
|
||||||
import { ValidationPipe } from '@nestjs/common';
|
import { ValidationPipe } from '@nestjs/common';
|
||||||
// import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||||
|
|
||||||
async function bootstrap() {
|
async function bootstrap() {
|
||||||
const app = await NestFactory.create(AppModule);
|
const app = await NestFactory.create(AppModule);
|
||||||
app.useGlobalPipes(new ValidationPipe());
|
app.useGlobalPipes(new ValidationPipe());
|
||||||
app.use(cookieParser());
|
app.use(cookieParser());
|
||||||
await app.listen(process.env.PORT);
|
// await app.listen(process.env.PORT);
|
||||||
|
|
||||||
// MICROSERVICE
|
// MICROSERVICE
|
||||||
// app.connectMicroservice<MicroserviceOptions>({
|
// app.connectMicroservice<MicroserviceOptions>({
|
||||||
// transport: Transport.NATS,
|
// transport: Transport.KAFKA,
|
||||||
// options: { retryAttempts: 5, retryDelay: 3000 },
|
// options: {
|
||||||
|
// client: {
|
||||||
|
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
|
||||||
|
// }
|
||||||
|
// }
|
||||||
// });
|
// });
|
||||||
|
|
||||||
// await app.startAllMicroservices();
|
// await app.startAllMicroservices();
|
||||||
// await app.listen(3001);
|
await app.listen(process.env.PORT);
|
||||||
}
|
}
|
||||||
bootstrap();
|
bootstrap();
|
||||||
|
|
|
@ -1,9 +1,41 @@
|
||||||
import { Module } from '@nestjs/common';
|
import { Module } from '@nestjs/common';
|
||||||
import { TasksService } from './tasks.service';
|
import { TasksService } from './tasks.service';
|
||||||
import { HttpModule } from '@nestjs/axios';
|
import { HttpModule } from '@nestjs/axios';
|
||||||
|
import { LinksModule } from 'src/links/links.module';
|
||||||
|
import { ClientsModule, Transport } from '@nestjs/microservices';
|
||||||
|
import { ConfigModule, ConfigService } from '@nestjs/config';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [HttpModule],
|
imports: [
|
||||||
|
ClientsModule.registerAsync([
|
||||||
|
{
|
||||||
|
name: 'PARSER',
|
||||||
|
imports: [ConfigModule],
|
||||||
|
inject: [ConfigService],
|
||||||
|
useFactory: async (configService: ConfigService) => {
|
||||||
|
const brokerUrl = `${configService.get(
|
||||||
|
'KAFKA_BROKER_HOST',
|
||||||
|
)}:${configService.get('KAFKA_BROKER_PORT')}`;
|
||||||
|
console.log('brokerUrl: ', brokerUrl);
|
||||||
|
|
||||||
|
return {
|
||||||
|
transport: Transport.KAFKA,
|
||||||
|
options: {
|
||||||
|
client: {
|
||||||
|
clientId: 'parsing',
|
||||||
|
brokers: [brokerUrl],
|
||||||
|
},
|
||||||
|
consumer: {
|
||||||
|
groupId: 'parsing-consumer',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
HttpModule,
|
||||||
|
LinksModule,
|
||||||
|
],
|
||||||
providers: [TasksService],
|
providers: [TasksService],
|
||||||
})
|
})
|
||||||
export class TasksModule {}
|
export class TasksModule {}
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import { HttpService } from '@nestjs/axios';
|
import { HttpService } from '@nestjs/axios';
|
||||||
import { Injectable, Logger } from '@nestjs/common';
|
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||||
|
import { ClientKafka } from '@nestjs/microservices';
|
||||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||||
import { firstValueFrom } from 'rxjs';
|
import { firstValueFrom } from 'rxjs';
|
||||||
|
import { LinksService } from 'src/links/links.service';
|
||||||
|
|
||||||
// * * * * * *
|
// * * * * * *
|
||||||
// | | | | | |
|
// | | | | | |
|
||||||
|
@ -16,7 +18,11 @@ import { firstValueFrom } from 'rxjs';
|
||||||
export class TasksService {
|
export class TasksService {
|
||||||
private readonly logger = new Logger(TasksService.name);
|
private readonly logger = new Logger(TasksService.name);
|
||||||
|
|
||||||
constructor(private readonly httpService: HttpService) {}
|
constructor(
|
||||||
|
private readonly httpService: HttpService,
|
||||||
|
private readonly linksService: LinksService,
|
||||||
|
@Inject('PARSER') private readonly parserClient: ClientKafka,
|
||||||
|
) {}
|
||||||
|
|
||||||
private async request(link: string) {
|
private async request(link: string) {
|
||||||
try {
|
try {
|
||||||
|
@ -29,16 +35,18 @@ export class TasksService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Cron('0 */20 * * * *', {
|
@Cron('*/20 * * * * *', {
|
||||||
name: 'sendParse',
|
name: 'sendParse',
|
||||||
timeZone: 'Europe/Moscow',
|
timeZone: 'Europe/Moscow',
|
||||||
disabled: true,
|
// disabled: true
|
||||||
})
|
})
|
||||||
async sendParse() {
|
async sendParse() {
|
||||||
const link =
|
this.logger.log('TASK RUN');
|
||||||
'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
|
const links = await this.linksService.getLinksByDate(new Date());
|
||||||
const res = await this.request(link);
|
this.logger.log('LINKS', links);
|
||||||
this.logger.debug(res);
|
this.parserClient.emit(
|
||||||
this.logger.debug('Called when the current second is 10');
|
'link_to_parse',
|
||||||
|
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue