mirror of
https://github.com/VinylDNS/vinyldns
synced 2025-08-22 02:02:14 +00:00
Adding shutdown to DataStoreProvider (#293)
* Adding shutdown to DataStore * Calling shutdown from Boot
This commit is contained in:
parent
685ca7e75c
commit
b5f059a6fd
@ -77,7 +77,7 @@ lazy val sharedSettings = Seq(
|
||||
lazy val testSettings = Seq(
|
||||
parallelExecution in Test := true,
|
||||
parallelExecution in IntegrationTest := false,
|
||||
fork in IntegrationTest := false,
|
||||
fork in IntegrationTest := true,
|
||||
testOptions in Test += Tests.Argument("-oDNCXEHPQRMIK"),
|
||||
logBuffered in Test := false,
|
||||
// Hide stack traces in tests
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
package vinyldns.api
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import vinyldns.mysql.repository.MySqlIntegrationSpec
|
||||
import vinyldns.mysql.MySqlIntegrationSpec
|
||||
|
||||
trait MySqlApiIntegrationSpec extends MySqlIntegrationSpec {
|
||||
val mysqlConfig: Config = ConfigFactory.load().getConfig("vinyldns.mysql")
|
||||
|
@ -62,8 +62,9 @@ object Boot extends App {
|
||||
banner <- vinyldnsBanner()
|
||||
crypto <- IO(Crypto.instance) // load crypto
|
||||
repoConfigs <- VinylDNSConfig.dataStoreConfigs
|
||||
repositories <- DataStoreLoader
|
||||
loaderResponse <- DataStoreLoader
|
||||
.loadAll[ApiDataAccessor](repoConfigs, crypto, ApiDataAccessorProvider)
|
||||
repositories = loaderResponse.accessor
|
||||
_ <- TestDataLoader.loadTestData(repositories.userRepository)
|
||||
sqsConfig <- IO(VinylDNSConfig.sqsConfig)
|
||||
sqsConnection <- IO(SqsConnection(sqsConfig))
|
||||
@ -118,6 +119,9 @@ object Boot extends App {
|
||||
// shutdown sqs gracefully
|
||||
sqsConnection.shutdown()
|
||||
|
||||
//shutdown data store provider
|
||||
loaderResponse.shutdown()
|
||||
|
||||
// exit JVM when ActorSystem has been terminated
|
||||
system.registerOnTermination(System.exit(0))
|
||||
|
||||
|
@ -27,19 +27,30 @@ import scala.reflect.ClassTag
|
||||
|
||||
object DataStoreLoader {
|
||||
|
||||
class DataStoreInfo(
|
||||
val dataStoreConfig: DataStoreConfig,
|
||||
val dataStore: DataStore,
|
||||
val dataStoreProvider: DataStoreProvider) {
|
||||
val accessorTuple: (DataStoreConfig, DataStore) = (dataStoreConfig, dataStore)
|
||||
}
|
||||
|
||||
class DataLoaderResponse[A](val accessor: A, shutdownHook: => List[IO[Unit]]) {
|
||||
def shutdown(): Unit = shutdownHook.parSequence.unsafeRunSync()
|
||||
}
|
||||
|
||||
private val logger = LoggerFactory.getLogger("DataStoreLoader")
|
||||
|
||||
def loadAll[A <: DataAccessor](
|
||||
configs: List[DataStoreConfig],
|
||||
crypto: CryptoAlgebra,
|
||||
dataAccessorProvider: DataAccessorProvider[A]): IO[A] =
|
||||
dataAccessorProvider: DataAccessorProvider[A]): IO[DataLoaderResponse[A]] =
|
||||
for {
|
||||
activeConfigs <- IO.fromEither(getValidatedConfigs(configs, dataAccessorProvider.repoNames))
|
||||
dataStores <- activeConfigs.map(load(_, crypto)).parSequence
|
||||
accessor <- IO.fromEither(generateAccessor(dataStores, dataAccessorProvider))
|
||||
} yield accessor
|
||||
} yield new DataLoaderResponse[A](accessor, dataStores.map(_.dataStoreProvider.shutdown()))
|
||||
|
||||
def load(config: DataStoreConfig, crypto: CryptoAlgebra): IO[(DataStoreConfig, DataStore)] =
|
||||
def load(config: DataStoreConfig, crypto: CryptoAlgebra): IO[DataStoreInfo] =
|
||||
for {
|
||||
_ <- IO(
|
||||
logger.error(
|
||||
@ -51,7 +62,7 @@ object DataStoreLoader {
|
||||
.newInstance()
|
||||
.asInstanceOf[DataStoreProvider])
|
||||
dataStore <- provider.load(config, crypto)
|
||||
} yield (config, dataStore)
|
||||
} yield new DataStoreInfo(config, dataStore, provider)
|
||||
|
||||
/*
|
||||
* Validates that there's exactly one repo defined across all datastore configs. Returns only
|
||||
@ -110,10 +121,10 @@ object DataStoreLoader {
|
||||
}
|
||||
|
||||
def generateAccessor[A <: DataAccessor](
|
||||
responses: List[(DataStoreConfig, DataStore)],
|
||||
responses: List[DataStoreInfo],
|
||||
dataAccessorProvider: DataAccessorProvider[A]): Either[DataStoreStartupError, A] = {
|
||||
val accessor = dataAccessorProvider.create(responses)
|
||||
val accessor = dataAccessorProvider
|
||||
.create(responses.map(_.accessorTuple))
|
||||
accessor.toEither.leftMap(errors => DataStoreStartupError(errors.toList.mkString(", ")))
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -21,4 +21,5 @@ import vinyldns.core.crypto.CryptoAlgebra
|
||||
|
||||
trait DataStoreProvider {
|
||||
def load(config: DataStoreConfig, crypto: CryptoAlgebra): IO[DataStore]
|
||||
def shutdown(): IO[Unit]
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{Matchers, WordSpec}
|
||||
import vinyldns.core.crypto.{CryptoAlgebra, NoOpCrypto}
|
||||
import vinyldns.core.domain.membership.UserRepository
|
||||
import vinyldns.core.repository.DataStoreLoader.DataLoaderResponse
|
||||
import vinyldns.core.repository.RepositoryName._
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
@ -90,7 +91,14 @@ class DataStoreLoaderSpec
|
||||
"loadAll" should {
|
||||
"return a data accessor for valid config for one datastore" in {
|
||||
val loadCall = DataStoreLoader.loadAll(List(goodConfig), crypto, TestAccessorProvider)
|
||||
loadCall.unsafeRunSync() shouldBe a[TestDataAccessor]
|
||||
val loaderResponse = loadCall.unsafeRunSync()
|
||||
loaderResponse shouldBe a[DataLoaderResponse[_]]
|
||||
loaderResponse.accessor shouldBe a[TestDataAccessor]
|
||||
}
|
||||
|
||||
"return a unit when datastore is shutdown" in {
|
||||
val loadCall = DataStoreLoader.loadAll(List(goodConfig), crypto, TestAccessorProvider)
|
||||
noException should be thrownBy loadCall.unsafeRunSync().shutdown()
|
||||
}
|
||||
|
||||
"return a data accessor for valid config for multiple datastores" in {
|
||||
@ -105,7 +113,9 @@ class DataStoreLoaderSpec
|
||||
allDisabledReposConfig.copy(user = enabled))
|
||||
|
||||
val loadCall = DataStoreLoader.loadAll(List(config1, config2), crypto, TestAccessorProvider)
|
||||
loadCall.unsafeRunSync() shouldBe a[TestDataAccessor]
|
||||
val loaderResponse = loadCall.unsafeRunSync()
|
||||
loaderResponse shouldBe a[DataLoaderResponse[_]]
|
||||
loaderResponse.accessor shouldBe a[TestDataAccessor]
|
||||
}
|
||||
|
||||
"throw an exception if getValidatedConfigs fails" in {
|
||||
@ -132,6 +142,17 @@ class DataStoreLoaderSpec
|
||||
val thrown = the[DataStoreStartupError] thrownBy loadCall.unsafeRunSync()
|
||||
thrown.getMessage shouldBe "create failure"
|
||||
}
|
||||
|
||||
"throw an exception when shutdown is called" in {
|
||||
val config = DataStoreConfig(
|
||||
"vinyldns.core.repository.AlternateMockDataStoreProvider",
|
||||
placeholderConfig,
|
||||
allEnabledReposConfig)
|
||||
|
||||
val loadCall = DataStoreLoader.loadAll(List(config), crypto, TestAccessorProvider)
|
||||
val thrown = the[RuntimeException] thrownBy loadCall.unsafeRunSync().shutdown()
|
||||
thrown.getMessage should include("oh no")
|
||||
}
|
||||
}
|
||||
|
||||
"getValidatedConfigs" should {
|
||||
|
@ -58,11 +58,19 @@ class MockDataStoreProvider extends DataStoreProvider with MockitoSugar {
|
||||
)
|
||||
}
|
||||
|
||||
def shutdown(): IO[Unit] = IO.unit
|
||||
}
|
||||
|
||||
class AlternateMockDataStoreProvider extends MockDataStoreProvider
|
||||
class AlternateMockDataStoreProvider extends MockDataStoreProvider {
|
||||
|
||||
override def shutdown(): IO[Unit] =
|
||||
IO.raiseError(new RuntimeException("oh no"))
|
||||
}
|
||||
|
||||
class FailDataStoreProvider extends DataStoreProvider {
|
||||
def load(config: DataStoreConfig, crypto: CryptoAlgebra): IO[DataStore] =
|
||||
IO.raiseError(new RuntimeException("ruh roh"))
|
||||
|
||||
def shutdown(): IO[Unit] = IO.unit
|
||||
|
||||
}
|
||||
|
@ -114,4 +114,5 @@ class DynamoDBDataStoreProvider extends DataStoreProvider {
|
||||
).parMapN { DataStore.apply }
|
||||
}
|
||||
|
||||
def shutdown(): IO[Unit] = IO.unit
|
||||
}
|
||||
|
@ -130,5 +130,13 @@ class DynamoDBDataStoreProviderSpec extends WordSpec with Matchers {
|
||||
.loadRepoConfigs(repoSettings)
|
||||
.unsafeRunSync()
|
||||
}
|
||||
|
||||
"Return unit upon Shutdown" in {
|
||||
val response: Unit = underTest
|
||||
.shutdown()
|
||||
.unsafeRunSync()
|
||||
|
||||
response shouldBe (())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,21 +14,23 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package vinyldns.mysql.repository
|
||||
package vinyldns.mysql
|
||||
|
||||
import com.typesafe.config.{Config, ConfigFactory}
|
||||
import vinyldns.core.crypto.NoOpCrypto
|
||||
import vinyldns.core.domain.batch.BatchChangeRepository
|
||||
import vinyldns.core.domain.zone.{ZoneChangeRepository, ZoneRepository}
|
||||
import vinyldns.core.crypto.NoOpCrypto
|
||||
import vinyldns.core.repository.{DataStore, DataStoreConfig, RepositoryName}
|
||||
import vinyldns.mysql.repository.MySqlDataStoreProvider
|
||||
|
||||
trait MySqlIntegrationSpec {
|
||||
def mysqlConfig: Config
|
||||
|
||||
lazy val dataStoreConfig: DataStoreConfig = pureconfig.loadConfigOrThrow[DataStoreConfig](mysqlConfig)
|
||||
|
||||
lazy val instance: DataStore =
|
||||
new MySqlDataStoreProvider().load(dataStoreConfig, new NoOpCrypto()).unsafeRunSync()
|
||||
lazy val provider = new MySqlDataStoreProvider()
|
||||
|
||||
lazy val instance: DataStore = provider.load(dataStoreConfig, new NoOpCrypto()).unsafeRunSync()
|
||||
|
||||
lazy val batchChangeRepository: BatchChangeRepository =
|
||||
instance.get[BatchChangeRepository](RepositoryName.batchChange).get
|
||||
@ -36,6 +38,8 @@ trait MySqlIntegrationSpec {
|
||||
instance.get[ZoneRepository](RepositoryName.zone).get
|
||||
lazy val zoneChangeRepository: ZoneChangeRepository =
|
||||
instance.get[ZoneChangeRepository](RepositoryName.zoneChange).get
|
||||
|
||||
def shutdown(): Unit = provider.shutdown().unsafeRunSync()
|
||||
}
|
||||
|
||||
object TestMySqlInstance extends MySqlIntegrationSpec {
|
@ -27,9 +27,9 @@ import vinyldns.core.domain.record.RecordSetChange
|
||||
import vinyldns.core.domain.zone.{ZoneChange, ZoneCommand}
|
||||
import vinyldns.core.protobuf.ProtobufConversions
|
||||
import vinyldns.core.queue.{CommandMessage, MessageCount, MessageId}
|
||||
import vinyldns.mysql.TestMySqlInstance
|
||||
import vinyldns.mysql.queue.MessageType.{InvalidMessageType, RecordChangeMessageType, ZoneChangeMessageType}
|
||||
import vinyldns.mysql.queue.MySqlMessageQueue.{InvalidMessageTimeout, MessageAttemptsExceeded}
|
||||
import vinyldns.mysql.repository.TestMySqlInstance
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
@ -47,7 +47,7 @@ final case class InvalidMessage(command: ZoneCommand) extends CommandMessage {
|
||||
}
|
||||
|
||||
class MySqlMessageQueueIntegrationSpec extends WordSpec with Matchers
|
||||
with BeforeAndAfterEach with EitherMatchers with EitherValues with BeforeAndAfterAll with ProtobufConversions {
|
||||
with BeforeAndAfterEach with EitherMatchers with BeforeAndAfterAll with EitherValues with ProtobufConversions {
|
||||
import vinyldns.core.TestRecordSetData._
|
||||
import vinyldns.core.TestZoneData._
|
||||
|
||||
|
@ -26,6 +26,7 @@ import vinyldns.core.domain.record.{AAAAData, AData, RecordData, RecordType}
|
||||
import vinyldns.core.domain.batch._
|
||||
import vinyldns.core.TestZoneData.okZone
|
||||
import vinyldns.core.TestMembershipData.okAuth
|
||||
import vinyldns.mysql.TestMySqlInstance
|
||||
|
||||
class MySqlBatchChangeRepositoryIntegrationSpec
|
||||
extends WordSpec
|
||||
|
@ -27,6 +27,7 @@ import vinyldns.core.domain.zone.ZoneChangeStatus.ZoneChangeStatus
|
||||
import vinyldns.core.domain.zone._
|
||||
import vinyldns.core.TestZoneData.okZone
|
||||
import vinyldns.core.TestZoneData.testConnection
|
||||
import vinyldns.mysql.TestMySqlInstance
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
|
@ -24,9 +24,9 @@ import scalikejdbc.DB
|
||||
import vinyldns.core.domain.auth.AuthPrincipal
|
||||
import vinyldns.core.domain.membership.User
|
||||
import vinyldns.core.domain.zone._
|
||||
|
||||
import vinyldns.core.TestZoneData.okZone
|
||||
import vinyldns.core.TestMembershipData.{dummyAuth, dummyUser, oneUserDummyGroup, okUser, okGroup}
|
||||
import vinyldns.core.TestMembershipData.{dummyAuth, dummyUser, okGroup, okUser, oneUserDummyGroup}
|
||||
import vinyldns.mysql.TestMySqlInstance
|
||||
|
||||
class MySqlZoneRepositoryIntegrationSpec
|
||||
extends WordSpec
|
||||
|
@ -16,7 +16,8 @@
|
||||
|
||||
package vinyldns.mysql.repository
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.effect._
|
||||
import cats.syntax.all._
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import javax.sql.DataSource
|
||||
import org.flywaydb.core.Flyway
|
||||
@ -25,7 +26,7 @@ import pureconfig.module.catseffect.loadConfigF
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scalikejdbc.config.DBs
|
||||
import scalikejdbc.{ConnectionPool, DataSourceConnectionPool}
|
||||
import scalikejdbc.{ConnectionPool, DataSourceCloser, DataSourceConnectionPool}
|
||||
import vinyldns.core.crypto.CryptoAlgebra
|
||||
import vinyldns.core.repository._
|
||||
|
||||
@ -77,6 +78,7 @@ class MySqlDataStoreProvider extends DataStoreProvider {
|
||||
// will be created and maintained even though this datasource is no longer needed post-migration
|
||||
ds.setMaximumPoolSize(3)
|
||||
ds.setMinimumIdle(0)
|
||||
ds.setPoolName("flywayPool")
|
||||
ds
|
||||
}
|
||||
|
||||
@ -100,7 +102,7 @@ class MySqlDataStoreProvider extends DataStoreProvider {
|
||||
}
|
||||
|
||||
def setupDBConnection(settings: MySqlDataStoreSettings): IO[Unit] = IO {
|
||||
val dataSource: DataSource = {
|
||||
val dataSource: HikariDataSource = {
|
||||
val ds = new HikariDataSource()
|
||||
ds.setDriverClassName(settings.driver)
|
||||
ds.setJdbcUrl(settings.url)
|
||||
@ -110,13 +112,15 @@ class MySqlDataStoreProvider extends DataStoreProvider {
|
||||
ds.setMaximumPoolSize(settings.poolMaxSize)
|
||||
ds.setMaxLifetime(settings.maxLifeTime)
|
||||
ds.setRegisterMbeans(true)
|
||||
ds.setPoolName("mysqldbPool")
|
||||
ds
|
||||
}
|
||||
|
||||
logger.info("configuring connection pool")
|
||||
|
||||
// Configure the connection pool
|
||||
ConnectionPool.singleton(new DataSourceConnectionPool(dataSource))
|
||||
ConnectionPool.singleton(
|
||||
new DataSourceConnectionPool(dataSource, closer = new HikariCloser(dataSource)))
|
||||
|
||||
logger.info("setting up databases")
|
||||
|
||||
@ -126,4 +130,11 @@ class MySqlDataStoreProvider extends DataStoreProvider {
|
||||
logger.info("database init complete")
|
||||
}
|
||||
|
||||
def shutdown(): IO[Unit] =
|
||||
IO(DBs.closeAll())
|
||||
.handleError(e => logger.error(s"exception occurred while shutting down", e))
|
||||
|
||||
class HikariCloser(dataSource: HikariDataSource) extends DataSourceCloser {
|
||||
override def close(): Unit = dataSource.close()
|
||||
}
|
||||
}
|
||||
|
@ -86,5 +86,13 @@ class MySqlDataStoreProviderSpec extends WordSpec with Matchers {
|
||||
.load(badSettings, crypto)
|
||||
.unsafeRunSync()
|
||||
}
|
||||
|
||||
"Return unit upon Shutdown" in {
|
||||
val response: Unit = underTest
|
||||
.shutdown()
|
||||
.unsafeRunSync()
|
||||
|
||||
response shouldBe (())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,9 +49,10 @@ class VinylDNSModule(environment: Environment, configuration: Configuration)
|
||||
cryptoConf <- settings.cryptoConfig
|
||||
crypto <- CryptoAlgebra.load(cryptoConf)
|
||||
repoConfigs <- settings.dataStoreConfigs
|
||||
repositories <- DataStoreLoader
|
||||
loaderResponse <- DataStoreLoader
|
||||
.loadAll[PortalDataAccessor](repoConfigs, crypto, PortalDataAccessorProvider)
|
||||
} yield {
|
||||
val repositories = loaderResponse.accessor
|
||||
bind(classOf[CryptoAlgebra]).toInstance(crypto)
|
||||
bind(classOf[Authenticator]).toInstance(authenticator())
|
||||
bind(classOf[UserRepository]).toInstance(repositories.userRepository)
|
||||
|
Loading…
x
Reference in New Issue
Block a user