;;;; -*- mode:lisp;coding:utf-8 -*-
;;;;**************************************************************************
;;;;FILE:               pipe.lisp
;;;;LANGUAGE:           Common-Lisp
;;;;SYSTEM:             Common-Lisp
;;;;USER-INTERFACE:     NONE
;;;;DESCRIPTION
;;;;
;;;;    Implements a pipe using Gray streams and bordeaux-threads.
;;;;
;;;;    The data written to the pipe-output-stream is queued (if a maximum
;;;;    queue-size is specified for the stream, then the writing
;;;;    thread may block if the buffer is full).
;;;;
;;;;    The data queued can be read from the pipe-input-stream.  If the
;;;;    queue is empty, then the reading stream may block (unless it
;;;;    used listen, read-char-no-hang, etc).
;;;;
;;;;    When the stream is closed, one can still read from it until
;;;;    the EOF is reached.
;;;;
;;;;AUTHORS
;;;;    <PJB> Pascal J. Bourguignon <pjb@informatimago.com>
;;;;MODIFICATIONS
;;;;    2015-09-12 <PJB> Created.
;;;;BUGS
;;;;LEGAL
;;;;    AGPL3
;;;;
;;;;    Copyright Pascal J. Bourguignon 2015 - 2015
;;;;
;;;;    This program is free software: you can redistribute it and/or modify
;;;;    it under the terms of the GNU Affero General Public License as published by
;;;;    the Free Software Foundation, either version 3 of the License, or
;;;;    (at your option) any later version.
;;;;
;;;;    This program is distributed in the hope that it will be useful,
;;;;    but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;;    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
;;;;    GNU Affero General Public License for more details.
;;;;
;;;;    You should have received a copy of the GNU Affero General Public License
;;;;    along with this program.  If not, see <http://www.gnu.org/licenses/>.
;;;;**************************************************************************

(defpackage "COM.INFORMATIMAGO.CLEXT.PIPE"
  (:use "COMMON-LISP"
        ;; "CL-STEPPER"
        "TRIVIAL-GRAY-STREAMS"
        "BORDEAUX-THREADS"
        "COM.INFORMATIMAGO.CLEXT.GATE")
  (:export "MAKE-PIPE" "PIPE" "PIPE-INPUT-STREAM" "PIPE-OUTPUT-STREAM" "PIPE-ELEMENT-TYPE"
           "REOPEN-PIPE"
           "PIPE-CHARACTER-INPUT-STREAM"
           "PIPE-CHARACTER-OUTPUT-STREAM"
           "PIPE-BINARY-INPUT-STREAM"
           "PIPE-BINARY-OUTPUT-STREAM")
  (:documentation "

This package exports a pipe abstraction, that is, a pair of streams.

One thread writes to the pipe-output-stream, another thread reads from
the pipe-input-stream.


LEGAL

License AGPL3

Copyright Pascal J. Bourguignon 2015 - 2015

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

"))
(in-package "COM.INFORMATIMAGO.CLEXT.PIPE")

(declaim (declaration stepper))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;
;;; Public interface:
;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defgeneric pipe-element-type (pipe)
  (:documentation "RETURN: the pipe ELEMENT-TYPE."))
(defgeneric pipe-input-stream (pipe)
  (:documentation "RETURN: the pipe input stream."))
(defgeneric pipe-output-stream (pipe)
  (:documentation "RETURN: the pipe output stream."))

(defclass pipe ()
  ((element-type  :reader   pipe-element-type  :initarg :element-type)
   (input-stream  :reader   pipe-input-stream)
   (output-stream :reader   pipe-output-stream))
  ;; Data is written to the pipe-output-stream and then enqueued into the pipe.
  ;; Data is dequeued from the pipe, and then read from the pipe-input-stream.
  (:documentation "A PIPE is a synchronized queue accessed
from an input stream and an output stream."))

(defmethod initialize-instance ((pipe pipe) &key &allow-other-keys)
  (if (eql (class-of pipe) (find-class 'pipe))
      (error "~S is an abstract class, use ~S to create pipes."
             'pipe 'make-pipe)
      (call-next-method)))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;
;;; Private implementation:
;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defclass generic-pipe (pipe)
  ((name          :reader   pipe-name :initform "")
   (lock          :reader   lock)
   (not-empty     :reader   not-empty)
   (head          :accessor head      :initarg :head)
   (tail          :accessor tail      :initarg :tail)))

(defmethod initialize-instance :before ((pipe generic-pipe) &key &allow-other-keys)
  (when (eql (class-of pipe) (find-class 'generic-pipe))
      (error "~S is an abstract class, use ~S to create pipes."
             'generic-pipe 'make-pipe)))

(defmethod initialize-instance :after ((pipe generic-pipe) &key &allow-other-keys)
  (setf (slot-value pipe 'lock)      (bt:make-lock (format nil "~A/pipe/lock" (pipe-name pipe)))
        (slot-value pipe 'not-empty) (make-gate :name (format nil "~A/gate/not-empty" (pipe-name pipe)))))



(defclass buffered-pipe (generic-pipe)
  ((not-full      :reader   not-full)
   (buffer        :reader   buffer    :initarg :buffer))

  ;; Otherwise buffer is a vector of element-type, and head
  ;; and tail are indices in that vector.  In that case, not-full
  ;; is a gate.

  ;; New elements are enqueued on tail; old elements are dequeued
  ;; from head.

  (:documentation "A pipe with a fixed-size circular buffer."))


(defmethod print-object ((pipe buffered-pipe) stream)
  (declare (stepper disable))
  (print-unreadable-object (pipe stream :type t :identity t)
    (let ((bufsize (length (buffer pipe))))
      (format stream ":element-type ~S :input ~:[:closed~;:open~] :output ~:[:closed~;:open~] ~:[~;:full ~]~:[~;:empty ~]:buffer ([~A ~A] |~A|/~A) :name ~S"
              (pipe-element-type pipe)
              (open-stream-p (pipe-input-stream  pipe))
              (open-stream-p (pipe-output-stream pipe))
              (%pipe-fullp pipe)
              (%pipe-emptyp pipe)
              (head pipe) (tail pipe)
              (mod (- (tail pipe) (head pipe)) bufsize)
              bufsize
              (pipe-name pipe))))
  pipe)

(defmethod initialize-instance :after ((pipe buffered-pipe) &key &allow-other-keys)
  (setf (slot-value pipe 'not-full) (make-gate :name (format nil "~A/pipe/not-full" (pipe-name pipe)))))


(defclass queued-pipe (generic-pipe)
  ()

  ;; When buffer is NIL then head is a list of cells, with
  ;; tail refering the last cons in that list.  In this case,
  ;; not-full is NIL.

  ;; New elements are enqueued on tail; old elements are dequeued
  ;; from head.

  (:documentation "A pipe with a variable length queue of blocks."))

(defmethod print-object ((pipe queued-pipe) stream)
  (declare (stepper disable))
  (print-unreadable-object (pipe stream :type t :identity t)
    (let ((queue-size (length (head pipe))))
      (format stream ":element-type ~S :input ~:[:closed~;:open~] :output ~:[:closed~;:open~] ~:[~;:full ~]~:[~;:empty ~]:queued ~A :name ~S"
              (pipe-element-type pipe)
              (open-stream-p (pipe-input-stream  pipe))
              (open-stream-p (pipe-output-stream pipe))
              (%pipe-fullp pipe)
              (%pipe-emptyp pipe)
              queue-size
              (pipe-name pipe))))
  pipe)




(defclass pipe-stream ()
  ((pipe         :reader pipe-stream-pipe         :initarg :pipe )
   (element-type :reader pipe-stream-element-type :initarg :element-type)
   (open         :reader open-stream-p            :initform t)))

(defmethod print-object ((stream pipe-stream) output)
  (declare (stepper disable))
  (print-unreadable-object (stream output :type t :identity t)
    (format output "~:[:closed~;:open~] :element-type ~S"
            (open-stream-p stream)
            (pipe-stream-element-type stream)))
  stream)

(defclass pipe-character-input-stream (pipe-stream fundamental-character-input-stream)
  ((column :initform 0 :accessor column)))


(defclass pipe-character-output-stream (pipe-stream fundamental-character-output-stream)
  ((column :initform 0 :accessor column)))


(defclass pipe-binary-input-stream (pipe-stream fundamental-binary-input-stream)
  ())


(defclass pipe-binary-output-stream (pipe-stream fundamental-binary-output-stream)
  ())


(defun make-pipe (&key (element-type 'character) buffer-size (name ""))
  "

ELEMENT-TYPE:   The element-type of the pipe and the pipe streams.
                Since the pipe streams are streams, this should be a
                valid stream element type.

BUFFER-SIZE:    NIL or a FIXNUM; If NIL then an unbound queue is used,
                therefore the writer thread will never block.  If a
                fixnum, then it's the size of the allocated circular
                buffer for the pipe, and when this buffer is full, the
                writer thread will block until the reader thread
                empties some.

NAME:           A string, the name of the pipe.  It's used to build
                the name of the various internal structures such as
                lock and condition-variables, which is useful for
                debugging.

RETURN:         The new PIPE.

"
  (let ((pipe (if buffer-size
                  (make-instance 'buffered-pipe :name name
                                                :element-type element-type
                                                :buffer (make-array (1+ buffer-size) :element-type element-type)
                                                :head 0 :tail 0)
                  (make-instance 'queued-pipe   :name name
                                                :element-type element-type
                                                :head nil :tail nil))))
    (multiple-value-bind (input-class output-class)
        (if (subtypep element-type 'character)
            (values 'pipe-character-input-stream 'pipe-character-output-stream)
            (values 'pipe-binary-input-stream    'pipe-binary-output-stream))
      (setf (slot-value pipe 'input-stream)  (make-instance input-class  :pipe pipe :element-type element-type)
            (slot-value pipe 'output-stream) (make-instance output-class :pipe pipe :element-type element-type)))
    pipe))



(defgeneric %pipe-emptyp (pipe)
  (:documentation "Whether the PIPE is empty."))

(defgeneric %pipe-fullp  (pipe)
  (:documentation "Whether the PIPE is full."))


(defgeneric %pipe-peek-or-dequeue (pipe peek)
  (:documentation "
PRE:    (not (%pipe-emptyp pipe))
PEEK:   When true, then peek, else dequeue.
RETURN: RETURNP: whether we have a result;
        RESULT:  the result of peeking or dequeing;
        SIGNAL:  whether not-full condition should be signaled.
"))


(defgeneric pipe-enqueue-element (pipe element)
  (:documentation "
Enqueues the ELEMENT into the PIPE.
Blocks if the pipe is full.
"))

(defgeneric pipe-dequeue-element (pipe)
  (:documentation "
Dequeues and returns the ELEMENT from the PIPE.
If the pipe is empty,
then returns :EOF if the output stream is closed
or else blocks.
"))

(defgeneric pipe-peek-element (pipe)
  (:documentation "
Returns the first ELEMENT from the PIPE,
or NIL if the queue is empty,
or :EOF if the output-stream is closed.
"))

(defgeneric pipe-enqueue-sequence (pipe sequence start end)
  (:documentation "
If (- end start) is more than the buffer-size then signal an error
else if there's enough space in the buffer, for (- end start) elements,
then enqueues the subseq of SEQUENCE between START and END,
else blocks until there's enough space.
"))

(defgeneric pipe-dequeue-sequence (pipe sequence start end)
  (:documentation "
If (- end start) is more than the buffer-size then signal an error
else if there's more than (- end start) elements  in the buffer,
then dequeues them into the subseq of SEQUENCE between START and END,
else blocks until there's enough data available.

RETURN: The index of the first slot in SEQUENCE not modified.
        May be less than END when EOF is reached.
"))

(defgeneric pipe-dequeue-until-element (pipe element)
  (:documentation "
Dequeues elements from the buffer until an element equal to element is found.
Returns a sequence containing all the dequeued elements.
"))


(defgeneric sunk-pipe-p (pipe)
  (:documentation "Whether the reading stream has been closed;
when it's the case, no more data is written to the pipe.")
  (:method ((pipe pipe))
    (not (open-stream-p (pipe-input-stream pipe)))))

(defgeneric closed-pipe-p (pipe)
  (:documentation "Whether the writing stream has been closed;
when it's the case, end-of-file is detected upon reading on an empty pipe.")
  (:method ((pipe pipe))
    (not (open-stream-p (pipe-output-stream pipe)))))


;;; for circular buffers:

(defun mod-plus (value modulo &rest arguments)
  (mod (apply (function +) value arguments) modulo))

(defun mod-minus (value modulo &rest arguments)
  (mod (apply (function -) value arguments) modulo))

(define-modify-macro mod-incf (modulo &optional (increment 1)) mod-plus)
(define-modify-macro mod-decf (modulo &optional (decrement 1)) mod-minus)




(defmethod %pipe-emptyp ((pipe buffered-pipe))
  (= (head pipe) (tail pipe)))

(defmethod %pipe-fullp  ((pipe buffered-pipe))
  (= (head pipe) (mod-plus (tail pipe) (length (buffer pipe)) 1)))


;;; for queue lists, we enqueue blocks made of a start index and a sequence.

(declaim (inline make-block block-start block-sequence block-empty-p block-full-p
                 block-pop-char block-peek-char block-push-char))
(defun make-block         (buffer)        (cons 0 (copy-seq buffer)))
(defun block-start        (blk)           (car blk))
(defun (setf block-start) (new-value blk) (setf (car blk) new-value))
(defun block-sequence       (blk)           (cdr blk))
(defun block-empty-p      (blk)           (>= (block-start blk) (length (block-sequence blk))))
(defun block-full-p       (blk)           (zerop (block-start blk)))
(defun block-pop-char (blk)
  (assert (not (block-empty-p blk)))
  (prog1 (aref (block-sequence blk) (block-start blk))
    (incf (block-start blk))))
(defun block-peek-char (blk)
  (assert (not (block-empty-p blk)))
  (aref (block-sequence blk) (block-start blk)))
(defun block-push-char (blk ch)
  (assert (not (block-full-p blk)))
  (decf (block-start blk))
  (setf (aref (block-sequence blk) (block-start blk)) ch))


(defmethod %pipe-emptyp ((pipe queued-pipe))
  (null (head pipe)))

(defmethod %pipe-fullp  ((pipe queued-pipe))
  nil)

(defun %wait-not-empty-or-closed (pipe)
  (loop :while (%pipe-emptyp pipe)
        :do (if (closed-pipe-p pipe)
                (return :eof)
                (gate-wait (not-empty pipe) (lock pipe)))
        :finally (return nil)))


(defmethod pipe-enqueue-element ((pipe buffered-pipe) element)
  (with-lock-held ((lock pipe))
    (loop :while (%pipe-fullp pipe)
          :do (gate-wait (not-full pipe) (lock pipe)))
    (setf (aref (buffer pipe) (tail pipe)) element)
    (mod-incf (tail pipe) (length (buffer pipe))))
  (gate-signal (not-empty pipe)))

(defmethod pipe-enqueue-sequence ((pipe buffered-pipe) sequence start end)
  (assert (<= 0 start end (length sequence)))
  (let ((buflen (length (buffer pipe))))
    (loop
      :while (< start end)
      :do (with-lock-held ((lock pipe))
            (loop :while (%pipe-fullp pipe)
                  :do (gate-wait (not-full pipe) (lock pipe)))
            (when (%pipe-emptyp pipe)
              (setf (head pipe) 0
                    (tail pipe) 0))
            ;; [_____head-------tail__________]
            ;; [-------tail_______head--------]
            ;; write what we can:
            (let* ((seqlen (- end start))
                   (dsts  (tail pipe))
                   (dste  (min (cond ((zerop (head pipe))            (1- buflen))
                                     ((<= (head pipe) (tail pipe))   buflen)
                                     (t                              (1- (head pipe))))
                               (+ dsts seqlen)))
                   (len   (- dste dsts)))
              (replace (buffer pipe) sequence :start1 dsts :end1 dste :start2 start)
              (incf start len)
              (mod-incf (tail pipe) buflen len)))
          (gate-signal (not-empty pipe))))
  sequence)

(defmethod pipe-dequeue-sequence ((pipe buffered-pipe) sequence start end)
  (assert (<= 0 start end (length sequence)))
  (let ((buflen (length (buffer pipe))))
    (loop
      :with current := start
      :while (< current end)
      :do (with-lock-held ((lock pipe))
            (when (%wait-not-empty-or-closed pipe)
              (return current))
            ;; [_____head-------tail__________]
            ;; [-------tail_______head--------]
            ;; read what we can:
            (let* ((seqlen (- end current))
                   (srcs   (head pipe))
                   (srce   (min (if (<= (head pipe) (tail pipe))
                                    (tail pipe)
                                    buflen)
                                (+ srcs seqlen)))
                   (len    (- srce srcs)))
              (replace sequence (buffer pipe) :start2 srcs :end2 srce :start1 current)
              (incf current len)
              (mod-incf (head pipe) buflen len)))
          (gate-signal (not-full pipe))
      :finally (return current))))

(defmethod pipe-dequeue-until-element ((pipe buffered-pipe) element)
  (let ((buflen (length (buffer pipe)))
        (chunks '()))
    (flet ((concatenate-chunks ()
             (let ((result
                     (with-output-to-string (out)
                       (dolist (chunk (nreverse chunks))
                         (write-string chunk out)))))
               result)))
      (loop :named collect
            :do (with-lock-held ((lock pipe))
                  (when (%wait-not-empty-or-closed pipe)
                    (return-from collect (values (concatenate-chunks)
                                                 t #|=eof|#)))
                  ;; [_____head-------tail__________]
                  ;; [-------tail_______head--------]
                  ;; read what we can:
                  (let* ((srcs   (head pipe))
                         (srce   (if (<= (head pipe) (tail pipe))
                                     (tail pipe)
                                     buflen))
                         (pos    (position element (buffer pipe) :start srcs :end srce)))
                    (if pos
                        (progn
                          (push (subseq (buffer pipe) srcs pos) chunks)
                          (mod-incf (head pipe) buflen (- (1+ pos) srcs))
                          (return-from collect (values (concatenate-chunks)
                                                       nil #|=newline|#)))
                        (progn
                          (push (subseq (buffer pipe) srcs srce) chunks)
                          (mod-incf (head pipe) buflen (- srce srcs))))))
                (gate-signal (not-full pipe))
            :finally (gate-signal (not-full pipe))))))



(defmethod pipe-enqueue-element ((pipe queued-pipe) element)
  (pipe-enqueue-sequence pipe (vector element) 0 1))

(declaim (inline subsequence))
(defun subsequence (element-type sequence start end)
  (let ((result (make-array (- end start) :element-type element-type)))
    (replace result sequence :start2 start :end2 end)))

(defmethod pipe-enqueue-sequence ((pipe queued-pipe) sequence start end)
  (let ((blkl (list (make-block (subsequence (pipe-element-type pipe) sequence start end)))))
    (with-lock-held ((lock pipe))
      (if (tail pipe)
          (setf (cdr (tail pipe)) blkl
                (tail pipe) (cdr (tail pipe)))
          (setf (head pipe)
                (setf (tail pipe) blkl))))
    (gate-signal (not-empty pipe))))

(defmethod pipe-dequeue-sequence ((pipe queued-pipe) sequence start end)
  (assert (<= 0 start end (length sequence)))
  (with-lock-held ((lock pipe))
    (loop
      :with current := start
      :while (< current end)
      :do (when (%wait-not-empty-or-closed pipe)
            (return current))
          ;; read what we can:
          (let ((blk (car (head pipe))))
            (if (block-empty-p blk)
                (if (eql (head pipe) (tail pipe))
                    (setf (head pipe) nil
                          (tail pipe) nil)
                    (pop (head pipe)))
                (let* ((seqlen (- end current))
                       (srcs   (block-start blk))
                       (srce   (min (+ seqlen srcs)
                                    (length (block-sequence blk))))
                       (len    (- srce srcs)))
                  (replace sequence (block-sequence blk)
                           :start1 current :start2 (block-start blk) :end2 srce)
                  (incf current len)
                  (incf (block-start blk) len))))
      :finally (return current))))

(defmethod pipe-dequeue-until-element ((pipe queued-pipe) element)
  (let ((chunks '()))
    (flet ((concatenate-chunks ()
             (let ((result
                     (with-output-to-string (out)
                       (dolist (chunk (nreverse chunks))
                         (write-string chunk out)))))
               result)))
      (with-lock-held ((lock pipe))
        (loop
          (when (%wait-not-empty-or-closed pipe)
            (return (values (concatenate-chunks) t #|=eof|#)))
          ;; read what we can:
          (let ((blk (car (head pipe))))
            (if (block-empty-p blk)
                (if (eql (head pipe) (tail pipe))
                    (setf (head pipe) nil
                          (tail pipe) nil)
                    (pop (head pipe)))
                (let* ((srcs   (block-start blk))
                       (srce   (length (block-sequence blk)))
                       (pos    (position element (block-sequence blk) :start srcs :end srce)))
                  (if pos
                      (progn (push (subseq (block-sequence blk) (block-start blk) pos) chunks)
                             (setf (block-start blk) (1+ pos))
                             (return (values (concatenate-chunks) nil #|=newline|#)))
                      (progn (push (subseq (block-sequence blk) (block-start blk)) chunks)
                             (setf (block-start blk) srce)))))))))))


;;; Peek or dequeue element.

(defmethod %pipe-peek-or-dequeue ((pipe buffered-pipe) peek)
  (values t (prog1 (aref (buffer pipe) (head pipe))
              (unless peek
                (mod-incf (head pipe) (length (buffer pipe)))))
          (not peek)))

(defmethod %pipe-peek-or-dequeue ((pipe queued-pipe) peek)
  (let ((blk (car (head pipe))))
    (if (block-empty-p blk)
        (progn
          (if (eql (head pipe) (tail pipe))
              (setf (head pipe) nil
                    (tail pipe) nil)
              (pop (head pipe)))
          (values nil))
        (values t (if peek
                      (block-peek-char blk)
                      (block-pop-char  blk))))))

(defun %peek-or-dequeue (pipe peek no-hang)
  (let ((signal-not-full nil))
    (prog1 (loop
             (with-lock-held ((lock pipe))
               (if (%pipe-emptyp pipe)
                   (if (closed-pipe-p pipe)
                       (return :eof)
                       (if no-hang
                           (return nil)
                           (gate-wait (not-empty pipe) (lock pipe))))
                   (multiple-value-bind (returnp result signal) (%pipe-peek-or-dequeue pipe peek)
                     (setf signal-not-full signal)
                     (when returnp
                       (return result))))))
      (when signal-not-full (gate-signal (not-full pipe))))))


(defmethod pipe-dequeue-element ((pipe generic-pipe))
  (%peek-or-dequeue pipe nil nil))

(defmethod pipe-peek-element ((pipe generic-pipe))
  (%peek-or-dequeue pipe :peek nil))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;
;;; stream methods
;;;
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(defun check-stream-open (stream where)
  (unless (open-stream-p stream)
    (error "~S cannot deal with closed stream ~S"
           where stream)))

;;; character input

(declaim (inline update-column))
(defun update-column (stream ch)
  (when (characterp ch)
    (if (char= ch #\newline)
        (setf (column stream) 0)
        (incf (column stream))))
  ch)

(defmethod stream-read-char ((stream pipe-character-input-stream))
  (check-stream-open stream 'stream-read-char)
  (update-column stream (pipe-dequeue-element (pipe-stream-pipe stream))))

(defmethod stream-read-char-no-hang ((stream pipe-character-input-stream))
  (check-stream-open stream 'stream-read-char-no-hang)
  (update-column stream (%peek-or-dequeue (pipe-stream-pipe stream) nil :no-hang)))

 (defmethod stream-peek-char ((stream pipe-character-input-stream))
  (check-stream-open stream 'stream-peek-char)
  (pipe-peek-element (pipe-stream-pipe stream)))

(defmethod stream-read-line ((stream pipe-character-input-stream))
  (check-stream-open stream 'stream-read-line)
  (multiple-value-bind (line eof) (pipe-dequeue-until-element (pipe-stream-pipe stream) #\newline)
    (setf (column stream) 0)
    (values line eof)))

(defmethod stream-listen ((stream pipe-character-input-stream))
  (check-stream-open stream 'stream-listen)
  (%peek-or-dequeue (pipe-stream-pipe stream) :peek :no-hang))

(defmethod stream-unread-char ((stream pipe-character-input-stream) ch)
  (check-stream-open stream 'stream-unread-char)
  (with-lock-held ((lock stream))
    (let ((head (head stream)))
      (if head
          (let ((blk (car head)))
            (if (block-full-p blk)
                (push (make-block (string ch)) (head stream))
                (block-push-char blk ch)))
          (setf (tail stream)
                (setf (head stream) (list (make-block (string ch))))))))
  (gate-signal (not-empty stream))
  (setf (column stream) (max 0 (1- (column stream))))
  ch)



;;; character output

(defmethod stream-write-char ((stream pipe-character-output-stream) ch)
  (check-stream-open stream 'stream-write-char)
  (let ((pipe (pipe-stream-pipe stream)))
    (unless (sunk-pipe-p pipe)
      (pipe-enqueue-element pipe ch)))
  (if (char= #\newline ch)
      (setf (column stream) 0)
      (incf (column stream)))
  ch)

(defmethod stream-terpri ((stream pipe-character-output-stream))
  (check-stream-open stream 'stream-terpri)
  (stream-write-char stream #\newline)
  nil)

(defmethod stream-write-string ((stream pipe-character-output-stream) string &optional (start 0) end)
  (check-stream-open stream 'stream-write-string)
  (let* ((pipe (pipe-stream-pipe stream))
         (end  (or end (length string)))
         (nlp  (position #\newline string :start start :end end :from-end t)))
    (unless (sunk-pipe-p pipe)
      (pipe-enqueue-sequence pipe string start end))
    (if nlp
        (setf (column stream) (- end nlp))
        (incf (column stream) (- end start))))
  string)

(defmethod stream-line-column ((stream pipe-character-output-stream))
  (column stream))

(defmethod stream-start-line-p ((stream pipe-character-output-stream))
  (zerop (column stream)))

(defmethod stream-advance-to-column ((stream pipe-character-output-stream) column)
  (check-stream-open stream 'stream-advance-to-column)
  (let ((delta (- column (column stream))))
    (when (plusp delta)
      (stream-write-string stream (make-string delta :initial-element #\space))
      delta)))

(defgeneric reopen-pipe (pipe)
  (:documentation "Reopens the streams of the PIPE."))
(defmethod reopen-pipe ((pipe generic-pipe))
  (with-lock-held ((lock pipe))
    (setf (slot-value (pipe-input-stream pipe)  'open) t
          (slot-value (pipe-output-stream pipe) 'open) t))
  (gate-signal (not-empty pipe)))

(defmethod close-pipe ((pipe generic-pipe))
  (with-lock-held ((lock pipe))
    (setf (slot-value (pipe-output-stream pipe) 'open) nil))
  (gate-signal (not-empty pipe)))

(defun sink-pipe (pipe)
  (with-lock-held ((lock pipe))
    (setf (slot-value (pipe-input-stream pipe) 'open) nil)))

(defmethod close ((stream pipe-character-output-stream) &key abort)
  (declare (ignore abort))
  (close-pipe (pipe-stream-pipe stream)))

(defmethod close ((stream pipe-binary-output-stream) &key abort)
  (declare (ignore abort))
  (close-pipe (pipe-stream-pipe stream)))

(defmethod close ((stream pipe-character-input-stream) &key abort)
  (declare (ignore abort))
  (sink-pipe (pipe-stream-pipe stream)))

(defmethod close ((stream pipe-binary-input-stream) &key abort)
  (declare (ignore abort))
  (sink-pipe (pipe-stream-pipe stream)))



;; binary input

(defmethod stream-read-byte ((stream pipe-binary-input-stream))
  (check-stream-open stream 'stream-read-byte)
  (%peek-or-dequeue (pipe-stream-pipe stream) nil nil))

;; binary output

(defmethod stream-write-byte ((stream pipe-binary-output-stream) byte)
  (check-stream-open stream 'stream-write-byte)
  (let ((pipe (pipe-stream-pipe stream)))
    (unless (sunk-pipe-p pipe)
      (pipe-enqueue-element pipe byte)))
  byte)



;;; sequence I/O

(defun check-sequence-arguments (direction pipe sequence start end)
  (assert (<= 0 start end (length sequence))
          (sequence start end)
          "START = ~D or END = ~D are not sequence bounding indexes for a ~S of length ~D"
          start end (type-of sequence) (length sequence))
  (ecase direction
    (:read
     (assert (or (listp sequence)
                 (and (vectorp sequence)
                      (subtypep (pipe-element-type pipe) (array-element-type sequence))))
             (sequence)
             "For reading, the sequence element type ~S should be a supertype of the pipe element type ~S"
             (array-element-type sequence) (pipe-element-type pipe)))
    (:write
     (assert (or (listp sequence)
                 (and (vectorp sequence)
                      (subtypep (array-element-type sequence) (pipe-element-type pipe))))
             (sequence)
             "For writing, the sequence element type ~S should be a subtype of the pipe element type ~S"
             (array-element-type sequence) (pipe-element-type pipe)))))

(defun %stream-read-sequence (stream sequence start end)
  (let ((pipe (pipe-stream-pipe stream)))
    (check-sequence-arguments :read pipe sequence start end)
    (pipe-dequeue-sequence pipe sequence start end)))

(defun %stream-write-sequence (stream sequence start end)
  (let ((pipe (pipe-stream-pipe stream)))
    (check-sequence-arguments :write pipe sequence start end)
    (unless (sunk-pipe-p pipe)
      (pipe-enqueue-sequence pipe sequence start end))
    sequence))

(defmethod stream-read-sequence ((stream pipe-character-input-stream) sequence start end &key &allow-other-keys)
  (check-stream-open stream 'stream-read-sequence)
  (%stream-read-sequence stream sequence start end))

(defmethod stream-read-sequence ((stream pipe-binary-input-stream) sequence start end &key &allow-other-keys)
  (check-stream-open stream 'stream-read-sequence)
  (%stream-read-sequence stream sequence start end))

(defmethod stream-write-sequence ((stream pipe-character-output-stream) sequence start end &key &allow-other-keys)
  (check-stream-open stream 'stream-write-sequence)
  (%stream-write-sequence stream sequence start end))

(defmethod stream-write-sequence ((stream pipe-binary-output-stream) sequence start end &key &allow-other-keys)
  (check-stream-open stream 'stream-write-sequence)
  (%stream-write-sequence stream sequence start end))



;; For:
;; (defmethod stream-finish-output ((stream pipe-character-output-stream)))
;; (defmethod stream-finish-output ((stream pipe-binary-output-stream)))
;; Attempts to ensure that all output sent to the stream has reached its
;; destination, and only then returns false. Implements
;; cl:finish-output. The default method does nothing.
;;
;; We assign the semantics of waiting for the reader process to
;; have read all the data written so far.
;;
;; NO: This could be done with a empty condition, and the writer waiting
;;     on the empty condition.  The reader would notify it when reaching
;;     an empty pipe.
;; This assumes a single writer stream.  While waiting for the
;; condition another writer stream could further write data.
;; Therefore we need instead to be able to enqueue tokens into the pipe,
;; that could be used to message back to the exact writing thread.
;;
;; (defmethod pipe-finish-output ((pipe generic-pipe))
;;   (with-lock-held ((lock pipe))
;;     (loop :until (%pipe-emptyp pipe)
;;           :do (condition-wait (empty pipe) (lock pipe)))))
;;
;; (defmethod stream-finish-output ((stream pipe-character-output-stream))
;;   (pipe-finish-output (pipe-stream-pipe stream)))
;; (defmethod stream-finish-output ((stream pipe-binary-output-stream))
;;   (pipe-finish-output (pipe-stream-pipe stream)))

#-(and) (progn
          (eval-when (:compile-toplevel :load-toplevel :execute)
            (pushnew :debug *features*)
            (setf *features*  (delete :debug *features*)))
          #+debug (:shadowing-import-from  "COM.INFORMATIMAGO.CLEXT.DEBUG" "WITH-LOCK-HELD")
          #+debug (:use "COM.INFORMATIMAGO.CLEXT.DEBUG")
          #-debug (declaim (inline tr))
          #-debug (defun tr (&rest args) (declare (ignore args)))

          (defun trace-pipe (where pipe)
            (flet ((sub (s e)
                     (substitute #\␤ #\newline (subseq (buffer pipe) s e))))
              (let ((head (head pipe))
                    (tail (tail pipe))
                    (e    (if (%pipe-emptyp pipe) "empty " ""))
                    (f    (if (%pipe-fullp  pipe) "full  " "")))
                (tr "~A: h:~S t:~S ~A~A" where head tail e f)
                (if (<= head tail)
                    (tr "~V,,,'_<~>~A~V,,,'_<~>"
                      head
                      (sub head tail)
                      (- (length (buffer pipe)) tail))
                    (tr "~A~V,,,'_<~>~A"
                      (sub 0 tail)
                      (- head tail)
                      (sub head (length (buffer pipe))))))))

          )

;;;; THE END ;;;;
ViewGit