/* Mangadex@Home Copyright (c) 2020, MangaDex Network This file is part of MangaDex@Home. MangaDex@Home is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. MangaDex@Home is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this MangaDex@Home. If not, see . */ package mdnet import io.micrometer.core.instrument.Gauge import io.micrometer.core.instrument.binder.BaseUnits import io.micrometer.prometheus.PrometheusConfig import io.micrometer.prometheus.PrometheusMeterRegistry import mdnet.cache.ImageStorage import mdnet.data.Statistics import mdnet.logging.error import mdnet.logging.info import mdnet.logging.warn import mdnet.metrics.DefaultMicrometerMetrics import mdnet.server.getServer import mdnet.settings.ClientSettings import mdnet.settings.RemoteSettings import okhttp3.ConnectionPool import okhttp3.OkHttpClient import okhttp3.Protocol import org.http4k.client.OkHttp import org.http4k.core.BodyMode import org.http4k.core.then import org.http4k.filter.ClientFilters import org.http4k.filter.MicrometerMetrics import org.http4k.server.Http4kServer import org.slf4j.LoggerFactory import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.TimeUnit sealed class State // server is not running object Uninitialized : State() // server has shut down object Shutdown : State() // server is in the process of stopping data class GracefulStop( val lastRunning: Running, val counts: Int = 0, val nextState: State, val action: () -> Unit = {} ) : State() // server is currently running data class Running(val server: Http4kServer, val settings: RemoteSettings) : State() class ServerManager( private val settings: ClientSettings, private val storage: ImageStorage ) { // this must remain single-threaded because of how the state mechanism works private val executor = Executors.newSingleThreadScheduledExecutor() private val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) private val statistics = Statistics() private val okhttp = ClientFilters.MicrometerMetrics.RequestCounter(registry) .then(ClientFilters.MicrometerMetrics.RequestTimer(registry)) .then( OkHttp( bodyMode = BodyMode.Stream, client = OkHttpClient.Builder() .connectTimeout(Duration.ofSeconds(2)) .connectionPool( ConnectionPool( maxIdleConnections = 100, keepAliveDuration = 1, timeUnit = TimeUnit.MINUTES ) ) .writeTimeout(Duration.ofSeconds(10)) .readTimeout(Duration.ofSeconds(10)) .protocols(listOf(Protocol.HTTP_1_1)) .build() ) ) // state that must only be accessed from the thread on the executor private var state: State private var backendApi: BackendApi // end protected state init { state = Uninitialized backendApi = BackendApi(settings) } fun start() { LOGGER.info { "Image server starting" } DefaultMicrometerMetrics(registry) Gauge.builder( "cache.used", storage, { it.size.toDouble() } ).baseUnit(BaseUnits.BYTES).register(registry) Gauge.builder( "cache.max", storage, { it.maxSize.toDouble() } ).baseUnit(BaseUnits.BYTES).register(registry) loginAndStartServer() var lastBytesSent = statistics.bytesSent.get() executor.scheduleAtFixedRate( { try { lastBytesSent = statistics.bytesSent.get() val state = this.state if (state is GracefulStop && state.nextState != Shutdown) { LOGGER.info { "Aborting graceful shutdown started due to hourly bandwidth limit" } this.state = state.lastRunning } if (state is Uninitialized) { LOGGER.info { "Restarting server stopped due to hourly bandwidth limit" } loginAndStartServer() } } catch (e: Exception) { LOGGER.warn(e) { "Hourly bandwidth check failed" } } }, 1, 1, TimeUnit.HOURS ) var lastRequests = 0L executor.scheduleAtFixedRate( { try { val state = this.state if (state is GracefulStop) { val timesToWait = settings.serverSettings.gracefulShutdownWaitSeconds / 5 val requestCounters = registry.find("http.server.request.latency").timers() val curRequests = requestCounters.map { it.count() }.sum() val noRequests = lastRequests >= curRequests when { state.counts == 0 -> { LOGGER.info { "Starting graceful stop" } logout() lastRequests = curRequests this.state = state.copy(counts = state.counts + 1) } state.counts == timesToWait || noRequests -> { if (noRequests) { LOGGER.info { "No requests received, stopping" } } else { LOGGER.info { "Max tries attempted (${state.counts} out of $timesToWait), shutting down" } } stopServer(state.nextState) state.action() } else -> { LOGGER.info { "Waiting another 5 seconds for graceful stop (${state.counts} out of $timesToWait)" } lastRequests = curRequests this.state = state.copy(counts = state.counts + 1) } } } } catch (e: Exception) { LOGGER.error(e) { "Main loop failed" } } }, 5, 5, TimeUnit.SECONDS ) executor.scheduleWithFixedDelay( { try { val state = this.state if (state is Running) { val currentBytesSent = statistics.bytesSent.get() - lastBytesSent if (settings.serverSettings.maxMebibytesPerHour != 0L && settings.serverSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) { LOGGER.info { "Stopping image server as hourly bandwidth limit reached" } this.state = GracefulStop(lastRunning = state, nextState = Uninitialized) } else { pingControl() } } } catch (e: Exception) { LOGGER.warn(e) { "Bandwidth shutdown checker/ping failed" } } }, 45, 45, TimeUnit.SECONDS ) LOGGER.info { "Image server has started" } } private fun pingControl() { // this is currentSettings, other is newSettings // if tls is null that means same as previous ping fun RemoteSettings.logicalEqual(other: RemoteSettings): Boolean { val test = if (other.tls != null) { other } else { other.copy(tls = this.tls) } return this == test } val state = this.state as Running val newSettings = backendApi.pingControl(state.settings) if (newSettings is RemoteSettings) { LOGGER.info { "Server settings received: $newSettings" } warnBasedOnSettings(newSettings) if (!state.settings.logicalEqual(newSettings)) { LOGGER.info { "Doing internal restart of HTTP server to refresh settings" } this.state = GracefulStop(lastRunning = state, nextState = Uninitialized) { loginAndStartServer() } } } else { LOGGER.info { "Ignoring failed server ping - $newSettings" } } } private fun loginAndStartServer() { this.state as Uninitialized val remoteSettings = backendApi.loginToControl() if (remoteSettings !is RemoteSettings) { throw RuntimeException(remoteSettings.toString()) } LOGGER.info { "Server settings received: $remoteSettings" } warnBasedOnSettings(remoteSettings) val server = getServer( storage, remoteSettings, settings.serverSettings, settings.metricsSettings, statistics, registry, okhttp, ).start() this.state = Running(server, remoteSettings) LOGGER.info { "Internal HTTP server was successfully started" } } private fun logout() { backendApi.logoutFromControl() } private fun stopServer(nextState: State) { val state = this.state.let { when (it) { is Running -> it is GracefulStop -> it.lastRunning else -> throw AssertionError() } } LOGGER.info { "Image server stopping" } state.server.stop() LOGGER.info { "Image server has stopped" } this.state = nextState } fun shutdown() { LOGGER.info { "Image server shutting down" } val latch = CountDownLatch(1) executor.schedule( { val state = this.state if (state is Running) { this.state = GracefulStop(state, nextState = Shutdown) { latch.countDown() } } else if (state is GracefulStop) { this.state = state.copy(nextState = Shutdown) { latch.countDown() } } else if (state is Uninitialized || state is Shutdown) { this.state = Shutdown latch.countDown() } }, 0, TimeUnit.SECONDS ) latch.await() executor.shutdown() LOGGER.info { "Image server has shut down" } } private fun warnBasedOnSettings(settings: RemoteSettings) { if (settings.latestBuild > Constants.CLIENT_BUILD) { LOGGER.warn { "Outdated build detected! Latest: ${settings.latestBuild}, Current: ${Constants.CLIENT_BUILD}" } } if (settings.paused) { LOGGER.warn { "Your client is paused by the backend and will not serve any images!" } } if (settings.compromised) { LOGGER.warn { "Your client secret is compromised and it will not serve any images!" } } } companion object { private val LOGGER = LoggerFactory.getLogger(ServerManager::class.java) } }