summaryrefslogtreecommitdiff
path: root/src/wasp-bus-binary.el
diff options
context:
space:
mode:
Diffstat (limited to 'src/wasp-bus-binary.el')
-rw-r--r--src/wasp-bus-binary.el144
1 files changed, 144 insertions, 0 deletions
diff --git a/src/wasp-bus-binary.el b/src/wasp-bus-binary.el
new file mode 100644
index 00000000..bb9fc6da
--- /dev/null
+++ b/src/wasp-bus-binary.el
@@ -0,0 +1,144 @@
+;;; wasp-bus-binary --- Pub/sub bus client -*- lexical-binding: t; -*-
+;;; Commentary:
+;;; Code:
+
+(require 'dash)
+(require 's)
+(require 'wasp-utils)
+
+(defcustom w/bus-binary-process "wasp-bus-binary"
+ "Name of process connected to binary message bus."
+ :type '(string)
+ :group 'wasp)
+
+(defcustom w/bus-binary-buffer " *wasp-bus-binary*"
+ "Name of buffer used to store intermediate binary message bus data."
+ :type '(string)
+ :group 'wasp)
+
+(defcustom w/bus-binary-host "shiro"
+ "Hostname of the binary message bus."
+ :type '(string)
+ :group 'wasp)
+
+(defcustom w/bus-binary-port 32051
+ "Port of the binary message bus."
+ :type '(integer)
+ :group 'wasp)
+
+(defvar w/bus-binary-event-handlers nil
+ "List of pairs of events and handler functions.")
+
+(defun w/bus-binary-read-bytes (len)
+ "Read LEN bytes from the current buffer.
+Advances point by LEN also."
+ (let ((end (+ (point) len)))
+ (when (<= end (point-max))
+ (let ((istr (buffer-substring (point) end)))
+ (forward-char len)
+ istr))))
+
+(defun w/bus-binary-read-int32le ()
+ "Read a 32-bit little endian integer from the current buffer."
+ (when-let* ((istr (w/bus-binary-read-bytes 4)))
+ (-let [(x0 x1 x2 x3) (seq-into istr 'list)]
+ (logior x0 (ash x1 8) (ash x2 16) (ash x3 24)))))
+
+(defun w/bus-binary-read-length-prefixed ()
+ "Read a length-prefixed string from the current buffer.
+Return nil if unable."
+ (let ((start (point)))
+ (if-let* ((len (w/bus-binary-read-int32le)))
+ (progn
+ (w/bus-binary-read-bytes len))
+ (goto-char start)
+ nil)))
+
+(defun w/bus-binary-read-message ()
+ "Parse a message from the current buffer.
+Return non-nil if a message was successfully parsed."
+ (if-let* ( (start (point))
+ (ev (w/bus-binary-read-length-prefixed))
+ (d (w/bus-binary-read-length-prefixed)))
+ (progn
+ (delete-region start (point))
+ (cons ev d))
+ (goto-char start)
+ nil))
+
+(defun w/bus-binary-handle-message ()
+ "Parse and handle a message from the current buffer.
+Return non-nil if a message was successfully parsed."
+ (when-let* ((msg (w/bus-binary-read-message)))
+ (-let [(ev . d) msg]
+ (print (cons ev d))
+ (when-let* ((handler (w/saget ev w/bus-binary-event-handlers)))
+ (funcall handler d))
+ t)))
+
+(defun w/bus-binary-process-filter (proc data)
+ "Process filter for binary message bus connection on PROC and DATA."
+ (with-current-buffer (get-buffer-create w/bus-binary-buffer)
+ (set-buffer-multibyte nil)
+ (when (not (marker-position (process-mark proc)))
+ (set-marker (process-mark proc) (point-max)))
+ (goto-char (process-mark proc))
+ (insert data)
+ (set-marker (process-mark proc) (point))
+ (goto-char (point-min))
+ (while (w/bus-binary-handle-message))))
+
+(defun w/bus-binary-build-int32le (x)
+ "Convert X into the bytes for a little endian 32-bit integer."
+ (unibyte-string
+ (logand x #xff)
+ (logand (lsh x -8) #xff)
+ (logand (lsh x -16) #xff)
+ (logand (lsh x -24) #xff)))
+
+(defun w/bus-binary-build-length-prefixed (s)
+ "Turn S into a length-prefixed unibyte string."
+ (s-concat
+ (w/bus-binary-build-int32le (string-bytes s))
+ s))
+
+(defun w/bus-binary-convert-event (ev)
+ "Convert the s-expression EV to a string event."
+ (s-chop-suffix ")" (s-chop-prefix "(" (format "%s" ev))))
+
+(defun w/binary-sub (ev)
+ "Subscribe to the event EV."
+ (process-send-string
+ w/bus-binary-process
+ (s-concat "s"
+ (w/bus-binary-build-length-prefixed (w/bus-binary-convert-event ev)))))
+
+(defun w/binary-pub (ev &optional d)
+ "Publish the data D to the event EV."
+ (process-send-string
+ w/bus-binary-process
+ (s-concat "p"
+ (w/bus-binary-build-length-prefixed (w/bus-binary-convert-event ev))
+ (w/bus-binary-build-length-prefixed (or d "")))))
+
+(defun w/bus-binary-disconnect ()
+ "Disconnect from Redis."
+ (when (process-live-p (get-process w/bus-binary-process))
+ (delete-process w/bus-binary-process)))
+
+(defun w/bus-binary-connect ()
+ "Connect to Redis."
+ (w/bus-binary-disconnect)
+ (with-current-buffer (get-buffer-create w/bus-binary-buffer)
+ (set-buffer-multibyte nil)
+ (erase-buffer))
+ (make-network-process
+ :coding 'no-conversion
+ :name w/bus-binary-process
+ :buffer nil
+ :host w/bus-binary-host
+ :service w/bus-binary-port
+ :filter #'w/bus-binary-process-filter))
+
+(provide 'wasp-bus-binary)
+;;; wasp-bus-binary.el ends here