import { Subject, AnonymousSubject } from '../../Subject';
|
import { Subscriber } from '../../Subscriber';
|
import { Observable } from '../../Observable';
|
import { Subscription } from '../../Subscription';
|
import { Operator } from '../../Operator';
|
import { ReplaySubject } from '../../ReplaySubject';
|
import { Observer, NextObserver } from '../../types';
|
|
/**
|
* WebSocketSubjectConfig is a plain Object that allows us to make our
|
* webSocket configurable.
|
*
|
* <span class="informal">Provides flexibility to {@link webSocket}</span>
|
*
|
* It defines a set of properties to provide custom behavior in specific
|
* moments of the socket's lifecycle. When the connection opens we can
|
* use `openObserver`, when the connection is closed `closeObserver`, if we
|
* are interested in listening for data comming from server: `deserializer`,
|
* which allows us to customize the deserialization strategy of data before passing it
|
* to the socket client. By default `deserializer` is going to apply `JSON.parse` to each message comming
|
* from the Server.
|
*
|
* ## Example
|
* **deserializer**, the default for this property is `JSON.parse` but since there are just two options
|
* for incomming data, either be text or binarydata. We can apply a custom deserialization strategy
|
* or just simply skip the default behaviour.
|
* ```ts
|
* import { webSocket } from 'rxjs/webSocket';
|
*
|
* const wsSubject = webSocket({
|
* url: 'ws://localhost:8081',
|
* //Apply any transformation of your choice.
|
* deserializer: ({data}) => data
|
* });
|
*
|
* wsSubject.subscribe(console.log);
|
*
|
* // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
|
* //output
|
* //
|
* // This is a msg from the server
|
* ```
|
*
|
* **serializer** allows us tom apply custom serialization strategy but for the outgoing messages
|
* ```ts
|
* import { webSocket } from 'rxjs/webSocket';
|
*
|
* const wsSubject = webSocket({
|
* url: 'ws://localhost:8081',
|
* //Apply any transformation of your choice.
|
* serializer: msg => JSON.stringify({channel: "webDevelopment", msg: msg})
|
* });
|
*
|
* wsSubject.subscribe(() => subject.next("msg to the server"));
|
*
|
* // Let's suppose we have this on the Server: ws.send("This is a msg from the server")
|
* //output
|
* //
|
* // {"channel":"webDevelopment","msg":"msg to the server"}
|
* ```
|
*
|
* **closeObserver** allows us to set a custom error when an error raise up.
|
* ```ts
|
* import { webSocket } from 'rxjs/webSocket';
|
*
|
* const wsSubject = webSocket({
|
* url: 'ws://localhost:8081',
|
* closeObserver: {
|
next(closeEvent) {
|
const customError = { code: 6666, reason: "Custom evil reason" }
|
console.log(`code: ${customError.code}, reason: ${customError.reason}`);
|
}
|
}
|
* });
|
*
|
* //output
|
* // code: 6666, reason: Custom evil reason
|
* ```
|
*
|
* **openObserver**, Let's say we need to make some kind of init task before sending/receiving msgs to the
|
* webSocket or sending notification that the connection was successful, this is when
|
* openObserver is usefull for.
|
* ```ts
|
* import { webSocket } from 'rxjs/webSocket';
|
*
|
* const wsSubject = webSocket({
|
* url: 'ws://localhost:8081',
|
* openObserver: {
|
* next: () => {
|
* console.log('connetion ok');
|
* }
|
* },
|
* });
|
*
|
* //output
|
* // connetion ok`
|
* ```
|
* */
|
|
export interface WebSocketSubjectConfig<T> {
|
/** The url of the socket server to connect to */
|
url: string;
|
/** The protocol to use to connect */
|
protocol?: string | Array<string>;
|
/** @deprecated use {@link deserializer} */
|
resultSelector?: (e: MessageEvent) => T;
|
/**
|
* A serializer used to create messages from passed values before the
|
* messages are sent to the server. Defaults to JSON.stringify.
|
*/
|
serializer?: (value: T) => WebSocketMessage;
|
/**
|
* A deserializer used for messages arriving on the socket from the
|
* server. Defaults to JSON.parse.
|
*/
|
deserializer?: (e: MessageEvent) => T;
|
/**
|
* An Observer that watches when open events occur on the underlying web socket.
|
*/
|
openObserver?: NextObserver<Event>;
|
/**
|
* An Observer than watches when close events occur on the underlying webSocket
|
*/
|
closeObserver?: NextObserver<CloseEvent>;
|
/**
|
* An Observer that watches when a close is about to occur due to
|
* unsubscription.
|
*/
|
closingObserver?: NextObserver<void>;
|
/**
|
* A WebSocket constructor to use. This is useful for situations like using a
|
* WebSocket impl in Node (WebSocket is a DOM API), or for mocking a WebSocket
|
* for testing purposes
|
*/
|
WebSocketCtor?: { new(url: string, protocols?: string|string[]): WebSocket };
|
/** Sets the `binaryType` property of the underlying WebSocket. */
|
binaryType?: 'blob' | 'arraybuffer';
|
}
|
|
const DEFAULT_WEBSOCKET_CONFIG: WebSocketSubjectConfig<any> = {
|
url: '',
|
deserializer: (e: MessageEvent) => JSON.parse(e.data),
|
serializer: (value: any) => JSON.stringify(value),
|
};
|
|
const WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT =
|
'WebSocketSubject.error must be called with an object with an error code, and an optional reason: { code: number, reason: string }';
|
|
export type WebSocketMessage = string | ArrayBuffer | Blob | ArrayBufferView;
|
|
export class WebSocketSubject<T> extends AnonymousSubject<T> {
|
|
private _config: WebSocketSubjectConfig<T>;
|
|
/** @deprecated This is an internal implementation detail, do not use. */
|
_output: Subject<T>;
|
|
private _socket: WebSocket;
|
|
constructor(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) {
|
super();
|
if (urlConfigOrSource instanceof Observable) {
|
this.destination = destination;
|
this.source = urlConfigOrSource as Observable<T>;
|
} else {
|
const config = this._config = { ...DEFAULT_WEBSOCKET_CONFIG };
|
this._output = new Subject<T>();
|
if (typeof urlConfigOrSource === 'string') {
|
config.url = urlConfigOrSource;
|
} else {
|
for (let key in urlConfigOrSource) {
|
if (urlConfigOrSource.hasOwnProperty(key)) {
|
config[key] = urlConfigOrSource[key];
|
}
|
}
|
}
|
|
if (!config.WebSocketCtor && WebSocket) {
|
config.WebSocketCtor = WebSocket;
|
} else if (!config.WebSocketCtor) {
|
throw new Error('no WebSocket constructor can be found');
|
}
|
this.destination = new ReplaySubject();
|
}
|
}
|
|
lift<R>(operator: Operator<T, R>): WebSocketSubject<R> {
|
const sock = new WebSocketSubject<R>(this._config as WebSocketSubjectConfig<any>, <any> this.destination);
|
sock.operator = operator;
|
sock.source = this;
|
return sock;
|
}
|
|
private _resetState() {
|
this._socket = null;
|
if (!this.source) {
|
this.destination = new ReplaySubject();
|
}
|
this._output = new Subject<T>();
|
}
|
|
/**
|
* Creates an {@link Observable}, that when subscribed to, sends a message,
|
* defined by the `subMsg` function, to the server over the socket to begin a
|
* subscription to data over that socket. Once data arrives, the
|
* `messageFilter` argument will be used to select the appropriate data for
|
* the resulting Observable. When teardown occurs, either due to
|
* unsubscription, completion or error, a message defined by the `unsubMsg`
|
* argument will be send to the server over the WebSocketSubject.
|
*
|
* @param subMsg A function to generate the subscription message to be sent to
|
* the server. This will still be processed by the serializer in the
|
* WebSocketSubject's config. (Which defaults to JSON serialization)
|
* @param unsubMsg A function to generate the unsubscription message to be
|
* sent to the server at teardown. This will still be processed by the
|
* serializer in the WebSocketSubject's config.
|
* @param messageFilter A predicate for selecting the appropriate messages
|
* from the server for the output stream.
|
*/
|
multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
|
const self = this;
|
return new Observable((observer: Observer<any>) => {
|
try {
|
self.next(subMsg());
|
} catch (err) {
|
observer.error(err);
|
}
|
|
const subscription = self.subscribe(x => {
|
try {
|
if (messageFilter(x)) {
|
observer.next(x);
|
}
|
} catch (err) {
|
observer.error(err);
|
}
|
},
|
err => observer.error(err),
|
() => observer.complete());
|
|
return () => {
|
try {
|
self.next(unsubMsg());
|
} catch (err) {
|
observer.error(err);
|
}
|
subscription.unsubscribe();
|
};
|
});
|
}
|
|
private _connectSocket() {
|
const { WebSocketCtor, protocol, url, binaryType } = this._config;
|
const observer = this._output;
|
|
let socket: WebSocket = null;
|
try {
|
socket = protocol ?
|
new WebSocketCtor(url, protocol) :
|
new WebSocketCtor(url);
|
this._socket = socket;
|
if (binaryType) {
|
this._socket.binaryType = binaryType;
|
}
|
} catch (e) {
|
observer.error(e);
|
return;
|
}
|
|
const subscription = new Subscription(() => {
|
this._socket = null;
|
if (socket && socket.readyState === 1) {
|
socket.close();
|
}
|
});
|
|
socket.onopen = (e: Event) => {
|
const { _socket } = this;
|
if (!_socket) {
|
socket.close();
|
this._resetState();
|
return;
|
}
|
const { openObserver } = this._config;
|
if (openObserver) {
|
openObserver.next(e);
|
}
|
|
const queue = this.destination;
|
|
this.destination = Subscriber.create<T>(
|
(x) => {
|
if (socket.readyState === 1) {
|
try {
|
const { serializer } = this._config;
|
socket.send(serializer(x));
|
} catch (e) {
|
this.destination.error(e);
|
}
|
}
|
},
|
(e) => {
|
const { closingObserver } = this._config;
|
if (closingObserver) {
|
closingObserver.next(undefined);
|
}
|
if (e && e.code) {
|
socket.close(e.code, e.reason);
|
} else {
|
observer.error(new TypeError(WEBSOCKETSUBJECT_INVALID_ERROR_OBJECT));
|
}
|
this._resetState();
|
},
|
() => {
|
const { closingObserver } = this._config;
|
if (closingObserver) {
|
closingObserver.next(undefined);
|
}
|
socket.close();
|
this._resetState();
|
}
|
) as Subscriber<any>;
|
|
if (queue && queue instanceof ReplaySubject) {
|
subscription.add((<ReplaySubject<T>>queue).subscribe(this.destination));
|
}
|
};
|
|
socket.onerror = (e: Event) => {
|
this._resetState();
|
observer.error(e);
|
};
|
|
socket.onclose = (e: CloseEvent) => {
|
this._resetState();
|
const { closeObserver } = this._config;
|
if (closeObserver) {
|
closeObserver.next(e);
|
}
|
if (e.wasClean) {
|
observer.complete();
|
} else {
|
observer.error(e);
|
}
|
};
|
|
socket.onmessage = (e: MessageEvent) => {
|
try {
|
const { deserializer } = this._config;
|
observer.next(deserializer(e));
|
} catch (err) {
|
observer.error(err);
|
}
|
};
|
}
|
|
/** @deprecated This is an internal implementation detail, do not use. */
|
_subscribe(subscriber: Subscriber<T>): Subscription {
|
const { source } = this;
|
if (source) {
|
return source.subscribe(subscriber);
|
}
|
if (!this._socket) {
|
this._connectSocket();
|
}
|
this._output.subscribe(subscriber);
|
subscriber.add(() => {
|
const { _socket } = this;
|
if (this._output.observers.length === 0) {
|
if (_socket && _socket.readyState === 1) {
|
_socket.close();
|
}
|
this._resetState();
|
}
|
});
|
return subscriber;
|
}
|
|
unsubscribe() {
|
const { _socket } = this;
|
if (_socket && _socket.readyState === 1) {
|
_socket.close();
|
}
|
this._resetState();
|
super.unsubscribe();
|
}
|
}
|