diff --git a/package-lock.json b/package-lock.json index e6e1e5a..c5e1872 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "satyr", - "version": "0.9.4", + "version": "0.10.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -950,6 +950,11 @@ "integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==", "optional": true }, + "is-port-available": { + "version": "0.1.5", + "resolved": "https://registry.npmjs.org/is-port-available/-/is-port-available-0.1.5.tgz", + "integrity": "sha512-/r7UZAQtfgDFdhxzM71jG0mkC4oSRA513cImMILdRe/+UOIe0Se/D/Z7XCua4AFg5k4Zt3ALMGaC1W3FzlrR2w==" + }, "isarray": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", @@ -963,11 +968,6 @@ "asn1.js": "^5.2.0" } }, - "jwt-decode": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-3.0.0.tgz", - "integrity": "sha512-RBQv2MTm3FNKQkdzhEyQwh5MbdNgMa+FyIJIK5RMWEn6hRgRHr7j55cRxGhRe6vGJDElyi6f6u/yfkP7AoXddA==" - }, "lodash": { "version": "4.17.20", "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz", diff --git a/package.json b/package.json index e921a3e..bee4cbf 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "express": "^4.17.1", "flags": "^0.1.3", "irc": "^0.5.2", + "is-port-available": "^0.1.5", "jose": "^1.15.1", "mysql": "^2.17.1", "node-media-server": "^2.2.4", diff --git a/src/cluster.ts b/src/cluster.ts index 72f4fa2..4a5181e 100644 --- a/src/cluster.ts +++ b/src/cluster.ts @@ -8,6 +8,7 @@ import * as strf from "strftime"; import * as ctx from '../node_modules/node-media-server/node_core_ctx'; import * as db from "./database"; import {config} from "./config"; +import * as isPortAvailable from "is-port-available"; const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); const { exec, execFile } = require('child_process'); @@ -64,11 +65,14 @@ if (cluster.isMaster) { rtmp: Object.assign({port: 1936}, config['rtmp']) }; + //find a unique port to listen on + getPort().then((wPort) => { + // creating the rtmp server var serv = net.createServer((socket) => { let session = new NodeRtmpSession(rtmpcfg, socket); session.run(); - }).listen(1936); + }).listen(wPort); logger.setLogType(0); // RTMP Server Logic @@ -99,7 +103,7 @@ if (cluster.isMaster) { mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, ()=>{;}); while(true){ if(session.audioCodec !== 0 && session.videoCodec !== 0){ - transCommand(results[0].username, key).then((r) => { + transCommand(results[0].username, key, wPort).then((r) => { execFile(config['media']['ffmpeg'], r, {maxBuffer: Infinity}, (err, stdout, stderr) => { /*console.log(err); console.log(stdout); @@ -114,7 +118,7 @@ if (cluster.isMaster) { console.log(`[RTMP Cluster WORKER ${process.pid}] Initiating recording for stream: ${id}`); mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, (err) => { if (err) throw err; - execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username+'/'+strf('%d%b%Y-%H%M')+'.mp4'], { + execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username+'/'+strf('%d%b%Y-%H%M')+'.mp4'], { detached : true, stdio : 'inherit', maxBuffer: Infinity @@ -130,7 +134,7 @@ if (cluster.isMaster) { db.query('SELECT twitch_key,enabled from twitch_mirror where username='+db.raw.escape(results[0].username)+' limit 1').then(async (tm) => { if(!tm[0]['enabled'] || !config['twitch_mirror']['enabled'] || !config['twitch_mirror']['ingest']) return; console.log('[NodeMediaServer] Mirroring to twitch for stream:',id) - execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', '-f', 'flv', config['twitch_mirror']['ingest']+tm[0]['twitch_key']], { + execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', '-f', 'flv', config['twitch_mirror']['ingest']+tm[0]['twitch_key']], { detached: true, stdio : 'inherit', maxBuffer: Infinity @@ -206,6 +210,8 @@ if (cluster.isMaster) { } }); console.log(`[RTMP Cluster WORKER ${process.pid}] Worker Ready.`); + + }); } function newRTMPListener(eventName, listener) { @@ -216,10 +222,23 @@ function getRTMPSession(id) { return ctx.sessions.get(id); } -async function transCommand(user: string, key: string): Promise{ +async function getPort(): Promise{ + let port = 1936+process.pid; + while(true){ + let i=0; + if(await isPortAvailable(port+i)){ + port += i; + break; + } + i++; + } + return port; +} + +async function transCommand(user: string, key: string, wPort): Promise{ let args: string[] = ['-loglevel', 'fatal', '-y']; if(config['transcode']['inputflags'] !== null && config['transcode']['inputflags'] !== "") args = args.concat(config['transcode']['inputflags'].split(" ")); - args = args.concat(['-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-movflags', '+faststart']); + args = args.concat(['-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-movflags', '+faststart']); if(config['transcode']['adaptive']===true && config['transcode']['variants'] > 1) { for(let i=0;i