diff --git a/hunks/pipes/filepipe.js b/hunks/pipes/filepipe.js new file mode 100644 index 0000000000000000000000000000000000000000..d675998da8a2ffe53c6f389113d93ba153d723af --- /dev/null +++ b/hunks/pipes/filepipe.js @@ -0,0 +1,160 @@ +/* + +pipe transport for an fs.write, appropriate for linux-2-machines + +*/ + +// + +import { + Hunkify, + Input, + Output, + State +} from '../hunks.js' + +const STATUS_UNKNOWN = 'unknown...' +const STATUS_OPENING = 'opening...' +const STATUS_OPEN = 'open' +const STATUS_CLOSED = 'closed' +const STATUS_ERROR = 'error' + +export default function VFPTC() { + Hunkify(this) + let debug = false + + let dtin = new Input('byteArray', 'data', this) + this.inputs.push(dtin) + + let dtout = new Output('byteArray', 'data', this) + this.outputs.push(dtout) + + // one pipe (websocket) + let pipeStatusMessage = new State('string', 'pipe status', STATUS_CLOSED) + let portSelect = new State('string', 'websocket port', '2042') + let pipeRetryButton = new State('boolean', 'pipe reset', false) + // and serialport, + let serialStatusMessage = new State('string', 'serialport status', 'unknown') + let usbPidSelect = new State('string', 'usb product id', '8022') + + let remoteProcessId = '' + let shutdownRemote = () => { + return new Promise((resolve, reject) => { + jQuery.get(`/killProcess?pid=${remoteProcessId}`, (res) => { + resolve() + }) + }) + } + + this.states.push(pipeStatusMessage, portSelect, usbPidSelect, pipeRetryButton, serialStatusMessage) + // for simplicity, joy, just reset everything ? + pipeRetryButton.onChange = (value) => { + shutdownRemote().then((res) => { + startWsConnection() + }) + } + + // coming merge of init and onload, however: + this.init = () => { + // force closed at startup; else program state can make us confused, + pipeStatusMessage.set(STATUS_CLOSED) + serialStatusMessage.set(STATUS_UNKNOWN) + startWsConnection() + } + + let ws = {} + let outbuffer = [] + let startWsConnection = () => { + // only attempt reconnect if we're not already opening, or opened + if (pipeStatusMessage.value === STATUS_OPEN || pipeStatusMessage.value === STATUS_OPENING) return + // ask the server to instantiate the reciprocal process, + pipeStatusMessage.set(STATUS_OPENING) + jQuery.get(`spawnProcess/filepipe.js?args=${portSelect.value},${usbPidSelect.value}`, (data) => { + if (data.startup) { + console.log(`serverside launched with pid ${data.pid}, starting client`) + remoteProcessId = data.pid + console.log(data) + // have data.ip and data.port + ws = new WebSocket(`ws://${data.ip}:${data.port}`) + ws.onopen = (evt) => { + if (debug) console.log(this.name, 'opens') + pipeStatusMessage.set(STATUS_OPEN) + } + ws.onerror = (err) => { + if (debug) console.log(this.name, 'error', err) + pipeStatusMessage.set(STATUS_ERROR) + } + ws.onclose = (evt) => { + if (debug) console.log(this.name, 'closes') + pipeStatusMessage.set(STATUS_CLOSED) + } + ws.onmessage = (msg) => { + if (debug) console.log(this.name, 'recvs', msg) + recv(msg) + } + } else { + console.log('pipe received non-startup response from server') + pipeStatusMessage.set(STATUS_ERROR) + } + }) + } + + // send wrapper + let send = (msg) => { + if (ws && ws.readyState === 1) { + ws.send(msg) + return true + } else { + console.error('attempt to send on a closed ws') + return false + } + } + + // write ur handler, + let recv = (msg) => { + if(typeof msg.data === 'string'){ + let data = JSON.parse(msg.data) + if(data.type === 'echo'){ + console.log('echo returns') + } else if (data.type === 'serial status'){ + serialStatusMessage.set(data.status) + } else { + console.error('vfptc unhandled:', data) + } + } else { + if(msg.data instanceof Blob){ + msg.data.arrayBuffer().then((res) => { + outbuffer.push(Array.from(new Uint8Array(res))) + }).catch((err) => { + console.log("err converting recv'd blob into buffer", err) + }) + } else { + // bad error state + console.error('bad data type out of VFPTS') + } + } + } + + this.loop = () => { + // if open downstream, take off of data input, push downstream + if (ws && ws.readyState === 1) { + if (dtin.io()) { + ws.send(Uint8Array.from(dtin.get()).buffer) + } + } + // if have recv'd msgs previously, and output clear, put 'em + if (outbuffer.length > 0 && !dtout.io()) { + let pusher = outbuffer.shift() + if(debug) console.log('puts', pusher) + dtout.put(pusher)//outbuffer.shift()) + } + } + + // rm this + this.onDelete = () => { + // important to also shutdown remote process, + // do: have reply come with pid, + // on delete, send req to kill that pid ... + if (ws && ws.readyState === 1) ws.close() + } +} diff --git a/hunks/statemachines/srm20.js b/hunks/statemachines/srm20.js new file mode 100644 index 0000000000000000000000000000000000000000..34de2e93bd438efcc17bcad1d53bacf64eab9ee5 --- /dev/null +++ b/hunks/statemachines/srm20.js @@ -0,0 +1,9 @@ +/* + +SRM20 interface, over a device server + +*/ + +// OK: +// take some path input, write it as a file, send to modela... +// also, will have to take those paths [seg][i][x-y-z] diff --git a/processes/filepipe.js b/processes/filepipe.js new file mode 100644 index 0000000000000000000000000000000000000000..51f84d9aed74a7983c4532bee4a4d40298ffe36b --- /dev/null +++ b/processes/filepipe.js @@ -0,0 +1,204 @@ +/* + +very fast ~~picket ship~~ pipe transport (server) + +*/ + +const WebSocketServer = require('ws').Server +const SerialPort = require('serialport') +const Delimiter = require('@serialport/parser-delimiter') + +const STATUS_UNKNOWN = 'unknown...' +const STATUS_OPENING = 'opening...' +const STATUS_OPEN = 'open' +const STATUS_CLOSED = 'closed' +const STATUS_ERROR = 'error' + +// port to issue on, +let port = '2042' +let pid = '8031' + +const WSS = new WebSocketServer({port: port}, () => { + process.send({ + startup: true, + port: port + }) +}) + +let WS = null + +WSS.on('connection', (ws) => { + console.log('vfpts websocket connects') + // handles, + WS = ws + // send current status + sendSerialStatus() + // handlers, + ws.onmessage = (msg) => { + if(typeof msg.data === 'string'){ + // probably a control object, do + let data = JSON.parse(msg.data) + if(data.type === 'echo'){ + sendToBrowser(msg.data) + } else { + console.log("how to handle:", data) + } + } else { + if(Buffer.isBuffer(msg.data)){ + if(serport){ + serport.write(encode(msg.data, true), 'utf8') + } + } + } + } + ws.onclose = (evt) => { + // shutdown, + console.log('ws closes, pipe exiting') + process.exit() + } +}) + +// send wrapper +let sendToBrowser = (msg) => { + if (WS) { + WS.send(msg) + return true + } else { + return false + } +} + +// now the usb, +let serport = null +let comname = '' + +let sendSerialStatus = () => { + if(serport){ + if(serport.opening){ + sendToBrowser(JSON.stringify({ + type: 'serial status', + status: STATUS_OPENING + })) + } else if (serport.readable){ + sendToBrowser(JSON.stringify({ + type: 'serial status', + status: STATUS_OPEN + })) + } else { + sendToBrowser(JSON.stringify({ + type: 'serial status', + status: STATUS_CLOSED + })) + } + } else { + sendToBrowser(JSON.stringify({ + type: 'serial status', + status: STATUS_CLOSED + })) + } +} + +// COBS https://github.com/tcr/node-cobs + +function encode (buf, zeroBack) { + var dest = [0]; + // vfpt starts @ 1, + var code_ptr = 0; + var code = 0x01; + + function finish (incllast) { + dest[code_ptr] = code; + code_ptr = dest.length; + incllast !== false && dest.push(0x00); + code = 0x01; + } + + for (var i = 0; i < buf.length; i++) { + if (buf[i] == 0) { + finish(); + } else { + dest.push(buf[i]); + code += 1; + if (code == 0xFF) { + finish(); + } + } + } + finish(false); + + if (zeroBack) { + dest.push(0x00); + } + + return new Buffer.from(dest); +} + + +function decode (buf) +{ + var dest = []; + for (var i = 0; i < buf.length; ) { + var code = buf[i++]; + for (var j = 1; j < code; j++) { + dest.push(buf[i++]); + } + if (code < 0xFF && i < buf.length) { + dest.push(0); + } + } + return new Buffer.from(dest) +} + +let findSerialPort = () => { + let found = false + SerialPort.list((err, ports) => { + ports.forEach((serialport) => { + console.log('port:', serialport.comName, serialport.productId) + console.log(serialport) + if (serialport.productId === pid) { + comname = serialport.comName + console.log(`found port at ${comname}, opening`) + openPort() + } + }) + }) +} + +let openPort = () => { + serport = new SerialPort(comname, { + baudRate: 3000000 + }) + serport.on('open', () => { + sendSerialStatus() + serport.on('error', (err) => { + sendSerialStatus() + console.log('port error', err) + }) + const parser = serport.pipe(new Delimiter({delimiter: [0]})) + parser.on('data', (buf) => { + // serialport doesn't guarantee packet sized events + //console.log('serport receives: ', buf) + let op = decode(buf) + if(op[0] === 252){ + // NEXT: write this as a JSON, ship -> vfpt, splash it uuup + console.log('LLM: ', buf.toString('utf8')) + } else { + //console.log('<- de-cobs: ', op.length) + if(WS){ + WS.send(op) + } + } + }) + }) +} + +findSerialPort() + +// this causes node to req. time from the OS more often (as often as possible) +// meaning that our events are handled more often, and we drop ring times by some ms +// does burn cycles though, + +let reminders = () => { + setImmediate(reminders) +} +reminders() diff --git a/processes/pipetemplate.js b/processes/pipetemplate.js deleted file mode 100644 index 634cba854dd681ace31cf7b0948d8c12c82ca447..0000000000000000000000000000000000000000 --- a/processes/pipetemplate.js +++ /dev/null @@ -1,34 +0,0 @@ -const { - parentPort -} = require('worker_threads') - -const WebSocketServer = require('ws').Server - -let port = 2042 -const WSS = new WebSocketServer({port: port}, () => { - parentPort.postMessage({ - startup: true, - port: port - }) -}) - -WSS.on('connection', (ws) => { - console.log('ws connects') - ws.onmessage = (msg) => { - // always .data is what we sent, - let data = JSON.parse(msg.data) - // the template-writer will want to update these ... - if(data.type === 'echo'){ - console.log('msg echo', msg.data) - ws.send(msg.data) - } else { - console.log('msg non echo') - console.log(msg.data) - } - } - ws.onclose = (evt) => { - // shutdown, - console.log('ws closes, pipe exiting') - process.exit() - } -})