import { Injectable } from "@angular/core";
import { HttpClient, HttpDownloadProgressEvent, HttpEventType } from "@angular/common/http";
import { Observable } from "rxjs";
import { filter, map, mergeMap, pairwise, startWith } from "rxjs/operators";

@Injectable({
  providedIn: "root",
})
export class HttpEventStreamService {
  constructor(private readonly httpClient: HttpClient) {}

  post<TRequestBody extends object | null>(url: string, body: TRequestBody): Observable<string> {
    return this.httpClient
      .post(url, JSON.stringify(body), {
        observe: "events",
        responseType: "text",
        reportProgress: true,
        headers: { "Content-Type": "application/json" },
      })
      .pipe(
        // Retain only download progress events
        filter((event): event is HttpDownloadProgressEvent => event.type === HttpEventType.DownloadProgress),
        // Extract cumulative response text
        map((event) => event.partialText || ""),
        // Initialize with an empty string for 'pairwise'
        startWith(""),
        // Emit pairs of consecutive cumulative values: [previous, current]
        pairwise(),
        // Extract new text by removing previously processed content
        map(([previousChunk, currentChunk]) => currentChunk.substring(previousChunk.length)),
        // Split new text into individual SSE messages
        mergeMap((chunk: string) => chunk.split("\n")),
        // Trim whitespace from each message
        // map((message) => message.trim()),
        // Retain only messages starting with 'data: '
        filter((message) => message.startsWith("data: ")),
        // Remove the prefix and leading whitespace
        map((message) => message.substring(6))
      );
  }
}
