Compare commits
No commits in common. "add-kafka" and "master" have entirely different histories.
|
@ -8,12 +8,6 @@ DB_USER=mam-kupi-admin
|
||||||
DB_PASSWORD=7TTLpNh4GtQcDAMY
|
DB_PASSWORD=7TTLpNh4GtQcDAMY
|
||||||
DB_NAME=mam-kupi-db
|
DB_NAME=mam-kupi-db
|
||||||
|
|
||||||
# jwt
|
#JWT
|
||||||
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
|
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
|
||||||
JWT_EXPIRATION_TIME=3600
|
JWT_EXPIRATION_TIME=3600
|
||||||
|
|
||||||
# kafka
|
|
||||||
ZOOKEEPER_HOST=localhost
|
|
||||||
ZOOKEEPER_PORT=2181
|
|
||||||
KAFKA_BROKER_HOST=localhost
|
|
||||||
KAFKA_BROKER_PORT=9092
|
|
|
@ -12,37 +12,6 @@ services:
|
||||||
ports:
|
ports:
|
||||||
- "25432:5432"
|
- "25432:5432"
|
||||||
|
|
||||||
zookeeper:
|
|
||||||
image: docker.io/bitnami/zookeeper:3.7.1
|
|
||||||
ports:
|
|
||||||
- "2181:2181"
|
|
||||||
environment:
|
|
||||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
|
||||||
restart: on-failure
|
|
||||||
kafka:
|
|
||||||
image: docker.io/bitnami/kafka:3.4.1
|
|
||||||
ports:
|
|
||||||
- "9092:9092"
|
|
||||||
environment:
|
|
||||||
- KAFKA_BROKER_ID=1
|
|
||||||
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
|
|
||||||
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
|
|
||||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
|
||||||
- KAFKA_TLS_CLIENT_AUTH=none
|
|
||||||
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
|
||||||
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
|
|
||||||
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
|
|
||||||
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
|
|
||||||
- KAFKA_CFG_COMPRESSION_TYPE=lz4
|
|
||||||
- KAFKA_CFG_BATCH_SIZE=131072
|
|
||||||
- KAFKA_CFG_LINGER_MS=10
|
|
||||||
- KAFKA_MESSAGE_MAX_BYTES=52428800
|
|
||||||
- KAFKA_ADVERTISED_HOST_NAME=localhost
|
|
||||||
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
|
|
||||||
depends_on:
|
|
||||||
- zookeeper
|
|
||||||
restart: on-failure
|
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
mam-kupi-postgres-data:
|
mam-kupi-postgres-data:
|
||||||
driver: local
|
driver: local
|
||||||
|
|
|
@ -38,7 +38,6 @@
|
||||||
"class-transformer": "^0.5.1",
|
"class-transformer": "^0.5.1",
|
||||||
"class-validator": "^0.14.0",
|
"class-validator": "^0.14.0",
|
||||||
"cookie-parser": "^1.4.6",
|
"cookie-parser": "^1.4.6",
|
||||||
"kafkajs": "^2.2.4",
|
|
||||||
"lodash": "^4.17.21",
|
"lodash": "^4.17.21",
|
||||||
"passport": "^0.6.0",
|
"passport": "^0.6.0",
|
||||||
"passport-jwt": "^4.0.1",
|
"passport-jwt": "^4.0.1",
|
||||||
|
|
|
@ -22,7 +22,6 @@ import { AuthModule } from './auth/auth.module';
|
||||||
password: process.env.DB_PASSWORD,
|
password: process.env.DB_PASSWORD,
|
||||||
database: process.env.DB_NAME,
|
database: process.env.DB_NAME,
|
||||||
entities: ['dist/**/*.entity.js'],
|
entities: ['dist/**/*.entity.js'],
|
||||||
// logging: ['query']
|
|
||||||
}),
|
}),
|
||||||
TasksModule,
|
TasksModule,
|
||||||
LinksModule,
|
LinksModule,
|
||||||
|
|
|
@ -16,8 +16,8 @@ export class Link {
|
||||||
@Column()
|
@Column()
|
||||||
url: string;
|
url: string;
|
||||||
|
|
||||||
@Column({ type: 'timestamp', nullable: true })
|
@Column({ nullable: true })
|
||||||
lastCheckDate: Date;
|
lastCheckDate: Date = new Date();
|
||||||
|
|
||||||
@OneToMany(() => Price, (price) => price.link)
|
@OneToMany(() => Price, (price) => price.link)
|
||||||
prices: Price[];
|
prices: Price[];
|
||||||
|
|
|
@ -25,7 +25,7 @@ export class LinksController {
|
||||||
@Get('/')
|
@Get('/')
|
||||||
@UseGuards(JwtAuthGuard)
|
@UseGuards(JwtAuthGuard)
|
||||||
async getLinks(@Req() request: RequestWithUser) {
|
async getLinks(@Req() request: RequestWithUser) {
|
||||||
const links = await this.linksService.getLinksByUser(request.user.id);
|
const links = await this.linksService.getLinks(request.user.id);
|
||||||
|
|
||||||
return { links };
|
return { links };
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,5 @@ import { LinksController } from './links.cotroller';
|
||||||
imports: [TypeOrmModule.forFeature([Link])],
|
imports: [TypeOrmModule.forFeature([Link])],
|
||||||
controllers: [LinksController],
|
controllers: [LinksController],
|
||||||
providers: [LinksService],
|
providers: [LinksService],
|
||||||
exports: [LinksService],
|
|
||||||
})
|
})
|
||||||
export class LinksModule {}
|
export class LinksModule {}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { Link } from './entities/links.entity';
|
import { Link } from './entities/links.entity';
|
||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
|
import { Repository } from 'typeorm';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class LinksService {
|
export class LinksService {
|
||||||
|
@ -15,16 +15,7 @@ export class LinksService {
|
||||||
return this.linksRepository.save(link);
|
return this.linksRepository.save(link);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLinksByUser(userId: number) {
|
async getLinks(userId: number) {
|
||||||
return this.linksRepository.findBy({ user: { id: userId } });
|
return this.linksRepository.findBy({ user: { id: userId } });
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLinksByDate(minDate: Date) {
|
|
||||||
return this.linksRepository.find({
|
|
||||||
where: [
|
|
||||||
{ lastCheckDate: IsNull() },
|
|
||||||
{ lastCheckDate: MoreThanOrEqual(minDate) },
|
|
||||||
],
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
14
src/main.ts
14
src/main.ts
|
@ -2,25 +2,21 @@ import { NestFactory } from '@nestjs/core';
|
||||||
import { AppModule } from './app.module';
|
import { AppModule } from './app.module';
|
||||||
import * as cookieParser from 'cookie-parser';
|
import * as cookieParser from 'cookie-parser';
|
||||||
import { ValidationPipe } from '@nestjs/common';
|
import { ValidationPipe } from '@nestjs/common';
|
||||||
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
// import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||||
|
|
||||||
async function bootstrap() {
|
async function bootstrap() {
|
||||||
const app = await NestFactory.create(AppModule);
|
const app = await NestFactory.create(AppModule);
|
||||||
app.useGlobalPipes(new ValidationPipe());
|
app.useGlobalPipes(new ValidationPipe());
|
||||||
app.use(cookieParser());
|
app.use(cookieParser());
|
||||||
// await app.listen(process.env.PORT);
|
await app.listen(process.env.PORT);
|
||||||
|
|
||||||
// MICROSERVICE
|
// MICROSERVICE
|
||||||
// app.connectMicroservice<MicroserviceOptions>({
|
// app.connectMicroservice<MicroserviceOptions>({
|
||||||
// transport: Transport.KAFKA,
|
// transport: Transport.NATS,
|
||||||
// options: {
|
// options: { retryAttempts: 5, retryDelay: 3000 },
|
||||||
// client: {
|
|
||||||
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// });
|
// });
|
||||||
|
|
||||||
// await app.startAllMicroservices();
|
// await app.startAllMicroservices();
|
||||||
await app.listen(process.env.PORT);
|
// await app.listen(3001);
|
||||||
}
|
}
|
||||||
bootstrap();
|
bootstrap();
|
||||||
|
|
|
@ -1,41 +1,9 @@
|
||||||
import { Module } from '@nestjs/common';
|
import { Module } from '@nestjs/common';
|
||||||
import { TasksService } from './tasks.service';
|
import { TasksService } from './tasks.service';
|
||||||
import { HttpModule } from '@nestjs/axios';
|
import { HttpModule } from '@nestjs/axios';
|
||||||
import { LinksModule } from 'src/links/links.module';
|
|
||||||
import { ClientsModule, Transport } from '@nestjs/microservices';
|
|
||||||
import { ConfigModule, ConfigService } from '@nestjs/config';
|
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [HttpModule],
|
||||||
ClientsModule.registerAsync([
|
|
||||||
{
|
|
||||||
name: 'PARSER',
|
|
||||||
imports: [ConfigModule],
|
|
||||||
inject: [ConfigService],
|
|
||||||
useFactory: async (configService: ConfigService) => {
|
|
||||||
const brokerUrl = `${configService.get(
|
|
||||||
'KAFKA_BROKER_HOST',
|
|
||||||
)}:${configService.get('KAFKA_BROKER_PORT')}`;
|
|
||||||
console.log('brokerUrl: ', brokerUrl);
|
|
||||||
|
|
||||||
return {
|
|
||||||
transport: Transport.KAFKA,
|
|
||||||
options: {
|
|
||||||
client: {
|
|
||||||
clientId: 'parsing',
|
|
||||||
brokers: [brokerUrl],
|
|
||||||
},
|
|
||||||
consumer: {
|
|
||||||
groupId: 'parsing-consumer',
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
},
|
|
||||||
},
|
|
||||||
]),
|
|
||||||
HttpModule,
|
|
||||||
LinksModule,
|
|
||||||
],
|
|
||||||
providers: [TasksService],
|
providers: [TasksService],
|
||||||
})
|
})
|
||||||
export class TasksModule {}
|
export class TasksModule {}
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
import { HttpService } from '@nestjs/axios';
|
import { HttpService } from '@nestjs/axios';
|
||||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
import { ClientKafka } from '@nestjs/microservices';
|
|
||||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||||
import { firstValueFrom } from 'rxjs';
|
import { firstValueFrom } from 'rxjs';
|
||||||
import { LinksService } from 'src/links/links.service';
|
|
||||||
|
|
||||||
// * * * * * *
|
// * * * * * *
|
||||||
// | | | | | |
|
// | | | | | |
|
||||||
|
@ -18,11 +16,7 @@ import { LinksService } from 'src/links/links.service';
|
||||||
export class TasksService {
|
export class TasksService {
|
||||||
private readonly logger = new Logger(TasksService.name);
|
private readonly logger = new Logger(TasksService.name);
|
||||||
|
|
||||||
constructor(
|
constructor(private readonly httpService: HttpService) {}
|
||||||
private readonly httpService: HttpService,
|
|
||||||
private readonly linksService: LinksService,
|
|
||||||
@Inject('PARSER') private readonly parserClient: ClientKafka,
|
|
||||||
) {}
|
|
||||||
|
|
||||||
private async request(link: string) {
|
private async request(link: string) {
|
||||||
try {
|
try {
|
||||||
|
@ -35,18 +29,16 @@ export class TasksService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Cron('*/20 * * * * *', {
|
@Cron('0 */20 * * * *', {
|
||||||
name: 'sendParse',
|
name: 'sendParse',
|
||||||
timeZone: 'Europe/Moscow',
|
timeZone: 'Europe/Moscow',
|
||||||
// disabled: true
|
disabled: true,
|
||||||
})
|
})
|
||||||
async sendParse() {
|
async sendParse() {
|
||||||
this.logger.log('TASK RUN');
|
const link =
|
||||||
const links = await this.linksService.getLinksByDate(new Date());
|
'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
|
||||||
this.logger.log('LINKS', links);
|
const res = await this.request(link);
|
||||||
this.parserClient.emit(
|
this.logger.debug(res);
|
||||||
'link_to_parse',
|
this.logger.debug('Called when the current second is 10');
|
||||||
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue