Add unique ports per worker so that ffmpeg can reliably record.

merge-requests/28/head
knotteye 2020-10-17 01:53:33 -05:00
parent 1ae7128b9d
commit 80cf01ef30
3 changed files with 32 additions and 12 deletions

12
package-lock.json generated
View File

@ -1,6 +1,6 @@
{ {
"name": "satyr", "name": "satyr",
"version": "0.9.4", "version": "0.10.0",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
"dependencies": { "dependencies": {
@ -950,6 +950,11 @@
"integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==", "integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==",
"optional": true "optional": true
}, },
"is-port-available": {
"version": "0.1.5",
"resolved": "https://registry.npmjs.org/is-port-available/-/is-port-available-0.1.5.tgz",
"integrity": "sha512-/r7UZAQtfgDFdhxzM71jG0mkC4oSRA513cImMILdRe/+UOIe0Se/D/Z7XCua4AFg5k4Zt3ALMGaC1W3FzlrR2w=="
},
"isarray": { "isarray": {
"version": "1.0.0", "version": "1.0.0",
"resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz",
@ -963,11 +968,6 @@
"asn1.js": "^5.2.0" "asn1.js": "^5.2.0"
} }
}, },
"jwt-decode": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/jwt-decode/-/jwt-decode-3.0.0.tgz",
"integrity": "sha512-RBQv2MTm3FNKQkdzhEyQwh5MbdNgMa+FyIJIK5RMWEn6hRgRHr7j55cRxGhRe6vGJDElyi6f6u/yfkP7AoXddA=="
},
"lodash": { "lodash": {
"version": "4.17.20", "version": "4.17.20",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz", "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.20.tgz",

View File

@ -26,6 +26,7 @@
"express": "^4.17.1", "express": "^4.17.1",
"flags": "^0.1.3", "flags": "^0.1.3",
"irc": "^0.5.2", "irc": "^0.5.2",
"is-port-available": "^0.1.5",
"jose": "^1.15.1", "jose": "^1.15.1",
"mysql": "^2.17.1", "mysql": "^2.17.1",
"node-media-server": "^2.2.4", "node-media-server": "^2.2.4",

View File

@ -8,6 +8,7 @@ import * as strf from "strftime";
import * as ctx from '../node_modules/node-media-server/node_core_ctx'; import * as ctx from '../node_modules/node-media-server/node_core_ctx';
import * as db from "./database"; import * as db from "./database";
import {config} from "./config"; import {config} from "./config";
import * as isPortAvailable from "is-port-available";
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
const { exec, execFile } = require('child_process'); const { exec, execFile } = require('child_process');
@ -64,11 +65,14 @@ if (cluster.isMaster) {
rtmp: Object.assign({port: 1936}, config['rtmp']) rtmp: Object.assign({port: 1936}, config['rtmp'])
}; };
//find a unique port to listen on
getPort().then((wPort) => {
// creating the rtmp server // creating the rtmp server
var serv = net.createServer((socket) => { var serv = net.createServer((socket) => {
let session = new NodeRtmpSession(rtmpcfg, socket); let session = new NodeRtmpSession(rtmpcfg, socket);
session.run(); session.run();
}).listen(1936); }).listen(wPort);
logger.setLogType(0); logger.setLogType(0);
// RTMP Server Logic // RTMP Server Logic
@ -99,7 +103,7 @@ if (cluster.isMaster) {
mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, ()=>{;}); mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, ()=>{;});
while(true){ while(true){
if(session.audioCodec !== 0 && session.videoCodec !== 0){ if(session.audioCodec !== 0 && session.videoCodec !== 0){
transCommand(results[0].username, key).then((r) => { transCommand(results[0].username, key, wPort).then((r) => {
execFile(config['media']['ffmpeg'], r, {maxBuffer: Infinity}, (err, stdout, stderr) => { execFile(config['media']['ffmpeg'], r, {maxBuffer: Infinity}, (err, stdout, stderr) => {
/*console.log(err); /*console.log(err);
console.log(stdout); console.log(stdout);
@ -114,7 +118,7 @@ if (cluster.isMaster) {
console.log(`[RTMP Cluster WORKER ${process.pid}] Initiating recording for stream: ${id}`); console.log(`[RTMP Cluster WORKER ${process.pid}] Initiating recording for stream: ${id}`);
mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, (err) => { mkdir(config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username, { recursive : true }, (err) => {
if (err) throw err; if (err) throw err;
execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username+'/'+strf('%d%b%Y-%H%M')+'.mp4'], { execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', config['http']['directory']+'/'+config['media']['publicEndpoint']+'/'+results[0].username+'/'+strf('%d%b%Y-%H%M')+'.mp4'], {
detached : true, detached : true,
stdio : 'inherit', stdio : 'inherit',
maxBuffer: Infinity maxBuffer: Infinity
@ -130,7 +134,7 @@ if (cluster.isMaster) {
db.query('SELECT twitch_key,enabled from twitch_mirror where username='+db.raw.escape(results[0].username)+' limit 1').then(async (tm) => { db.query('SELECT twitch_key,enabled from twitch_mirror where username='+db.raw.escape(results[0].username)+' limit 1').then(async (tm) => {
if(!tm[0]['enabled'] || !config['twitch_mirror']['enabled'] || !config['twitch_mirror']['ingest']) return; if(!tm[0]['enabled'] || !config['twitch_mirror']['enabled'] || !config['twitch_mirror']['ingest']) return;
console.log('[NodeMediaServer] Mirroring to twitch for stream:',id) console.log('[NodeMediaServer] Mirroring to twitch for stream:',id)
execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', '-f', 'flv', config['twitch_mirror']['ingest']+tm[0]['twitch_key']], { execFile(config['media']['ffmpeg'], ['-loglevel', 'fatal', '-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-vcodec', 'copy', '-acodec', 'copy', '-f', 'flv', config['twitch_mirror']['ingest']+tm[0]['twitch_key']], {
detached: true, detached: true,
stdio : 'inherit', stdio : 'inherit',
maxBuffer: Infinity maxBuffer: Infinity
@ -206,6 +210,8 @@ if (cluster.isMaster) {
} }
}); });
console.log(`[RTMP Cluster WORKER ${process.pid}] Worker Ready.`); console.log(`[RTMP Cluster WORKER ${process.pid}] Worker Ready.`);
});
} }
function newRTMPListener(eventName, listener) { function newRTMPListener(eventName, listener) {
@ -216,10 +222,23 @@ function getRTMPSession(id) {
return ctx.sessions.get(id); return ctx.sessions.get(id);
} }
async function transCommand(user: string, key: string): Promise<string[]>{ async function getPort(): Promise<number>{
let port = 1936+process.pid;
while(true){
let i=0;
if(await isPortAvailable(port+i)){
port += i;
break;
}
i++;
}
return port;
}
async function transCommand(user: string, key: string, wPort): Promise<string[]>{
let args: string[] = ['-loglevel', 'fatal', '-y']; let args: string[] = ['-loglevel', 'fatal', '-y'];
if(config['transcode']['inputflags'] !== null && config['transcode']['inputflags'] !== "") args = args.concat(config['transcode']['inputflags'].split(" ")); if(config['transcode']['inputflags'] !== null && config['transcode']['inputflags'] !== "") args = args.concat(config['transcode']['inputflags'].split(" "));
args = args.concat(['-i', 'rtmp://127.0.0.1:'+config['rtmp']['port']+'/'+config['media']['privateEndpoint']+'/'+key, '-movflags', '+faststart']); args = args.concat(['-i', 'rtmp://127.0.0.1:'+wPort+'/'+config['media']['privateEndpoint']+'/'+key, '-movflags', '+faststart']);
if(config['transcode']['adaptive']===true && config['transcode']['variants'] > 1) { if(config['transcode']['adaptive']===true && config['transcode']['variants'] > 1) {
for(let i=0;i<config['transcode']['variants'];i++){ for(let i=0;i<config['transcode']['variants'];i++){
args = args.concat(['-map', '0:2']); args = args.concat(['-map', '0:2']);