import {
  asyncScheduler, combineLatest, concatMap, defer, EMPTY, isObservable, merge, Observable, of, OperatorFunction, SchedulerLike, switchMap, take, timer
} from 'rxjs';
import { catchError, delayWhen, expand, filter, last, map, retry, sample, share, switchMapTo, tap } from 'rxjs/operators';

import { AsArray } from '../../shared';

/**
 * Flattens an array of observables and packs them all together into one observable as array.
 * The method is generic enough to handle mixtures from observables and synchronous values both as plain value or array.
 *
 * flattenObservableArray([of([1,2]), of(3), [4,5], 6]) -> Observable<[1,2,3,4,5,6]>
 *
 * @param observableArray array of observables
 * @return observable with an array where every given observables translate to one item in the new array
 */
export function flattenObservableArray<T>(observableArray: (Observable<T | T[]> | T | T[])[]): Observable<AsArray<T>> {
  if (!observableArray || observableArray.length === 0) {
    return of([] as AsArray<T>);
  }

  const allObservables = observableArray.map(value => isObservable(value) ? value : of(value));
  return combineLatest(allObservables)
    .pipe(
      map(items => {
            return items.reduce((acc: T[], next: T | T[]) => {
              const nextItems = Array.isArray(next) ? next : [next];
              return [...acc, ...nextItems];
            }, []);
          }
      )
    ) as Observable<AsArray<T>>;
}

/**
 * Custom rxjs operator to sample the source observable according to the sampling strategy. Despite sampling, the last value will always be emitted.
 * @param strategy object with runtime as key and sample time as value.
 * The value of the first key that is bigger than the source runtime will be used as next sample time.
 * If there is no bigger value than the current runtime, it will use the sample time of the last element.
 * If no strategy is given, the source observable will not be sampled
 * Example - Sample with 200ms in the first 10s, then with 300ms the next 10s and with 1000ms after that.
 *   e.g. {
 *     10000: 200,
 *     20000: 300,
 *     [Number.MAX_VALUE]: 1000
 *   };
 * would mean that for the first 10 seconds, it will sample in 200ms intervals, between 10s to 20s in 300ms intervals and after that in 1000ms intervals
 */
export const sampleWithStrategy = <T>(strategy: { [key: number]: number }) => (source: Observable<T>,
                                                                               scheduler: SchedulerLike = asyncScheduler): Observable<T> => {
  if (!strategy) {
    return source;
  }

  const sampler$ = defer(() => of(scheduler.now())).pipe(
    expand(start => {
      const strategyRuntime = Object.keys(strategy);
      const sampleCategory = strategyRuntime.find(key => scheduler.now() - start < +key) || strategyRuntime[strategyRuntime.length - 1];
      return timer(strategy[+sampleCategory]).pipe(map(() => start));
    })
  );

  let lastValue: T;
  const shared$ = source.pipe(share());
  const sampled$ = shared$.pipe(sample(sampler$), tap(value => lastValue = value));
  const last$ = shared$.pipe(last(), filter(value => value !== lastValue));

  return merge(sampled$, last$)
    .pipe(tap({
                complete: lastValue = undefined
              }));
};

/**
 * Operator to periodically poll a target
 * @param pollUntil condition which has to be met in order to stop polling
 * @param waitForMs function which determines the waiting time between polls depending on the total number of tries so far
 * @param retries number of consecutive errors which should be retried before end polling
 * @param canceledFn different from pollUntil as it does not emit if the condition is met
 */
export const pollPeriodically = <T>(pollUntil: (result: T) => boolean, waitForMs: (tries: number) => number, retries = 0,
                                    canceledFn?: () => boolean) => (source$: Observable<T>): Observable<T> => {
  let hitCount = 0;

  const execute = (shouldDelay = true) => {
    return of(1).pipe(
      tap(() => hitCount++),
      map(() => !shouldDelay && hitCount === 1 ? 0 : waitForMs(hitCount)), // delay increasingly but 1 min at max
      delayWhen(delayTime => timer(delayTime)),
      filter(() => !canceledFn || !canceledFn()), // do not run again if it was canceled in the meantime
      switchMapTo(source$),
      retry(retries)
    );
  };

  return execute(false).pipe(expand(result => result && pollUntil(result) ? EMPTY : execute()));
};

/**
 * Operator which executes the given {@param tapFn} only if the stream emit index is equal to the {@param tapIndex}.
 *
 * Examples: execute a backend call when the previous have finished
 * stream$.pipe(
 *    tapOnce(() => callAfterFirstEmit()), // optionally a 0 could be passed as second argument
 *    tapOnce(() => callAfterSecondEmit(), 1)
 * );
 *
 * @param tapFn function which will be executed
 * @param tapIndex tapIndex on which the function should be executed
 */
export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
  // eslint-disable-next-line eqeqeq
  if (tapIndex == null || tapIndex < 0) {
    throw new Error(`tapIndex must be a number being equal or greater than 0`);
  }

  return source$ => source$.pipe(concatMap((value, index) => {
    if (index === tapIndex) {
      tapFn?.(value);
    }
    return of(value);
  }));
}

/**
 * Options which can be passed optionally to {@see waitForOtherStream}
 */
interface WaitForOtherStreamOptions<D> {
  /** If set, the input stream will only be continued if the given property of the otherStream is truthy. */
  property?: keyof D;
  /** If set, the input stream will only be continued if the given function, which gets the otherStream data, evaluates to a truthy value. */
  verifyData?: (data: D) => boolean;
}

/**
 * Operator which pauses the input stream until the given {@see dataStream$} emitted.
 * This can be used if you need to listen for side-effects but don't want to interrupt your stream.
 *
 * Example with operator
 * stream$.pipe(waitForOtherStream(otherStream$))
 *
 * Example without operator
 * stream$.pipe(switchMap(data => otherStream$.pipe(take(1), map(() => data))))
 *
 * The operator allows to pass in an option object to verify the other stream in order to wait for specific conditions to apply.
 * For more details see {@see WaitForOtherStreamOptions}
 *
 * {@param otherStream$} stream which will halt the input stream until it emits
 * {@param options} optional options to change the behaviour of the operator
 */
export function waitForOtherStream<T, D>(otherStream$: Observable<D>, options?: WaitForOtherStreamOptions<D>): OperatorFunction<T, T> {
  return (input$: Observable<T>) =>
    input$.pipe(
      switchMap(input =>
                  otherStream$.pipe(
                    filter(other => {
                      return options?.property ? Boolean(other[options.property]) :
                             options?.verifyData ? options.verifyData(other) :
                             true;
                    }),
                    take(1),
                    map(() => input)
                  )
      )
    );
}

/**
 * Inspired by {@link https://ngrx.io/api/component-store/tapResponse tapResponse}. Instead of continuing with a different observable in case of
 * an error, this variant retries the caught source observable again, similar to the retry() operator.
 *
 * ATTENTION: the observable has to be hot for tapAllResponses to work properly! cold observables would start all over again.
 *
 * ```typescript
 *  public load = this.effect<void>(load$ => {
 *     return load$.pipe(
 *       switchMap(() => this.loadLibraries()),
 *       switchMap(() => this.loadPortals()),
 *       switchMap(() => this.loadAssets()),
 *       tapAllResponses(
 *         ({ paging }) => this.patchState({ paging }),
 *         error => console.error(error)
 *       )
 *     );
 *   });
 * ```
 */
export function tapAllResponses<T, E = unknown>(nextFn: (next: T) => void, errorFn: (error: E) => void,
                                                completeFn?: () => void): (source: Observable<T>) => Observable<T> {
  return source =>
    source.pipe(tap({ next: nextFn, complete: completeFn }),
                catchError((error, caught) => {
                  errorFn(error);
                  return caught;
                })
    );
}
