import { BehaviorSubject, MonoTypeOperatorFunction, Observable } from "rxjs";
import { delay, map, multicast, refCount, retryWhen, scan, tap } from "rxjs/operators";

// 基于参数列表创建唯一ID
const generateParamsUUID = (ps: any[]) => {
    return ps.length > 0
        ? ps
              .map((p) => {
                  try {
                      switch (typeof p) {
                          case "function":
                          case "symbol":
                              return (Date.now() + Math.random() * 1000).toString();
                          case "object":
                              return JSON.stringify(p);
                          case "undefined":
                              return "0";
                          default:
                              return p ? p.toString() : "0";
                      }
                  } catch (err) {}
                  return "*";
              })
              .join("-")
        : "--empty--";
};

/**
 * 包装一个聚合订阅的多播订阅调用函数。【主要用于解决创建多播订阅的场景】
 * @param {Observable<T>} targetObservableCall
 * @param {T extends any} initValue 默认数据
 */
export function multicastObservable<T extends any, P extends any[] = any, O extends (...p: P) => Observable<T> = any>(
    targetObservableCall: O,
): (...p: P) => Observable<undefined | T>;
export function multicastObservable<T extends any, P extends any[] = any, O extends (...p: P) => Observable<T> = any>(wrapTarget: O, initValue: T): (...p: P) => Observable<T>;
export function multicastObservable<T extends any, P extends any[] = any, O extends (...p: P) => Observable<T> = any>(wrapTarget: O, initValue?: T) {
    // 创建缓存Map，以规避不同的传参，复用同一个订阅调用
    const cacheOfParams = new Map<string, BehaviorSubject<T>>();
    return (...params: P) => {
        const pid = generateParamsUUID(params);
        let middleSubject = cacheOfParams.get(pid);
        if (!middleSubject) {
            cacheOfParams.set(pid, (middleSubject = new BehaviorSubject(initValue as T)));
        }
        return wrapTarget(...params)
            .pipe(multicast(middleSubject))
            .pipe(refCount()) as Observable<T>;
    };
}

/**
 * 包装一个有生命周期的聚合订阅函数。【主要用于阻止接口在同一时间段内，无效的重复调用】
 * - 在第一次订阅未完成的情况下，所有的重复调用将被聚合，直到第一次调用进入完成态之前，执行所有重复订阅队列。
 * - 上一次订阅进入完成态后，再次调用，将自动重启原始调用。
 * @param targetObservableCall
 * @param {T extends any} initValue 默认数据
 */
export function aggregateObservable<T extends any, P extends any[] = any, O extends (...p: P) => Observable<T> = any>(
    targetObservableCall: O,
): (...p: P) => Observable<undefined | T>;
export function aggregateObservable<T extends any, P extends any[] = any, O extends (...p: P) => Observable<T> = any>(
    targetObservableCall: O,
    initValue: T,
): (...p: P) => Observable<T>;
export function aggregateObservable<T extends any, P extends any[] = any, O extends (...p: P) => Observable<T> = any>(targetObservableCall: O, initValue?: T): O {
    // 创建缓存Map，以规避不同的传参，复用同一个订阅调用
    const cacheOfParams = new Map<string, Observable<T>>();
    return ((...params) => {
        const pid = generateParamsUUID(params);
        // 获取基于传参的订阅缓存
        let proxyObservableCache = cacheOfParams.get(pid);
        if (!proxyObservableCache) {
            // 外层定义
            const multiObservable = multicastObservable(() => targetObservableCall(...params), initValue)();
            proxyObservableCache = new Observable<T>((obs) => {
                // 闭包调用，以确保多次订阅能作用到同一个原始可订阅对象上
                return multiObservable.subscribe({
                    next: (v) => {
                        return obs.next(v);
                    },
                    complete: () => {
                        // 本次生命周期完成后，清除缓存，确保下个生命周期的重新执行。
                        cacheOfParams.delete(pid);
                        return obs.complete();
                    },
                    error: (e) => {
                        cacheOfParams.delete(pid);
                        return obs.error(e);
                    },
                });
            });
            cacheOfParams.set(pid, proxyObservableCache);
        }
        return proxyObservableCache;
    }) as O;
}

export namespace RxTools {
    export const RETRY_WITH_DELAY_EXCEED = "Exceed count limit";

    /**
     * 再某种条件下重试请求的自定义操作符
     * @param condition 返回ture则重试
     * @param delayMill 重试的事件间隔
     * @param count 最多重试次数
     */
    export function retryWithDelay<T>(condition: (value: T) => boolean, { delayMill, count = 1 }: { delayMill: number; count: number }): MonoTypeOperatorFunction<T> {
        return (input) =>
            input.pipe(
                map((v) => {
                    if (condition(v)) {
                        throw v;
                    }
                    return v;
                }),
                retryWhen((errors) =>
                    errors.pipe(
                        scan((acc, error) => ({ count: acc.count + 1, error }), {
                            count: 1,
                            error: undefined as any,
                        }),
                        tap((current) => {
                            if (current.count > count) {
                                throw new Error(RETRY_WITH_DELAY_EXCEED);
                            }
                        }),
                        delay(delayMill),
                    ),
                ),
            );
    }
}
