import { HttpClient } from '@angular/common/http';
import { Inject, Injectable } from '@angular/core';
import { Environment } from 'src/environments/environment.interface';
import { ENVIRONMENT } from 'src/environments/environment.token';
import { AmqpService } from './amqp.abstract';
import { AuthService } from '../shared/auth/auth.abstract';
import { MessageQueueCountModel } from './models/message-queue-count.model';
import { BehaviorSubject, forkJoin, Observable, of, Subject } from 'rxjs';
import { take, tap } from 'rxjs/operators';
import { MessageDetailModel } from './models/message-detail.model';

@Injectable({
    providedIn: 'root',
})
export class AmqpApiService extends AmqpService {
    public messageQueueCountSubject: BehaviorSubject<MessageQueueCountModel[]> = new BehaviorSubject<MessageQueueCountModel[]>(undefined);
    public messageDetailSubject: BehaviorSubject<MessageDetailModel[]> = new BehaviorSubject<MessageDetailModel[]>(undefined);
    public shovelMessagesSubject: Subject<any> = new Subject<any>();

    constructor(http: HttpClient, @Inject(ENVIRONMENT) environment: Environment, authService: AuthService) {
        super(environment, http, authService);
    }

    public getQueueCounts(): void {
        let array: Observable<MessageQueueCountModel>[] = [];
        if (this.environment.queues) {
            this.environment.queues.forEach((queue) => {
                let resp = this.getQueue(queue).pipe(
                    tap((r) => {
                        r.queueName = queue;
                    })
                );
                array.push(resp);
            });
            forkJoin(array).subscribe((values) => {
                this.messageQueueCountSubject.next(values);
            });
        }
    }

    private getQueue(queue: string): Observable<MessageQueueCountModel> {
        let response = this.endpointGet<MessageQueueCountModel>(`${this.environment.apiBaseUrl}/amqpservice/${queue}`);
        return response;
    }

    public getDlqMessages(queue: string): void {
        this.getQueue(queue).subscribe((res) => {
            this.getQueueMessages(queue, 'deadletter', res.deadLetterMessageCount);
        });
    }

    public getActiveMessages(queue: string): void {
        this.getQueue(queue).subscribe((res) => {
            this.getQueueMessages(queue, 'active', res.activeMessageCount);
        });
    }

    private getQueueMessages(queue: string, queueType: string, count: number): void {
        this.endpointGet<MessageDetailModel[]>(`${this.environment.apiBaseUrl}/amqpservice/${queue}/${queueType}/${count}`).subscribe({
            next: (messageDetailResponse) => {
                messageDetailResponse.map((d) => {
                    d.body = JSON.stringify(d.body, null, 4);
                    if (d.body.includes('<') && d.body.includes('>')) {
                        d.body = d.body.split('&').join('&amp;');
                        d.body = d.body.split('<').join('&lt;');
                        d.body = d.body.split('>').join('&gt;');
                    }
                    let partitionKeyArr = d.partitionKey.split('.');
                    d.partitionKey = partitionKeyArr[partitionKeyArr.length - 1];
                });
                this.messageDetailSubject.next(messageDetailResponse);
            },
        });
    }

    public shovelMessages(queue: string): void {
        this.endpointPost(`${this.environment.apiBaseUrl}/amqpservice/${queue}`).subscribe(() => {
            this.shovelMessagesSubject.next();
        });
    }

    public deleteMessage(queue: string, queueType: string, messageId: string): Observable<null> {
        return this.endpointPost<null>(`${this.environment.apiBaseUrl}/amqpservice/${queue}/${queueType}/${messageId}`);
    }
}
