Use a sqlite DB for metadata

This commit is contained in:
carbotaniuman 2020-06-15 22:25:31 +00:00
parent afe83a240b
commit 7eec78e2d2
8 changed files with 160 additions and 82 deletions

View File

@ -17,6 +17,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- [2020-06-15] Fixed tokenized data-saver parser not working by [@lflare].
- [2020-06-14] Switched cache metadata over to a MySql instance [@carbotaniuman].
### Changed
- [2020-06-14] Migrated cache metadata over to a sqlite3 handler [@carbotaniuman].
### Deprecated
### Removed
### Fixed
### Security
## [1.0.0-RC16] - 2020-06-14
### Added

View File

@ -4,7 +4,7 @@ plugins {
id "application"
id "com.github.johnrengelman.shadow" version "5.2.0"
id "com.diffplug.gradle.spotless" version "3.18.0"
id 'dev.afanasev.sekret' version '0.0.3'
id "dev.afanasev.sekret" version "0.0.3"
}
group = "com.mangadex"
@ -17,20 +17,25 @@ repositories {
}
dependencies {
compileOnly group:"dev.afanasev", name: "sekret-annotation", version: "0.0.3"
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
implementation group: "commons-io", name: "commons-io", version: "2.7"
implementation group: "com.konghq", name: "unirest-java", version: "3.7.02"
implementation group: "org.http4k", name: "http4k-core", version: "$http_4k_version"
implementation group: "org.http4k", name: "http4k-server-netty", version: "$http_4k_version"
implementation group: "org.http4k", name: "http4k-client-apache", version: "$http_4k_version"
implementation group: "org.http4k", name: "http4k-format-gson", version: "$http_4k_version"
implementation group: "commons-io", name: "commons-io", version: "2.7"
implementation group:"ch.qos.logback", name: "logback-classic", version: "$logback_version"
implementation group: "org.http4k", name: "http4k-client-apache", version: "$http_4k_version"
implementation group: "org.http4k", name: "http4k-server-netty", version: "$http_4k_version"
runtimeOnly group:"io.netty", name: "netty-tcnative-boringssl-static", version: "2.0.30.Final"
compileOnly group:"dev.afanasev", name: "sekret-annotation", version: "0.0.3"
implementation group:"ch.qos.logback", name: "logback-classic", version: "1.2.1"
implementation group: "org.jetbrains.exposed", name: "exposed-core", version: "$exposed_version"
implementation group: "org.jetbrains.exposed", name: "exposed-dao", version: "$exposed_version"
implementation group: "org.jetbrains.exposed", name: "exposed-jdbc", version: "$exposed_version"
implementation group: "org.xerial", name: "sqlite-jdbc", version: "3.30.1"
}
java {

View File

@ -1,2 +1,2 @@
http_4k_version=3.249.0
logback_version=1.2.1
http_4k_version=3.250.0
exposed_version=0.24.1

View File

@ -55,13 +55,12 @@ public class MangaDexClient {
this.statistics = new AtomicReference<>();
try {
cache = DiskLruCache.open(new File("cache"), 3, 3,
cache = DiskLruCache.open(new File("cache"), 1, 1,
clientSettings.getMaxCacheSizeMib() * 1024 * 1024 /* MiB to bytes */);
DiskLruCache.Snapshot snapshot = cache.get("statistics");
if (snapshot != null) {
String json = snapshot.getString(0);
snapshot.close();
statistics.set(GSON.fromJson(json, Statistics.class));
} else {
statistics.set(new Statistics());
@ -103,8 +102,6 @@ public class MangaDexClient {
if (editor != null) {
String json = GSON.toJson(statistics.get(), Statistics.class);
editor.setString(0, json);
editor.setString(1, "");
editor.setString(2, "");
editor.commit();
}
} catch (IOException ignored) {

View File

@ -34,11 +34,6 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@ -110,6 +105,7 @@ public final class DiskLruCache implements Closeable {
private static final long ANY_SEQUENCE_NUMBER = -1;
public static final Pattern LEGAL_KEY_PATTERN = Pattern.compile("[a-z0-9_-]{1,120}");
public static final Pattern UNSAFE_LEGAL_KEY_PATTERN = Pattern.compile("[a-z0-9_-][\\\\/a-z0-9_-]{0,119}");
private static final String CLEAN = "CLEAN";
private static final String DIRTY = "DIRTY";
@ -411,6 +407,16 @@ public final class DiskLruCache implements Closeable {
return getImpl(key);
}
/**
* Returns a snapshot of the entry named {@code key}, or null if it doesn't
* exist is not currently readable. If a value is returned, it is moved to the
* head of the LRU queue. Unsafe as it allows arbitrary directories to be accessed!
*/
public Snapshot getUnsafe(String key) throws IOException {
validateUnsafeKey(key);
return getImpl(key);
}
private synchronized Snapshot getImpl(String key) throws IOException {
checkNotClosed();
Entry entry = lruEntries.get(key);
@ -463,6 +469,15 @@ public final class DiskLruCache implements Closeable {
return editImpl(key, ANY_SEQUENCE_NUMBER);
}
/**
* Returns an editor for the entry named {@code key}, or null if another edit is
* in progress. Unsafe as it allows arbitrary directories to be accessed!
*/
public Editor editUnsafe(String key) throws IOException {
validateUnsafeKey(key);
return editImpl(key, ANY_SEQUENCE_NUMBER);
}
private synchronized Editor editImpl(String key, long expectedSequenceNumber) throws IOException {
checkNotClosed();
Entry entry = lruEntries.get(key);
@ -594,6 +609,17 @@ public final class DiskLruCache implements Closeable {
return removeImpl(key);
}
/**
* Drops the entry for {@code key} if it exists and can be removed. Entries
* actively being edited cannot be removed. Unsafe as it allows arbitrary directories to be accessed!
*
* @return true if an entry was removed.
*/
public boolean removeUnsafe(String key) throws IOException {
validateUnsafeKey(key);
return removeImpl(key);
}
private synchronized boolean removeImpl(String key) throws IOException {
checkNotClosed();
Entry entry = lruEntries.get(key);
@ -657,7 +683,7 @@ public final class DiskLruCache implements Closeable {
private void trimToSize() throws IOException {
while (size > maxSize) {
Map.Entry<String, Entry> toEvict = lruEntries.entrySet().iterator().next();
remove(toEvict.getKey());
removeImpl(toEvict.getKey());
}
}
@ -674,7 +700,14 @@ public final class DiskLruCache implements Closeable {
private void validateKey(String key) {
Matcher matcher = LEGAL_KEY_PATTERN.matcher(key);
if (!matcher.matches()) {
throw new IllegalArgumentException("keys must match regex " + LEGAL_KEY_PATTERN + ": \"" + key + "\"");
throw new IllegalArgumentException("Keys must match regex " + LEGAL_KEY_PATTERN + ": \"" + key + "\"");
}
}
private void validateUnsafeKey(String key) {
Matcher matcher = UNSAFE_LEGAL_KEY_PATTERN.matcher(key);
if (!matcher.matches()) {
throw new IllegalArgumentException("Unsafe keys must match regex " + UNSAFE_LEGAL_KEY_PATTERN + ": \"" + key + "\"");
}
}
@ -831,7 +864,7 @@ public final class DiskLruCache implements Closeable {
public void commit() throws IOException {
if (hasErrors) {
completeEdit(this, false);
remove(entry.key); // The previous entry is stale.
removeImpl(entry.key); // The previous entry is stale.
} else {
completeEdit(this, true);
}
@ -912,9 +945,6 @@ public final class DiskLruCache implements Closeable {
/** Lengths of this entry's files. */
private final long[] lengths;
/** Subkey pathing for cache files. */
private final String subKeyPath;
/** True if this entry has ever been published. */
private boolean readable;
@ -927,11 +957,6 @@ public final class DiskLruCache implements Closeable {
private Entry(String key) {
this.key = key;
this.lengths = new long[valueCount];
// Splits the keys into a list of two characters, and join it together to use it
// for sub-directorying
this.subKeyPath = File.separator
+ String.join(File.separator, key.substring(0, 8).replaceAll("..(?!$)", "$0 ").split(" "));
}
public String getLengths() {
@ -962,40 +987,11 @@ public final class DiskLruCache implements Closeable {
}
public File getCleanFile(int i) {
// Move files to new caching tree if exists
Path oldCache = Paths.get(directory + File.separator + key + "." + i);
Path newCache = Paths.get(directory + subKeyPath + File.separator + key + "." + i);
migrateCacheFile(i, oldCache, newCache);
return new File(directory + subKeyPath, key + "." + i);
return new File(directory, key + "." + i);
}
public File getDirtyFile(int i) {
// Move files to new caching tree if exists
Path oldCache = Paths.get(directory + File.separator + key + "." + i + ".tmp");
Path newCache = Paths.get(directory + subKeyPath + File.separator + key + "." + i + ".tmp");
migrateCacheFile(i, oldCache, newCache);
return new File(directory + subKeyPath, key + "." + i + ".tmp");
}
private void migrateCacheFile(int i, Path oldCache, Path newCache) {
File newCacheDirectory = new File(directory + subKeyPath, key + "." + i + ".tmp");
newCacheDirectory.getParentFile().mkdirs();
if (Files.exists(oldCache)) {
try {
Files.move(oldCache, newCache, StandardCopyOption.ATOMIC_MOVE);
} catch (FileAlreadyExistsException faee) {
try {
Files.delete(oldCache);
} catch (IOException ignored) {
}
} catch (IOException ignored) {
}
}
return new File(directory, key + "." + i + ".tmp");
}
}
}

View File

@ -0,0 +1,20 @@
package mdnet.base.dao
import org.jetbrains.exposed.dao.Entity
import org.jetbrains.exposed.dao.EntityClass
import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.dao.id.IdTable
object ImageData : IdTable<String>() {
override val id = varchar("id", 32).entityId()
override val primaryKey = PrimaryKey(id)
val contentType = varchar("contentType", 20)
val lastModified = varchar("lastModified", 29)
}
class ImageDatum(id: EntityID<String>) : Entity<String>(id) {
companion object : EntityClass<String, ImageDatum>(ImageData)
var contentType by ImageData.contentType
var lastModified by ImageData.lastModified
}

View File

@ -13,10 +13,12 @@ import org.http4k.routing.bind
import org.http4k.routing.routes
import org.http4k.server.Http4kServer
import org.http4k.server.asServer
import org.jetbrains.exposed.sql.Database
import java.util.concurrent.atomic.AtomicReference
fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSettings: ClientSettings, statistics: AtomicReference<Statistics>): Http4kServer {
val imageServer = ImageServer(cache, statistics, serverSettings.imageServer)
val database = Database.connect("jdbc:sqlite:cache/data.db", "org.sqlite.JDBC")
val imageServer = ImageServer(cache, statistics, serverSettings.imageServer, database)
return Timer
.then(catchAllHideDetails())

View File

@ -3,6 +3,8 @@ package mdnet.base.server
import mdnet.base.Constants
import mdnet.base.Statistics
import mdnet.base.dao.ImageData
import mdnet.base.dao.ImageDatum
import mdnet.cache.CachingInputStream
import mdnet.cache.DiskLruCache
import org.apache.http.client.config.CookieSpecs
@ -12,9 +14,13 @@ import org.http4k.client.ApacheClient
import org.http4k.core.*
import org.http4k.filter.MaxAgeTtl
import org.http4k.lens.Path
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction
import org.slf4j.LoggerFactory
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.File
import java.io.InputStream
import java.security.MessageDigest
import java.util.concurrent.Executors
@ -27,7 +33,12 @@ import javax.crypto.spec.SecretKeySpec
private const val THREADS_TO_ALLOCATE = 262144 // 2**18 // Honestly, no reason to not just let 'er rip. Inactive connections will expire on their own :D
private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java)
class ImageServer(private val cache: DiskLruCache, private val statistics: AtomicReference<Statistics>, private val upstreamUrl: String) {
class ImageServer(private val cache: DiskLruCache, private val statistics: AtomicReference<Statistics>, private val upstreamUrl: String, private val database: Database) {
init {
transaction(database) {
SchemaUtils.create(ImageData)
}
}
private val executor = Executors.newCachedThreadPool()
private val client = ApacheClient(responseBodyMode = BodyMode.Stream, client = HttpClients.custom()
.setDefaultRequestConfig(
@ -63,19 +74,39 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi
} else {
md5Bytes("$chapterHash.$fileName")
}
val cacheId = printHexString(rc4Bytes)
val imageId = printHexString(rc4Bytes)
val snapshot = cache.get(cacheId)
if (snapshot != null) {
request.handleCacheHit(sanitizedUri, getRc4(rc4Bytes), snapshot)
val snapshot = cache.getUnsafe(imageId.toCacheId())
val imageDatum = transaction(database) {
ImageDatum.findById(imageId)
}
if (snapshot != null && imageDatum != null) {
request.handleCacheHit(sanitizedUri, getRc4(rc4Bytes), snapshot, imageDatum)
.header("X-Uri", sanitizedUri)
} else {
request.handleCacheMiss(sanitizedUri, getRc4(rc4Bytes), cacheId)
if (snapshot != null) {
snapshot.close()
if (LOGGER.isWarnEnabled) {
LOGGER.warn("Removing cache file for $sanitizedUri without corresponding DB entry")
}
cache.removeUnsafe(imageId.toCacheId())
}
if (imageDatum != null) {
if (LOGGER.isWarnEnabled) {
LOGGER.warn("Deleting DB entry for $sanitizedUri without corresponding file")
}
transaction(database) {
imageDatum.delete()
}
}
request.handleCacheMiss(sanitizedUri, getRc4(rc4Bytes), imageId)
.header("X-Uri", sanitizedUri)
}
}
private fun Request.handleCacheHit(sanitizedUri: String, cipher: Cipher, snapshot: DiskLruCache.Snapshot): Response {
private fun Request.handleCacheHit(sanitizedUri: String, cipher: Cipher, snapshot: DiskLruCache.Snapshot, imageDatum: ImageDatum): Response {
// our files never change, so it's safe to use the browser cache
return if (this.header("If-Modified-Since") != null) {
statistics.getAndUpdate {
@ -102,13 +133,13 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi
respondWithImage(
CipherInputStream(BufferedInputStream(snapshot.getInputStream(0)), cipher),
snapshot.getLength(0).toString(), snapshot.getString(1), snapshot.getString(2),
snapshot.getLength(0).toString(), imageDatum.contentType, imageDatum.lastModified,
true
)
}
}
private fun Request.handleCacheMiss(sanitizedUri: String, cipher: Cipher, cacheId: String): Response {
private fun Request.handleCacheMiss(sanitizedUri: String, cipher: Cipher, imageId: String): Response {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Request for $sanitizedUri missed cache")
}
@ -134,7 +165,7 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi
val contentLength = mdResponse.header("Content-Length")
val lastModified = mdResponse.header("Last-Modified")
val editor = cache.edit(cacheId)
val editor = cache.editUnsafe(imageId.toCacheId())
// A null editor means that this file is being written to
// concurrently so we skip the cache process
@ -142,23 +173,34 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi
if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $sanitizedUri is being cached and served")
}
editor.setString(1, contentType)
editor.setString(2, lastModified)
transaction(database) {
ImageDatum.new(imageId) {
this.contentType = contentType
this.lastModified = lastModified
}
}
val tee = CachingInputStream(
mdResponse.body.stream,
executor, CipherOutputStream(BufferedOutputStream(editor.newOutputStream(0)), cipher)
) {
if (editor.getLength(0) == contentLength.toLong()) {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Cache download for $sanitizedUri committed")
try {
if (editor.getLength(0) == contentLength.toLong()) {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Cache download for $sanitizedUri committed")
}
editor.commit()
} else {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Cache download for $sanitizedUri aborted")
}
editor.abort()
}
editor.commit()
} else {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Cache download for $sanitizedUri aborted")
} catch (e: Exception) {
if (LOGGER.isWarnEnabled) {
LOGGER.warn("Cache go/no go for $sanitizedUri failed", e)
}
editor.abort()
}
}
respondWithImage(tee, contentLength, contentType, lastModified, false)
@ -172,6 +214,10 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi
}
}
private fun String.toCacheId() =
this.substring(0, 8).replace("..(?!$)".toRegex(), "$0 ").split(" ".toRegex())
.plus(this).joinToString(File.separator)
private fun respondWithImage(input: InputStream, length: String?, type: String, lastModified: String?, cached: Boolean): Response =
Response(Status.OK)
.header("Content-Type", type)