Compare commits

..

No commits in common. "8ce1ca3354fb90ad136de4e3bdc55405673eee2a" and "8a9aec4884d32ae5b527b7cacc6e8bcc2ec53986" have entirely different histories.

2 changed files with 47 additions and 41 deletions
cmd/comic-hasher
cv

@ -162,7 +162,6 @@ type Opts struct {
APIKey string
path string
thumbOnly bool
originalOnly bool
hashDownloaded bool
keepDownloaded bool
}
@ -197,7 +196,6 @@ func main() {
flag.StringVar(&opts.cv.APIKey, "cv-api-key", "", "API Key to use to access the ComicVine API")
flag.StringVar(&opts.cv.path, "cv-path", "", fmt.Sprintf("Path to store ComicVine data in (default %v)", filepath.Join(wd, "comicvine")))
flag.BoolVar(&opts.cv.thumbOnly, "cv-thumb-only", true, "Only downloads the thumbnail image from comicvine, when false sets -only-hash-new-ids=false")
flag.BoolVar(&opts.cv.originalOnly, "cv-original-only", true, "Only downloads the original image from comicvine, when false sets -only-hash-new-ids=false")
flag.BoolVar(&opts.cv.hashDownloaded, "cv-hash-downloaded", true, "Hash already downloaded images")
flag.BoolVar(&opts.cv.keepDownloaded, "cv-keep-downloaded", true, "Keep downloaded images. When set to false does not ever write to the filesystem, a crash or exiting can mean some images need to be re-downloaded")
flag.Parse()
@ -641,10 +639,20 @@ func (s *Server) DecodeHashes(format Format, hashes []byte) error {
}
func (s *Server) HashLocalImages(opts Opts) {
if opts.coverPath == "" {
return
}
go func() {
alreadyQuit := false
if opts.coverPath == "" {
select {
case sig := <-s.signalQueue:
log.Printf("Signal: %v\n", sig)
s.cancel()
case <-s.Context.Done():
log.Println("Recieved quit")
}
err := s.httpServer.Shutdown(context.TODO())
log.Println("Err:", err)
return
}
log.Println("Hashing covers at ", opts.coverPath)
start := time.Now()
err := filepath.WalkDir(opts.coverPath, func(path string, d fs.DirEntry, err error) error {
@ -652,6 +660,11 @@ func (s *Server) HashLocalImages(opts Opts) {
return err
}
select {
case signal := <-s.signalQueue:
err = s.httpServer.Shutdown(context.TODO())
alreadyQuit = true
s.cancel()
return fmt.Errorf("signal: %v, %w", signal, err)
case <-s.Context.Done():
log.Println("Recieved quit")
err = s.httpServer.Shutdown(context.TODO())
@ -667,19 +680,14 @@ func (s *Server) HashLocalImages(opts Opts) {
})
elapsed := time.Since(start)
log.Println("Err:", err, "local hashing took", elapsed)
}()
}
func signalHandler(s *Server) {
select {
case sig := <-s.signalQueue:
log.Printf("Signal: %v\n", sig)
s.cancel()
case <-s.Context.Done():
log.Println("Recieved quit: Attempting to shutdown gracefully")
}
err := s.httpServer.Shutdown(context.TODO())
log.Println("Err:", err)
sig := <-s.signalQueue
if !alreadyQuit {
s.cancel()
}
err = s.httpServer.Shutdown(context.TODO())
log.Printf("Signal: %v, error: %v", sig, err)
}()
}
func initializeStorage(opts Opts) (ch.HashStorage, error) {
@ -779,7 +787,6 @@ func saveHashes(opts Opts, encodeHashes func(format Format) ([]byte, error)) {
func downloadProcessor(chdb ch.CHDB, opts Opts, imagePaths chan cv.Download, server Server) {
defer func() {
log.Println("Download Processor completed")
close(server.hashingQueue)
}()
for path := range imagePaths {
id := ch.ID{Domain: ch.ComicVine, ID: path.IssueID}
@ -824,7 +831,12 @@ func downloadProcessor(chdb ch.CHDB, opts Opts, imagePaths chan cv.Download, ser
ID: id,
NewOnly: opts.onlyHashNewIDs,
}
server.hashingQueue <- im
select {
case <-server.Context.Done():
log.Println("Recieved quit")
return
case server.hashingQueue <- im:
}
}
}
@ -845,7 +857,7 @@ func startServer(opts Opts) {
Context: ctx,
cancel: cancel,
signalQueue: make(chan os.Signal, 1),
readerQueue: make(chan string, 1),
readerQueue: make(chan string, 100),
hashingQueue: make(chan ch.Im, 1),
mappingQueue: make(chan ch.ImageHash, 1),
mux: mux,
@ -900,7 +912,7 @@ func startServer(opts Opts) {
log.Println("Init downloaders")
dwg := sync.WaitGroup{}
finishedDownloadQueue := make(chan cv.Download, 1)
finishedDownloadQueue := make(chan cv.Download, 10)
go downloadProcessor(chdb, opts, finishedDownloadQueue, server)
if opts.cv.downloadCovers {
@ -909,9 +921,6 @@ func startServer(opts Opts) {
if opts.cv.thumbOnly {
imageTypes = append(imageTypes, "thumb_url")
}
if opts.cv.originalOnly {
imageTypes = append(imageTypes, "original_url")
}
cvdownloader := cv.NewCVDownloader(server.Context, bufPool, chdb, opts.cv.path, opts.cv.APIKey, imageTypes, opts.cv.keepDownloaded, opts.cv.hashDownloaded, finishedDownloadQueue)
go func() {
defer dwg.Done()
@ -928,7 +937,6 @@ func startServer(opts Opts) {
}()
}
go signalHandler(&server)
log.Println("Listening on ", server.httpServer.Addr)
err = server.httpServer.ListenAndServe()
if err != nil {
@ -942,14 +950,9 @@ func startServer(opts Opts) {
}
log.Println("waiting on downloaders")
dwg.Wait() // Downloaders send to finishedDownloadQueue which sends to server.hashingQueue
dwg.Wait() // Downloaders send to server.hashingQueue
log.Println("waiting on downloader")
close(finishedDownloadQueue)
for range finishedDownloadQueue {
}
// close(server.hashingQueue) // Closed by downloadProcessor
close(server.hashingQueue)
log.Println("waiting on hashers")
hwg.Wait()
for range server.hashingQueue {
@ -965,6 +968,10 @@ func startServer(opts Opts) {
for range server.signalQueue {
}
log.Println("waiting on downloader")
close(finishedDownloadQueue)
for range finishedDownloadQueue {
}
_ = chdb.Close()
// server.EncodeHashes would normally need a read lock

@ -108,7 +108,7 @@ func (c *CVDownloader) readJson() ([]*CVResult, error) {
return issues, nil
}
func (c *CVDownloader) loadIssues(file_entry fs.DirEntry) (*CVResult, error) {
tmp := &CVResult{Results: make([]Issue, 0, 100)}
tmp := &CVResult{}
file, err := os.Open(filepath.Join(c.JSONPath, file_entry.Name()))
if err != nil {
return nil, err
@ -220,8 +220,8 @@ func (c *CVDownloader) updateIssues() {
continue
} else {
log.Println("Failed to read page at offset ", offset, err)
os.Remove(filepath.Join(c.JSONPath, c.fileList[index].Name()))
c.fileList = slices.Delete(c.fileList, index, (index)+1)
os.Remove(filepath.Join(c.JSONPath, c.fileList[offset/100].Name()))
c.fileList = slices.Delete(c.fileList, offset/100, (offset/100)+1)
}
}
@ -377,7 +377,6 @@ func (c *CVDownloader) start_downloader() {
} else {
image := c.bufPool.Get().(*bytes.Buffer)
image.Reset()
log.Println("downloading", dl.dest)
_, err = io.Copy(image, resp.Body)
if err != nil {
@ -575,10 +574,10 @@ func NewCVDownloader(ctx context.Context, bufPool *sync.Pool, chdb ch.CHDB, work
JSONPath: filepath.Join(workPath, "_json"),
ImagePath: filepath.Join(workPath, "_image"),
APIKey: APIKey,
downloadQueue: make(chan *CVResult, 1), // This is just json it shouldn't take up much more than 122 MB
imageDownloads: make(chan download, 1), // These are just URLs should only take a few MB
notFound: make(chan download, 1), // Same here
bufPool: bufPool, // Only used if keepDownloadedImages is false to save space on byte buffers. The buffers get sent back via finishedDownloadQueue
downloadQueue: make(chan *CVResult, 1000), // This is just json it shouldn't take up much more than 122 MB
imageDownloads: make(chan download, 250), // These are just URLs should only take a few MB
notFound: make(chan download, 100), // Same here
bufPool: bufPool, // Only used if keepDownloadedImages is false to save space on byte buffers. The buffers get sent back via finishedDownloadQueue
FinishedDownloadQueue: finishedDownloadQueue,
SendExistingImages: sendExistingImages,
KeepDownloadedImages: keepDownloadedImages,
@ -591,7 +590,6 @@ func DownloadCovers(c *CVDownloader) {
var (
err error
)
log.Println("Reading json")
os.MkdirAll(c.JSONPath, 0o777)
f, _ := os.Create(filepath.Join(c.ImagePath, ".keep"))
f.Close()
@ -611,6 +609,7 @@ func DownloadCovers(c *CVDownloader) {
c.totalResults, _ = strconv.Atoi(last_file[3 : len(last_file)-1-4])
}
c.totalResults += 100
log.Println("Reading json")
log.Println("Number of pages", len(c.fileList), "Expected Pages:", c.totalResults/100)
log.Println("Updating issues now")