Today, the maximum number of items in the queue, as well as how long we keep attempting to send each item, is hard-coded and not changed by end users. While they are totally adequate for chasquid's main use cases, it can still be useful for some users to change them. So this patch adds two new configuration options for those settings. They're marked experimental for now, so we can adjust them if needed after they get more exposure. Thanks to Lewis Ross-Jones <lewis_r_j@hotmail.com> for suggesting this improvement, and help with testing it.
342 lines
8.6 KiB
Go
342 lines
8.6 KiB
Go
package queue
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"blitiri.com.ar/go/chasquid/internal/aliases"
|
|
"blitiri.com.ar/go/chasquid/internal/set"
|
|
"blitiri.com.ar/go/chasquid/internal/testlib"
|
|
"blitiri.com.ar/go/chasquid/internal/trace"
|
|
)
|
|
|
|
func allUsersExist(tr *trace.Trace, user, domain string) (bool, error) {
|
|
return true, nil
|
|
}
|
|
|
|
func TestBasic(t *testing.T) {
|
|
dir := testlib.MustTempDir(t)
|
|
defer testlib.RemoveIfOk(t, dir)
|
|
localC := testlib.NewTestCourier()
|
|
remoteC := testlib.NewTestCourier()
|
|
q, _ := New(dir, set.NewString("loco"),
|
|
aliases.NewResolver(allUsersExist),
|
|
localC, remoteC)
|
|
tr := trace.New("test", "TestBasic")
|
|
defer tr.Finish()
|
|
|
|
localC.Expect(2)
|
|
remoteC.Expect(1)
|
|
id, err := q.Put(tr, "from", []string{"am@loco", "x@remote", "nodomain"}, []byte("data"))
|
|
if err != nil {
|
|
t.Fatalf("Put: %v", err)
|
|
}
|
|
|
|
if len(id) < 6 {
|
|
t.Errorf("short ID: %v", id)
|
|
}
|
|
|
|
localC.Wait()
|
|
remoteC.Wait()
|
|
|
|
// Make sure the delivered items leave the queue.
|
|
testlib.WaitFor(func() bool { return q.Len() == 0 }, 2*time.Second)
|
|
if q.Len() != 0 {
|
|
t.Fatalf("%d items not removed from the queue after delivery", q.Len())
|
|
}
|
|
|
|
cases := []struct {
|
|
courier *testlib.TestCourier
|
|
expectedTo string
|
|
}{
|
|
{localC, "nodomain"},
|
|
{localC, "am@loco"},
|
|
{remoteC, "x@remote"},
|
|
}
|
|
for _, c := range cases {
|
|
req := c.courier.ReqFor[c.expectedTo]
|
|
if req == nil {
|
|
t.Errorf("missing request for %q", c.expectedTo)
|
|
continue
|
|
}
|
|
|
|
if req.From != "from" || req.To != c.expectedTo ||
|
|
!bytes.Equal(req.Data, []byte("data")) {
|
|
t.Errorf("wrong request for %q: %v", c.expectedTo, req)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDSNOnTimeout(t *testing.T) {
|
|
localC := testlib.NewTestCourier()
|
|
remoteC := testlib.NewTestCourier()
|
|
dir := testlib.MustTempDir(t)
|
|
defer testlib.RemoveIfOk(t, dir)
|
|
q, _ := New(dir, set.NewString("loco"),
|
|
aliases.NewResolver(allUsersExist),
|
|
localC, remoteC)
|
|
|
|
// Insert an expired item in the queue.
|
|
item := &Item{
|
|
Message: Message{
|
|
ID: <-newID,
|
|
From: "from@loco",
|
|
Rcpt: []*Recipient{
|
|
mkR("to@to", Recipient_EMAIL, Recipient_PENDING, "err", "to@to")},
|
|
Data: []byte("data"),
|
|
},
|
|
CreatedAt: time.Now().Add(-24 * time.Hour),
|
|
}
|
|
q.q[item.ID] = item
|
|
err := item.WriteTo(q.path)
|
|
if err != nil {
|
|
t.Errorf("failed to write item: %v", err)
|
|
}
|
|
|
|
// Exercise DumpString while at it.
|
|
q.DumpString()
|
|
|
|
// Launch the sending loop, expect 1 local delivery (the DSN).
|
|
localC.Expect(1)
|
|
go item.SendLoop(q)
|
|
localC.Wait()
|
|
|
|
req := localC.ReqFor["from@loco"]
|
|
if req == nil {
|
|
t.Fatal("missing DSN")
|
|
}
|
|
|
|
if req.From != "<>" || req.To != "from@loco" ||
|
|
!strings.Contains(string(req.Data), "X-Failed-Recipients: to@to,") {
|
|
t.Errorf("wrong DSN: %q", string(req.Data))
|
|
}
|
|
}
|
|
|
|
func TestAliases(t *testing.T) {
|
|
localC := testlib.NewTestCourier()
|
|
remoteC := testlib.NewTestCourier()
|
|
dir := testlib.MustTempDir(t)
|
|
defer testlib.RemoveIfOk(t, dir)
|
|
q, _ := New(dir, set.NewString("loco"),
|
|
aliases.NewResolver(allUsersExist),
|
|
localC, remoteC)
|
|
tr := trace.New("test", "TestAliases")
|
|
defer tr.Finish()
|
|
|
|
q.aliases.AddDomain("loco")
|
|
q.aliases.AddAliasForTesting("ab@loco", "pq@loco", nil, aliases.EMAIL)
|
|
q.aliases.AddAliasForTesting("ab@loco", "rs@loco", nil, aliases.EMAIL)
|
|
q.aliases.AddAliasForTesting("cd@loco", "ata@hualpa", nil, aliases.EMAIL)
|
|
q.aliases.AddAliasForTesting(
|
|
"fwd@loco", "fwd@loco", []string{"server"}, aliases.FORWARD)
|
|
q.aliases.AddAliasForTesting(
|
|
"remote@loco", "remote@rana", []string{"server"}, aliases.FORWARD)
|
|
// Note the pipe aliases are tested below, as they don't use the couriers
|
|
// and it can be quite inconvenient to test them in this way.
|
|
|
|
localC.Expect(2)
|
|
remoteC.Expect(3)
|
|
|
|
// One email from a local domain: from@loco -> ab@loco, cd@loco, fwd@loco.
|
|
_, err := q.Put(tr, "from@loco",
|
|
[]string{"ab@loco", "cd@loco", "fwd@loco"},
|
|
[]byte("data"))
|
|
if err != nil {
|
|
t.Fatalf("Put: %v", err)
|
|
}
|
|
|
|
// And another from a remote domain: from@rana -> remote@loco
|
|
_, err = q.Put(tr, "from@rana",
|
|
[]string{"remote@loco"},
|
|
[]byte("data"))
|
|
if err != nil {
|
|
t.Fatalf("Put: %v", err)
|
|
}
|
|
|
|
localC.Wait()
|
|
remoteC.Wait()
|
|
|
|
cases := []struct {
|
|
courier *testlib.TestCourier
|
|
expectedFrom string
|
|
expectedTo string
|
|
}{
|
|
// From the local domain: from@loco
|
|
{localC, "from@loco", "pq@loco"},
|
|
{localC, "from@loco", "rs@loco"},
|
|
{remoteC, "from@loco", "ata@hualpa"},
|
|
{remoteC, "from@loco", "fwd@loco"},
|
|
|
|
// From the remote domain: from@rana.
|
|
// Note the SRS in the remoteC.
|
|
{remoteC, "remote+fwd_from=from=rana@loco", "remote@rana"},
|
|
}
|
|
for _, c := range cases {
|
|
req := c.courier.ReqFor[c.expectedTo]
|
|
if req == nil {
|
|
t.Errorf("missing request for %q", c.expectedTo)
|
|
continue
|
|
}
|
|
|
|
if req.From != c.expectedFrom || req.To != c.expectedTo ||
|
|
!bytes.Equal(req.Data, []byte("data")) {
|
|
t.Errorf("wrong request for %q: %v", c.expectedTo, *req)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFullQueue(t *testing.T) {
|
|
dir := testlib.MustTempDir(t)
|
|
defer testlib.RemoveIfOk(t, dir)
|
|
q, _ := New(dir, set.NewString(),
|
|
aliases.NewResolver(allUsersExist),
|
|
testlib.DumbCourier, testlib.DumbCourier)
|
|
tr := trace.New("test", "TestFullQueue")
|
|
defer tr.Finish()
|
|
|
|
// Force-insert as many items in the queue as it supports.
|
|
oneID := ""
|
|
for i := 0; i < q.MaxItems; i++ {
|
|
item := &Item{
|
|
Message: Message{
|
|
ID: <-newID,
|
|
From: fmt.Sprintf("from-%d", i),
|
|
Rcpt: []*Recipient{
|
|
mkR("to", Recipient_EMAIL, Recipient_PENDING, "", "")},
|
|
Data: []byte("data"),
|
|
},
|
|
CreatedAt: time.Now(),
|
|
}
|
|
q.q[item.ID] = item
|
|
oneID = item.ID
|
|
}
|
|
|
|
// This one should fail due to the queue being too big.
|
|
id, err := q.Put(tr, "from", []string{"to"}, []byte("data-qf"))
|
|
if err != errQueueFull {
|
|
t.Errorf("Not failed as expected: %v - %v", id, err)
|
|
}
|
|
|
|
// Remove one, and try again: it should succeed.
|
|
// Write it first so we don't get complaints about the file not existing
|
|
// (as we did not all the items properly).
|
|
q.q[oneID].WriteTo(q.path)
|
|
q.Remove(oneID)
|
|
|
|
id, err = q.Put(tr, "from", []string{"to"}, []byte("data"))
|
|
if err != nil {
|
|
t.Errorf("Put: %v", err)
|
|
}
|
|
q.Remove(id)
|
|
}
|
|
|
|
func TestPipes(t *testing.T) {
|
|
dir := testlib.MustTempDir(t)
|
|
defer testlib.RemoveIfOk(t, dir)
|
|
q, _ := New(dir, set.NewString("loco"),
|
|
aliases.NewResolver(allUsersExist),
|
|
testlib.DumbCourier, testlib.DumbCourier)
|
|
|
|
item := &Item{
|
|
Message: Message{
|
|
ID: <-newID,
|
|
From: "from",
|
|
Rcpt: []*Recipient{
|
|
mkR("true", Recipient_PIPE, Recipient_PENDING, "", "")},
|
|
Data: []byte("data"),
|
|
},
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
if err, _ := item.deliver(q, item.Rcpt[0]); err != nil {
|
|
t.Errorf("pipe delivery failed: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestBadPath(t *testing.T) {
|
|
// A new queue will attempt to os.MkdirAll the path.
|
|
// We expect this path to fail.
|
|
_, err := New("/proc/doesnotexist", set.NewString("loco"),
|
|
aliases.NewResolver(allUsersExist),
|
|
testlib.DumbCourier, testlib.DumbCourier)
|
|
if err == nil {
|
|
t.Errorf("could create queue, expected permission denied")
|
|
}
|
|
}
|
|
|
|
func TestNextDelay(t *testing.T) {
|
|
cases := []struct{ since, min time.Duration }{
|
|
{10 * time.Second, 1 * time.Minute},
|
|
{3 * time.Minute, 5 * time.Minute},
|
|
{7 * time.Minute, 10 * time.Minute},
|
|
{15 * time.Minute, 20 * time.Minute},
|
|
{30 * time.Minute, 20 * time.Minute},
|
|
}
|
|
for _, c := range cases {
|
|
// Repeat each case a few times to exercise the perturbation a bit.
|
|
for i := 0; i < 10; i++ {
|
|
delay := nextDelay(time.Now().Add(-c.since))
|
|
|
|
max := c.min + 1*time.Minute
|
|
if delay < c.min || delay > max {
|
|
t.Errorf("since:%v expected [%v, %v], got %v",
|
|
c.since, c.min, max, delay)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSerialization(t *testing.T) {
|
|
dir := testlib.MustTempDir(t)
|
|
defer testlib.RemoveIfOk(t, dir)
|
|
|
|
// Save an item in the queue directory.
|
|
item := &Item{
|
|
Message: Message{
|
|
ID: <-newID,
|
|
From: "from@loco",
|
|
Rcpt: []*Recipient{
|
|
mkR("to@to", Recipient_EMAIL, Recipient_PENDING, "err", "to@to")},
|
|
Data: []byte("data"),
|
|
},
|
|
CreatedAt: time.Now().Add(-1 * time.Hour),
|
|
}
|
|
err := item.WriteTo(dir)
|
|
if err != nil {
|
|
t.Errorf("failed to write item: %v", err)
|
|
}
|
|
|
|
// Create the queue; should load the
|
|
remoteC := testlib.NewTestCourier()
|
|
remoteC.Expect(1)
|
|
q, _ := New(dir, set.NewString("loco"),
|
|
aliases.NewResolver(allUsersExist),
|
|
testlib.DumbCourier, remoteC)
|
|
q.Load()
|
|
|
|
// Launch the sending loop, expect 1 remote delivery for the item we saved.
|
|
remoteC.Wait()
|
|
|
|
req := remoteC.ReqFor["to@to"]
|
|
if req == nil {
|
|
t.Fatal("email not delivered")
|
|
}
|
|
|
|
if req.From != "from@loco" || req.To != "to@to" {
|
|
t.Errorf("wrong email: %v", req)
|
|
}
|
|
}
|
|
|
|
func mkR(a string, t Recipient_Type, s Recipient_Status, m, o string) *Recipient {
|
|
return &Recipient{
|
|
Address: a,
|
|
Type: t,
|
|
Status: s,
|
|
LastFailureMessage: m,
|
|
OriginalAddress: o,
|
|
}
|
|
}
|