import { ofType, combineEpics } from "redux-observable";
import { mergeMap, map, catchError, takeUntil, withLatestFrom, filter } from 'rxjs/operators';
import { of } from 'rxjs';
import { api, cx, ods, rx } from "../../../../api";
import { actions } from "../actions";
import { actions as announcerActions } from '../../announcer';


const commuteEpic = (action$) => {
	return action$.pipe(
		ofType(actions.commute.request.type),
		mergeMap(action =>
			rx(api.assets.commute[action.mode], action.parameters).pipe(
				map(operation => actions.commute.success({ uid: action.uid, intervals: operation.response() })),
				catchError(error => of(actions.commute.fail({ uid: action.uid, errorMessage: error.userMessage || error.message }))),
				takeUntil(action$.pipe(ofType(actions.commute.cancel.type)))
			)
		)
	);
}

const watchLastEventEpic = (action$, state$) => {
	return action$.pipe(
		ofType(announcerActions.announced.type),
		filter((action) => action.announcements.some(announcement => cx.o.typeOf(announcement, ods.processors.DeviceAssetPresenceChangeAnnouncement))),
		withLatestFrom(state$.pipe(map(state => state.assets.commute))),
		mergeMap(([action, state]) => {
			const emit = [];
			Object.keys(state).forEach(uid => {
				const commuteStateItem = state[uid];
				if (commuteStateItem.parameters) {
					commuteStateItem.parameters.uris.forEach(uri => {
						if (action.announcements.some(announcement => announcement.uri == uri)) {
							const parameters = cx.meta.clone(commuteStateItem.parameters);
							emit.push(actions.commute.request({ uid, parameters, mode: commuteStateItem.mode }));
						}
					});
				}
			});
			return of.apply(this, emit);
		})
	)
}

export const epic = combineEpics(commuteEpic, watchLastEventEpic);
