Compare commits
No commits in common. "add-kafka" and "master" have entirely different histories.
|
@ -8,12 +8,6 @@ DB_USER=mam-kupi-admin
|
|||
DB_PASSWORD=7TTLpNh4GtQcDAMY
|
||||
DB_NAME=mam-kupi-db
|
||||
|
||||
# jwt
|
||||
#JWT
|
||||
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
|
||||
JWT_EXPIRATION_TIME=3600
|
||||
|
||||
# kafka
|
||||
ZOOKEEPER_HOST=localhost
|
||||
ZOOKEEPER_PORT=2181
|
||||
KAFKA_BROKER_HOST=localhost
|
||||
KAFKA_BROKER_PORT=9092
|
|
@ -12,37 +12,6 @@ services:
|
|||
ports:
|
||||
- "25432:5432"
|
||||
|
||||
zookeeper:
|
||||
image: docker.io/bitnami/zookeeper:3.7.1
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
- ALLOW_ANONYMOUS_LOGIN=yes
|
||||
restart: on-failure
|
||||
kafka:
|
||||
image: docker.io/bitnami/kafka:3.4.1
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
- KAFKA_BROKER_ID=1
|
||||
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
|
||||
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
|
||||
- ALLOW_PLAINTEXT_LISTENER=yes
|
||||
- KAFKA_TLS_CLIENT_AUTH=none
|
||||
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
|
||||
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
|
||||
- KAFKA_CFG_DELETE_TOPIC_ENABLE=true
|
||||
- KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
|
||||
- KAFKA_CFG_COMPRESSION_TYPE=lz4
|
||||
- KAFKA_CFG_BATCH_SIZE=131072
|
||||
- KAFKA_CFG_LINGER_MS=10
|
||||
- KAFKA_MESSAGE_MAX_BYTES=52428800
|
||||
- KAFKA_ADVERTISED_HOST_NAME=localhost
|
||||
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
|
||||
depends_on:
|
||||
- zookeeper
|
||||
restart: on-failure
|
||||
|
||||
volumes:
|
||||
mam-kupi-postgres-data:
|
||||
driver: local
|
||||
|
|
|
@ -38,7 +38,6 @@
|
|||
"class-transformer": "^0.5.1",
|
||||
"class-validator": "^0.14.0",
|
||||
"cookie-parser": "^1.4.6",
|
||||
"kafkajs": "^2.2.4",
|
||||
"lodash": "^4.17.21",
|
||||
"passport": "^0.6.0",
|
||||
"passport-jwt": "^4.0.1",
|
||||
|
|
|
@ -22,7 +22,6 @@ import { AuthModule } from './auth/auth.module';
|
|||
password: process.env.DB_PASSWORD,
|
||||
database: process.env.DB_NAME,
|
||||
entities: ['dist/**/*.entity.js'],
|
||||
// logging: ['query']
|
||||
}),
|
||||
TasksModule,
|
||||
LinksModule,
|
||||
|
|
|
@ -16,8 +16,8 @@ export class Link {
|
|||
@Column()
|
||||
url: string;
|
||||
|
||||
@Column({ type: 'timestamp', nullable: true })
|
||||
lastCheckDate: Date;
|
||||
@Column({ nullable: true })
|
||||
lastCheckDate: Date = new Date();
|
||||
|
||||
@OneToMany(() => Price, (price) => price.link)
|
||||
prices: Price[];
|
||||
|
|
|
@ -25,7 +25,7 @@ export class LinksController {
|
|||
@Get('/')
|
||||
@UseGuards(JwtAuthGuard)
|
||||
async getLinks(@Req() request: RequestWithUser) {
|
||||
const links = await this.linksService.getLinksByUser(request.user.id);
|
||||
const links = await this.linksService.getLinks(request.user.id);
|
||||
|
||||
return { links };
|
||||
}
|
||||
|
|
|
@ -8,6 +8,5 @@ import { LinksController } from './links.cotroller';
|
|||
imports: [TypeOrmModule.forFeature([Link])],
|
||||
controllers: [LinksController],
|
||||
providers: [LinksService],
|
||||
exports: [LinksService],
|
||||
})
|
||||
export class LinksModule {}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { Link } from './entities/links.entity';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
@Injectable()
|
||||
export class LinksService {
|
||||
|
@ -15,16 +15,7 @@ export class LinksService {
|
|||
return this.linksRepository.save(link);
|
||||
}
|
||||
|
||||
async getLinksByUser(userId: number) {
|
||||
async getLinks(userId: number) {
|
||||
return this.linksRepository.findBy({ user: { id: userId } });
|
||||
}
|
||||
|
||||
async getLinksByDate(minDate: Date) {
|
||||
return this.linksRepository.find({
|
||||
where: [
|
||||
{ lastCheckDate: IsNull() },
|
||||
{ lastCheckDate: MoreThanOrEqual(minDate) },
|
||||
],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
14
src/main.ts
14
src/main.ts
|
@ -2,25 +2,21 @@ import { NestFactory } from '@nestjs/core';
|
|||
import { AppModule } from './app.module';
|
||||
import * as cookieParser from 'cookie-parser';
|
||||
import { ValidationPipe } from '@nestjs/common';
|
||||
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||
// import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||
|
||||
async function bootstrap() {
|
||||
const app = await NestFactory.create(AppModule);
|
||||
app.useGlobalPipes(new ValidationPipe());
|
||||
app.use(cookieParser());
|
||||
// await app.listen(process.env.PORT);
|
||||
await app.listen(process.env.PORT);
|
||||
|
||||
// MICROSERVICE
|
||||
// app.connectMicroservice<MicroserviceOptions>({
|
||||
// transport: Transport.KAFKA,
|
||||
// options: {
|
||||
// client: {
|
||||
// brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
|
||||
// }
|
||||
// }
|
||||
// transport: Transport.NATS,
|
||||
// options: { retryAttempts: 5, retryDelay: 3000 },
|
||||
// });
|
||||
|
||||
// await app.startAllMicroservices();
|
||||
await app.listen(process.env.PORT);
|
||||
// await app.listen(3001);
|
||||
}
|
||||
bootstrap();
|
||||
|
|
|
@ -1,41 +1,9 @@
|
|||
import { Module } from '@nestjs/common';
|
||||
import { TasksService } from './tasks.service';
|
||||
import { HttpModule } from '@nestjs/axios';
|
||||
import { LinksModule } from 'src/links/links.module';
|
||||
import { ClientsModule, Transport } from '@nestjs/microservices';
|
||||
import { ConfigModule, ConfigService } from '@nestjs/config';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
ClientsModule.registerAsync([
|
||||
{
|
||||
name: 'PARSER',
|
||||
imports: [ConfigModule],
|
||||
inject: [ConfigService],
|
||||
useFactory: async (configService: ConfigService) => {
|
||||
const brokerUrl = `${configService.get(
|
||||
'KAFKA_BROKER_HOST',
|
||||
)}:${configService.get('KAFKA_BROKER_PORT')}`;
|
||||
console.log('brokerUrl: ', brokerUrl);
|
||||
|
||||
return {
|
||||
transport: Transport.KAFKA,
|
||||
options: {
|
||||
client: {
|
||||
clientId: 'parsing',
|
||||
brokers: [brokerUrl],
|
||||
},
|
||||
consumer: {
|
||||
groupId: 'parsing-consumer',
|
||||
},
|
||||
},
|
||||
};
|
||||
},
|
||||
},
|
||||
]),
|
||||
HttpModule,
|
||||
LinksModule,
|
||||
],
|
||||
imports: [HttpModule],
|
||||
providers: [TasksService],
|
||||
})
|
||||
export class TasksModule {}
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
import { HttpService } from '@nestjs/axios';
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { ClientKafka } from '@nestjs/microservices';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Cron, CronExpression } from '@nestjs/schedule';
|
||||
import { firstValueFrom } from 'rxjs';
|
||||
import { LinksService } from 'src/links/links.service';
|
||||
|
||||
// * * * * * *
|
||||
// | | | | | |
|
||||
|
@ -18,11 +16,7 @@ import { LinksService } from 'src/links/links.service';
|
|||
export class TasksService {
|
||||
private readonly logger = new Logger(TasksService.name);
|
||||
|
||||
constructor(
|
||||
private readonly httpService: HttpService,
|
||||
private readonly linksService: LinksService,
|
||||
@Inject('PARSER') private readonly parserClient: ClientKafka,
|
||||
) {}
|
||||
constructor(private readonly httpService: HttpService) {}
|
||||
|
||||
private async request(link: string) {
|
||||
try {
|
||||
|
@ -35,18 +29,16 @@ export class TasksService {
|
|||
}
|
||||
}
|
||||
|
||||
@Cron('*/20 * * * * *', {
|
||||
@Cron('0 */20 * * * *', {
|
||||
name: 'sendParse',
|
||||
timeZone: 'Europe/Moscow',
|
||||
// disabled: true
|
||||
disabled: true,
|
||||
})
|
||||
async sendParse() {
|
||||
this.logger.log('TASK RUN');
|
||||
const links = await this.linksService.getLinksByDate(new Date());
|
||||
this.logger.log('LINKS', links);
|
||||
this.parserClient.emit(
|
||||
'link_to_parse',
|
||||
JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
|
||||
);
|
||||
const link =
|
||||
'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
|
||||
const res = await this.request(link);
|
||||
this.logger.debug(res);
|
||||
this.logger.debug('Called when the current second is 10');
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue