package mdnet.base import com.fasterxml.jackson.core.JsonProcessingException import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.fasterxml.jackson.module.kotlin.readValue import java.lang.RuntimeException import java.time.Instant import java.util.Collections import java.util.LinkedHashMap import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import mdnet.base.data.Statistics import mdnet.base.server.getServer import mdnet.base.settings.DevSettings import mdnet.base.settings.RemoteSettings import mdnet.base.settings.ServerSettings import mdnet.cache.DiskLruCache import org.http4k.server.Http4kServer import org.jetbrains.exposed.sql.Database import org.slf4j.LoggerFactory sealed class State // server is not running data class Uninitialized(val serverSettings: ServerSettings, val devSettings: DevSettings) : State() // server has shut down object Shutdown : State() // server is in the process of stopping data class GracefulStop(val lastRunning: Running, val counts: Int = 0, val nextState: State = Uninitialized(lastRunning.serverSettings, lastRunning.devSettings), val action: () -> Unit = {}) : State() // server is currently running data class Running(val server: Http4kServer, val settings: RemoteSettings, val serverSettings: ServerSettings, val devSettings: DevSettings) : State() class ServerManager(serverSettings: ServerSettings, devSettings: DevSettings, maxCacheSizeInMebibytes: Long, private val cache: DiskLruCache, private val database: Database) { // this must remain single-threaded because of how the state mechanism works private val executor = Executors.newSingleThreadScheduledExecutor() // state that must only be accessed from the thread on the executor private var state: State private var serverHandler: ServerHandler // end protected state val statsMap: MutableMap = Collections .synchronizedMap(object : LinkedHashMap(240) { override fun removeEldestEntry(eldest: Map.Entry): Boolean { return this.size > 240 } }) val statistics: AtomicReference = AtomicReference( Statistics() ) private val isHandled: AtomicBoolean = AtomicBoolean(false) init { state = Uninitialized(serverSettings, devSettings) serverHandler = ServerHandler(serverSettings, devSettings, maxCacheSizeInMebibytes) cache.get("statistics")?.use { try { statistics.set(JACKSON.readValue(it.getInputStream(0))) } catch (_: JsonProcessingException) { cache.remove("statistics") } } } fun start() { LOGGER.info { "Image server starting" } loginAndStartServer() statsMap[Instant.now()] = statistics.get() executor.scheduleAtFixedRate({ try { if (state is Running || state is GracefulStop || state is Uninitialized) { statistics.updateAndGet { it.copy(bytesOnDisk = cache.size()) } statsMap[Instant.now()] = statistics.get() val editor = cache.edit("statistics") if (editor != null) { JACKSON.writeValue(editor.newOutputStream(0), statistics.get()) editor.commit() } } } catch (e: Exception) { LOGGER.warn(e) { "Statistics update failed" } } }, 15, 15, TimeUnit.SECONDS) var lastBytesSent = statistics.get().bytesSent executor.scheduleAtFixedRate({ try { lastBytesSent = statistics.get().bytesSent val state = this.state if (state is GracefulStop) { LOGGER.info { "Aborting graceful shutdown started due to hourly bandwidth limit" } this.state = state.lastRunning } if (state is Uninitialized) { LOGGER.info { "Restarting server stopped due to hourly bandwidth limit" } loginAndStartServer() } } catch (e: Exception) { LOGGER.warn(e) { "Hourly bandwidth check failed" } } }, 1, 1, TimeUnit.HOURS) executor.scheduleAtFixedRate({ try { val state = this.state if (state is GracefulStop) { val timesToWait = state.lastRunning.serverSettings.gracefulShutdownWaitSeconds / 15 when { state.counts == 0 -> { LOGGER.info { "Starting graceful stop" } logout() isHandled.set(false) this.state = state.copy(counts = state.counts + 1) } state.counts == timesToWait || !isHandled.get() -> { if (!isHandled.get()) { LOGGER.info { "No requests received, stopping" } } else { LOGGER.info { "Max tries attempted (${state.counts} out of $timesToWait), shutting down" } } stopServer(state.nextState) state.action() } else -> { LOGGER.info { "Waiting another 15 seconds for graceful stop (${state.counts} out of $timesToWait)" } isHandled.set(false) this.state = state.copy(counts = state.counts + 1) } } } } catch (e: Exception) { LOGGER.error(e) { "Main loop failed" } } }, 15, 15, TimeUnit.SECONDS) executor.scheduleWithFixedDelay({ try { val state = this.state if (state is Running) { val currentBytesSent = statistics.get().bytesSent - lastBytesSent if (state.serverSettings.maxMebibytesPerHour != 0L && state.serverSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) { LOGGER.info { "Stopping image server as hourly bandwidth limit reached" } this.state = GracefulStop(lastRunning = state) } else { pingControl() } } } catch (e: Exception) { LOGGER.warn(e) { "Graceful shutdown checker failed" } } }, 45, 45, TimeUnit.SECONDS) LOGGER.info { "Image server has started" } } private fun pingControl() { val state = this.state as Running val newSettings = serverHandler.pingControl(state.settings) if (newSettings != null) { LOGGER.info { "Server settings received: $newSettings" } if (newSettings.latestBuild > Constants.CLIENT_BUILD) { LOGGER.warn { "Outdated build detected! Latest: ${newSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}" } } if (newSettings.tls != null || newSettings.imageServer != state.settings.imageServer) { // certificates or upstream url must have changed, restart webserver LOGGER.info { "Doing internal restart of HTTP server to refresh certs/upstream URL" } this.state = GracefulStop(lastRunning = state) { loginAndStartServer() } } } else { LOGGER.info { "Server ping failed - ignoring" } } } private fun loginAndStartServer() { val state = this.state as Uninitialized val remoteSettings = serverHandler.loginToControl() ?: throw RuntimeException("Failed to get a login response from server") val server = getServer(cache, database, remoteSettings, state.serverSettings, statistics, isHandled).start() if (remoteSettings.latestBuild > Constants.CLIENT_BUILD) { LOGGER.warn { "Outdated build detected! Latest: ${remoteSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}" } } this.state = Running(server, remoteSettings, state.serverSettings, state.devSettings) LOGGER.info { "Internal HTTP server was successfully started" } } private fun logout() { serverHandler.logoutFromControl() } private fun stopServer(nextState: State) { val state = this.state.let { when (it) { is Running -> it is GracefulStop -> it.lastRunning else -> throw AssertionError() } } LOGGER.info { "Image server stopping" } state.server.stop() LOGGER.info { "Image server has stopped" } this.state = nextState } fun shutdown() { LOGGER.info { "Image server shutting down" } val latch = CountDownLatch(1) executor.schedule({ val state = this.state if (state is Running) { this.state = GracefulStop(state, nextState = Shutdown) { latch.countDown() } } else if (state is GracefulStop) { this.state = state.copy(nextState = Shutdown) { latch.countDown() } } else if (state is Uninitialized || state is Shutdown) { this.state = Shutdown latch.countDown() } }, 0, TimeUnit.SECONDS) latch.await() executor.shutdown() LOGGER.info { "Image server has shut down" } } companion object { private val LOGGER = LoggerFactory.getLogger(ServerManager::class.java) private val JACKSON: ObjectMapper = jacksonObjectMapper().enable(SerializationFeature.INDENT_OUTPUT) } }