import { Observable } from '../../Observable';
|
import { Subscription } from '../../Subscription';
|
import { from } from '../../observable/from';
|
import { ObservableInput } from '../../types';
|
|
export function fromFetch<T>(
|
input: string | Request,
|
init: RequestInit & {
|
selector: (response: Response) => ObservableInput<T>
|
}
|
): Observable<T>;
|
|
export function fromFetch(
|
input: string | Request,
|
init?: RequestInit
|
): Observable<Response>;
|
|
/**
|
* Uses [the Fetch API](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API) to
|
* make an HTTP request.
|
*
|
* **WARNING** Parts of the fetch API are still experimental. `AbortController` is
|
* required for this implementation to work and use cancellation appropriately.
|
*
|
* Will automatically set up an internal [AbortController](https://developer.mozilla.org/en-US/docs/Web/API/AbortController)
|
* in order to teardown the internal `fetch` when the subscription tears down.
|
*
|
* If a `signal` is provided via the `init` argument, it will behave like it usually does with
|
* `fetch`. If the provided `signal` aborts, the error that `fetch` normally rejects with
|
* in that scenario will be emitted as an error from the observable.
|
*
|
* ### Basic Use
|
*
|
* ```ts
|
* import { of } from 'rxjs';
|
* import { fromFetch } from 'rxjs/fetch';
|
* import { switchMap, catchError } from 'rxjs/operators';
|
*
|
* const data$ = fromFetch('https://api.github.com/users?per_page=5').pipe(
|
* switchMap(response => {
|
* if (response.ok) {
|
* // OK return data
|
* return response.json();
|
* } else {
|
* // Server is returning a status requiring the client to try something else.
|
* return of({ error: true, message: `Error ${response.status}` });
|
* }
|
* }),
|
* catchError(err => {
|
* // Network or other error, handle appropriately
|
* console.error(err);
|
* return of({ error: true, message: err.message })
|
* })
|
* );
|
*
|
* data$.subscribe({
|
* next: result => console.log(result),
|
* complete: () => console.log('done')
|
* });
|
* ```
|
*
|
* ### Use with Chunked Transfer Encoding
|
*
|
* With HTTP responses that use [chunked transfer encoding](https://tools.ietf.org/html/rfc7230#section-3.3.1),
|
* the promise returned by `fetch` will resolve as soon as the response's headers are
|
* received.
|
*
|
* That means the `fromFetch` observable will emit a `Response` - and will
|
* then complete - before the body is received. When one of the methods on the
|
* `Response` - like `text()` or `json()` - is called, the returned promise will not
|
* resolve until the entire body has been received. Unsubscribing from any observable
|
* that uses the promise as an observable input will not abort the request.
|
*
|
* To facilitate aborting the retrieval of responses that use chunked transfer encoding,
|
* a `selector` can be specified via the `init` parameter:
|
*
|
* ```ts
|
* import { of } from 'rxjs';
|
* import { fromFetch } from 'rxjs/fetch';
|
*
|
* const data$ = fromFetch('https://api.github.com/users?per_page=5', {
|
* selector: response => response.json()
|
* });
|
*
|
* data$.subscribe({
|
* next: result => console.log(result),
|
* complete: () => console.log('done')
|
* });
|
* ```
|
*
|
* @param input The resource you would like to fetch. Can be a url or a request object.
|
* @param init A configuration object for the fetch.
|
* [See MDN for more details](https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch#Parameters)
|
* @returns An Observable, that when subscribed to performs an HTTP request using the native `fetch`
|
* function. The {@link Subscription} is tied to an `AbortController` for the the fetch.
|
*/
|
export function fromFetch<T>(
|
input: string | Request,
|
initWithSelector: RequestInit & {
|
selector?: (response: Response) => ObservableInput<T>
|
} = {}
|
): Observable<Response | T> {
|
const { selector, ...init } = initWithSelector;
|
return new Observable<Response | T>(subscriber => {
|
const controller = new AbortController();
|
const signal = controller.signal;
|
let abortable = true;
|
let unsubscribed = false;
|
|
const subscription = new Subscription();
|
subscription.add(() => {
|
unsubscribed = true;
|
if (abortable) {
|
controller.abort();
|
}
|
});
|
|
let perSubscriberInit: RequestInit;
|
if (init) {
|
// If a signal is provided, just have it teardown. It's a cancellation token, basically.
|
if (init.signal) {
|
if (init.signal.aborted) {
|
controller.abort();
|
} else {
|
const outerSignal = init.signal;
|
const outerSignalHandler = () => {
|
if (!signal.aborted) {
|
controller.abort();
|
}
|
};
|
outerSignal.addEventListener('abort', outerSignalHandler);
|
subscription.add(() => outerSignal.removeEventListener('abort', outerSignalHandler));
|
}
|
}
|
// init cannot be mutated or reassigned as it's closed over by the
|
// subscriber callback and is shared between subscribers.
|
perSubscriberInit = { ...init, signal };
|
} else {
|
perSubscriberInit = { signal };
|
}
|
|
fetch(input, perSubscriberInit).then(response => {
|
if (selector) {
|
subscription.add(from(selector(response)).subscribe(
|
value => subscriber.next(value),
|
err => {
|
abortable = false;
|
if (!unsubscribed) {
|
// Only forward the error if it wasn't an abort.
|
subscriber.error(err);
|
}
|
},
|
() => {
|
abortable = false;
|
subscriber.complete();
|
}
|
));
|
} else {
|
abortable = false;
|
subscriber.next(response);
|
subscriber.complete();
|
}
|
}).catch(err => {
|
abortable = false;
|
if (!unsubscribed) {
|
// Only forward the error if it wasn't an abort.
|
subscriber.error(err);
|
}
|
});
|
|
return subscription;
|
});
|
}
|