Commit 7e8a27e1 authored by mntmn's avatar mntmn
Browse files

replace redis by in-memory object

parent 6ad97ac5
'use strict'; 'use strict';
const RedisConnection = require('ioredis'); var notRedis = {
const websockets = require('./websockets'); state: {},
topics: {},
publish: function(topic, msg, cb) {
//console.log("[notredis] publish",topic,msg);
if (!this.topics[topic]) {
this.topics[topic] = {
subscribers: []
};
}
var t=this.topics[topic];
for (var i=0; i<t.subscribers.length; i++) {
var s=t.subscribers[i];
if (s.handler) {
s.handler(topic, msg);
}
}
if (cb) cb(null);
},
subscribe: function(topics, cb) {
var handle = {
handler: null,
on: function(evt, cb) {
if (evt == "message") {
this.handler = cb;
}
}
};
for (var i=0; i<topics.length; i++) {
var topic = topics[i];
if (!this.topics[topic]) {
this.topics[topic] = {
subscribers: []
};
}
var t=this.topics[topic];
t.subscribers.push(handle);
}
cb(null, handle, topics.length);
return handle;
},
get: function(key, cb) {
cb(null, this.state[key]);
return this.state[key];
},
set: function(key, val, cb) {
this.state[key] = val;
cb();
},
del: function(key, cb) {
delete this.state[key];
cb(null);
},
sadd: function(key, skey, cb) {
if (!this.state[key]) this.state[key] = {};
this.state[key][skey] = true;
cb(null);
},
srem: function(key, skey, cb) {
if (this.state[key]) {
delete this.state[key][skey];
}
cb(null);
},
smembers: function(key, cb) {
cb(null, Object.keys(this.state[key]));
},
incr: function(key, cb) {
if (!this.state[key]) this.state[key] = 0;
this.state[key]++;
cb();
},
expire: function() {
},
}
module.exports = { module.exports = {
connectRedis(){ connectRedis: function() {
const redisHost = process.env.REDIS_PORT_6379_TCP_ADDR || 'localhost'; this.connection = notRedis;
this.connection = new RedisConnection(6379, redisHost);
}, },
sendMessage(action, model, attributes, channelId) { getConnection: function() {
this.connectRedis();
return this.connection;
},
sendMessage: function(action, model, attributes, channelId) {
const data = JSON.stringify({ const data = JSON.stringify({
channel_id: channelId, channel_id: channelId,
action: action, action: action,
...@@ -17,12 +106,12 @@ module.exports = { ...@@ -17,12 +106,12 @@ module.exports = {
}); });
this.connection.publish('updates', data); this.connection.publish('updates', data);
}, },
logIp(ip, cb) { logIp: function(ip, cb) {
this.connection.incr("ip_"+ ip, (err, socketCounter) => { this.connection.incr("ip_"+ ip, (err, socketCounter) => {
cb(); cb();
}); });
}, },
rateLimit(namespace, ip, cb) { rateLimit: function(namespace, ip, cb) {
const key = "limit_"+ namespace + "_"+ ip; const key = "limit_"+ namespace + "_"+ ip;
const redis = this.connection; const redis = this.connection;
...@@ -47,7 +136,7 @@ module.exports = { ...@@ -47,7 +136,7 @@ module.exports = {
} }
}); });
}, },
isOnlineInSpace(user, space, cb) { isOnlineInSpace: function(user, space, cb) {
this.connection.smembers("space_" + space._id.toString(), function(err, list) { this.connection.smembers("space_" + space._id.toString(), function(err, list) {
if (err) cb(err); if (err) cb(err);
else { else {
...@@ -59,3 +148,6 @@ module.exports = { ...@@ -59,3 +148,6 @@ module.exports = {
}); });
} }
}; };
return module.exports;
...@@ -3,18 +3,19 @@ require('../models/schema'); ...@@ -3,18 +3,19 @@ require('../models/schema');
const WebSocketServer = require('ws').Server; const WebSocketServer = require('ws').Server;
const Redis = require('ioredis');
const async = require('async'); const async = require('async');
const _ = require("underscore"); const _ = require("underscore");
const mongoose = require("mongoose"); const mongoose = require("mongoose");
const crypto = require('crypto'); const crypto = require('crypto');
var redis = require("./redis.js");
module.exports = { module.exports = {
startWebsockets: function(server){ startWebsockets: function(server) {
this.setupSubscription(); this.setupSubscription();
this.state = new Redis(6379, process.env.REDIS_PORT_6379_TCP_ADDR || 'localhost'); this.state = redis.getConnection();
if(!this.current_websockets){ if(!this.current_websockets) {
this.current_websockets = []; this.current_websockets = [];
} }
...@@ -117,8 +118,7 @@ module.exports = { ...@@ -117,8 +118,7 @@ module.exports = {
}, },
setupSubscription: function() { setupSubscription: function() {
this.cursorSubscriber = new Redis(6379, process.env.REDIS_PORT_6379_TCP_ADDR || 'localhost'); this.cursorSubscriber = redis.getConnection().subscribe(['cursors', 'users', 'updates'], function (err, count) {
this.cursorSubscriber.subscribe(['cursors', 'users', 'updates'], function (err, count) {
console.log("[redis] websockets to " + count + " topics." ); console.log("[redis] websockets to " + count + " topics." );
}); });
this.cursorSubscriber.on('message', function (channel, rawMessage) { this.cursorSubscriber.on('message', function (channel, rawMessage) {
...@@ -206,7 +206,7 @@ module.exports = { ...@@ -206,7 +206,7 @@ module.exports = {
console.log("websocket not found to remove"); console.log("websocket not found to remove");
} }
this.state.del(ws.id, function(err, res) { this.state.del(ws.id+"", function(err, res) {
if (err) console.error(err, res); if (err) console.error(err, res);
else { else {
this.removeUserInSpace(ws.space_id, ws, (err) => { this.removeUserInSpace(ws.space_id, ws, (err) => {
...@@ -221,7 +221,8 @@ module.exports = { ...@@ -221,7 +221,8 @@ module.exports = {
addUserInSpace: function(username, space, ws, cb) { addUserInSpace: function(username, space, ws, cb) {
console.log("[websockets] user "+username+" in "+space.access_mode +" space " + space._id + " with socket " + ws.id); console.log("[websockets] user "+username+" in "+space.access_mode +" space " + space._id + " with socket " + ws.id);
this.state.set(ws.id, username, function(err, res) {
this.state.set(ws.id+"", username+"", function(err, res) {
if(err) console.error(err, res); if(err) console.error(err, res);
else { else {
this.state.sadd("space_" + space._id, ws.id, function(err, res) { this.state.sadd("space_" + space._id, ws.id, function(err, res) {
...@@ -238,7 +239,7 @@ module.exports = { ...@@ -238,7 +239,7 @@ module.exports = {
}.bind(this)); }.bind(this));
}, },
removeUserInSpace: function(spaceId, ws, cb) { removeUserInSpace: function(spaceId, ws, cb) {
this.state.srem("space_" + spaceId, ws.id, function(err, res) { this.state.srem("space_" + spaceId, ws.id+"", function(err, res) {
if (err) cb(err); if (err) cb(err);
else { else {
console.log("[websockets] socket "+ ws.id + " went offline in space " + spaceId); console.log("[websockets] socket "+ ws.id + " went offline in space " + spaceId);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment