import { filter, mergeMap, map as rxmap, catchError, takeUntil } from 'rxjs/operators';
import { of } from 'rxjs';
import { combineEpics, ofType } from 'redux-observable';
import { cx, api, rx } from '../../../api';
import { actions as announcerActions } from '../announcer';
import { actions as processorActions } from './actions';

const defaultState = {
	// processorId => substate
};

const defaultSubstate = {
	subscribers: 0,
	matches: null,
	pending: false,
	error: null
};

// -------------------------------------------------------------

const actions = processorActions.devicePresenceMatches;

// -------------------------------------------------------------

const reducer = (state = defaultState, action) => {
	let copy = null;
	let substate = null;
	switch (action.type) {
		case actions.subscribe.type:
			copy = {...state};
			substate = copy[action.processorId];
			if (substate == null) {
				substate = copy[action.processorId] = defaultSubstate;
			}
			substate.subscribers += 1;
			return copy;
		case actions.matches.request.type:
			substate = state[action.processorId];
			return {
				...state,
				[action.processorId]: {
					...substate,
					pending: true,
					error: null
				}
			}
		case actions.matches.success.type:
			substate = state[action.processorId];
			return {
				...state,
				[action.processorId]: {
					...substate,
					pending: false,
					matches: action.matches
				}
			}
		case actions.matches.fail.type:
			substate = state[action.processorId];
			return {
				...state,
				[action.processorId]: {
					...substate,
					pending: false,
					error: action.errorMessage
				}
			}
		case actions.unsubscribe.type:
			copy = {...state};
			substate = state[action.processorId];
			if (substate != null) {
				substate.subscribers -= 1;
				if (substate.subscribers == 0) {
					delete copy[action.processorId];
				}
			}
			return copy;
		default:
			return state;
	}
}

// -------------------------------------------------------------

const loadEpic = (action$) => {
	return action$.pipe(
		ofType(actions.matches.request.type),
		mergeMap(action => {
			const filter = action.filter ? action.filter : new cx.ods.processors.DevicePresenceFilter();
			return rx(api.processors.devicePresenceDetector.matches, action.processorId, filter).pipe(
				rxmap(operation => actions.matches.success({ processorId: action.processorId, matches: operation.response() })),
				catchError(error => of(actions.matches.fail({ processorId: action.processorId, errorMessage: error.userMessage || error.message }))),
				takeUntil(action$.pipe(ofType(actions.matches.cancel.type))),
			)
		})
	);
}

const sift = (announcements) => {
	return announcements.filter(anmt =>
		cx.o.typeOf(anmt, cx.ods.processors.MessageProcessorStateChangeAnnouncement)
	);
}

const watchAnnouncerEpic = (action$, state$) => {
	return action$.pipe(
		ofType(announcerActions.announced.type),
		filter(action => // pass only if there are device presence detector announcements
			sift(action.announcements).length > 0
		),
		filter(action => {// pass only if there are subscribers present
			return sift(action.announcements).some(anmt => state$.value.processors.devicePresenceMatches[anmt.processorId] != null);
		}),
		mergeMap(action => {
			const sifted = sift(action.announcements).filter(anmt => state$.value.processors.devicePresenceMatches[anmt.processorId] != null);
			return of(...sifted.map(anmt => actions.matches.request({ processorId: anmt.processorId })));
		})
	);
}

const epic = combineEpics(loadEpic, watchAnnouncerEpic);

export { actions, reducer, epic };
