twitchPubSub.js

const eventemitter = require('eventemitter3');
const WebSocket = require('ws');
const util = require('util');

const _ = require('./helpers');
const logger = require('./logger').getLogger();

const WEBSOCKET_ADDRESS = 'wss://pubsub-edge.twitch.tv';
const WEBSOCKET_TIMEOUT = 4 * 60 * 1000;


//TODO Add this param and support
//@param {number}   config.maxReconnectAttempts Max number of reconnection attempts (Default: Infinity)
//@param {number}   config.maxReconnectInterval Max number of ms to delay a reconnection (Default: 30000)
//@param {number}   config.reconnectDecay The rate of increase of the reconnect delay (Default: 1.5)
//@param {number}   config.reconnectInterval Number of ms before attempting to reconnect (Default: 1000)
//@param {boolean}  config.secure Use secure connection (SSL / HTTPS) (Overrides port to 443)
//@param {number}   config.timeout Number of ms to disconnect if no responses from server (Default: 9999)

/**
 * @class TwitchPubSub
 * The Twitch PubSub implementation, as described in https://dev.twitch.tv/docs/pubsub/ .
 * @param {object}   config The config object.
 * @param {object}   config.logger The logger instance. If empty, the default logger will be used.
 * @param {string}   config.authToken The Twitch with OAuth token.
 * @param {boolean}  config.reconnect Reconnect to Twitch PubSub when disconnected from server. Default: false
 */
function TwitchPubSub(config) {
    eventemitter.call(this);
    this.logger = config.logger || logger;
    this.waitingResponseMap = new Map();
    this.subscriptions = [];

    this.config = config;
    this.config.reconnect = this.config.reconnect || true;
}

/**
 * Connects to the Twitch PubSub
 * @returns {Promise} Promise for the connection.
 */
TwitchPubSub.prototype.connect = function() {
    this.logger.debug('Connecting Twitch PubSub');
    this.ws = new WebSocket(WEBSOCKET_ADDRESS);
    this.ws.onopen = _onOpen.bind(this);
    this.ws.onclose = _onClose.bind(this);
    this.ws.onmessage = _onMessage.bind(this);
    this.ws.onerror = _onError.bind(this);

    this.ws.on('ping', _onPing.bind(this));
    this.ws.on('pong', _onPong.bind(this));

    this.refreshIntervalId = setInterval(
        _refresh.bind(this),
        WEBSOCKET_TIMEOUT
    );

    return new Promise(
        (resolve, reject) =>
            (this.connectionPromise = {
                resolve,
                reject
            })
    );
};

/**
 * Disconnects to the Twitch PubSub
 * @returns {Promise} Promise for the disconnection.
 */
TwitchPubSub.prototype.disconnect = function() {
    this.logger.debug('Disconnecting Twitch PubSub');
    this.ws.close();
    this.isConnected = false;
    clearInterval(this.refreshIntervalId);
    delete this.ws;
    delete this.waitingResponseMap;

    return new Promise(
        (resolve, reject) =>
            (this.disconnectionPromise = {
                resolve,
                reject
            })
    );
};

/**
 * Reconnect to WebSub and resubscribe to the already added topics.
 * @returns {Promise}
 */
TwitchPubSub.prototype.reconnect = async function() {
    //clear the previous interval
    clearInterval(this.refreshIntervalId);
    await this.connect();
    for (let i in this.subscriptions) {
        let subscription = this.subscriptions[i];
        this.subscribe(
            subscription.types,
            subscription.dataId,
            subscription.authToken,
            true
        );
    }
};

/**
 * Subscribe to a topic based on the type.
 * @param {String[]} types The types array. Valid options: 'bits', 'subscription', 'commerce', 'whisper'
 * @param {String} id The channel/user ID.
 * @param {String} authToken The oauth token with the necessary scopes.
 * @returns {Promise} The subscription promise that will be resolved when it receives the response.
 */
TwitchPubSub.prototype.subscribe = function(
    types,
    id,
    authToken,
    isReconnect
) {
    return new Promise((resolve, reject) => {
        if (!types) reject('Missing types parameter');
        if (!id) reject('Missing id parameter');
        if (!authToken) reject('Missing authToken parameter');

        let topics = [];
        for (let i in types) {
            let topic;
            switch (types[i]) {
                case 'bits':
                    topic = 'channel-bits-events-v1';
                    break;
                case 'subscription':
                    topic = 'channel-subscribe-events-v1';
                    break;
                case 'commerce':
                    topic = 'channel-commerce-events-v1';
                    break;
                case 'whisper':
                    topic = 'whispers';
                    break;
                default:
                    throw new Error('Unknown type.');
            }
            topic += '.' + id;
            topics.push(topic);
        }
        let key = _.uuidv4();
        let data = {
            type: 'LISTEN',
            nonce: key,
            data: {
                topics: topics,
                auth_token: authToken
            }
        };
        this.ws.send(JSON.stringify(data));
        this.logger.debug('Sending topic subscription for ' + topics);

        let subscription = {
            types,
            id,
            authToken,
            data: {
                resolve,
                reject
            }
        };
        this.waitingResponseMap.set(key, subscription);

        if (isReconnect) {
            this.subscriptions.push({
                types,
                id,
                authToken
            });
        }
    });
};

function _onOpen() {
    this.logger.debug('Twitch PubSub connected.');
    this.isConnected = true;
    this.connectionPromise.resolve();
    delete this.connectionPromise;
}

function _onClose() {
    this.logger.debug('Twitch PubSub connection closed.');
    if (this.disconnectionPromise) {
        this.isConnected = false;
        this.disconnectionPromise.resolve();
        delete this.disconnectionPromise;
    } else {
        return this.reconnect();
    }
}

function _onPing() {
    this.logger.debug('PING received.');
}

function _onPong() {
    this.logger.debug('PONG received.');
    this.lastPong = new Date().getTime();
}

function _onMessage(event) {
    this.logger.debug('New message received: ' + event.data);
    let message = JSON.parse(event.data);
    if (!message || !message.type) {
        throw new Error('Invalid message format.');
    } else if (message.type.toUpperCase() === 'RECONNECT') {
        this.logger.debug('Reconnecting in 30 seconds.');
        setTimeout(this.reconnect, 30000);
    } else if (message.type === 'RESPONSE') {
        _handleResponse.call(this, message);
    } else if (message.type === 'MESSAGE') {
        _handleMessage.call(this, message);
    }
}

function _onError(errorEvent) {
    logger.warn('Error received: ' + errorEvent.message);
}

function _handleResponse(message) {
    if (this.waitingResponseMap.has(message.nonce)) {
        if (message.type.toUpperCase() === 'RESPONSE') {
            let subscription = this.waitingResponseMap.get(message.nonce);
            if (!message.error || message.error.trim().length === 0) {
                subscription.data.resolve(message.nonce);
                subscription.state === 'connected';
                delete subscription.data;
            } else {
                subscription.data.reject(new Error(message.error));
                this.waitingResponseMap.delete(message.nonce);
            }
        }
    } else {
        this.logger.warn(
            'Connection for RESPONSE with nouce ' +
                message.nonce +
                ' not found.'
        );
    }
}

function _handleMessage(message) {
    let id = message.data.topic.split('.').slice(-1)[0];
    let topicArr = message.data.topic.split('.');
    let type;
    switch (topicArr[0]) {
        case 'channel-bits-events-v1':
            type = 'bits';
            break;
        case 'channel-subscribe-events-v1':
            type = 'subscription';
            break;
        case 'channel-commerce-events-v1':
            type = 'commerce';
            break;
        case 'whispers':
            type = 'whisper';
            break;
        default:
            this.logger.warn('Unknown topic ' + topicArr[0]);
    }

    if (type) {
        this.emit(type, id, message.data.message);
    }
}

function _refresh() {
    logger.debug('Refreshing the PubSub with a PING command.');
    this.ws.ping();
    let pingTime = new Date().getTime();

    //check if a PONG will be received in the next 10 seconds, otherwise issue a reconnects
    //CLEAR TIMEOUT
    let _this = this;
    setTimeout(() => {
        if (_this.config.reconnect && _this.lastPong < pingTime) {
            _this.reconnect();
        }
    }, 10000);
}

/* test code */
if (process.env.NODE_ENV === 'test') {
    TwitchPubSub.prototype._onOpen = _onOpen;
    TwitchPubSub.prototype._onMessage = _onMessage;
    TwitchPubSub.prototype._handleResponse = _handleResponse;
    TwitchPubSub.prototype._refresh = _refresh;
}
/* end-test code */

util.inherits(TwitchPubSub, eventemitter);
module.exports = TwitchPubSub;