Make deleting files efficient

This commit is contained in:
Timmy Welch 2024-12-15 14:15:33 -08:00
parent 7ede0dee72
commit cc4e973bf9
3 changed files with 178 additions and 107 deletions

100
CHDB.go Normal file
View File

@ -0,0 +1,100 @@
package ch
import (
"database/sql"
"fmt"
"log"
"os"
"path/filepath"
_ "modernc.org/sqlite"
)
type CHDB struct {
comicvinePath string
sql *sql.DB
deleteExisting bool
}
func OpenCHDB(path string, comicvinePath string, deleteExisting bool) (CHDB, error) {
path, _ = filepath.Abs(path)
err := os.MkdirAll(filepath.Dir(path), 0o755)
if err != nil {
panic("Unable to create directory " + filepath.Dir(path))
}
println(fmt.Sprintf("file://%s?&_pragma=busy_timeout(500)&_pragma=journal_mode(wal)", path))
sql, err := sql.Open("sqlite", fmt.Sprintf("file://%s?&_pragma=busy_timeout(500)&_pragma=journal_mode(wal)", path))
if err != nil {
return CHDB{comicvinePath, sql, deleteExisting}, fmt.Errorf("Failed to open database: %w", err)
}
err = sql.Ping()
if err != nil {
return CHDB{comicvinePath, sql, deleteExisting}, fmt.Errorf("Failed to open database: %w", err)
}
_, err = sql.Exec(`
CREATE TABLE IF NOT EXISTS paths(
path STRING PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS bad_urls(
url STRING PRIMARY KEY
);
`)
if err != nil {
err = fmt.Errorf("Failed to create table: %w", err)
}
return CHDB{comicvinePath, sql, deleteExisting}, err
}
func (s CHDB) PathHashed(path string) bool {
path, _ = filepath.Rel(s.comicvinePath, path)
dbPath := ""
_ = s.sql.QueryRow("SELECT path FROM paths where path=?", path).Scan(&dbPath)
if dbPath == path && s.deleteExisting {
os.Remove(filepath.Join(s.comicvinePath, path))
}
return dbPath == path
}
func (s CHDB) PathDownloaded(path string) bool {
path, _ = filepath.Rel(s.comicvinePath, path)
dbPath := ""
_ = s.sql.QueryRow("SELECT path FROM paths where path=?", path).Scan(&dbPath)
if dbPath != path {
f, err := os.Open(filepath.Join(s.comicvinePath, path))
if err == nil {
defer f.Close()
}
return !os.IsNotExist(err)
}
return true
}
func (s CHDB) AddPath(path string) {
path, _ = filepath.Rel(s.comicvinePath, path)
_, err := s.sql.Exec("INSERT INTO paths VALUES(?) ON CONFLICT DO NOTHING", path)
if err != nil {
log.Println(fmt.Errorf("Failed to insert %v into paths: %w", path, err))
}
if s.deleteExisting {
os.Remove(path)
}
}
func (s CHDB) CheckURL(url string) bool {
dbURL := ""
_ = s.sql.QueryRow("SELECT url FROM bad_urls where url=?", url).Scan(&dbURL)
return dbURL == url
}
func (s CHDB) AddURL(url string) {
_, err := s.sql.Exec("INSERT INTO bad_urls VALUES(?) ON CONFLICT DO NOTHING", url)
if err != nil {
log.Println(fmt.Errorf("Failed to insert %v into bad_urls: %w", url, err))
}
}
func (s CHDB) Close() error {
return s.sql.Close()
}

View File

@ -136,16 +136,17 @@ type Encoder func(any) ([]byte, error)
type Decoder func([]byte, interface{}) error
type Opts struct {
cpuprofile string
coverPath string
sqlitePath string
loadEmbeddedHashes bool
saveEmbeddedHashes bool
format Format
hashesPath string
storageType Storage
onlyHashNewIDs bool
truncateHashedImages bool
cpuprofile string
coverPath string
sqlitePath string
loadEmbeddedHashes bool
saveEmbeddedHashes bool
format Format
hashesPath string
storageType Storage
onlyHashNewIDs bool
deleteHashedImages bool
path string
cv struct {
downloadCovers bool
@ -158,24 +159,32 @@ type Opts struct {
func main() {
opts := Opts{format: Msgpack, storageType: BasicMap} // flag is weird
wd, err := os.Getwd()
fmt.Println(err)
if err != nil {
wd = "comic-hasher"
} else {
wd = filepath.Join(wd, "comic-hasher")
}
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
flag.StringVar(&opts.cpuprofile, "cpuprofile", "", "Write cpu profile to file")
flag.StringVar(&opts.path, "path", wd, "Path for comic-hasher to store files")
flag.StringVar(&opts.coverPath, "cover-path", "", "Path to local covers to add to hash database. Must be in the form '{cover-path}/{domain}/{id}/*' eg for --cover-path /covers it should look like /covers/comicvine.gamespot.com/10000/image.gif")
flag.StringVar(&opts.sqlitePath, "sqlite-path", "tmp.sqlite", "Path to sqlite database to use for matching hashes, substantialy reduces memory usage")
flag.StringVar(&opts.sqlitePath, "sqlite-path", "", fmt.Sprintf("Path to sqlite database to use for matching hashes, substantialy reduces memory usage (default %v)", filepath.Join(wd, "tmp.sqlite")))
flag.BoolVar(&opts.loadEmbeddedHashes, "use-embedded-hashes", true, "Use hashes embedded in the application as a starting point")
flag.BoolVar(&opts.saveEmbeddedHashes, "save-embedded-hashes", false, "Save hashes even if we loaded the embedded hashes")
flag.StringVar(&opts.hashesPath, "hashes", "hashes.gz", "Path to optionally gziped hashes in msgpack or json format. You must disable embedded hashes to use this option")
flag.StringVar(&opts.hashesPath, "hashes", "", fmt.Sprintf("Path to optionally gziped hashes in msgpack or json format. You must disable embedded hashes to use this option (default %v)", filepath.Join(wd, "hashes.gz")))
flag.Var(&opts.format, "save-format", "Specify the format to export hashes to (json, msgpack)")
flag.Var(&opts.storageType, "storage-type", "Specify the storage type used internally to search hashes (sqlite,sqlite3,map,basicmap,vptree)")
flag.BoolVar(&opts.onlyHashNewIDs, "only-hash-new-ids", true, "Only hashes new covers from CV/local path (Note: If there are multiple covers for the same ID they may get queued at the same time and hashed on the first run)")
flag.BoolVar(&opts.truncateHashedImages, "trucate-hashed-images", true, "Truncates downloaded images after hashing them, useful to save space, implies -only-hash-new-ids")
flag.BoolVar(&opts.onlyHashNewIDs, "only-hash-new-ids", true, "Only hashes new covers from CV/local path (Note: If there are multiple covers for the same ID they may get queued at the same time and hashed on the first run, implies -cv-thumb-only if -delete-hashed-images is set)")
flag.BoolVar(&opts.deleteHashedImages, "delete-hashed-images", false, "Deletes downloaded images after hashing them, useful to save space, paths are recorded in ch.sqlite")
flag.BoolVar(&opts.cv.downloadCovers, "cv-dl-covers", false, "Downloads all covers from ComicVine and adds them to the server")
flag.StringVar(&opts.cv.APIKey, "cv-api-key", "", "API Key to use to access the ComicVine API")
flag.StringVar(&opts.cv.path, "cv-path", "", "Path to store ComicVine data in")
flag.StringVar(&opts.cv.path, "cv-path", "", fmt.Sprintf("Path to store ComicVine data in (default %v)", filepath.Join(wd, "comicvine")))
flag.BoolVar(&opts.cv.thumbOnly, "cv-thumb-only", true, "Only downloads the thumbnail image from comicvine")
flag.BoolVar(&opts.cv.hashDownloaded, "cv-hash-downloaded", true, "Hash already downloaded images")
flag.Parse()
@ -186,17 +195,28 @@ func main() {
panic(err)
}
}
opts.onlyHashNewIDs = opts.onlyHashNewIDs || opts.truncateHashedImages
// opts.onlyHashNewIDs = opts.onlyHashNewIDs || opts.deleteHashedImages
if opts.cv.downloadCovers {
if opts.cv.APIKey == "" {
log.Fatal("No ComicVine API Key provided")
}
if opts.cv.path == "" {
log.Fatal("No path provided for ComicVine data")
}
}
opts.cv.thumbOnly = opts.cv.thumbOnly || (opts.onlyHashNewIDs && opts.deleteHashedImages)
opts.path, _ = filepath.Abs(opts.path)
if opts.hashesPath == "" {
opts.hashesPath = filepath.Join(opts.path, "hashes.gz")
}
opts.hashesPath, _ = filepath.Abs(opts.hashesPath)
if opts.sqlitePath == "" {
opts.sqlitePath = filepath.Join(opts.path, "tmp.sqlite")
}
opts.sqlitePath, _ = filepath.Abs(opts.sqlitePath)
log.Println(pretty.Formatter(opts))
if opts.cv.path == "" {
opts.cv.path = filepath.Join(opts.path, "comicvine")
}
opts.cv.path, _ = filepath.Abs(opts.cv.path)
pretty.Log(opts)
startServer(opts)
}
@ -505,7 +525,7 @@ func (s *Server) hasher(workerID int, done func(int)) {
for image := range s.hashingQueue {
start := time.Now()
if image.NewOnly && len(s.hashes.GetIDs(image.ID)) > 0 {
fmt.Println("skipping", image)
log.Printf("Skipping existing hash with ID: %s found", image.ID)
continue
}
hash := ch.HashImage(image)
@ -749,7 +769,7 @@ func saveHashes(opts Opts, encodeHashes func(format Format) ([]byte, error)) {
}
}
func downloadProcessor(opts Opts, imagePaths chan cv.Download, server Server) {
func downloadProcessor(chdb ch.CHDB, opts Opts, imagePaths chan cv.Download, server Server) {
defer func() {
log.Println("Download Processor completed")
}()
@ -759,23 +779,22 @@ func downloadProcessor(opts Opts, imagePaths chan cv.Download, server Server) {
continue
}
file, err := os.OpenFile(path.Dest, os.O_RDWR|os.O_CREATE, 0666)
if chdb.PathHashed(path.Dest) {
// log.Println(path.Dest, "File has already been hashed, it may not be saved in the hashes file because we currently don't save any hashes if we've crashed")
continue
}
file, err := os.OpenFile(path.Dest, os.O_RDWR, 0666)
if err != nil {
panic(err)
}
i, format, err := image.Decode(bufio.NewReader(file))
if err != nil {
file.Close()
log.Println("Reading image failed", path.Dest)
continue // skip this image
}
if opts.truncateHashedImages {
file.Seek(0, io.SeekStart)
err = file.Truncate(0)
if err != nil {
log.Printf("Failed to truncate %#v: %v", path.Dest, err)
}
}
file.Close()
chdb.AddPath(path.Dest) // Add to sqlite db and remove file if opts.deleteHashedImages is true
im := ch.Im{
Im: i,
@ -788,7 +807,7 @@ func downloadProcessor(opts Opts, imagePaths chan cv.Download, server Server) {
log.Println("Recieved quit")
return
case server.hashingQueue <- im:
log.Println("Sending:", im)
// log.Println("Sending:", im)
}
}
}
@ -858,11 +877,15 @@ func startServer(opts Opts) {
loadHashes(opts, server.DecodeHashes)
server.HashLocalImages(opts)
chdb, err := ch.OpenCHDB(filepath.Join(opts.path, "ch.sqlite"), opts.cv.path, opts.deleteHashedImages)
if err != nil {
panic(err)
}
log.Println("Init downloaders")
dwg := sync.WaitGroup{}
finishedDownloadQueue := make(chan cv.Download)
go downloadProcessor(opts, finishedDownloadQueue, server)
go downloadProcessor(chdb, opts, finishedDownloadQueue, server)
if opts.cv.downloadCovers {
dwg.Add(1)
@ -870,7 +893,7 @@ func startServer(opts Opts) {
if opts.cv.thumbOnly {
imageTypes = append(imageTypes, "thumb_url")
}
cvdownloader := cv.NewCVDownloader(server.Context, opts.cv.path, opts.cv.APIKey, imageTypes, opts.cv.hashDownloaded, finishedDownloadQueue)
cvdownloader := cv.NewCVDownloader(server.Context, chdb, opts.cv.path, opts.cv.APIKey, imageTypes, opts.cv.hashDownloaded, finishedDownloadQueue)
go func() {
defer dwg.Done()
cv.DownloadCovers(cvdownloader)
@ -921,6 +944,7 @@ func startServer(opts Opts) {
close(finishedDownloadQueue)
for range finishedDownloadQueue {
}
_ = chdb.Close()
// server.EncodeHashes would normally need a read lock
// the server has been stopped so it's not needed here

View File

@ -21,6 +21,8 @@ import (
"time"
"slices"
ch "gitea.narnian.us/lordwelch/comic-hasher"
)
type Download struct {
@ -70,12 +72,11 @@ type CVDownloader struct {
fileList []fs.DirEntry
totalResults int
badURLs []string
bMut sync.Mutex
imageWG sync.WaitGroup
downloadQueue chan *CVResult
imageDownloads chan download
notFound chan download
chdb ch.CHDB
}
var (
@ -83,28 +84,6 @@ var (
ErrInvalidPage = errors.New("Invalid ComicVine Page")
)
func (c *CVDownloader) InsertBadURL(url string) {
c.bMut.Lock()
defer c.bMut.Unlock()
index, itemFound := slices.BinarySearch(c.badURLs, url)
if itemFound {
return
}
c.badURLs = slices.Insert(c.badURLs, index, url)
}
func (c *CVDownloader) InsertBadURLs(url ...string) {
c.bMut.Lock()
defer c.bMut.Unlock()
c.badURLs = append(c.badURLs, url...)
slices.Sort(c.badURLs)
}
func (c *CVDownloader) IsBadURL(url string) bool {
c.bMut.Lock()
defer c.bMut.Unlock()
_, itemFound := slices.BinarySearch(c.badURLs, url)
return itemFound
}
func (c *CVDownloader) readJson() ([]*CVResult, error) {
var issues []*CVResult
for _, file_entry := range c.fileList {
@ -272,7 +251,6 @@ func (c *CVDownloader) updateIssues() {
}
resp, err, cancelDownloadCTX := Get(c.Context, URI.String())
if err != nil {
_ = resp.Body.Close()
cancelDownloadCTX()
if retry(URI.String(), err) {
continue
@ -338,6 +316,7 @@ func (c *CVDownloader) start_downloader() {
for i := range 5 {
go func() {
log.Println("starting downloader", i)
dir_created := make(map[string]bool)
for dl := range c.imageDownloads {
if c.hasQuit() {
c.imageWG.Done()
@ -358,10 +337,11 @@ func (c *CVDownloader) start_downloader() {
}
continue
}
dir := filepath.Dir(dl.dest)
resp, err, cancelDownload := Get(c.Context, dl.url)
if err != nil {
cancelDownload()
log.Println("Failed to download", dl.url, err)
log.Println("Failed to download", dl.volumeID, "/", dl.issueID, dl.url, err)
c.imageWG.Done()
continue
}
@ -381,6 +361,10 @@ func (c *CVDownloader) start_downloader() {
cleanup()
continue
}
if !dir_created[dir] {
_ = os.MkdirAll(dir, 0o755)
dir_created[dir] = true
}
image, err := os.Create(dl.dest)
if err != nil {
log.Println("Unable to create image file", dl.dest, err)
@ -408,42 +392,10 @@ func (c *CVDownloader) start_downloader() {
}
}
func (c *CVDownloader) loadBadURLs(path string) error {
bad_urls_file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return fmt.Errorf("Unable to read bad_urls: %w", err)
}
bad_urls_bytes, err := io.ReadAll(bad_urls_file)
bad_urls_file.Close()
if err != nil {
return fmt.Errorf("Unable to read bad_urls: %w", err)
}
c.bMut.Lock()
c.badURLs = strings.Split(string(bad_urls_bytes), "\n")
c.bMut.Unlock()
return nil
}
func (c *CVDownloader) handleNotFound() {
err := c.loadBadURLs("bad_urls")
if err != nil {
panic(err)
}
file, err := os.OpenFile("bad_urls", os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
panic(err)
}
defer file.Close()
_, err = file.Seek(0, io.SeekEnd)
if err != nil {
panic(err)
}
for failedDownload := range c.notFound {
c.InsertBadURL(failedDownload.url)
c.chdb.AddURL(failedDownload.url)
log.Printf("Not found: volumeID: %d issueID: %d Offset: %d URL: %s\n", failedDownload.volumeID, failedDownload.issueID, failedDownload.offset, failedDownload.url)
file.Write([]byte(failedDownload.url))
file.Write([]byte("\n"))
file.Sync()
}
}
@ -456,7 +408,6 @@ func (c *CVDownloader) downloadImages() {
go c.handleNotFound()
added := 0
dir_created := make(map[string]bool)
for list := range c.downloadQueue {
log.Printf("Checking downloads at offset %v\r", list.Offset)
for _, issue := range list.Results {
@ -472,7 +423,7 @@ func (c *CVDownloader) downloadImages() {
if len(c.ImageTypes) > 0 && !slices.Contains(c.ImageTypes, image.name) {
continue
}
if c.IsBadURL(image.url) {
if c.chdb.CheckURL(image.url) {
log.Printf("Skipping known bad url %s", image.url)
continue
}
@ -491,13 +442,13 @@ func (c *CVDownloader) downloadImages() {
if ext == "" || (len(ext) > 4 && !slices.Contains([]string{".avif", ".webp", ".tiff", ".heif"}, ext)) {
ext = ".jpg"
}
path := filepath.Join(c.ImagePath, strconv.Itoa(issue.Volume.ID), strconv.Itoa(issue.ID), image.name+ext)
dir := filepath.Join(c.ImagePath, strconv.Itoa(issue.Volume.ID), strconv.Itoa(issue.ID))
path := filepath.Join(dir, image.name+ext)
image_file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0666)
if errors.Is(err, os.ErrExist) {
if c.SendExistingImages {
if c.chdb.PathDownloaded(path) {
if _, err = os.Stat(path); c.SendExistingImages && err == nil {
// We don't add to the count of added as these should be processed immediately
log.Printf("Sending Existing image %v/%v %v", issue.Volume.ID, issue.ID, path)
c.imageWG.Add(1)
c.imageDownloads <- download{
url: image.url,
@ -510,13 +461,8 @@ func (c *CVDownloader) downloadImages() {
}
continue // If it exists assume it is fine, adding some basic verification might be a good idea later
}
dir := filepath.Join(c.ImagePath, strconv.Itoa(issue.Volume.ID), strconv.Itoa(issue.ID))
if !dir_created[dir] {
os.MkdirAll(dir, 0o777)
dir_created[dir] = true
}
added++
image_file.Close()
c.imageWG.Add(1)
c.imageDownloads <- download{
url: image.url,
@ -564,7 +510,7 @@ list:
if c.hasQuit() {
return ErrQuit
}
if c.IsBadURL(url) {
if c.chdb.CheckURL(url) {
indexesToRemove = append(indexesToRemove, i)
if err := os.Remove(filepath.Join(c.JSONPath, jsonFile.Name())); err != nil {
return err
@ -591,7 +537,7 @@ func (c *CVDownloader) hasQuit() bool {
}
}
func NewCVDownloader(ctx context.Context, workPath, APIKey string, imageTypes []string, sendExistingImages bool, finishedDownloadQueue chan Download) *CVDownloader {
func NewCVDownloader(ctx context.Context, chdb ch.CHDB, workPath, APIKey string, imageTypes []string, sendExistingImages bool, finishedDownloadQueue chan Download) *CVDownloader {
return &CVDownloader{
Context: ctx,
JSONPath: filepath.Join(workPath, "_json"),
@ -603,6 +549,7 @@ func NewCVDownloader(ctx context.Context, workPath, APIKey string, imageTypes []
FinishedDownloadQueue: finishedDownloadQueue,
SendExistingImages: sendExistingImages,
ImageTypes: imageTypes,
chdb: chdb,
}
}