Compare commits

..

No commits in common. "add-kafka" and "master" have entirely different histories.

12 changed files with 22 additions and 5702 deletions

View File

@ -8,12 +8,6 @@ DB_USER=mam-kupi-admin
DB_PASSWORD=7TTLpNh4GtQcDAMY
DB_NAME=mam-kupi-db
# jwt
#JWT
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
JWT_EXPIRATION_TIME=3600
# kafka
ZOOKEEPER_HOST=localhost
ZOOKEEPER_PORT=2181
KAFKA_BROKER_HOST=localhost
KAFKA_BROKER_PORT=9092
JWT_EXPIRATION_TIME=3600

View File

@ -12,37 +12,6 @@ services:
ports:
- "25432:5432"
zookeeper:
image: docker.io/bitnami/zookeeper:3.7.1
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
restart: on-failure
kafka:
image: docker.io/bitnami/kafka:3.4.1
ports:
- "9092:9092"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_TLS_CLIENT_AUTH=none
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
- KAFKA_CFG_COMPRESSION_TYPE=lz4
- KAFKA_CFG_BATCH_SIZE=131072
- KAFKA_CFG_LINGER_MS=10
- KAFKA_MESSAGE_MAX_BYTES=52428800
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
restart: on-failure
volumes:
mam-kupi-postgres-data:
driver: local

View File

@ -38,7 +38,6 @@
"class-transformer": "^0.5.1",
"class-validator": "^0.14.0",
"cookie-parser": "^1.4.6",
"kafkajs": "^2.2.4",
"lodash": "^4.17.21",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",

View File

@ -22,7 +22,6 @@ import { AuthModule } from './auth/auth.module';
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
entities: ['dist/**/*.entity.js'],
// logging: ['query']
}),
TasksModule,
LinksModule,

View File

@ -16,8 +16,8 @@ export class Link {
@Column()
url: string;
@Column({ type: 'timestamp', nullable: true })
lastCheckDate: Date;
@Column({ nullable: true })
lastCheckDate: Date = new Date();
@OneToMany(() => Price, (price) => price.link)
prices: Price[];

View File

@ -25,7 +25,7 @@ export class LinksController {
@Get('/')
@UseGuards(JwtAuthGuard)
async getLinks(@Req() request: RequestWithUser) {
const links = await this.linksService.getLinksByUser(request.user.id);
const links = await this.linksService.getLinks(request.user.id);
return { links };
}

View File

@ -8,6 +8,5 @@ import { LinksController } from './links.cotroller';
imports: [TypeOrmModule.forFeature([Link])],
controllers: [LinksController],
providers: [LinksService],
exports: [LinksService],
})
export class LinksModule {}

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { Link } from './entities/links.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
import { Repository } from 'typeorm';
@Injectable()
export class LinksService {
@ -15,16 +15,7 @@ export class LinksService {
return this.linksRepository.save(link);
}
async getLinksByUser(userId: number) {
async getLinks(userId: number) {
return this.linksRepository.findBy({ user: { id: userId } });
}
async getLinksByDate(minDate: Date) {
return this.linksRepository.find({
where: [
{ lastCheckDate: IsNull() },
{ lastCheckDate: MoreThanOrEqual(minDate) },
],
});
}
}

View File

@ -2,25 +2,21 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import * as cookieParser from 'cookie-parser';
import { ValidationPipe } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
// import { MicroserviceOptions, Transport } from '@nestjs/microservices';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
app.useGlobalPipes(new ValidationPipe());
app.use(cookieParser());
// await app.listen(process.env.PORT);
await app.listen(process.env.PORT);
// MICROSERVICE
// app.connectMicroservice<MicroserviceOptions>({
// transport: Transport.KAFKA,
// options: {
// client: {
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
// }
// }
// transport: Transport.NATS,
// options: { retryAttempts: 5, retryDelay: 3000 },
// });
// await app.startAllMicroservices();
await app.listen(process.env.PORT);
// await app.listen(3001);
}
bootstrap();

View File

@ -1,41 +1,9 @@
import { Module } from '@nestjs/common';
import { TasksService } from './tasks.service';
import { HttpModule } from '@nestjs/axios';
import { LinksModule } from 'src/links/links.module';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'PARSER',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService) => {
const brokerUrl = `${configService.get(
'KAFKA_BROKER_HOST',
)}:${configService.get('KAFKA_BROKER_PORT')}`;
console.log('brokerUrl: ', brokerUrl);
return {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'parsing',
brokers: [brokerUrl],
},
consumer: {
groupId: 'parsing-consumer',
},
},
};
},
},
]),
HttpModule,
LinksModule,
],
imports: [HttpModule],
providers: [TasksService],
})
export class TasksModule {}

View File

@ -1,9 +1,7 @@
import { HttpService } from '@nestjs/axios';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { firstValueFrom } from 'rxjs';
import { LinksService } from 'src/links/links.service';
// * * * * * *
// | | | | | |
@ -18,11 +16,7 @@ import { LinksService } from 'src/links/links.service';
export class TasksService {
private readonly logger = new Logger(TasksService.name);
constructor(
private readonly httpService: HttpService,
private readonly linksService: LinksService,
@Inject('PARSER') private readonly parserClient: ClientKafka,
) {}
constructor(private readonly httpService: HttpService) {}
private async request(link: string) {
try {
@ -35,18 +29,16 @@ export class TasksService {
}
}
@Cron('*/20 * * * * *', {
@Cron('0 */20 * * * *', {
name: 'sendParse',
timeZone: 'Europe/Moscow',
// disabled: true
disabled: true,
})
async sendParse() {
this.logger.log('TASK RUN');
const links = await this.linksService.getLinksByDate(new Date());
this.logger.log('LINKS', links);
this.parserClient.emit(
'link_to_parse',
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
);
const link =
'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
const res = await this.request(link);
this.logger.debug(res);
this.logger.debug('Called when the current second is 10');
}
}

5587
yarn.lock

File diff suppressed because it is too large Load Diff