1
0
Fork 1
mirror of https://gitlab.com/mangadex-pub/mangadex_at_home.git synced 2024-01-19 02:48:37 +00:00
mangadex_at_home/src/main/kotlin/mdnet/ServerManager.kt
2021-01-28 07:50:13 -06:00

304 lines
10 KiB
Kotlin

/*
Mangadex@Home
Copyright (c) 2020, MangaDex Network
This file is part of MangaDex@Home.
MangaDex@Home is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MangaDex@Home is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
*/
package mdnet
import io.micrometer.prometheus.PrometheusConfig
import io.micrometer.prometheus.PrometheusMeterRegistry
import mdnet.cache.ImageStorage
import mdnet.data.Statistics
import mdnet.logging.error
import mdnet.logging.info
import mdnet.logging.warn
import mdnet.metrics.DefaultMicrometerMetrics
import mdnet.server.getServer
import mdnet.settings.*
import org.http4k.server.Http4kServer
import org.slf4j.LoggerFactory
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
sealed class State
// server is not running
object Uninitialized : State()
// server has shut down
object Shutdown : State()
// server is in the process of stopping
data class GracefulStop(
val lastRunning: Running,
val counts: Int = 0,
val nextState: State = Uninitialized,
val action: () -> Unit = {}
) : State()
// server is currently running
data class Running(val server: Http4kServer, val settings: RemoteSettings) : State()
class ServerManager(
private val settings: ClientSettings,
private val storage: ImageStorage
) {
// this must remain single-threaded because of how the state mechanism works
private val executor = Executors.newSingleThreadScheduledExecutor()
private val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
private val statistics = Statistics()
// state that must only be accessed from the thread on the executor
private var state: State
private var backendApi: BackendApi
// end protected state
init {
state = Uninitialized
backendApi = BackendApi(settings)
}
fun start() {
LOGGER.info { "Image server starting" }
DefaultMicrometerMetrics(registry, storage.cacheDirectory)
loginAndStartServer()
var lastBytesSent = statistics.bytesSent.get()
executor.scheduleAtFixedRate(
{
try {
lastBytesSent = statistics.bytesSent.get()
val state = this.state
if (state is GracefulStop && state.nextState != Shutdown) {
LOGGER.info { "Aborting graceful shutdown started due to hourly bandwidth limit" }
this.state = state.lastRunning
}
if (state is Uninitialized) {
LOGGER.info { "Restarting server stopped due to hourly bandwidth limit" }
loginAndStartServer()
}
} catch (e: Exception) {
LOGGER.warn(e) { "Hourly bandwidth check failed" }
}
},
1, 1, TimeUnit.HOURS
)
var lastRequests = 0L
executor.scheduleAtFixedRate(
{
try {
val state = this.state
if (state is GracefulStop) {
val timesToWait = settings.serverSettings.gracefulShutdownWaitSeconds / 5
val requestCounters = registry.find("http.server.request.latency").timers()
println(requestCounters)
val curRequests = requestCounters.map { it.count() }.sum()
val noRequests = lastRequests >= curRequests
when {
state.counts == 0 -> {
LOGGER.info { "Starting graceful stop" }
logout()
lastRequests = curRequests
this.state = state.copy(counts = state.counts + 1)
}
state.counts == timesToWait || noRequests -> {
if (noRequests) {
LOGGER.info { "No requests received, stopping" }
} else {
LOGGER.info { "Max tries attempted (${state.counts} out of $timesToWait), shutting down" }
}
stopServer(state.nextState)
state.action()
}
else -> {
LOGGER.info {
"Waiting another 5 seconds for graceful stop (${state.counts} out of $timesToWait)"
}
lastRequests = curRequests
this.state = state.copy(counts = state.counts + 1)
}
}
}
} catch (e: Exception) {
LOGGER.error(e) { "Main loop failed" }
}
},
5, 5, TimeUnit.SECONDS
)
executor.scheduleWithFixedDelay(
{
try {
val state = this.state
if (state is Running) {
val currentBytesSent = statistics.bytesSent.get() - lastBytesSent
if (settings.serverSettings.maxMebibytesPerHour != 0L && settings.serverSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
LOGGER.info { "Stopping image server as hourly bandwidth limit reached" }
this.state = GracefulStop(lastRunning = state)
} else {
pingControl()
}
}
} catch (e: Exception) {
LOGGER.warn(e) { "Bandwidth shutdown checker/ping failed" }
}
},
45, 45, TimeUnit.SECONDS
)
LOGGER.info { "Image server has started" }
}
private fun pingControl() {
// this is currentSettings, other is newSettings
// if tls is null that means same as previous ping
fun RemoteSettings.logicalEqual(other: RemoteSettings): Boolean {
val test = if (other.tls != null) {
other
} else {
other.copy(tls = this.tls)
}
return this == test
}
val state = this.state as Running
val newSettings = backendApi.pingControl(state.settings)
if (newSettings is RemoteSettings) {
LOGGER.info { "Server settings received: $newSettings" }
warnBasedOnSettings(newSettings)
if (!state.settings.logicalEqual(newSettings)) {
LOGGER.info { "Doing internal restart of HTTP server to refresh settings" }
this.state = GracefulStop(lastRunning = state) {
loginAndStartServer()
}
}
} else {
LOGGER.info { "Ignoring failed server ping - $newSettings" }
}
}
private fun loginAndStartServer() {
this.state as Uninitialized
val remoteSettings = backendApi.loginToControl()
if (remoteSettings !is RemoteSettings) {
throw RuntimeException(remoteSettings.toString())
}
LOGGER.info { "Server settings received: $remoteSettings" }
warnBasedOnSettings(remoteSettings)
val server = getServer(
storage,
remoteSettings,
settings.serverSettings,
settings.metricsSettings,
statistics,
registry
).start()
this.state = Running(server, remoteSettings)
LOGGER.info { "Internal HTTP server was successfully started" }
}
private fun logout() {
backendApi.logoutFromControl()
}
private fun stopServer(nextState: State) {
val state = this.state.let {
when (it) {
is Running ->
it
is GracefulStop ->
it.lastRunning
else ->
throw AssertionError()
}
}
LOGGER.info { "Image server stopping" }
state.server.stop()
LOGGER.info { "Image server has stopped" }
this.state = nextState
}
fun shutdown() {
LOGGER.info { "Image server shutting down" }
val latch = CountDownLatch(1)
executor.schedule(
{
val state = this.state
if (state is Running) {
this.state = GracefulStop(state, nextState = Shutdown) {
latch.countDown()
}
} else if (state is GracefulStop) {
this.state = state.copy(nextState = Shutdown) {
latch.countDown()
}
} else if (state is Uninitialized || state is Shutdown) {
this.state = Shutdown
latch.countDown()
}
},
0, TimeUnit.SECONDS
)
latch.await()
executor.shutdown()
LOGGER.info { "Image server has shut down" }
}
private fun warnBasedOnSettings(settings: RemoteSettings) {
if (settings.latestBuild > Constants.CLIENT_BUILD) {
LOGGER.warn {
"Outdated build detected! Latest: ${settings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
}
}
if (settings.paused) {
LOGGER.warn {
"Your client is paused by the backend and will not serve any images!"
}
}
if (settings.compromised) {
LOGGER.warn {
"Your client secret is compromised and it will not serve any images!"
}
}
}
companion object {
private val LOGGER = LoggerFactory.getLogger(ServerManager::class.java)
}
}