Implement a buffer pool

Downloads in quick succession could cause an OOM when the GC couldn't
  keep up
This commit is contained in:
Timmy Welch 2025-01-09 02:07:36 -08:00
parent 260a13688a
commit 033c68593b
2 changed files with 30 additions and 10 deletions

View File

@ -74,6 +74,15 @@ var formatValues = map[string]Format{
"msgpack": Msgpack, "msgpack": Msgpack,
} }
var bufPool = &sync.Pool{
New: func() any {
// The Pool's New function should generally only return pointer
// types, since a pointer can be put into the return interface
// value without an allocation:
return new(bytes.Buffer)
},
}
func (f Format) String() string { func (f Format) String() string {
if name, known := formatNames[f]; known { if name, known := formatNames[f]; known {
return name return name
@ -799,15 +808,21 @@ func downloadProcessor(chdb ch.CHDB, opts Opts, imagePaths chan cv.Download, ser
panic(err) panic(err)
} }
} else { } else {
file = io.NopCloser(bytes.NewBuffer(path.Image)) file = io.NopCloser(path.Image)
} }
i, format, err := image.Decode(bufio.NewReader(file)) i, format, err := image.Decode(bufio.NewReader(file))
if err != nil { if err != nil {
file.Close() file.Close()
log.Println("Reading image failed", path.Dest) log.Println("Reading image failed", path.Dest, err)
if path.Image != nil {
bufPool.Put(path.Image)
}
continue // skip this image continue // skip this image
} }
file.Close() file.Close()
if path.Image != nil {
bufPool.Put(path.Image)
}
chdb.AddPath(path.Dest) // Add to sqlite db and remove file if opts.deleteHashedImages is true chdb.AddPath(path.Dest) // Add to sqlite db and remove file if opts.deleteHashedImages is true
im := ch.Im{ im := ch.Im{
@ -906,7 +921,7 @@ func startServer(opts Opts) {
if opts.cv.thumbOnly { if opts.cv.thumbOnly {
imageTypes = append(imageTypes, "thumb_url") imageTypes = append(imageTypes, "thumb_url")
} }
cvdownloader := cv.NewCVDownloader(server.Context, chdb, opts.cv.path, opts.cv.APIKey, imageTypes, opts.cv.keepDownloaded, opts.cv.hashDownloaded, finishedDownloadQueue) cvdownloader := cv.NewCVDownloader(server.Context, bufPool, chdb, opts.cv.path, opts.cv.APIKey, imageTypes, opts.cv.keepDownloaded, opts.cv.hashDownloaded, finishedDownloadQueue)
go func() { go func() {
defer dwg.Done() defer dwg.Done()
cv.DownloadCovers(cvdownloader) cv.DownloadCovers(cvdownloader)

View File

@ -30,7 +30,7 @@ type Download struct {
URL string URL string
Dest string Dest string
IssueID string IssueID string
Image []byte Image *bytes.Buffer
} }
type Issue struct { type Issue struct {
@ -80,6 +80,7 @@ type CVDownloader struct {
imageDownloads chan download imageDownloads chan download
notFound chan download notFound chan download
chdb ch.CHDB chdb ch.CHDB
bufPool *sync.Pool
} }
var ( var (
@ -375,13 +376,16 @@ func (c *CVDownloader) start_downloader() {
} }
} else { } else {
image := &bytes.Buffer{} image := c.bufPool.Get().(*bytes.Buffer)
log.Println("downloading", dl.dest) log.Println("downloading", dl.dest)
_, err = io.Copy(image, resp.Body) _, err = io.Copy(image, resp.Body)
if err != nil { if err != nil {
log.Println("Failed when downloading image", err) log.Println("Failed when downloading image", err)
cleanup() cleanup()
os.Remove(dl.dest) os.Remove(dl.dest)
if image != nil {
c.bufPool.Put(image)
}
continue continue
} }
@ -389,7 +393,7 @@ func (c *CVDownloader) start_downloader() {
URL: dl.url, URL: dl.url,
Dest: dl.dest, Dest: dl.dest,
IssueID: strconv.Itoa(dl.issueID), IssueID: strconv.Itoa(dl.issueID),
Image: image.Bytes(), Image: image,
} }
} }
cleanup() cleanup()
@ -564,15 +568,16 @@ func (c *CVDownloader) cleanDirs() {
}) })
} }
func NewCVDownloader(ctx context.Context, chdb ch.CHDB, workPath, APIKey string, imageTypes []string, keepDownloadedImages, sendExistingImages bool, finishedDownloadQueue chan Download) *CVDownloader { func NewCVDownloader(ctx context.Context, bufPool *sync.Pool, chdb ch.CHDB, workPath, APIKey string, imageTypes []string, keepDownloadedImages, sendExistingImages bool, finishedDownloadQueue chan Download) *CVDownloader {
return &CVDownloader{ return &CVDownloader{
Context: ctx, Context: ctx,
JSONPath: filepath.Join(workPath, "_json"), JSONPath: filepath.Join(workPath, "_json"),
ImagePath: filepath.Join(workPath, "_image"), ImagePath: filepath.Join(workPath, "_image"),
APIKey: APIKey, APIKey: APIKey,
downloadQueue: make(chan *CVResult, 1000), downloadQueue: make(chan *CVResult, 1000), // This is just json it shouldn't take up much more than 122 MB
imageDownloads: make(chan download, 250), imageDownloads: make(chan download, 250), // These are just URLs should only take a few MB
notFound: make(chan download, 100), notFound: make(chan download, 100), // Same here
bufPool: bufPool, // Only used if keepDownloadedImages is false to save space on byte buffers. The buffers get sent back via finishedDownloadQueue
FinishedDownloadQueue: finishedDownloadQueue, FinishedDownloadQueue: finishedDownloadQueue,
SendExistingImages: sendExistingImages, SendExistingImages: sendExistingImages,
KeepDownloadedImages: keepDownloadedImages, KeepDownloadedImages: keepDownloadedImages,