10 Star 47 Fork 10

10km / webredis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
webredis.js 5.90 KB
一键复制 编辑 原始数据 按行查看 历史
10km 提交于 2019-08-06 10:15 . commit
const config = require('./config.js')
const package = require('./package.json')
// 输出端口地址
console.log("webredis start on:%d", config.PORT)
// 输出redis 地址
console.log("redis localtion:%s", config.REDIS_URL)
var redis = require('redis');
var http = require('http');
var socketio = require('socket.io');
var server = http.createServer(function (req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain'
});
// 显示项目名称及版本号
res.end(package.name + ' ' + package.version + ' ' + package.repository.url);
}).listen(config.PORT);
// 每一个websocket代表一个浏览器连接,使用socket.id为key保存频道订阅数据和socket实例
// 保存所有连接的socket
// socket.id=>socket
const WEB_SOCKETS = new Map()
// websocket订阅的频道集合
// socket.id=>Set<String>
const SUBSCRIBES = new Map()
var clent_error;
// 公用redis client实例
var client = redis.createClient(config.REDIS_URL)
client.on('error', function (error) {
clent_error = error;
console.log(error);
});
var sub_error;
// 只用于消息订阅subscribe的redis客户端,需要独占链接,全局变量
var sub = redis.createClient(config.REDIS_URL)
.on("subscribe", function (channel, count) {
/** 命令输出频道订阅日志 */
console.log(count + ' subscribe: ' + channel);
})
.on('message', function (channel, message) {
/**
* 分发订阅频道的收到的消息到订阅频道的连接
* 遍历每个socket订阅的频道,检查当前消息的频道是在该socket订阅的频道名集合中,
* 如果是就向该socket推送消息
*/
for (var [socketid, channels] of SUBSCRIBES) {
// 如果socket订阅了消息频道则推送消息
if (channels.has(channel)) {
WEB_SOCKETS.get(socketid).emit(
'message',
{
channel: channel,
data: message
})
}
}
})
sub.on('error', function (error) {
sub_error = error;
console.log(error);
});
var io = socketio(server)
io.on('connection', function (socket) {
console.log('%s connect', socket.id)
// 将socket对象保存到全局映射表中
WEB_SOCKETS.set(socket.id, socket)
// 初始化订阅频道表
SUBSCRIBES.set(socket.id, new Set())
/** 频道订阅 */
socket.on('subscribe', function (channels) {
var ack = {
success: false,
request: 'subscribe',
reply: null,
respone: 'OK'
}
try {
if (typeof channels === 'string') {
channels = [channels]
}
if (!(channels instanceof Array)) {
ack.respone = 'channels is NOT ARRAY'
socket.emit('ack', ack)
return
} else if (sub.ready === false || sub.stream.writable === false) {
ack.respone = sub_error.message
socket.emit('ack', ack)
} else if (channels.length > 0) {
sub.subscribe(channels, (err, res) => {
if (err) {
ack.respone = err.message
} else {
ack.success = true
ack.respone = '订阅成功'
ack.reply = res
saveChannels(socket, channels);
socket.emit('ack', ack)
}
})
} else {
ack.respone = '频道列表为空'
socket.emit('ack', ack)
}
} catch (error) {
ack.respone = err.message
socket.emit('ack', ack)
}
})
/** 取消订阅 */
socket.on('unsubscribe', function (channels) {
var ack = {
success: false,
request: 'unsubscribe',
reply: null,
respone: 'OK'
}
try {
if (typeof channels === 'string') {
channels = [channels]
}
if (!(channels instanceof Array)) {
ack.respone = 'channels is NOT ARRAY'
socket.emit('ack', ack)
} else if (sub.ready === false || sub.stream.writable === false) {
ack.respone = sub_error.message
socket.emit('ack', ack)
} else if (channels.length > 0) {
client.unsubscribe(channels, (err, res) => {
if (err) {
ack.respone = error.message
} else {
ack.success = true
ack.respone = '取消订阅成功'
ack.reply = res
removeChannels(socket, channels);
}
socket.emit('ack', ack)
})
} else {
ack.respone = '频道列表为空'
socket.emit('ack', ack)
}
} catch (error) {
ack.respone = error.message
socket.emit('ack', ack)
}
})
/** 发布消息 */
socket.on('publish', (channel, message) => {
var ack = {
success: false,
request: 'publish',
reply: null,
respone: 'OK'
}
try {
if(!channel || !message){
ack.respone = 'NULL INPUT (channel or message)'
socket.emit('ack', ack)
}else if (sub.ready === false || sub.stream.writable === false) {
ack.respone = sub_error.message
socket.emit('ack', ack)
} else {
client.publish(channel, message, (err, res) => {
if (err) {
ack.respone = error.message
} else {
ack.success = true
ack.respone = channel + '发布成功'
ack.reply = res
}
socket.emit('ack', ack)
});
}
} catch (error) {
ack.respone = error.message
socket.emit('ack', ack)
}
});
/** 关闭websocket连接 */
socket.on('disconnect', function (reason) {
console.log('%s disconnect %s', socket.id, reason)
WEB_SOCKETS.delete(socket.id)
SUBSCRIBES.delete(socket.id)
})
});
// 删除socket订阅的频道的记录
function removeChannels(socket, channels) {
var set = SUBSCRIBES.get(socket.id);
channels.forEach(element => {
set.delete(element);
});
}
// 记录每个socket订阅的频道
function saveChannels(socket, channels) {
var set = SUBSCRIBES.get(socket.id);
if(set){
channels.forEach(element => {
set.add(element);
});
}else{
console.log("ERROR:INVALID socket!!!")
}
}
JavaScript
1
https://gitee.com/l0km/webredis.git
git@gitee.com:l0km/webredis.git
l0km
webredis
webredis
master

搜索帮助