mangadex_at_home/src/main/kotlin/mdnet/cache/ImageStorage.kt

365 lines
12 KiB
Kotlin

/*
Mangadex@Home
Copyright (c) 2020, MangaDex Network
This file is part of MangaDex@Home.
MangaDex@Home is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MangaDex@Home is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
*/
package mdnet.cache
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.PropertyNamingStrategies
import com.fasterxml.jackson.databind.annotation.JsonNaming
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import mdnet.logging.info
import mdnet.logging.trace
import org.apache.commons.io.file.PathUtils
import org.ktorm.database.Database
import org.ktorm.dsl.*
import org.slf4j.LoggerFactory
import java.io.*
import java.nio.file.*
import java.sql.SQLIntegrityConstraintViolationException
import java.time.Instant
import java.util.UUID
import java.util.concurrent.*
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy::class)
data class ImageMetadata(
val contentType: String,
val lastModified: String,
val size: Int,
)
data class Image(val data: ImageMetadata, val stream: InputStream)
/**
* A storage for images that handles LRU removal as well as database metadata storage. This cache
* will ensure the the storage size (excluding the database) will be below [maxSize] over time,
* but there may be temporary peaks or overages. It will cache the files in [cacheDirectory], and
* store associated metadata in the [database].
*
* @constructor Creates an `ImageStorage`, creating necessary tables in the database.
*/
class ImageStorage(
var maxSize: Long,
val cacheDirectory: Path,
private val database: Database,
autoPrune: Boolean = true
) {
private val tempCacheDirectory = cacheDirectory.resolve("tmp")
private val evictor: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
private val queue = LinkedBlockingQueue<String>(1000)
/**
* Returns the size in bytes of the images stored in this cache, not including metadata.
* This is cached for performance on a call to [calculateSize].
*/
@Volatile
var size: Long = 0
private set
init {
Files.createDirectories(tempCacheDirectory)
PathUtils.cleanDirectory(tempCacheDirectory)
// create tables in database
LOGGER.info { "Creating tables if not already present" }
database.useConnection { conn ->
conn.prepareStatement(INIT_TABLE).use {
it.execute()
}
}
calculateSize()
LOGGER.info { "Cache at $size out of $maxSize bytes" }
evictor.scheduleWithFixedDelay(
{
val toUpdate = HashSet<String>()
queue.drainTo(toUpdate)
val now = Instant.now()
LOGGER.info { "Updating LRU times for ${toUpdate.size} entries" }
database.batchUpdate(DbImage) {
for (id in toUpdate) {
item {
set(DbImage.accessed, now)
where {
DbImage.id eq id
}
}
}
}
calculateSize()
},
1, 1, TimeUnit.MINUTES
)
// evict LRU cache every 3 minutes
if (autoPrune) {
evictor.scheduleWithFixedDelay(
{
calculateSize()
pruneImages()
},
0, 3, TimeUnit.MINUTES
)
}
}
/**
* Prunes excess images from the cache in order to meet
* the [maxSize] property and not waste disk space. It is recommended
* to call [calculateSize] beforehand to update [size].
*/
fun pruneImages() {
LOGGER.info { "Cache at $size out of $maxSize bytes" }
// we need to prune the cache now
if (size > maxSize * 0.95) {
val toClear = size - (maxSize * 0.9).toLong()
LOGGER.info { "Evicting at least $toClear bytes from cache" }
val list = database.useConnection { conn ->
conn.prepareStatement(IMAGES_TO_PRUNE).apply {
setLong(1, toClear)
}.use { stmt ->
stmt.executeQuery().let {
val ret = ArrayList<String>()
while (it.next()) {
ret.add(it.getString(1))
}
ret
}
}
}
for (id in list) {
LOGGER.info { "Evicting images $id from cache" }
deleteImage(id)
}
}
}
/**
* Loads the image with the specified [id]. This method will return null
* if the image is not committed, the id does not exist, or an [IOException]
* occurs when loading the image.
*
* @param id the id of the image to load
* @return the [Image] associated with the id or null.
*/
fun loadImage(id: String): Image? {
return try {
// this try catch handles the case where the image has been deleted
// we assume total control over the directory, so this file open
// cannot fail due to any other reason
val stream = try {
Files.newInputStream(getPath(id)).also {
queue.offer(id)
}
} catch (e: IOException) {
// image does not exist or has been deleted
return null
}
val data = JACKSON.readValue<ImageMetadata>(
DataInputStream(stream).readUTF()
)
Image(data, stream)
} catch (e: IOException) {
null
}
}
/**
* Stores an image with the specified [id], which must be at least 3 characters long.
* This method returns a writer that allows one to stream data in.
*
* @param id the id of the image to store
* @param metadata the metadata associated with the image
* @return the [Writer] associated with the id or null.
*/
fun storeImage(id: String, metadata: ImageMetadata): Writer? {
if (id.length < 3) {
throw IllegalArgumentException("id length needs to be at least 3")
}
// don't make high cache utilization worse
if (size >= maxSize * 0.95) {
return null
}
return WriterImpl(id, metadata)
}
private fun deleteImage(id: String) {
database.useTransaction {
val path = getTempPath()
try {
Files.move(
getPath(id),
path,
StandardCopyOption.ATOMIC_MOVE
)
Files.deleteIfExists(path)
} catch (e: IOException) {
LOGGER.trace(e) { "Deleting image failed, ignoring" }
// a failure means the image did not exist
} finally {
database.delete(DbImage) {
DbImage.id eq id
}
}
}
}
/**
* Updates the cached size using data from the database
*/
fun calculateSize() {
size = database.useConnection { conn ->
conn.prepareStatement(SIZE_TAKEN_SQL).use { stmt ->
stmt.executeQuery().let {
it.next()
it.getLong(1)
}
}
}
}
fun close() {
evictor.shutdown()
evictor.awaitTermination(10, TimeUnit.SECONDS)
}
/**
* A writer for storing images and allow incremental streaming
*/
interface Writer {
/**
* The output stream associated with this writer
*/
val stream: OutputStream
/**
* Commit bytes written to the output stream if the number of bytes
* written to the output stream excluding the metadata is exactly [bytes]
* bytes, else abort
* @return true if the data was commited, false otherwise
*/
fun commit(bytes: Int): Boolean
/**
* Revert bytes written to the output stream, undo changes,
* allowing another writer to try again
*/
fun abort()
}
private inner class WriterImpl(private val id: String, metadata: ImageMetadata) : Writer {
val tempPath = getTempPath()
override val stream: OutputStream
val metadataSize: Int
init {
stream = Files.newOutputStream(tempPath, StandardOpenOption.CREATE_NEW)
val dataOutputStream = DataOutputStream(stream)
dataOutputStream.writeUTF(
JACKSON.writeValueAsString(metadata)
)
metadataSize = dataOutputStream.size()
// Don't close the `dataOutputStream` because
// we need to write to the underlying stream
}
override fun commit(bytes: Int): Boolean {
stream.flush()
stream.close()
if (Files.size(tempPath).toInt() != metadataSize + bytes) {
abort()
return false
}
Files.createDirectories(getPath(id).parent)
try {
database.insert(DbImage) {
set(DbImage.id, id)
set(DbImage.accessed, Instant.now())
set(DbImage.size, metadataSize + bytes)
}
} catch (e: SQLIntegrityConstraintViolationException) {
// someone got to us before this (TOCTOU)
// there are 2 situations here
// one is that the
// other write died in between writing the DB and
// moving the file
// the other is that we have raced and the other
// is about to write the file
// we handle this below
}
try {
Files.move(
tempPath,
getPath(id),
StandardCopyOption.ATOMIC_MOVE
)
} catch (e: FileAlreadyExistsException) {
// the file already exists
// so we must lost the race
// delete our local copy
abort()
return false
}
return true
}
override fun abort() {
stream.flush()
stream.close()
Files.deleteIfExists(tempPath)
}
}
private fun getPath(id: String): Path {
return cacheDirectory.resolve(id.toCachePath())
}
private fun getTempPath(): Path {
return tempCacheDirectory.resolve(UUID.randomUUID().toString())
}
companion object {
private val LOGGER = LoggerFactory.getLogger(ImageStorage::class.java)
private fun String.toCachePath() =
this.substring(0, 3).replace(".(?!$)".toRegex(), "$0 ").split(" ".toRegex()).reversed()
.plus(this).joinToString(File.separator)
private val JACKSON: ObjectMapper = jacksonObjectMapper()
}
}