Optimizations

Re-arrange some channels
set all channels to a size of 1
Reset the image download buffer
Allocate the correct slice size for the CV API
Add a flag to only download the original image
This commit is contained in:
Timmy Welch 2025-01-10 22:28:51 -08:00
parent 8a9aec4884
commit 75d60339ee
2 changed files with 38 additions and 44 deletions

View File

@ -162,6 +162,7 @@ type Opts struct {
APIKey string
path string
thumbOnly bool
originalOnly bool
hashDownloaded bool
keepDownloaded bool
}
@ -196,6 +197,7 @@ func main() {
flag.StringVar(&opts.cv.APIKey, "cv-api-key", "", "API Key to use to access the ComicVine API")
flag.StringVar(&opts.cv.path, "cv-path", "", fmt.Sprintf("Path to store ComicVine data in (default %v)", filepath.Join(wd, "comicvine")))
flag.BoolVar(&opts.cv.thumbOnly, "cv-thumb-only", true, "Only downloads the thumbnail image from comicvine, when false sets -only-hash-new-ids=false")
flag.BoolVar(&opts.cv.originalOnly, "cv-original-only", true, "Only downloads the original image from comicvine, when false sets -only-hash-new-ids=false")
flag.BoolVar(&opts.cv.hashDownloaded, "cv-hash-downloaded", true, "Hash already downloaded images")
flag.BoolVar(&opts.cv.keepDownloaded, "cv-keep-downloaded", true, "Keep downloaded images. When set to false does not ever write to the filesystem, a crash or exiting can mean some images need to be re-downloaded")
flag.Parse()
@ -639,20 +641,10 @@ func (s *Server) DecodeHashes(format Format, hashes []byte) error {
}
func (s *Server) HashLocalImages(opts Opts) {
if opts.coverPath == "" {
return
}
go func() {
alreadyQuit := false
if opts.coverPath == "" {
select {
case sig := <-s.signalQueue:
log.Printf("Signal: %v\n", sig)
s.cancel()
case <-s.Context.Done():
log.Println("Recieved quit")
}
err := s.httpServer.Shutdown(context.TODO())
log.Println("Err:", err)
return
}
log.Println("Hashing covers at ", opts.coverPath)
start := time.Now()
err := filepath.WalkDir(opts.coverPath, func(path string, d fs.DirEntry, err error) error {
@ -660,11 +652,6 @@ func (s *Server) HashLocalImages(opts Opts) {
return err
}
select {
case signal := <-s.signalQueue:
err = s.httpServer.Shutdown(context.TODO())
alreadyQuit = true
s.cancel()
return fmt.Errorf("signal: %v, %w", signal, err)
case <-s.Context.Done():
log.Println("Recieved quit")
err = s.httpServer.Shutdown(context.TODO())
@ -680,16 +667,21 @@ func (s *Server) HashLocalImages(opts Opts) {
})
elapsed := time.Since(start)
log.Println("Err:", err, "local hashing took", elapsed)
sig := <-s.signalQueue
if !alreadyQuit {
s.cancel()
}
err = s.httpServer.Shutdown(context.TODO())
log.Printf("Signal: %v, error: %v", sig, err)
}()
}
func signalHandler(s *Server) {
select {
case sig := <-s.signalQueue:
log.Printf("Signal: %v\n", sig)
s.cancel()
case <-s.Context.Done():
log.Println("Recieved quit: Attempting to shutdown gracefully")
}
err := s.httpServer.Shutdown(context.TODO())
log.Println("Err:", err)
}
func initializeStorage(opts Opts) (ch.HashStorage, error) {
switch opts.storageType {
case Map:
@ -787,6 +779,7 @@ func saveHashes(opts Opts, encodeHashes func(format Format) ([]byte, error)) {
func downloadProcessor(chdb ch.CHDB, opts Opts, imagePaths chan cv.Download, server Server) {
defer func() {
log.Println("Download Processor completed")
close(server.hashingQueue)
}()
for path := range imagePaths {
id := ch.ID{Domain: ch.ComicVine, ID: path.IssueID}
@ -831,12 +824,7 @@ func downloadProcessor(chdb ch.CHDB, opts Opts, imagePaths chan cv.Download, ser
ID: id,
NewOnly: opts.onlyHashNewIDs,
}
select {
case <-server.Context.Done():
log.Println("Recieved quit")
return
case server.hashingQueue <- im:
}
server.hashingQueue <- im
}
}
@ -857,7 +845,7 @@ func startServer(opts Opts) {
Context: ctx,
cancel: cancel,
signalQueue: make(chan os.Signal, 1),
readerQueue: make(chan string, 100),
readerQueue: make(chan string, 1),
hashingQueue: make(chan ch.Im, 1),
mappingQueue: make(chan ch.ImageHash, 1),
mux: mux,
@ -912,7 +900,7 @@ func startServer(opts Opts) {
log.Println("Init downloaders")
dwg := sync.WaitGroup{}
finishedDownloadQueue := make(chan cv.Download, 10)
finishedDownloadQueue := make(chan cv.Download, 1)
go downloadProcessor(chdb, opts, finishedDownloadQueue, server)
if opts.cv.downloadCovers {
@ -921,6 +909,9 @@ func startServer(opts Opts) {
if opts.cv.thumbOnly {
imageTypes = append(imageTypes, "thumb_url")
}
if opts.cv.originalOnly {
imageTypes = append(imageTypes, "original_url")
}
cvdownloader := cv.NewCVDownloader(server.Context, bufPool, chdb, opts.cv.path, opts.cv.APIKey, imageTypes, opts.cv.keepDownloaded, opts.cv.hashDownloaded, finishedDownloadQueue)
go func() {
defer dwg.Done()
@ -937,6 +928,7 @@ func startServer(opts Opts) {
}()
}
go signalHandler(&server)
log.Println("Listening on ", server.httpServer.Addr)
err = server.httpServer.ListenAndServe()
if err != nil {
@ -950,9 +942,14 @@ func startServer(opts Opts) {
}
log.Println("waiting on downloaders")
dwg.Wait() // Downloaders send to server.hashingQueue
dwg.Wait() // Downloaders send to finishedDownloadQueue which sends to server.hashingQueue
close(server.hashingQueue)
log.Println("waiting on downloader")
close(finishedDownloadQueue)
for range finishedDownloadQueue {
}
// close(server.hashingQueue) // Closed by downloadProcessor
log.Println("waiting on hashers")
hwg.Wait()
for range server.hashingQueue {
@ -968,10 +965,6 @@ func startServer(opts Opts) {
for range server.signalQueue {
}
log.Println("waiting on downloader")
close(finishedDownloadQueue)
for range finishedDownloadQueue {
}
_ = chdb.Close()
// server.EncodeHashes would normally need a read lock

View File

@ -108,7 +108,7 @@ func (c *CVDownloader) readJson() ([]*CVResult, error) {
return issues, nil
}
func (c *CVDownloader) loadIssues(file_entry fs.DirEntry) (*CVResult, error) {
tmp := &CVResult{}
tmp := &CVResult{Results: make([]Issue, 0, 100)}
file, err := os.Open(filepath.Join(c.JSONPath, file_entry.Name()))
if err != nil {
return nil, err
@ -377,6 +377,7 @@ func (c *CVDownloader) start_downloader() {
} else {
image := c.bufPool.Get().(*bytes.Buffer)
image.Reset()
log.Println("downloading", dl.dest)
_, err = io.Copy(image, resp.Body)
if err != nil {
@ -574,10 +575,10 @@ func NewCVDownloader(ctx context.Context, bufPool *sync.Pool, chdb ch.CHDB, work
JSONPath: filepath.Join(workPath, "_json"),
ImagePath: filepath.Join(workPath, "_image"),
APIKey: APIKey,
downloadQueue: make(chan *CVResult, 1000), // This is just json it shouldn't take up much more than 122 MB
imageDownloads: make(chan download, 250), // These are just URLs should only take a few MB
notFound: make(chan download, 100), // Same here
bufPool: bufPool, // Only used if keepDownloadedImages is false to save space on byte buffers. The buffers get sent back via finishedDownloadQueue
downloadQueue: make(chan *CVResult, 1), // This is just json it shouldn't take up much more than 122 MB
imageDownloads: make(chan download, 1), // These are just URLs should only take a few MB
notFound: make(chan download, 1), // Same here
bufPool: bufPool, // Only used if keepDownloadedImages is false to save space on byte buffers. The buffers get sent back via finishedDownloadQueue
FinishedDownloadQueue: finishedDownloadQueue,
SendExistingImages: sendExistingImages,
KeepDownloadedImages: keepDownloadedImages,