gloader/downloader.go
lordwelch bd0c3d2cc6 Load and save queue and history files
Switch to using an additional filename and sub-directory field
Allow status in json decode/encode
Switch to using string for url instead of url.URL
use log instead of fmt for logging
Add basic status handlers for the queue and history
Add HTTP timeouts
Implement cookie handling
Ignore TempPath and FilePath when adding URLs, they are absolute paths
Ignore Status when adding URLs and status is not Paused
When determining the filename use the path from the final redirect
Use the correct TempPath when downloading
Actually add requests to the queue before starting them
2020-12-13 01:05:17 -08:00

602 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"mime"
"net"
"net/http"
"net/http/cookiejar"
"net/url"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"github.com/cavaliercoder/grab"
"github.com/lordwelch/pathvalidate"
"golang.org/x/net/publicsuffix"
)
var (
DefaultCookieJar = newCookieJar()
DefaultGrabClient = grab.NewClient()
DefaultMaxActiveDownloads = 4
ErrUnsupportedScheme = errors.New("unsupported scheme")
)
type Priority uint8
type Status uint8
const (
Highest Priority = iota
High
Medium
Low
)
const (
Queued Status = iota
Complete
Stopped
Paused
Downloading
Error
Canceled
)
type Downloader struct {
DataDir string
DownloadDir string
CompleteDir string
InfoDir string
Grab *grab.Client
Jar http.CookieJar
MaxActiveDownloads int
Server *http.Server
Downloads RequestQueue
History RequestQueue
NewRequest chan Request
requestDone chan *Request
OnComplete func(r Request)
OnAdd func(r Request)
}
type Request struct {
URL string `json:"url"`
Cookies []http.Cookie `json:"cookies"`
ForceDownload bool `json:"forceDownload"`
Status Status `json:"Status"`
Priority Priority `json:"priority"`
FilePath string `json:"filepath"`
Filename string `json:"filename"`
Subdir string `json:"subdir"`
TempPath string `json:"tempPath"`
Response *grab.Response `json:"-"`
Error error `json:"-"`
CompletedDate time.Time `json:"completedDate"`
}
type RequestQueue struct {
Queue []*Request
URLSort bool
DateSort bool
}
func (rq RequestQueue) Less(i, j int) bool {
ii := 0
jj := 0
if rq.Queue[i].ForceDownload {
ii = 1
}
if rq.Queue[j].ForceDownload {
jj = 1
}
if ii < jj {
return true
}
if rq.Queue[i].Priority < rq.Queue[j].Priority {
return true
}
if rq.DateSort && rq.Queue[i].CompletedDate.Before(rq.Queue[j].CompletedDate) {
return true
}
if rq.URLSort && rq.Queue[i].URL < rq.Queue[j].URL {
return true
}
return false
}
func (rq RequestQueue) Len() int {
return len(rq.Queue)
}
func (rq RequestQueue) Swap(i, j int) {
rq.Queue[i], rq.Queue[j] = rq.Queue[j], rq.Queue[i]
}
func (rq *RequestQueue) Pop(i int) *Request {
r := rq.Queue[i]
copy(rq.Queue[i:], rq.Queue[i+1:])
rq.Queue[len(rq.Queue)-1] = nil
rq.Queue = rq.Queue[:len(rq.Queue)-1]
return r
}
func (rq *RequestQueue) remove(r *Request) {
for i, req := range rq.Queue {
if req == r {
copy(rq.Queue[i:], rq.Queue[i+1:])
rq.Queue[len(rq.Queue)-1] = nil
rq.Queue = rq.Queue[:len(rq.Queue)-1]
break
}
}
}
func newCookieJar() http.CookieJar {
c, _ := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
return c
}
func newDownloader() *Downloader {
return &Downloader{
Jar: DefaultCookieJar,
Grab: DefaultGrabClient,
}
}
func (d *Downloader) Start(network, address string) {
var (
listener net.Listener
mux = http.NewServeMux()
err error
)
if d.NewRequest == nil {
d.NewRequest = make(chan Request, 64)
}
if d.requestDone == nil {
d.requestDone = make(chan *Request, 64)
}
if d.MaxActiveDownloads < 1 {
d.MaxActiveDownloads = DefaultMaxActiveDownloads
}
if d.Server == nil {
d.Server = &http.Server{
Addr: address,
Handler: mux,
ReadTimeout: 2 * time.Minute,
WriteTimeout: 2 * time.Minute,
}
}
if d.DataDir == "" {
d.DataDir = "/perm/gloader"
}
if d.DownloadDir == "" {
d.DownloadDir = path.Join(d.DataDir, "Download")
}
if d.CompleteDir == "" {
d.CompleteDir = path.Join(d.DataDir, "Complete")
}
log.Println(d.DataDir)
log.Println(d.DownloadDir)
log.Println(d.CompleteDir)
_ = os.MkdirAll(d.DataDir, 0777)
_ = os.MkdirAll(d.DownloadDir, 0777)
_ = os.MkdirAll(d.CompleteDir, 0777)
listener, err = net.Listen(network, address)
if err != nil {
panic(err)
}
log.Println("adding /add handler")
// mux.HandleFunc("/", d.UI)
mux.HandleFunc("/add", d.restAddDownload)
mux.HandleFunc("/queue", d.restQueueStatus)
mux.HandleFunc("/history", d.restHistoryStatus)
mux.HandleFunc("/start", d.restStartDownload)
log.Println("starting main go routine")
d.Grab.HTTPClient = &http.Client{
Jar: d.Jar,
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
go d.download()
log.Println("serving http server")
_ = d.Server.Serve(listener)
}
func (d *Downloader) restStartDownload(w http.ResponseWriter, r *http.Request) {
var (
err error
index struct {
index int
}
)
if r.Method != http.MethodPost {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Add("Allow", http.MethodPost)
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintln(w, "HTTP Error 405 Method Not Allowed\nOnly POST method is allowed")
log.Println("HTTP Error 405 Method Not Allowed\nOnly POST method is allowed")
return
}
err = json.NewDecoder(r.Body).Decode(index)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if index.index >= d.Downloads.Len() || index.index < 0 {
http.Error(w, fmt.Sprintf("slice index out of bounds. index: %d length of slice: %d", index.index, d.Downloads.Len()), http.StatusBadRequest)
return
}
d.startDownload(index.index)
}
func (d *Downloader) restHistoryStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Add("Allow", http.MethodGet)
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintln(w, "HTTP Error 405 Method Not Allowed\nOnly GET method is allowed")
log.Println("HTTP Error 405 Method Not Allowed\nOnly GET method is allowed")
return
}
j := json.NewEncoder(w)
w.Header().Add("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
j.Encode(d.History.Queue)
}
func (d *Downloader) restQueueStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Add("Allow", http.MethodGet)
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintln(w, "HTTP Error 405 Method Not Allowed\nOnly GET method is allowed")
log.Println("HTTP Error 405 Method Not Allowed\nOnly GET method is allowed")
return
}
j := json.NewEncoder(w)
w.Header().Add("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
j.Encode(d.Downloads.Queue)
}
func (d *Downloader) restAddDownload(w http.ResponseWriter, r *http.Request) {
var (
requests []Request
err error
)
if r.Method != http.MethodPost {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Add("Allow", http.MethodPost)
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintln(w, "HTTP Error 405 Method Not Allowed\nOnly POST method is allowed")
log.Println("HTTP Error 405 Method Not Allowed\nOnly POST method is allowed")
return
}
// TODO fail only on individual requests
err = json.NewDecoder(r.Body).Decode(&requests)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, req := range requests {
req.TempPath = "" // not allowed via REST API
req.FilePath = "" // not allowed via REST API
if req.Status != Paused {
req.Status = Queued
}
log.Println("adding request", req.URL)
d.NewRequest <- req
}
w.WriteHeader(http.StatusOK)
}
func (d Downloader) getNameFromHEAD(r Request) string {
var (
err error
re *http.Response
p map[string]string
)
ht := &http.Client{
Jar: d.Jar,
Timeout: 30 * time.Second,
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 5 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}
re, err = ht.Head(r.URL)
if err != nil {
return ""
}
if re.StatusCode < 200 || re.StatusCode > 299 {
return ""
}
re.Body.Close()
_, p, err = mime.ParseMediaType(re.Header.Get("Content-Disposition"))
if err == nil {
if f, ok := p["filename"]; ok {
return f
}
}
return path.Base(re.Request.URL.Path)
}
// getFilename checks the provided filepath
// if not set uses the content-disposition from a head request
// if not set uses the basename of the url
// and sanitizes the filename using github.com/lordwelch/pathvalidate
func (d *Downloader) getFilename(r *Request) {
log.Println("Determining filename")
r.Filename = filepath.Clean(r.Filename)
if r.Filename == "." {
log.Println("filename is empty, testing head request")
r.Filename = d.getNameFromHEAD(*r)
log.Println("path from head request:", r.Filename)
if r.Filename == "" {
u, _ := url.Parse(r.URL)
r.Filename, _ = url.PathUnescape(filepath.Base(u.Path))
}
}
r.Filename, _ = pathvalidate.SanitizeFilename(r.Filename, '_')
// r.Filename = filepath.Join(d.CompleteDir, r.Filename)
// if filepath.IsAbs(r.Filename) { // should already exist
// dir, file := filepath.Split(r.Filename)
// // someone is trying to be sneaky (or someone changed the CompleteDir), change path to the correct dir
// if dir != filepath.Clean(d.CompleteDir) {
// r.Filename = filepath.Join(d.CompleteDir, file)
// }
// return
// }
log.Println("result path:", r.Filename)
}
func getNewFilename(dir, name string) string {
var (
err error
index = 1
)
log.Println("getfilename", dir, name)
ext := filepath.Ext(name)
base := strings.TrimSuffix(name, ext)
log.Println("stat", filepath.Join(dir, name))
_, err = os.Stat(filepath.Join(dir, name))
for err == nil {
name = strings.TrimRight(base+"."+strconv.Itoa(index)+ext, ".")
log.Println("stat", filepath.Join(dir, name))
_, err = os.Stat(filepath.Join(dir, name))
index++
}
if os.IsNotExist(err) {
return filepath.Join(dir, name)
}
panic(err) // other path error
}
func (d Downloader) getTempFilename(r *Request) {
if r.TempPath == "" {
f, err := ioutil.TempFile(d.DownloadDir, filepath.Base(r.Filename))
if err != nil {
log.Printf("request for %v failed: %v", r.URL, err)
}
r.TempPath = f.Name()
f.Close()
}
os.MkdirAll(filepath.Dir(r.FilePath), 0o777)
f, err := os.OpenFile(r.Filename, os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
return
}
f.Close()
}
func (d Downloader) SearchDownloads(u string) int {
for i, req := range d.Downloads.Queue {
if req.URL == u {
return i
}
}
return -1
}
func (d Downloader) SearchHistory(u string) int {
for i, req := range d.History.Queue {
if req.URL == u {
return i
}
}
return -1
}
func (d Downloader) FindRequest(u string) *Request {
if i := d.SearchDownloads(u); i >= 0 {
return d.Downloads.Queue[i]
}
if i := d.SearchHistory(u); i >= 0 {
return d.History.Queue[i]
}
return nil
}
func (d *Downloader) addRequest(r *Request) {
log.Println("adding download for", r.URL)
req := d.FindRequest(r.URL)
u, _ := url.Parse(r.URL)
for i, v := range r.Cookies {
d.Jar.SetCookies(&url.URL{
Scheme: u.Scheme,
Path: v.Path,
Host: v.Domain,
}, []*http.Cookie{&r.Cookies[i]})
}
d.getFilename(r)
if req != nil { // url alread added
log.Println("URL is already added", r.URL)
return
// if fi, err := os.Stat(r.Filepath); filepath.Base(req.Filepath) == filepath.Base(r.Filepath) || (err == nil && fi.Name() == filepath.Base(r.Filepath) && fi.Size() != 0) { // filepath has been found, should this check for multiple downloads of the same url or let the download name increment automatically
// log.Println("file already exists", r.Filepath)
// d.validate(*r) // TODO, should also check to see if it seems like it is similar, (check first k to see if it is the same file?? leave option to user)
// return
// }
}
r.FilePath = getNewFilename(d.CompleteDir, filepath.Join(r.Subdir, r.Filename))
d.Downloads.Queue = append(d.Downloads.Queue, r)
if len(d.getRunningDownloads()) < d.MaxActiveDownloads {
d.startDownload(d.Downloads.Len() - 1)
}
}
// func (d *Downloader) validate(r Request) {
// //TODO
// }
func (d *Downloader) startDownload(i int) {
var (
r *Request
req *grab.Request
err error
)
r = d.Downloads.Queue[i]
d.getTempFilename(r)
log.Println("starting download for", r.URL, "to", r.TempPath)
// d.Downloads.Queue = append(d.Downloads.Queue, r)
if r.Response == nil || r.Response.Err() != nil {
req, err = grab.NewRequest(r.TempPath, r.URL)
if err != nil {
r.Status = Error
r.Error = err
return
}
}
r.Status = Downloading
r.Response = d.Grab.Do(req)
go func(r *Request) {
log.Println("wait for download")
log.Println(r.Response.IsComplete())
r.Response.Wait()
log.Println("download completed for", r.URL)
d.requestDone <- r
}(r)
}
func (d Downloader) getRunningDownloads() []*Request {
var (
running = make([]*Request, 0, d.MaxActiveDownloads)
)
for _, req := range d.Downloads.Queue {
if req.Status == Downloading && req.Response != nil {
running = append(running, req)
}
}
return running
}
func (d *Downloader) syncDownloads() {
if len(d.getRunningDownloads()) >= d.MaxActiveDownloads {
return
}
sort.Stable(d.Downloads)
// Start new downloads
for i, req := range d.Downloads.Queue {
if d.MaxActiveDownloads >= len(d.getRunningDownloads()) {
if req.Status == Queued {
d.startDownload(i)
}
}
}
// Clean completed/canceled downloads
for i := 0; i < d.Downloads.Len(); i++ {
if d.Downloads.Queue[i].Status == Complete || d.Downloads.Queue[i].Status == Canceled {
d.History.Queue = append(d.History.Queue, d.Downloads.Pop(i))
i--
}
}
}
func (d *Downloader) requestCompleted(r *Request) {
if r.Response.Err() == nil {
log.Println("removing from downloads")
d.Downloads.remove(r)
r.Status = Complete
log.Println(r.TempPath, "!=", r.FilePath)
if r.TempPath != r.FilePath {
log.Println("renaming download to the completed dir")
os.Rename(r.TempPath, r.FilePath)
}
d.History.Queue = append(d.History.Queue, r)
} else {
r.Status = Error
r.Error = r.Response.Err()
log.Println("fucking error:", r.Error)
}
}
func (d *Downloader) download() {
for {
select {
case <-time.After(10 * time.Second):
d.syncDownloads()
case r := <-d.NewRequest:
d.addRequest(&r)
if d.OnAdd != nil {
d.OnAdd(r)
}
case r := <-d.requestDone:
log.Println("finishing request for", r.URL)
d.requestCompleted(r)
if d.OnComplete != nil {
d.OnComplete(*r)
}
}
}
}