We have a few Python scripts which over the years ended up with a variety of formatting. This patch auto-formats them using `black -l 79` to make them more uniform, and easier to read and write.
281 lines
8.4 KiB
Python
Executable File
281 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
chamuyero is a tool to test and validate line-oriented commands and servers.
|
|
|
|
It can launch and communicate with other processes, and follow a script of
|
|
line-oriented request-response, validating the dialog as it goes along.
|
|
|
|
This can be used to test line-oriented network protocols (such as SMTP) or
|
|
interactive command-line tools.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import ssl
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
# Command-line flags.
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("script", type=argparse.FileType("r", encoding="utf8"))
|
|
args = ap.parse_args()
|
|
|
|
# Make sure stdout is open in utf8 mode, as we will print our input, which is
|
|
# utf8, and want it to work regardless of the environment.
|
|
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf8", buffering=1)
|
|
|
|
|
|
class Process(object):
|
|
def __init__(self, cmd, **kwargs):
|
|
self.cmd = subprocess.Popen(cmd, **kwargs)
|
|
|
|
def write(self, s):
|
|
self.cmd.stdin.write(s)
|
|
|
|
def readline(self):
|
|
return self.cmd.stdout.readline()
|
|
|
|
def wait(self):
|
|
return self.cmd.wait()
|
|
|
|
def close(self):
|
|
return self.cmd.stdin.close()
|
|
|
|
|
|
class Sock(object):
|
|
"""A (generic) socket.
|
|
|
|
This class implements the common code for socket support.
|
|
Subclasses will implement the behaviour specific to different socket
|
|
types.
|
|
"""
|
|
|
|
def __init__(self, addr):
|
|
self.addr = addr
|
|
self.sock = NotImplemented
|
|
self.connr = None
|
|
self.connw = None
|
|
self.has_conn = threading.Event()
|
|
|
|
def listen(self):
|
|
self.sock.bind(self.addr)
|
|
self.sock.listen(1)
|
|
threading.Thread(target=self._accept).start()
|
|
|
|
def _accept(self):
|
|
conn, _ = self.sock.accept()
|
|
self.connr = conn.makefile(mode="r", encoding="utf8")
|
|
self.connw = conn.makefile(mode="w", encoding="utf8")
|
|
self.has_conn.set()
|
|
|
|
def write(self, s):
|
|
self.has_conn.wait()
|
|
self.connw.write(s)
|
|
self.connw.flush()
|
|
|
|
def readline(self):
|
|
self.has_conn.wait()
|
|
return self.connr.readline()
|
|
|
|
def close(self):
|
|
self.connr.close()
|
|
self.connw.close()
|
|
self.sock.close()
|
|
|
|
|
|
class UnixSock(Sock):
|
|
def __init__(self, addr):
|
|
Sock.__init__(self, addr)
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
def listen(self):
|
|
if os.path.exists(self.addr):
|
|
os.remove(self.addr)
|
|
Sock.listen(self)
|
|
|
|
|
|
class TCPSock(Sock):
|
|
def __init__(self, addr):
|
|
host, port = addr.rsplit(":", 1)
|
|
Sock.__init__(self, (host, int(port)))
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def connect(self):
|
|
self.sock = socket.create_connection(self.addr)
|
|
self.connr = self.sock.makefile(mode="r", encoding="utf8")
|
|
self.connw = self.sock.makefile(mode="w", encoding="utf8")
|
|
self.has_conn.set()
|
|
|
|
|
|
class TLSSock(Sock):
|
|
def __init__(self, addr):
|
|
host, port = addr.rsplit(":", 1)
|
|
Sock.__init__(self, (host, int(port)))
|
|
plain_sock = socket.create_connection(self.addr)
|
|
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context.check_hostname = False
|
|
context.verify_mode = ssl.CERT_NONE
|
|
self.sock = context.wrap_socket(plain_sock)
|
|
|
|
def connect(self):
|
|
self.connr = self.sock.makefile(mode="r", encoding="utf8")
|
|
self.connw = self.sock.makefile(mode="w", encoding="utf8")
|
|
self.has_conn.set()
|
|
|
|
|
|
class Interpreter(object):
|
|
"""Interpreter for chamuyero scripts."""
|
|
|
|
def __init__(self):
|
|
# Processes and sockets we have spawn. Indexed by the id provided by
|
|
# the user.
|
|
self.procs = {}
|
|
|
|
# Line number we are processing.
|
|
self.nline = 0
|
|
|
|
def syntax_error(self, msg):
|
|
raise SyntaxError("Error in line %d: %s" % (self.nline, msg))
|
|
|
|
def runtime_error(self, msg):
|
|
raise RuntimeError("Error in line %d: %s" % (self.nline, msg))
|
|
|
|
def run(self, fd):
|
|
"""Main processing loop."""
|
|
cont_l = ""
|
|
for l in fd:
|
|
self.nline += 1
|
|
|
|
# Remove rightmost \n.
|
|
l = l[:-1]
|
|
|
|
# Continuations with \.
|
|
if cont_l:
|
|
l = cont_l + " " + l.lstrip()
|
|
if l.endswith("\\"):
|
|
cont_l = l[:-1]
|
|
continue
|
|
else:
|
|
cont_l = ""
|
|
|
|
# Comments start with a "#".
|
|
if l.strip().startswith("#") or l.strip() == "":
|
|
continue
|
|
|
|
print(l)
|
|
|
|
# Everything else is of the form:
|
|
# <proc> <op> [params]
|
|
sp = l.split(None, 2)
|
|
if len(sp) == 3:
|
|
proc, op, params = sp
|
|
else:
|
|
proc, op = sp
|
|
params = ""
|
|
|
|
# = Launch a process.
|
|
if op == "=":
|
|
cmd = Process(
|
|
params,
|
|
shell=True,
|
|
universal_newlines=True,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
self.procs[proc] = cmd
|
|
|
|
# |= Launch a process, do not capture stdout.
|
|
elif op == "|=":
|
|
cmd = Process(params, shell=True, stdin=subprocess.PIPE)
|
|
self.procs[proc] = cmd
|
|
|
|
# unix_listen Listen on an UNIX socket.
|
|
elif op == "unix_listen":
|
|
sock = UnixSock(params)
|
|
sock.listen()
|
|
self.procs[proc] = sock
|
|
|
|
# tcp_listen Listen on a TCP socket.
|
|
elif op == "tcp_listen":
|
|
sock = TCPSock(params)
|
|
sock.listen()
|
|
self.procs[proc] = sock
|
|
|
|
elif op == "tcp_connect":
|
|
sock = TCPSock(params)
|
|
sock.connect()
|
|
self.procs[proc] = sock
|
|
|
|
elif op == "tls_connect":
|
|
sock = TLSSock(params)
|
|
sock.connect()
|
|
self.procs[proc] = sock
|
|
|
|
# -> Send to a process stdin, with a \r\n at the end.
|
|
# .> Send to a process stdin, no \r\n at the end.
|
|
# ~> Send to a process stdin, string is python-evaluated.
|
|
elif op == "->":
|
|
self.procs[proc].write(params + "\r\n")
|
|
elif op == ".>":
|
|
self.procs[proc].write(params)
|
|
elif op == "~>":
|
|
self.procs[proc].write(eval(params))
|
|
|
|
# <- Read from the process, expect matching input.
|
|
# <~ Read from the process, match input using regexp.
|
|
# <... Read many lines until one matches.
|
|
elif op == "<-":
|
|
read = self.procs[proc].readline()
|
|
if read != params + "\n":
|
|
self.runtime_error(
|
|
"data different that expected:\n"
|
|
+ " expected: %s\n" % repr(params)
|
|
+ " got: %s" % repr(read)
|
|
)
|
|
elif op == "<~":
|
|
read = self.procs[proc].readline()
|
|
m = re.match(params, read)
|
|
if m is None:
|
|
self.runtime_error(
|
|
"data did not match regexp:\n"
|
|
+ " regexp: %s\n" % repr(params)
|
|
+ " got: %s" % repr(read)
|
|
)
|
|
elif op == "<...":
|
|
while True:
|
|
read = self.procs[proc].readline()
|
|
m = re.match(params, read)
|
|
if m:
|
|
break
|
|
|
|
# sleep Sleep this number of seconds (process-independent).
|
|
elif op == "sleep":
|
|
time.sleep(float(params))
|
|
|
|
# wait Wait for the process to exit (with the given code).
|
|
elif op == "wait":
|
|
retcode = self.procs[proc].wait()
|
|
if params and retcode != int(params):
|
|
self.runtime_error(
|
|
"return code did not match:\n"
|
|
+ " expected %s, got %d" % (params, retcode)
|
|
)
|
|
|
|
# close Close the process.
|
|
elif op == "close":
|
|
self.procs[proc].close()
|
|
|
|
else:
|
|
self.syntax_error("unknown syntax")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
i = Interpreter()
|
|
i.run(args.script)
|