This patch implements "via" aliases, which let us explicitly select a server to use for delivery. This feature is useful in different scenarios, such as a secondary MX server that forwards all incoming email to a primary. For now, it is experimental and the syntax and semantics are subject to change.
122 lines
3.4 KiB
Python
Executable File
122 lines
3.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import difflib
|
|
import email.parser
|
|
import email.policy
|
|
import itertools
|
|
import mailbox
|
|
import sys
|
|
|
|
|
|
def flexible_eq(expected, got):
|
|
"""Compare two strings, supporting wildcards.
|
|
|
|
This functions compares two strings, but supports wildcards on the
|
|
expected string. The following characters have special meaning:
|
|
|
|
- ? matches any character.
|
|
- * matches anything until the end of the line.
|
|
|
|
Returns True if equal (considering wildcards), False otherwise.
|
|
"""
|
|
posG = 0
|
|
for c in expected:
|
|
if posG >= len(got):
|
|
return False
|
|
|
|
if c == "?":
|
|
posG += 1
|
|
continue
|
|
if c == "*":
|
|
while posG < len(got) and got[posG] != "\t":
|
|
posG += 1
|
|
continue
|
|
continue
|
|
|
|
if c != got[posG]:
|
|
return False
|
|
|
|
posG += 1
|
|
|
|
if posG != len(got):
|
|
# We got more than we expected.
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def msg_equals(expected, msg):
|
|
"""Compare two messages recursively, using flexible_eq()."""
|
|
diff = False
|
|
for h, val in expected.items():
|
|
if h not in msg:
|
|
print("Header missing: %r" % h)
|
|
diff = True
|
|
continue
|
|
|
|
if expected[h] == "*":
|
|
continue
|
|
|
|
msg_hdr_vals = msg.get_all(h, [])
|
|
if len(msg_hdr_vals) == 1:
|
|
if not flexible_eq(val, msg[h]):
|
|
print("Header %r differs:" % h)
|
|
print("Exp: %r" % val)
|
|
print("Got: %r" % msg[h])
|
|
diff = True
|
|
else:
|
|
# We have multiple values for this header, so we need to check each
|
|
# one, and only return a diff if none of them match.
|
|
# Note this will result in a false positive if two headers match
|
|
# the same expected one, but this is good enough for now.
|
|
for msg_hdr_val in msg_hdr_vals:
|
|
if flexible_eq(val, msg_hdr_val):
|
|
break
|
|
else:
|
|
print("Header %r differs, no matching header found" % h)
|
|
print("Exp: %r" % val)
|
|
for i, msg_hdr_val in enumerate(msg_hdr_vals):
|
|
print("Got %d: %r" % (i, msg_hdr_val))
|
|
diff = True
|
|
|
|
if diff:
|
|
return False
|
|
|
|
if expected.is_multipart() != msg.is_multipart():
|
|
print(
|
|
"Multipart differs, expected %s, got %s"
|
|
% (expected.is_multipart(), msg.is_multipart())
|
|
)
|
|
return False
|
|
|
|
if expected.is_multipart():
|
|
for exp, got in itertools.zip_longest(
|
|
expected.get_payload(), msg.get_payload()
|
|
):
|
|
if not msg_equals(exp, got):
|
|
return False
|
|
else:
|
|
if not flexible_eq(expected.get_payload(), msg.get_payload()):
|
|
exp = expected.get_payload().splitlines()
|
|
got = msg.get_payload().splitlines()
|
|
print("Payload differs:")
|
|
for l in difflib.ndiff(exp, got):
|
|
print(l)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
f1, f2 = sys.argv[1:3]
|
|
|
|
# We use a custom strict policy to do more strict content validation.
|
|
policy = email.policy.EmailPolicy(
|
|
utf8=True, linesep="\r\n", refold_source="none", raise_on_defect=True
|
|
)
|
|
|
|
expected = email.parser.Parser(policy=policy).parse(open(f1))
|
|
msg = email.parser.Parser(policy=policy).parse(open(f2))
|
|
|
|
sys.exit(0 if msg_equals(expected, msg) else 1)
|