From 572ab9244c71ed0d1301b7aa25d1b80be77ce29d Mon Sep 17 00:00:00 2001 From: Jake Read <jake.read@cba.mit.edu> Date: Mon, 7 Oct 2019 18:19:56 -0400 Subject: [PATCH] complete pipe types - pt --- cf.js | 2 +- hunks/pipes/{vfpt.js => vfp.js} | 6 +- hunks/pipes/vfptc.js | 141 ++++++++++++++++++++++++++++++++ pipes/vfpts.js | 53 ++++++++++++ 4 files changed, 199 insertions(+), 3 deletions(-) rename hunks/pipes/{vfpt.js => vfp.js} (98%) create mode 100644 hunks/pipes/vfptc.js create mode 100644 pipes/vfpts.js diff --git a/cf.js b/cf.js index a6935eb..aecd313 100644 --- a/cf.js +++ b/cf.js @@ -88,7 +88,7 @@ app.get('/pipeHookup/:file', (req, res) => { // to restart, // that way, remote clients can attempt a reset by issuing the same request // and report server-down if nothing happens - const piper = new Worker(`${__dirname}/pipes/${req.params.file}`) + const piper = new Worker(`${__dirname}/pipes/${req.params.file}`, {workerData: req.query}) piper.on('message', (msg) => { if(msg.startup){ console.log('worker for /pipeHookup/' + req.params.file + ' resolves') diff --git a/hunks/pipes/vfpt.js b/hunks/pipes/vfp.js similarity index 98% rename from hunks/pipes/vfpt.js rename to hunks/pipes/vfp.js index 5627185..fd62592 100644 --- a/hunks/pipes/vfpt.js +++ b/hunks/pipes/vfp.js @@ -11,7 +11,9 @@ import { State } from '../hunks.js' -function VFPT() { +// DEPRICATED: RMing for pipe version, + +function VFP() { Hunkify(this) let debug = false @@ -161,4 +163,4 @@ function VFPT() { } } -export default VFPT +export default VFP diff --git a/hunks/pipes/vfptc.js b/hunks/pipes/vfptc.js new file mode 100644 index 0000000..71f34fb --- /dev/null +++ b/hunks/pipes/vfptc.js @@ -0,0 +1,141 @@ +/* + +very fast ~~picket ship~~ pipe transport (client) + +*/ + +import { + Hunkify, + Input, + Output, + State +} from '../hunks.js' + +const STATUS_OPENING = 'opening...' +const STATUS_OPEN = 'open' +const STATUS_CLOSED = 'closed' +const STATUS_ERROR = 'error' + +export default function VFPTC() { + Hunkify(this) + let debug = false + + let dtin = new Input('byteArray', 'data', this) + this.inputs.push(dtin) + + let dtout = new Output('byteArray', 'data', this) + this.outputs.push(dtout) + + let statusMessage = new State('string', 'status', STATUS_CLOSED) + let portSelect = new State('number', 'port', 2042) + let retryButton = new State('boolean', 'retry', false) + let echoButton = new State('boolean', 'echo', false) + let bufferButton = new State('boolean', 'buffer', false) + this.states.push(statusMessage, portSelect, retryButton, echoButton, bufferButton) + retryButton.onChange = (value) => { + startWsConnection() + } + echoButton.onChange = (value) => { + send(JSON.stringify({ type: 'echo' })) + } + bufferButton.onChange = (value) => { + send(Uint8Array.from([0, 1, 12]).buffer) + } + + // coming merge of init and onload, however: + this.init = () => { + // force closed at startup; else program state can make us confused, + statusMessage.set(STATUS_CLOSED) + startWsConnection() + } + + let ws = {} + let outbuffer = [] + let startWsConnection = () => { + // only attempt reconnect if we're not already opening, or opened + if (statusMessage.value === STATUS_OPEN || statusMessage.value === STATUS_OPENING) return + // ask the server to instantiate the reciprocal process, + statusMessage.set(STATUS_OPENING) + jQuery.get(`pipeHookup/vfpts.js?port=${portSelect.value}`, (data) => { + if (data.startup) { + console.log('serverside launched, starting client') + console.log(data) + // have data.ip and data.port + ws = new WebSocket(`ws://${data.ip}:${data.port}`) + ws.onopen = (evt) => { + if (debug) console.log(this.name, 'opens') + statusMessage.set(STATUS_OPEN) + } + ws.onerror = (err) => { + if (debug) console.log(this.name, 'error', err) + statusMessage.set(STATUS_ERROR) + } + ws.onclose = (evt) => { + if (debug) console.log(this.name, 'closes') + statusMessage.set(STATUS_CLOSED) + } + ws.onmessage = (msg) => { + if (debug) console.log(this.name, 'recvs', msg) + recv(msg) + } + } else { + console.log('pipe received non-startup response from server') + statusMessage.set(STATUS_ERROR) + } + }) + } + + // send wrapper + let send = (msg) => { + if (ws && ws.readyState === 1) { + ws.send(msg) + return true + } else { + console.error('attempt to send on a closed ws') + return false + } + } + + // write ur handler, + let recv = (msg) => { + if(typeof msg.data === 'string'){ + let data = JSON.parse(msg.data) + if(data.type === 'echo'){ + console.log('echo returns') + } else { + console.log('how to handle:', data) + } + } else { + if(msg.data instanceof Blob){ + msg.data.arrayBuffer().then((res) => { + outbuffer.push(Array.from(new Uint8Array(res))) + }).catch((err) => { + console.log("err converting recv'd blob into buffer", err) + }) + } else { + // bad error state + console.error('bad data type out of VFPTS') + } + } + } + + this.loop = () => { + // if open downstream, take off of data input, push downstream + if (ws && ws.readyState === 1) { + if (dtin.io()) { + ws.send(Uint8Array.from(dtin.get()).buffer) + } + } + // if have recv'd msgs previously, and output clear, put 'em + if (outbuffer.length > 0 && !dtout.io()) { + let pusher = outbuffer.shift() + console.log('puts', pusher) + dtout.put(pusher)//outbuffer.shift()) + } + } + + // rm this + this.onDelete = () => { + if (ws) ws.close() + } +} diff --git a/pipes/vfpts.js b/pipes/vfpts.js new file mode 100644 index 0000000..34e4f81 --- /dev/null +++ b/pipes/vfpts.js @@ -0,0 +1,53 @@ +/* + +very fast ~~picket ship~~ pipe transport (server) + +*/ + +const { + parentPort, + workerData +} = require('worker_threads') + +const WebSocketServer = require('ws').Server + +let port + +if(workerData.port){ + port = workerData.port +} else { + port = 2042 +} + +const WSS = new WebSocketServer({port: port}, () => { + parentPort.postMessage({ + startup: true, + port: port + }) +}) + +WSS.on('connection', (ws) => { + console.log('ws connects') + ws.onmessage = (msg) => { + if(typeof msg.data === 'string'){ + // probably a control object, do + let data = JSON.parse(msg.data) + if(data.type === 'echo'){ + ws.send(msg.data) + } else { + console.log("how to handle:", data) + } + } else { + if(Buffer.isBuffer(msg.data)){ + // echo also for now, + ws.send(msg.data) + // assume it's to-be-put-to-port + } + } + } + ws.onclose = (evt) => { + // shutdown, + console.log('ws closes, pipe exiting') + process.exit() + } +}) -- GitLab