148 lines
5.7 KiB
Python
148 lines
5.7 KiB
Python
'''This is like pexpect, but it will work with any file descriptor that you
|
|
pass it. You are responsible for opening and close the file descriptor.
|
|
This allows you to use Pexpect with sockets and named pipes (FIFOs).
|
|
|
|
PEXPECT LICENSE
|
|
|
|
This license is approved by the OSI and FSF as GPL-compatible.
|
|
http://opensource.org/licenses/isc-license.txt
|
|
|
|
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
|
|
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
|
|
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
|
|
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
|
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
|
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
'''
|
|
|
|
from .spawnbase import SpawnBase
|
|
from .exceptions import ExceptionPexpect, TIMEOUT
|
|
from .utils import select_ignore_interrupts, poll_ignore_interrupts
|
|
import os
|
|
|
|
__all__ = ['fdspawn']
|
|
|
|
class fdspawn(SpawnBase):
|
|
'''This is like pexpect.spawn but allows you to supply your own open file
|
|
descriptor. For example, you could use it to read through a file looking
|
|
for patterns, or to control a modem or serial device. '''
|
|
|
|
def __init__ (self, fd, args=None, timeout=30, maxread=2000, searchwindowsize=None,
|
|
logfile=None, encoding=None, codec_errors='strict', use_poll=False):
|
|
'''This takes a file descriptor (an int) or an object that support the
|
|
fileno() method (returning an int). All Python file-like objects
|
|
support fileno(). '''
|
|
|
|
if type(fd) != type(0) and hasattr(fd, 'fileno'):
|
|
fd = fd.fileno()
|
|
|
|
if type(fd) != type(0):
|
|
raise ExceptionPexpect('The fd argument is not an int. If this is a command string then maybe you want to use pexpect.spawn.')
|
|
|
|
try: # make sure fd is a valid file descriptor
|
|
os.fstat(fd)
|
|
except OSError:
|
|
raise ExceptionPexpect('The fd argument is not a valid file descriptor.')
|
|
|
|
self.args = None
|
|
self.command = None
|
|
SpawnBase.__init__(self, timeout, maxread, searchwindowsize, logfile,
|
|
encoding=encoding, codec_errors=codec_errors)
|
|
self.child_fd = fd
|
|
self.own_fd = False
|
|
self.closed = False
|
|
self.name = '<file descriptor %d>' % fd
|
|
self.use_poll = use_poll
|
|
|
|
def close (self):
|
|
"""Close the file descriptor.
|
|
|
|
Calling this method a second time does nothing, but if the file
|
|
descriptor was closed elsewhere, :class:`OSError` will be raised.
|
|
"""
|
|
if self.child_fd == -1:
|
|
return
|
|
|
|
self.flush()
|
|
os.close(self.child_fd)
|
|
self.child_fd = -1
|
|
self.closed = True
|
|
|
|
def isalive (self):
|
|
'''This checks if the file descriptor is still valid. If :func:`os.fstat`
|
|
does not raise an exception then we assume it is alive. '''
|
|
|
|
if self.child_fd == -1:
|
|
return False
|
|
try:
|
|
os.fstat(self.child_fd)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def terminate (self, force=False): # pragma: no cover
|
|
'''Deprecated and invalid. Just raises an exception.'''
|
|
raise ExceptionPexpect('This method is not valid for file descriptors.')
|
|
|
|
# These four methods are left around for backwards compatibility, but not
|
|
# documented as part of fdpexpect. You're encouraged to use os.write
|
|
# directly.
|
|
def send(self, s):
|
|
"Write to fd, return number of bytes written"
|
|
s = self._coerce_send_string(s)
|
|
self._log(s, 'send')
|
|
|
|
b = self._encoder.encode(s, final=False)
|
|
return os.write(self.child_fd, b)
|
|
|
|
def sendline(self, s):
|
|
"Write to fd with trailing newline, return number of bytes written"
|
|
s = self._coerce_send_string(s)
|
|
return self.send(s + self.linesep)
|
|
|
|
def write(self, s):
|
|
"Write to fd, return None"
|
|
self.send(s)
|
|
|
|
def writelines(self, sequence):
|
|
"Call self.write() for each item in sequence"
|
|
for s in sequence:
|
|
self.write(s)
|
|
|
|
def read_nonblocking(self, size=1, timeout=-1):
|
|
"""
|
|
Read from the file descriptor and return the result as a string.
|
|
|
|
The read_nonblocking method of :class:`SpawnBase` assumes that a call
|
|
to os.read will not block (timeout parameter is ignored). This is not
|
|
the case for POSIX file-like objects such as sockets and serial ports.
|
|
|
|
Use :func:`select.select`, timeout is implemented conditionally for
|
|
POSIX systems.
|
|
|
|
:param int size: Read at most *size* bytes.
|
|
:param int timeout: Wait timeout seconds for file descriptor to be
|
|
ready to read. When -1 (default), use self.timeout. When 0, poll.
|
|
:return: String containing the bytes read
|
|
"""
|
|
if os.name == 'posix':
|
|
if timeout == -1:
|
|
timeout = self.timeout
|
|
rlist = [self.child_fd]
|
|
wlist = []
|
|
xlist = []
|
|
if self.use_poll:
|
|
rlist = poll_ignore_interrupts(rlist, timeout)
|
|
else:
|
|
rlist, wlist, xlist = select_ignore_interrupts(
|
|
rlist, wlist, xlist, timeout
|
|
)
|
|
if self.child_fd not in rlist:
|
|
raise TIMEOUT('Timeout exceeded.')
|
|
return super(fdspawn, self).read_nonblocking(size)
|