diff --git a/helpers/redis.js b/helpers/redis.js index 90b440a94702b4c1084740ca2746804c2aa74688..610742caec8af70aafa7086fa0171c40c3a0097f 100644 --- a/helpers/redis.js +++ b/helpers/redis.js @@ -1,14 +1,103 @@ 'use strict'; -const RedisConnection = require('ioredis'); -const websockets = require('./websockets'); +var notRedis = { + state: {}, + topics: {}, + + publish: function(topic, msg, cb) { + //console.log("[notredis] publish",topic,msg); + if (!this.topics[topic]) { + this.topics[topic] = { + subscribers: [] + }; + } + var t=this.topics[topic]; + for (var i=0; i<t.subscribers.length; i++) { + var s=t.subscribers[i]; + if (s.handler) { + s.handler(topic, msg); + } + } + if (cb) cb(null); + }, + + subscribe: function(topics, cb) { + var handle = { + handler: null, + on: function(evt, cb) { + if (evt == "message") { + this.handler = cb; + } + } + }; + + for (var i=0; i<topics.length; i++) { + var topic = topics[i]; + if (!this.topics[topic]) { + this.topics[topic] = { + subscribers: [] + }; + } + + var t=this.topics[topic]; + t.subscribers.push(handle); + } + + cb(null, handle, topics.length); + return handle; + }, + + get: function(key, cb) { + cb(null, this.state[key]); + return this.state[key]; + }, + + set: function(key, val, cb) { + this.state[key] = val; + cb(); + }, + + del: function(key, cb) { + delete this.state[key]; + cb(null); + }, + + sadd: function(key, skey, cb) { + if (!this.state[key]) this.state[key] = {}; + this.state[key][skey] = true; + cb(null); + }, + + srem: function(key, skey, cb) { + if (this.state[key]) { + delete this.state[key][skey]; + } + cb(null); + }, + + smembers: function(key, cb) { + cb(null, Object.keys(this.state[key])); + }, + + incr: function(key, cb) { + if (!this.state[key]) this.state[key] = 0; + this.state[key]++; + cb(); + }, + + expire: function() { + }, +} module.exports = { - connectRedis(){ - const redisHost = process.env.REDIS_PORT_6379_TCP_ADDR || 'localhost'; - this.connection = new RedisConnection(6379, redisHost); + connectRedis: function() { + this.connection = notRedis; }, - sendMessage(action, model, attributes, channelId) { + getConnection: function() { + this.connectRedis(); + return this.connection; + }, + sendMessage: function(action, model, attributes, channelId) { const data = JSON.stringify({ channel_id: channelId, action: action, @@ -17,12 +106,12 @@ module.exports = { }); this.connection.publish('updates', data); }, - logIp(ip, cb) { + logIp: function(ip, cb) { this.connection.incr("ip_"+ ip, (err, socketCounter) => { cb(); }); }, - rateLimit(namespace, ip, cb) { + rateLimit: function(namespace, ip, cb) { const key = "limit_"+ namespace + "_"+ ip; const redis = this.connection; @@ -47,7 +136,7 @@ module.exports = { } }); }, - isOnlineInSpace(user, space, cb) { + isOnlineInSpace: function(user, space, cb) { this.connection.smembers("space_" + space._id.toString(), function(err, list) { if (err) cb(err); else { @@ -59,3 +148,6 @@ module.exports = { }); } }; + +return module.exports; + diff --git a/helpers/websockets.js b/helpers/websockets.js index 26fcd1c73db9de5e1a380b4994168c1a0e6deeb8..9394913c946a1ee8ec65fbdd7ff8d4e079da713a 100644 --- a/helpers/websockets.js +++ b/helpers/websockets.js @@ -3,18 +3,19 @@ require('../models/schema'); const WebSocketServer = require('ws').Server; -const Redis = require('ioredis'); const async = require('async'); const _ = require("underscore"); const mongoose = require("mongoose"); const crypto = require('crypto'); +var redis = require("./redis.js"); + module.exports = { - startWebsockets: function(server){ + startWebsockets: function(server) { this.setupSubscription(); - this.state = new Redis(6379, process.env.REDIS_PORT_6379_TCP_ADDR || 'localhost'); - - if(!this.current_websockets){ + this.state = redis.getConnection(); + + if(!this.current_websockets) { this.current_websockets = []; } @@ -117,8 +118,7 @@ module.exports = { }, setupSubscription: function() { - this.cursorSubscriber = new Redis(6379, process.env.REDIS_PORT_6379_TCP_ADDR || 'localhost'); - this.cursorSubscriber.subscribe(['cursors', 'users', 'updates'], function (err, count) { + this.cursorSubscriber = redis.getConnection().subscribe(['cursors', 'users', 'updates'], function (err, count) { console.log("[redis] websockets to " + count + " topics." ); }); this.cursorSubscriber.on('message', function (channel, rawMessage) { @@ -206,7 +206,7 @@ module.exports = { console.log("websocket not found to remove"); } - this.state.del(ws.id, function(err, res) { + this.state.del(ws.id+"", function(err, res) { if (err) console.error(err, res); else { this.removeUserInSpace(ws.space_id, ws, (err) => { @@ -221,7 +221,8 @@ module.exports = { addUserInSpace: function(username, space, ws, cb) { console.log("[websockets] user "+username+" in "+space.access_mode +" space " + space._id + " with socket " + ws.id); - this.state.set(ws.id, username, function(err, res) { + + this.state.set(ws.id+"", username+"", function(err, res) { if(err) console.error(err, res); else { this.state.sadd("space_" + space._id, ws.id, function(err, res) { @@ -238,7 +239,7 @@ module.exports = { }.bind(this)); }, removeUserInSpace: function(spaceId, ws, cb) { - this.state.srem("space_" + spaceId, ws.id, function(err, res) { + this.state.srem("space_" + spaceId, ws.id+"", function(err, res) { if (err) cb(err); else { console.log("[websockets] socket "+ ws.id + " went offline in space " + spaceId);