add kafka, zookeeper
try to use kafka
This commit is contained in:
		
							parent
							
								
									02e9f874da
								
							
						
					
					
						commit
						16f781888e
					
				
							
								
								
									
										10
									
								
								.env.example
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								.env.example
									
									
									
									
									
								
							@ -8,6 +8,12 @@ DB_USER=mam-kupi-admin
 | 
			
		||||
DB_PASSWORD=7TTLpNh4GtQcDAMY
 | 
			
		||||
DB_NAME=mam-kupi-db
 | 
			
		||||
 | 
			
		||||
#JWT
 | 
			
		||||
# jwt
 | 
			
		||||
JWT_SECRET=3XUR3uRX6KHH5LI7nsWUh7RyhpJ8ST9t
 | 
			
		||||
JWT_EXPIRATION_TIME=3600
 | 
			
		||||
JWT_EXPIRATION_TIME=3600
 | 
			
		||||
 | 
			
		||||
# kafka
 | 
			
		||||
ZOOKEEPER_HOST=localhost
 | 
			
		||||
ZOOKEEPER_PORT=2181
 | 
			
		||||
KAFKA_BROKER_HOST=localhost
 | 
			
		||||
KAFKA_BROKER_PORT=9092
 | 
			
		||||
@ -12,6 +12,37 @@ services:
 | 
			
		||||
    ports:
 | 
			
		||||
      - "25432:5432"
 | 
			
		||||
 | 
			
		||||
  zookeeper:
 | 
			
		||||
    image: docker.io/bitnami/zookeeper:3.7.1
 | 
			
		||||
    ports:
 | 
			
		||||
      - "2181:2181"
 | 
			
		||||
    environment:
 | 
			
		||||
      - ALLOW_ANONYMOUS_LOGIN=yes
 | 
			
		||||
    restart: on-failure
 | 
			
		||||
  kafka:
 | 
			
		||||
    image: docker.io/bitnami/kafka:3.4.1
 | 
			
		||||
    ports:
 | 
			
		||||
      - "9092:9092"
 | 
			
		||||
    environment:
 | 
			
		||||
      - KAFKA_BROKER_ID=1
 | 
			
		||||
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
 | 
			
		||||
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
 | 
			
		||||
      - ALLOW_PLAINTEXT_LISTENER=yes
 | 
			
		||||
      - KAFKA_TLS_CLIENT_AUTH=none
 | 
			
		||||
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 | 
			
		||||
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
 | 
			
		||||
      - KAFKA_CFG_DELETE_TOPIC_ENABLE=true
 | 
			
		||||
      - KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS=60000
 | 
			
		||||
      - KAFKA_CFG_COMPRESSION_TYPE=lz4
 | 
			
		||||
      - KAFKA_CFG_BATCH_SIZE=131072
 | 
			
		||||
      - KAFKA_CFG_LINGER_MS=10
 | 
			
		||||
      - KAFKA_MESSAGE_MAX_BYTES=52428800
 | 
			
		||||
      - KAFKA_ADVERTISED_HOST_NAME=localhost
 | 
			
		||||
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
 | 
			
		||||
    depends_on:
 | 
			
		||||
      - zookeeper
 | 
			
		||||
    restart: on-failure
 | 
			
		||||
 | 
			
		||||
volumes:
 | 
			
		||||
  mam-kupi-postgres-data:
 | 
			
		||||
    driver: local
 | 
			
		||||
 | 
			
		||||
@ -38,6 +38,7 @@
 | 
			
		||||
    "class-transformer": "^0.5.1",
 | 
			
		||||
    "class-validator": "^0.14.0",
 | 
			
		||||
    "cookie-parser": "^1.4.6",
 | 
			
		||||
    "kafkajs": "^2.2.4",
 | 
			
		||||
    "lodash": "^4.17.21",
 | 
			
		||||
    "passport": "^0.6.0",
 | 
			
		||||
    "passport-jwt": "^4.0.1",
 | 
			
		||||
 | 
			
		||||
@ -22,6 +22,7 @@ import { AuthModule } from './auth/auth.module';
 | 
			
		||||
      password: process.env.DB_PASSWORD,
 | 
			
		||||
      database: process.env.DB_NAME,
 | 
			
		||||
      entities: ['dist/**/*.entity.js'],
 | 
			
		||||
      // logging: ['query']
 | 
			
		||||
    }),
 | 
			
		||||
    TasksModule,
 | 
			
		||||
    LinksModule,
 | 
			
		||||
 | 
			
		||||
@ -16,8 +16,8 @@ export class Link {
 | 
			
		||||
  @Column()
 | 
			
		||||
  url: string;
 | 
			
		||||
 | 
			
		||||
  @Column({ nullable: true })
 | 
			
		||||
  lastCheckDate: Date = new Date();
 | 
			
		||||
  @Column({ type: 'timestamp', nullable: true })
 | 
			
		||||
  lastCheckDate: Date;
 | 
			
		||||
 | 
			
		||||
  @OneToMany(() => Price, (price) => price.link)
 | 
			
		||||
  prices: Price[];
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ export class LinksController {
 | 
			
		||||
  @Get('/')
 | 
			
		||||
  @UseGuards(JwtAuthGuard)
 | 
			
		||||
  async getLinks(@Req() request: RequestWithUser) {
 | 
			
		||||
    const links = await this.linksService.getLinks(request.user.id);
 | 
			
		||||
    const links = await this.linksService.getLinksByUser(request.user.id);
 | 
			
		||||
 | 
			
		||||
    return { links };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -8,5 +8,6 @@ import { LinksController } from './links.cotroller';
 | 
			
		||||
  imports: [TypeOrmModule.forFeature([Link])],
 | 
			
		||||
  controllers: [LinksController],
 | 
			
		||||
  providers: [LinksService],
 | 
			
		||||
  exports: [LinksService],
 | 
			
		||||
})
 | 
			
		||||
export class LinksModule {}
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,7 @@
 | 
			
		||||
import { Injectable } from '@nestjs/common';
 | 
			
		||||
import { Link } from './entities/links.entity';
 | 
			
		||||
import { InjectRepository } from '@nestjs/typeorm';
 | 
			
		||||
import { Repository } from 'typeorm';
 | 
			
		||||
import { MoreThanOrEqual, Repository, IsNull } from 'typeorm';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class LinksService {
 | 
			
		||||
@ -15,7 +15,16 @@ export class LinksService {
 | 
			
		||||
    return this.linksRepository.save(link);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async getLinks(userId: number) {
 | 
			
		||||
  async getLinksByUser(userId: number) {
 | 
			
		||||
    return this.linksRepository.findBy({ user: { id: userId } });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async getLinksByDate(minDate: Date) {
 | 
			
		||||
    return this.linksRepository.find({
 | 
			
		||||
      where: [
 | 
			
		||||
        { lastCheckDate: IsNull() },
 | 
			
		||||
        { lastCheckDate: MoreThanOrEqual(minDate) },
 | 
			
		||||
      ],
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										14
									
								
								src/main.ts
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								src/main.ts
									
									
									
									
									
								
							@ -2,21 +2,25 @@ import { NestFactory } from '@nestjs/core';
 | 
			
		||||
import { AppModule } from './app.module';
 | 
			
		||||
import * as cookieParser from 'cookie-parser';
 | 
			
		||||
import { ValidationPipe } from '@nestjs/common';
 | 
			
		||||
// import { MicroserviceOptions, Transport } from '@nestjs/microservices';
 | 
			
		||||
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
 | 
			
		||||
 | 
			
		||||
async function bootstrap() {
 | 
			
		||||
  const app = await NestFactory.create(AppModule);
 | 
			
		||||
  app.useGlobalPipes(new ValidationPipe());
 | 
			
		||||
  app.use(cookieParser());
 | 
			
		||||
  await app.listen(process.env.PORT);
 | 
			
		||||
  // await app.listen(process.env.PORT);
 | 
			
		||||
 | 
			
		||||
  // MICROSERVICE
 | 
			
		||||
  // app.connectMicroservice<MicroserviceOptions>({
 | 
			
		||||
  //   transport: Transport.NATS,
 | 
			
		||||
  //   options: { retryAttempts: 5, retryDelay: 3000 },
 | 
			
		||||
  //   transport: Transport.KAFKA,
 | 
			
		||||
  //   options: {
 | 
			
		||||
  //     client: {
 | 
			
		||||
  //       brokers: [`${process.env.KAFKA_BROKER_HOST}:${process.env.KAFKA_BROKER_PORT}`],
 | 
			
		||||
  //     }
 | 
			
		||||
  //   }
 | 
			
		||||
  // });
 | 
			
		||||
 | 
			
		||||
  // await app.startAllMicroservices();
 | 
			
		||||
  // await app.listen(3001);
 | 
			
		||||
  await app.listen(process.env.PORT);
 | 
			
		||||
}
 | 
			
		||||
bootstrap();
 | 
			
		||||
 | 
			
		||||
@ -1,9 +1,41 @@
 | 
			
		||||
import { Module } from '@nestjs/common';
 | 
			
		||||
import { TasksService } from './tasks.service';
 | 
			
		||||
import { HttpModule } from '@nestjs/axios';
 | 
			
		||||
import { LinksModule } from 'src/links/links.module';
 | 
			
		||||
import { ClientsModule, Transport } from '@nestjs/microservices';
 | 
			
		||||
import { ConfigModule, ConfigService } from '@nestjs/config';
 | 
			
		||||
 | 
			
		||||
@Module({
 | 
			
		||||
  imports: [HttpModule],
 | 
			
		||||
  imports: [
 | 
			
		||||
    ClientsModule.registerAsync([
 | 
			
		||||
      {
 | 
			
		||||
        name: 'PARSER',
 | 
			
		||||
        imports: [ConfigModule],
 | 
			
		||||
        inject: [ConfigService],
 | 
			
		||||
        useFactory: async (configService: ConfigService) => {
 | 
			
		||||
          const brokerUrl = `${configService.get(
 | 
			
		||||
            'KAFKA_BROKER_HOST',
 | 
			
		||||
          )}:${configService.get('KAFKA_BROKER_PORT')}`;
 | 
			
		||||
          console.log('brokerUrl: ', brokerUrl);
 | 
			
		||||
 | 
			
		||||
          return {
 | 
			
		||||
            transport: Transport.KAFKA,
 | 
			
		||||
            options: {
 | 
			
		||||
              client: {
 | 
			
		||||
                clientId: 'parsing',
 | 
			
		||||
                brokers: [brokerUrl],
 | 
			
		||||
              },
 | 
			
		||||
              consumer: {
 | 
			
		||||
                groupId: 'parsing-consumer',
 | 
			
		||||
              },
 | 
			
		||||
            },
 | 
			
		||||
          };
 | 
			
		||||
        },
 | 
			
		||||
      },
 | 
			
		||||
    ]),
 | 
			
		||||
    HttpModule,
 | 
			
		||||
    LinksModule,
 | 
			
		||||
  ],
 | 
			
		||||
  providers: [TasksService],
 | 
			
		||||
})
 | 
			
		||||
export class TasksModule {}
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,9 @@
 | 
			
		||||
import { HttpService } from '@nestjs/axios';
 | 
			
		||||
import { Injectable, Logger } from '@nestjs/common';
 | 
			
		||||
import { Inject, Injectable, Logger } from '@nestjs/common';
 | 
			
		||||
import { ClientKafka } from '@nestjs/microservices';
 | 
			
		||||
import { Cron, CronExpression } from '@nestjs/schedule';
 | 
			
		||||
import { firstValueFrom } from 'rxjs';
 | 
			
		||||
import { LinksService } from 'src/links/links.service';
 | 
			
		||||
 | 
			
		||||
// * * * * * *
 | 
			
		||||
// | | | | | |
 | 
			
		||||
@ -16,7 +18,11 @@ import { firstValueFrom } from 'rxjs';
 | 
			
		||||
export class TasksService {
 | 
			
		||||
  private readonly logger = new Logger(TasksService.name);
 | 
			
		||||
 | 
			
		||||
  constructor(private readonly httpService: HttpService) {}
 | 
			
		||||
  constructor(
 | 
			
		||||
    private readonly httpService: HttpService,
 | 
			
		||||
    private readonly linksService: LinksService,
 | 
			
		||||
    @Inject('PARSER') private readonly parserClient: ClientKafka,
 | 
			
		||||
  ) {}
 | 
			
		||||
 | 
			
		||||
  private async request(link: string) {
 | 
			
		||||
    try {
 | 
			
		||||
@ -29,16 +35,18 @@ export class TasksService {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Cron('0 */20 * * * *', {
 | 
			
		||||
  @Cron('*/20 * * * * *', {
 | 
			
		||||
    name: 'sendParse',
 | 
			
		||||
    timeZone: 'Europe/Moscow',
 | 
			
		||||
    disabled: true,
 | 
			
		||||
    // disabled: true
 | 
			
		||||
  })
 | 
			
		||||
  async sendParse() {
 | 
			
		||||
    const link =
 | 
			
		||||
      'https://novex.ru/catalog/product/nabor-polot-c-smart-35-75-70-140-brusn/';
 | 
			
		||||
    const res = await this.request(link);
 | 
			
		||||
    this.logger.debug(res);
 | 
			
		||||
    this.logger.debug('Called when the current second is 10');
 | 
			
		||||
    this.logger.log('TASK RUN');
 | 
			
		||||
    const links = await this.linksService.getLinksByDate(new Date());
 | 
			
		||||
    this.logger.log('LINKS', links);
 | 
			
		||||
    this.parserClient.emit(
 | 
			
		||||
      'link_to_parse',
 | 
			
		||||
      JSON.stringify(links.map((link) => ({ url: link.url, id: link.id }))),
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user