import { BehaviorSubject, Observable, ReplaySubject } from 'rxjs';
import {
  debounceTime,
  distinctUntilChanged,
  takeUntil,
  tap,
} from 'rxjs/operators';

import { StreamManager } from 'src/app/minimal/services/StreamManager';
import { IMgStreamFilter, IMgStreamItem } from 'src/app/util/stream';

/**
 * A datasource used to query feeds in a paged manner,
 * rather than using a grpc stream.
 */
export abstract class PagedScrollerDatasource<T> {
  private _destroyed$ = new ReplaySubject<void>(1);
  /** Name of the stream, to be used to reset it */
  protected abstract streamName: string;
  /** How many items to return for each fetch */
  protected limit = 10;
  /** The current offset in the result list */
  protected offset = 0;
  /** How far from the bottom should we trigger grabbing new items */
  protected threshold = 3;
  /** Have we reached the end of the results */
  protected reachedEnd = false;
  /** Filter to pass to the stream */
  protected filter: IMgStreamFilter | null;
  /**
   * Trigger Fetch
   */
  protected triggerFetchSubj = new BehaviorSubject<boolean>(false);
  protected fetchTriggered$ = this.triggerFetchSubj
    .asObservable()
    .pipe(debounceTime(100));
  /** Data Stream */
  protected readonly _dataStream = new BehaviorSubject<IMgStreamItem<T>[]>([]);
  /** Loading */
  private readonly _loading$ = new BehaviorSubject<boolean>(false);
  readonly loading$: Observable<boolean> = this._loading$.asObservable();
  /** Loading */
  private readonly _error$: BehaviorSubject<any> = new BehaviorSubject<any>(
    undefined,
  );
  readonly error$: Observable<boolean> = this._error$.asObservable();

  private _hasFetched: boolean = false;

  /** Class Constructor */
  constructor(streamManager: StreamManager) {
    // listen to stream manager for restarts so that we can have new content
    // show up.
    streamManager.onStreamRestart
      .pipe(takeUntil(this._destroyed$))
      .subscribe(event => {
        if (event.name == this.streamName) {
          this.restartStream();
        }
      });
  }

  /**
   * Fetch
   *
   * To be implemented by parent class.
   */
  protected abstract _fetch();

  connect(
    filterObs: Observable<IMgStreamFilter | null>,
  ): Observable<IMgStreamItem<T>[]> {
    filterObs
      .pipe(
        takeUntil(this._destroyed$),
        distinctUntilChanged(),
        tap(filter => (this.filter = filter)),
      )
      .subscribe(filter => {
        // only restart the stream if the initial fetch has already been
        // triggered
        if (this._hasFetched) {
          this.restartStream();
        }
      });
    this.fetchTriggered$
      .pipe(takeUntil(this._destroyed$))
      .subscribe(async () => {
        if (!this._loading$.value) {
          this._loading$.next(true);
          try {
            this._hasFetched = true;
            await this._fetch();
          } catch (e) {
            this._error$.next(e);
          }
          this._loading$.next(false);
        }
      });
    return this._dataStream;
  }

  disconnect(): void {
    this._destroyed$.next();
    this._destroyed$.complete();
  }

  viewedIndexChange(viewIndex): void {
    if (viewIndex <= 0) {
      return;
    }
    if (!this.reachedEnd && viewIndex > this.offset - this.threshold) {
      const loading = this._loading$.getValue();
      // make sure we dont trigger another fetch if it's still loading
      if (!loading) {
        this.triggerFetchSubj.next(true);
      }
    }
  }

  addNewItems(newItems: IMgStreamItem<T>[]) {
    if (newItems.length > 0) {
      this.offset = this.offset + this.limit;
      const currentItems = this._dataStream.getValue();
      this._dataStream.next(currentItems.concat(newItems));
    }
    // if we received fewer items than we asked for, we've reached the end
    if (newItems.length < this.limit) {
      this.reachedEnd = true;
    }
  }

  restartStream() {
    this.offset = 0;
    this.reachedEnd = false;
    this._hasFetched = false;
    this._dataStream.next([]);
    this.triggerFetchSubj.next(true);
  }
}
