2
0
mirror of https://github.com/VinylDNS/vinyldns synced 2025-08-22 02:02:14 +00:00

Add trait and make changes

This commit is contained in:
Aravindh-Raju 2022-01-19 14:08:55 +05:30
parent 5a96580168
commit 74788a9a29
10 changed files with 267 additions and 543 deletions

View File

@ -31,7 +31,7 @@ import vinyldns.api.config.VinylDNSConfig
import vinyldns.api.domain.access.AccessValidations
import vinyldns.api.domain.zone._
import vinyldns.api.engine.TestMessageQueue
import vinyldns.api.engine.ZoneSyncHandler.executeWithinTransaction
import vinyldns.mysql.TransactionProvider
import vinyldns.core.TestMembershipData._
import vinyldns.core.TestZoneData.testConnection
import vinyldns.core.domain.{Fqdn, HighValueDomainError}
@ -50,7 +50,8 @@ class RecordSetServiceIntegrationSpec
with Matchers
with MySqlApiIntegrationSpec
with BeforeAndAfterEach
with BeforeAndAfterAll {
with BeforeAndAfterAll
with TransactionProvider {
private val vinyldnsConfig = VinylDNSConfig.load().unsafeRunSync()
@ -274,17 +275,8 @@ class RecordSetServiceIntegrationSpec
zoneRecords.map(makeAddChange(_, zone))
)
executeWithinTransaction { db: DB =>
recordSetRepo.apply(db, changes).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
recordSetRepo.apply(db, changes)
}.unsafeRunSync()
testRecordSetService = new RecordSetService(
zoneRepo,

View File

@ -30,7 +30,7 @@ import scalikejdbc.DB
import vinyldns.api.domain.access.AccessValidations
import vinyldns.api.domain.record.RecordSetChangeGenerator
import vinyldns.api.engine.TestMessageQueue
import vinyldns.api.engine.ZoneSyncHandler.executeWithinTransaction
import vinyldns.mysql.TransactionProvider
import vinyldns.api.{MySqlApiIntegrationSpec, ResultHelpers}
import vinyldns.core.TestMembershipData.{okAuth, okUser}
import vinyldns.core.TestZoneData.okZone
@ -54,7 +54,8 @@ class ZoneServiceIntegrationSpec
with ResultHelpers
with MySqlApiIntegrationSpec
with BeforeAndAfterAll
with BeforeAndAfterEach {
with BeforeAndAfterEach
with TransactionProvider {
private val timeout = PatienceConfiguration.Timeout(Span(10, Seconds))
@ -107,30 +108,11 @@ class ZoneServiceIntegrationSpec
waitForSuccess(zoneRepo.save(okZone))
// Seeding records in DB
executeWithinTransaction { db: DB =>
waitForSuccess(recordSetRepo.apply(db, changeSetSOA).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) => ok
})
waitForSuccess(recordSetRepo.apply(db, changeSetNS).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) => ok
})
waitForSuccess(recordSetRepo.apply(db, changeSetA).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
})
IO {
waitForSuccess(recordSetRepo.apply(db, changeSetSOA))
waitForSuccess(recordSetRepo.apply(db, changeSetNS))
waitForSuccess(recordSetRepo.apply(db, changeSetA))
}
}
doReturn(NonEmptyList.one("func-test-backend")).when(mockBackendResolver).ids
@ -167,16 +149,9 @@ class ZoneServiceIntegrationSpec
"accept a DeleteZone" in {
val removeARecord = ChangeSet(RecordSetChangeGenerator.forDelete(testRecordA, okZone))
executeWithinTransaction { db: DB =>
waitForSuccess(recordSetRepo.apply(db, removeARecord).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
})
IO {
waitForSuccess(recordSetRepo.apply(db, removeARecord))
}
}
val result =
testZoneService

View File

@ -27,9 +27,9 @@ import vinyldns.core.domain.backend.{Backend, BackendResponse}
import vinyldns.core.domain.batch.{BatchChangeRepository, SingleChange}
import vinyldns.core.domain.record._
import vinyldns.core.domain.zone.Zone
import vinyldns.api.engine.ZoneSyncHandler.executeWithinTransaction
import vinyldns.mysql.TransactionProvider
object RecordSetChangeHandler {
object RecordSetChangeHandler extends TransactionProvider {
private val logger = LoggerFactory.getLogger("vinyldns.api.engine.RecordSetChangeHandler")
private implicit val cs: ContextShift[IO] =
@ -52,44 +52,39 @@ object RecordSetChangeHandler {
)
}
def process(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
batchChangeRepository: BatchChangeRepository,
conn: Backend,
recordSetChange: RecordSetChange
)(implicit timer: Timer[IO]): IO[RecordSetChange] =
executeWithinTransaction { db: DB => {
for {
wildCardExists <- wildCardExistsForRecord(recordSetChange.recordSet, recordSetRepository)
)(implicit timer: Timer[IO]): IO[RecordSetChange] =
for {
wildCardExists <- wildCardExistsForRecord(recordSetChange.recordSet, recordSetRepository)
completedState <- fsm(
Pending(recordSetChange),
conn,
wildCardExists,
recordSetRepository,
recordChangeRepository
)
changeSet = ChangeSet(completedState.change).complete(completedState.change)
_ <- saveChangeSet(recordSetRepository, recordChangeRepository, changeSet)
singleBatchChanges <- batchChangeRepository.getSingleChanges(
recordSetChange.singleBatchChangeIds
)
singleChangeStatusUpdates = updateBatchStatuses(singleBatchChanges, completedState.change)
_ <- batchChangeRepository.updateSingleChanges(singleChangeStatusUpdates)
} yield completedState.change
completedState <- fsm(
Pending(recordSetChange),
conn,
wildCardExists,
recordSetRepository,
recordChangeRepository
)
changeSet = ChangeSet(completedState.change).complete(completedState.change)
_ <- recordSetRepository.apply(db, changeSet)
_ <- recordChangeRepository.save(db, changeSet)
singleBatchChanges <- batchChangeRepository.getSingleChanges(
recordSetChange.singleBatchChangeIds
)
singleChangeStatusUpdates = updateBatchStatuses(singleBatchChanges, completedState.change)
_ <- batchChangeRepository.updateSingleChanges(singleChangeStatusUpdates)
} yield completedState.change
}.attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
def saveChangeSet(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
changeSet: ChangeSet
) =
executeWithinTransaction { db: DB =>
recordSetRepository.apply(db, changeSet)
recordChangeRepository.save(db, changeSet)
}
def updateBatchStatuses(
@ -302,22 +297,9 @@ object RecordSetChangeHandler {
recordSetToSync
.map { rsc =>
val changeSet = ChangeSet(rsc)
executeWithinTransaction { db: DB => {
for {
_ <- recordChangeRepository.save(db, changeSet)
_ <- recordSetRepository.apply(db, changeSet)
} yield ()
}.attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
}
for {
_ <- saveChangeSet(recordSetRepository, recordChangeRepository, changeSet)
} yield ()
}
.getOrElse(IO.unit)
}
@ -419,4 +401,4 @@ object RecordSetChangeHandler {
recordSetRepository.getRecordSets(recordSet.zoneId, "*", RecordType.CNAME)
).parMapN(_ ++ _)
.map(_.nonEmpty)
}
}

View File

@ -20,16 +20,16 @@ import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import org.joda.time.DateTime
import org.slf4j.{Logger, LoggerFactory}
import scalikejdbc._
import scalikejdbc.DB
import vinyldns.api.backend.dns.DnsConversions
import vinyldns.api.domain.zone.{DnsZoneViewLoader, VinylDNSZoneViewLoader}
import vinyldns.core.domain.backend.BackendResolver
import vinyldns.core.domain.record._
import vinyldns.core.domain.zone.{Zone, ZoneStatus}
import vinyldns.core.domain.zone._
import vinyldns.core.route.Monitored
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeRepository, ZoneChangeStatus, ZoneRepository}
import vinyldns.mysql.TransactionProvider
object ZoneSyncHandler extends DnsConversions with Monitored {
object ZoneSyncHandler extends DnsConversions with Monitored with TransactionProvider {
private implicit val logger: Logger = LoggerFactory.getLogger("vinyldns.engine.ZoneSyncHandler")
private implicit val cs: ContextShift[IO] =
@ -78,18 +78,6 @@ object ZoneSyncHandler extends DnsConversions with Monitored {
zoneChangeRepository.save(zoneChange)
}
def executeWithinTransaction[A](execution: DB => A): A = {
val db=DB(ConnectionPool.borrow())
db.beginIfNotYet() //Begin the transaction
db.autoClose(false) //Keep the connection open
try {
execution(db)
} catch {
case error: Throwable =>
throw error
}
}
def runSync(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
@ -150,41 +138,29 @@ object ZoneSyncHandler extends DnsConversions with Monitored {
s"changeCount=${changesWithUserIds.size}; zoneChange='${zoneChange.id}'"
)
val changeSet = ChangeSet(changesWithUserIds).copy(status = ChangeSetStatus.Applied)
// we want to make sure we write to both the change repo and record set repo
// at the same time as this can take a while
executeWithinTransaction { db: DB =>
{
val saveRecordChanges = time(s"zone.sync.saveChanges; zoneName='${zone.name}'")(
recordChangeRepository.save(db, changeSet)
)
val saveRecordSets = time(s"zone.sync.saveRecordSets; zoneName='${zone.name}'")(
recordSetRepository.apply(db,changeSet)
)
for {
_ <- saveRecordChanges
_ <- saveRecordSets
} yield {
zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now)),
status = ZoneChangeStatus.Synced
)
}
}.attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
// we want to make sure we write to both the change repo and record set repo
// at the same time as this can take a while
val saveRecordChanges = time(s"zone.sync.saveChanges; zoneName='${zone.name}'")(
recordChangeRepository.save(db, changeSet)
)
val saveRecordSets = time(s"zone.sync.saveRecordSets; zoneName='${zone.name}'")(
recordSetRepository.apply(db, changeSet)
)
// join together the results of saving both the record changes as well as the record sets
for {
_ <- saveRecordChanges
_ <- saveRecordSets
} yield zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now)),
status = ZoneChangeStatus.Synced
)
}
}
}
}
}.attempt
.map {
}.attempt.map {
case Left(e: Throwable) =>
logger.error(
s"Encountered error syncing ; zoneName='${zoneChange.zone.name}'; zoneChange='${zoneChange.id}'",
@ -197,4 +173,5 @@ object ZoneSyncHandler extends DnsConversions with Monitored {
)
case Right(ok) => ok
}
}
}

View File

@ -38,6 +38,7 @@ import scala.concurrent.ExecutionContext
import cats.effect.ContextShift
import scalikejdbc.{ConnectionPool, DB}
import vinyldns.core.domain.backend.{Backend, BackendResponse}
import vinyldns.mysql.TransactionProvider
class RecordSetChangeHandlerSpec
extends AnyWordSpec
@ -45,7 +46,8 @@ class RecordSetChangeHandlerSpec
with MockitoSugar
with BeforeAndAfterEach
with CatsHelpers
with EitherValues {
with EitherValues
with TransactionProvider {
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private val mockBackend = mock[Backend]
@ -125,8 +127,8 @@ class RecordSetChangeHandlerSpec
doReturn(IO.pure(List(rs)))
.when(mockBackend)
.resolve(rs.name, rsChange.zone.name, rs.typ)
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[DB], any[ChangeSet])
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[DB], any[ChangeSet])
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[DB],any[ChangeSet])
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[DB],any[ChangeSet])
doReturn(IO.pure(List(rs))).when(mockRsRepo).getRecordSetsByName(cs.zoneId, rs.name)
val test = underTest.apply(mockBackend, rsChange)
@ -256,7 +258,7 @@ class RecordSetChangeHandlerSpec
doReturn(IO.pure(BackendResponse.NoError("test"))).when(mockBackend).applyChange(rsChange)
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[DB], any[ChangeSet])
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[DB],any[ChangeSet])
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[DB], any[ChangeSet])
doReturn(IO.pure(List.empty)).when(mockRsRepo).getRecordSetsByName(cs.zoneId, rs.name)
val test = underTest.apply(mockBackend, rsChange)
@ -857,4 +859,4 @@ class RecordSetChangeHandlerSpec
processorStatus shouldBe a[ReadyToApply]
}
}
}
}

View File

@ -38,14 +38,16 @@ import vinyldns.core.domain.zone.ZoneRepository.DuplicateZoneError
import vinyldns.core.domain.zone._
import cats.syntax.all._
import org.slf4j.{Logger, LoggerFactory}
import vinyldns.api.engine.ZoneSyncHandler.{executeWithinTransaction, monitor, time}
import vinyldns.api.engine.ZoneSyncHandler.{monitor, time}
import vinyldns.mysql.TransactionProvider
class ZoneSyncHandlerSpec
extends AnyWordSpec
with Matchers
with MockitoSugar
with BeforeAndAfterEach
with VinylDNSTestHelpers {
with VinylDNSTestHelpers
with TransactionProvider {
private implicit val logger: Logger = LoggerFactory.getLogger("vinyldns.engine.ZoneSyncHandler")
private implicit val cs: ContextShift[IO] =
@ -53,14 +55,14 @@ class ZoneSyncHandlerSpec
// Copy of runSync to verify the working of transaction and rollback while exception occurs
def testRunSyncFunc(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
zoneChange: ZoneChange,
backendResolver: BackendResolver,
maxZoneSize: Int,
vinyldnsLoader: (Zone, RecordSetRepository) => VinylDNSZoneViewLoader =
VinylDNSZoneViewLoader.apply
): IO[ZoneChange] =
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
zoneChange: ZoneChange,
backendResolver: BackendResolver,
maxZoneSize: Int,
vinyldnsLoader: (Zone, RecordSetRepository) => VinylDNSZoneViewLoader =
VinylDNSZoneViewLoader.apply
): IO[ZoneChange] =
monitor("zone.sync") {
time(s"zone.sync; zoneName='${zoneChange.zone.name}'") {
val zone = zoneChange.zone
@ -112,41 +114,29 @@ class ZoneSyncHandlerSpec
s"changeCount=${changesWithUserIds.size}; zoneChange='${zoneChange.id}'"
)
val changeSet = ChangeSet(changesWithUserIds).copy(status = ChangeSetStatus.Applied)
// we want to make sure we write to both the change repo and record set repo
// at the same time as this can take a while
executeWithinTransaction { db: DB =>
{
val saveRecordChanges = time(s"zone.sync.saveChanges; zoneName='${zone.name}'")(
recordChangeRepository.save(db, changeSet)
)
val saveRecordSets = time(s"zone.sync.saveRecordSets; zoneName='${zone.name}'")(
recordSetRepository.apply(db,changeSet)
)
for {
_ <- saveRecordChanges
_ <- saveRecordSets
} yield {
zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now)),
status = ZoneChangeStatus.Synced
)
}
}.attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw new Exception("Changes Rolled back. " + e.getMessage)
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
// we want to make sure we write to both the change repo and record set repo
// at the same time as this can take a while
val saveRecordChanges = time(s"zone.sync.saveChanges; zoneName='${zone.name}'")(
recordChangeRepository.save(db, changeSet)
)
val saveRecordSets = time(s"zone.sync.saveRecordSets; zoneName='${zone.name}'")(
recordSetRepository.apply(db, changeSet)
)
// join together the results of saving both the record changes as well as the record sets
for {
_ <- saveRecordChanges
_ <- saveRecordSets
} yield zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now)),
status = ZoneChangeStatus.Synced
)
}
}
}
}
}.attempt
.map {
}.attempt.map {
case Left(e: Throwable) =>
logger.error(
s"Encountered error syncing ; zoneName='${zoneChange.zone.name}'; zoneChange='${zoneChange.id}'",
@ -156,10 +146,11 @@ class ZoneSyncHandlerSpec
zoneChange.copy(
zone = zoneChange.zone.copy(status = ZoneStatus.Active),
status = ZoneChangeStatus.Failed,
systemMessage = Some(e.getMessage)
systemMessage = Some("Changes Rolled back. " + e.getMessage)
)
case Right(ok) => ok
}
}
private val mockBackend = mock[Backend]

View File

@ -26,13 +26,14 @@ import scalikejdbc._
import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetChange, RecordSetChangeType}
import vinyldns.core.domain.zone.Zone
import vinyldns.mysql.TestMySqlInstance
import vinyldns.mysql.TransactionProvider
class MySqlRecordChangeRepositoryIntegrationSpec
extends AnyWordSpec
with Matchers
with BeforeAndAfterAll
with BeforeAndAfterEach
with EitherMatchers {
with EitherMatchers
with TransactionProvider {
import vinyldns.core.TestRecordSetData._
import vinyldns.core.TestZoneData._
@ -60,85 +61,39 @@ class MySqlRecordChangeRepositoryIntegrationSpec
newRecordSets.map(makeTestAddChange(_, zone)).toList
}
def executeWithinTransaction[A](execution: DB => A): A = {
val db=DB(ConnectionPool.borrow())
db.beginIfNotYet() //Begin the transaction
db.autoClose(false) //Keep the connection open
try {
execution(db)
} catch {
case error: Throwable =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw error
}
}
"saving record changes" should {
"save a batch of inserts" in {
val inserts = generateInserts(okZone, 1000)
executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(inserts)).attempt.map{
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
Right(ok)
}.unsafeRunSync() shouldBe right
val saveRecChange = executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(inserts))
}
saveRecChange.attempt.unsafeRunSync() shouldBe right
repo.getRecordSetChange(okZone.id, inserts(0).id).unsafeRunSync() shouldBe inserts.headOption
}
"saves record updates" in {
val updates = generateInserts(okZone, 1).map(_.copy(changeType = RecordSetChangeType.Update))
executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(updates)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
Right(ok)
}.unsafeRunSync() shouldBe right
val saveRecChange = executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(updates))
}
saveRecChange.attempt.unsafeRunSync() shouldBe right
repo.getRecordSetChange(okZone.id, updates(0).id).unsafeRunSync() shouldBe updates.headOption
}
"saves record deletes" in {
val deletes = generateInserts(okZone, 1).map(_.copy(changeType = RecordSetChangeType.Delete))
executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(deletes)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
Right(ok)
}.unsafeRunSync() shouldBe right
val saveRecChange = executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(deletes))
}
saveRecChange.attempt.unsafeRunSync() shouldBe right
repo.getRecordSetChange(okZone.id, deletes(0).id).unsafeRunSync() shouldBe deletes.headOption
}
}
"list record changes" should {
"return successfully without start from" in {
val inserts = generateInserts(okZone, 10)
executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(inserts)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
Right(ok)
}.unsafeRunSync() shouldBe right
val saveRecChange = executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(inserts))
}
saveRecChange.attempt.unsafeRunSync() shouldBe right
val result = repo.listRecordSetChanges(okZone.id, None, 5).unsafeRunSync()
result.nextId shouldBe defined
result.maxItems shouldBe 5
@ -154,18 +109,10 @@ class MySqlRecordChangeRepositoryIntegrationSpec
// expect to be sorted by created descending so reverse that
val expectedOrder = timeSpaced.sortBy(_.created.getMillis).reverse
executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(timeSpaced)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
Right(ok)
}.unsafeRunSync() shouldBe right
val saveRecChange = executeWithinTransaction { db: DB =>
repo.save(db, ChangeSet(timeSpaced))
}
saveRecChange.attempt.unsafeRunSync() shouldBe right
val page1 = repo.listRecordSetChanges(okZone.id, None, 2).unsafeRunSync()
page1.nextId shouldBe Some(expectedOrder(1).created.getMillis.toString)
page1.maxItems shouldBe 2

View File

@ -21,19 +21,21 @@ import org.joda.time.DateTime
import org.scalatest._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scalikejdbc.{ConnectionPool, DB}
import scalikejdbc.DB
import vinyldns.core.domain.record._
import vinyldns.core.domain.record.RecordType._
import vinyldns.core.domain.zone.Zone
import vinyldns.mysql.TestMySqlInstance
import vinyldns.mysql.repository.MySqlRecordSetRepository.PagingKey
import vinyldns.mysql.TransactionProvider
class MySqlRecordSetRepositoryIntegrationSpec
extends AnyWordSpec
with BeforeAndAfterEach
with BeforeAndAfterAll
with Matchers
with EitherMatchers {
with EitherMatchers
with TransactionProvider {
import vinyldns.core.TestRecordSetData._
import vinyldns.core.TestZoneData._
@ -61,50 +63,20 @@ class MySqlRecordSetRepositoryIntegrationSpec
newRecordSets.map(makeTestAddChange(_, zone)).toList
}
def executeWithinTransaction[A](execution: DB => A): A = {
val db=DB(ConnectionPool.borrow())
db.beginIfNotYet() //Begin the transaction
db.autoClose(false) //Keep the connection open
try {
execution(db)
} catch {
case error: Throwable =>
throw error
}
}
def insert(zone: Zone, count: Int, word: String = "insert"): List[RecordSetChange] = {
val pendingChanges = generateInserts(zone, count, word)
val bigPendingChangeSet = ChangeSet(pendingChanges)
executeWithinTransaction { db: DB =>
repo.apply(db, bigPendingChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
repo.apply(db, bigPendingChangeSet)
}.unsafeRunSync()
pendingChanges
}
def insert(changes: List[RecordSetChange]): Unit = {
val bigPendingChangeSet = ChangeSet(changes)
executeWithinTransaction { db: DB =>
repo.apply(db, bigPendingChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
repo.apply(db, bigPendingChangeSet)
}.unsafeRunSync()
()
}
@ -123,16 +95,7 @@ class MySqlRecordSetRepositoryIntegrationSpec
val deleteChange = makePendingTestDeleteChange(existing(1))
.copy(status = RecordSetChangeStatus.Failed)
executeWithinTransaction { db: DB =>
repo.apply(db, ChangeSet(Seq(addChange, updateChange, deleteChange))).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
repo.apply(db, ChangeSet(Seq(addChange, updateChange, deleteChange)))
}
repo.getRecordSet(rsOk.id).unsafeRunSync() shouldBe None
repo.getRecordSet(existing.head.id).unsafeRunSync() shouldBe Some(
@ -190,34 +153,16 @@ class MySqlRecordSetRepositoryIntegrationSpec
status = RecordSetChangeStatus.Pending
)
executeWithinTransaction { db: DB =>
repo.apply(db, ChangeSet(existingPending)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
repo.apply(db, ChangeSet(existingPending))
}.attempt.unsafeRunSync()
repo.getRecordSet(failedChange.recordSet.id).unsafeRunSync() shouldBe
Some(
existingPending.recordSet
.copy(fqdn = Some(s"""${failedChange.recordSet.name}.${okZone.name}"""))
)
}
executeWithinTransaction { db: DB =>
repo.apply(db, ChangeSet(Seq(successfulChange, pendingChange, failedChange))).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
repo.apply(db, ChangeSet(Seq(successfulChange, pendingChange, failedChange)))
}.attempt.unsafeRunSync()
// success and pending changes have records saved
repo
@ -264,27 +209,15 @@ class MySqlRecordSetRepositoryIntegrationSpec
_.copy(changeType = RecordSetChangeType.Create, status = RecordSetChangeStatus.Complete)
)
val oldChangeSet = ChangeSet(oldAddChanges)
executeWithinTransaction { db: DB =>
repo.apply(db, oldChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) => ok
}.unsafeRunSync() shouldBe oldChangeSet
// apply updates
repo.apply(db, updateChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync() shouldBe updateChangeSet
val saveRecSets = executeWithinTransaction { db: DB =>
repo.apply(db, oldChangeSet)
}
saveRecSets.unsafeRunSync() shouldBe oldChangeSet
val applyRecSets = executeWithinTransaction { db: DB =>
// apply updates
repo.apply(db, updateChangeSet)
}
applyRecSets.unsafeRunSync() shouldBe updateChangeSet
// ensure that success and pending updates store the new recordsets
repo
@ -334,27 +267,15 @@ class MySqlRecordSetRepositoryIntegrationSpec
_.copy(changeType = RecordSetChangeType.Create, status = RecordSetChangeStatus.Complete)
)
val oldChangeSet = ChangeSet(oldAddChanges)
executeWithinTransaction { db: DB =>
repo.apply(db, oldChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) => ok
}.unsafeRunSync() shouldBe oldChangeSet
// apply deletes
repo.apply(db, deleteChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync() shouldBe deleteChangeSet
val saveRecSets = executeWithinTransaction { db: DB =>
repo.apply(db, oldChangeSet)
}
saveRecSets.unsafeRunSync() shouldBe oldChangeSet
val applyRecSets = executeWithinTransaction { db: DB =>
// apply deletes
repo.apply(db, deleteChangeSet)
}
applyRecSets.unsafeRunSync() shouldBe deleteChangeSet
// ensure that successful change deletes the recordset
repo
.getRecordSet(successfulDelete.recordSet.id)
@ -381,79 +302,32 @@ class MySqlRecordSetRepositoryIntegrationSpec
status = RecordSetChangeStatus.Complete
)
executeWithinTransaction { db: DB =>
val dbCalls = for {
_ <- repo.apply(db, ChangeSet(addChange)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
get <- repo.getRecordSet(testRecord.id)
} yield get
val get = dbCalls.unsafeRunSync()
get shouldBe Some(recordSetWithFQDN(testRecord, okZone))
}
repo.apply(db, ChangeSet(addChange))
}.attempt.unsafeRunSync()
val getRecSets = repo.getRecordSet(testRecord.id).unsafeRunSync()
getRecSets shouldBe Some(recordSetWithFQDN(testRecord, okZone))
executeWithinTransaction { db: DB =>
val anotherDbCall = for{
_ <- repo.apply(db, ChangeSet(deleteChange)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
finalGet <- repo.getRecordSet(testRecord.id)
} yield finalGet
val finalGet = anotherDbCall.unsafeRunSync()
finalGet shouldBe None
}
repo.apply(db, ChangeSet(deleteChange))
}.attempt.unsafeRunSync()
val applyRecSets = repo.getRecordSet(testRecord.id).unsafeRunSync()
applyRecSets shouldBe None
}
"be idempotent for inserts" in {
val pendingChanges = generateInserts(okZone, 1000)
val bigPendingChangeSet = ChangeSet(pendingChanges)
executeWithinTransaction { db: DB =>
repo.apply(db, bigPendingChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) => ok
}.unsafeRunSync()
repo.apply(db, bigPendingChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
Right(ok)
}.unsafeRunSync() shouldBe right
val saveRecSets = executeWithinTransaction { db: DB =>
repo.apply(db, bigPendingChangeSet)
repo.apply(db, bigPendingChangeSet)
}
saveRecSets.attempt.unsafeRunSync() shouldBe right
}
"work for multiple inserts" in {
val pendingChanges = generateInserts(okZone, 20)
val bigPendingChangeSet = ChangeSet(pendingChanges)
executeWithinTransaction { db: DB =>
repo.apply(db, bigPendingChangeSet).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
repo.apply(db, bigPendingChangeSet)
}.attempt.unsafeRunSync()
// let's make sure we have all 1000 records
val recordCount = repo.getRecordSetCount(okZone.id).unsafeRunSync()
recordCount shouldBe 20
@ -479,17 +353,8 @@ class MySqlRecordSetRepositoryIntegrationSpec
// exercise the entire change set
val cs = ChangeSet(deletes ++ updates ++ inserts)
executeWithinTransaction { db: DB =>
repo.apply(db, cs).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
repo.apply(db, cs)
}.attempt.unsafeRunSync()
// make sure the deletes are gone
repo.getRecordSet(deletes(0).recordSet.id).unsafeRunSync() shouldBe None
repo.getRecordSet(deletes(1).recordSet.id).unsafeRunSync() shouldBe None
@ -511,23 +376,10 @@ class MySqlRecordSetRepositoryIntegrationSpec
val addChange = makeTestAddChange(ds.copy(ownerGroupId = Some("someOwner")), okZone)
val testRecord = addChange.recordSet
executeWithinTransaction { db: DB =>
val dbCalls = for {
_ <- repo.apply(db, ChangeSet(addChange)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
get <- repo.getRecordSet(testRecord.id)
} yield get
val get = dbCalls.unsafeRunSync()
get shouldBe Some(recordSetWithFQDN(testRecord, okZone))
}
repo.apply(db, ChangeSet(addChange))
}.attempt.unsafeRunSync()
val saveRecSets = repo.getRecordSet(testRecord.id).unsafeRunSync()
saveRecSets shouldBe Some(recordSetWithFQDN(testRecord, okZone))
}
"works when updating ownerGroupId" in {
@ -539,39 +391,15 @@ class MySqlRecordSetRepositoryIntegrationSpec
testRecord.copy(name = "updated-name", ownerGroupId = Some("someOwner"))
val updateChange = makeCompleteTestUpdateChange(testRecord, updatedRecordSet, okZone)
executeWithinTransaction { db: DB =>
val dbCalls = for {
_ <- repo.apply(db, ChangeSet(addChange)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
get <- repo.getRecordSet(testRecord.id)
} yield get
val get = dbCalls.unsafeRunSync()
get shouldBe Some(recordSetWithFQDN(testRecord, okZone))
}
repo.apply(db, ChangeSet(addChange))
}.attempt.unsafeRunSync()
val dbCall = repo.getRecordSet(testRecord.id).unsafeRunSync()
dbCall shouldBe Some(recordSetWithFQDN(testRecord, okZone))
executeWithinTransaction { db: DB =>
val anotherDbCall = for {
_ <- repo.apply(db, ChangeSet(updateChange)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
finalGet <- repo.getRecordSet(testRecord.id)
} yield finalGet
val finalGet = anotherDbCall.unsafeRunSync()
finalGet.flatMap(_.ownerGroupId) shouldBe Some("someOwner")
}
repo.apply(db, ChangeSet(updateChange))
}.attempt.unsafeRunSync()
val anotherDbCall = repo.getRecordSet(testRecord.id).unsafeRunSync()
anotherDbCall.map(_.ownerGroupId.get) shouldBe Some("someOwner")
//Update the owner-group-id to None to check if its null in the db
val updateChangeNone = makeCompleteTestUpdateChange(
updatedRecordSet,
@ -580,23 +408,10 @@ class MySqlRecordSetRepositoryIntegrationSpec
)
executeWithinTransaction { db: DB =>
val updateToNone = for {
_ <- repo.apply(db, ChangeSet(updateChangeNone)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
finalGet <- repo.getRecordSet(updateChangeNone.id)
} yield finalGet
val finalUpdated = updateToNone.unsafeRunSync()
finalUpdated.flatMap(_.ownerGroupId) shouldBe None
}
repo.apply(db, ChangeSet(updateChangeNone))
}.attempt.unsafeRunSync()
val saveRecSets = repo.getRecordSet(updateChangeNone.id).unsafeRunSync()
saveRecSets.map(_.ownerGroupId) shouldBe None
}
}
"list record sets" should {
@ -867,17 +682,8 @@ class MySqlRecordSetRepositoryIntegrationSpec
val changes = newRecordSets.map(makeTestAddChange(_, okZone))
val expected = changes.map(r => recordSetWithFQDN(r.recordSet, okZone))
executeWithinTransaction { db: DB =>
repo.apply(db, ChangeSet(changes)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}.unsafeRunSync()
}
repo.apply(db, ChangeSet(changes))
}.attempt.unsafeRunSync()
val results = repo.getRecordSetsByName(okZone.id, "foo").unsafeRunSync()
results should contain theSameElementsAs expected
}
@ -955,22 +761,11 @@ class MySqlRecordSetRepositoryIntegrationSpec
val addChange = makeTestAddChange(ds.copy(ownerGroupId = Some("someOwner")), okZone)
val testRecord = addChange.recordSet
executeWithinTransaction { db: DB =>
val dbCalls = for {
_ <- repo.apply(db, ChangeSet(addChange)).attempt.map {
case Left(e: Throwable) =>
db.rollbackIfActive() //Roll back the changes if error occurs
db.close() //Close DB Connection
throw e
case Right(ok) =>
db.commit() //Commit the changes
db.close() //Close DB Connection
ok
}
for {
_ <- repo.apply(db, ChangeSet(addChange))
get <- repo.getRecordSet(testRecord.id)
} yield get
dbCalls.unsafeRunSync()
}
}.attempt.unsafeRunSync()
val result = repo.getFirstOwnedRecordByGroup("someOwner").unsafeRunSync()
result shouldBe Some(testRecord.id)

View File

@ -0,0 +1,60 @@
/*
* Copyright 2018 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vinyldns.mysql
import cats.effect.IO
import org.slf4j.{Logger, LoggerFactory}
import scalikejdbc.{ConnectionPool, DB}
import java.util.UUID
/**
* Provides access to database transaction helper methods.
*/
trait TransactionProvider {
private val logger: Logger = LoggerFactory.getLogger("vinyldns.mysql.TransactionProvider")
/**
* Synchronously executes the given `execution` function within a database transaction. Handles commit and rollback.
*
* @param execution The function to execute that takes a DB instance as a parameter
* @return The result of the execution
*/
def executeWithinTransaction[A](execution: DB => IO[A]): IO[A] = {
IO {
// Create a correlation ID for the database transaction
val txId = UUID.randomUUID()
val db = DB(ConnectionPool.borrow())
try {
db.autoClose(false)
logger.debug(s"Beginning a database transaction: $txId")
db.beginIfNotYet()
val result = execution(db).unsafeRunSync()
logger.debug(s"Committing database transaction: $txId")
db.commit()
result
} catch {
case e: Throwable =>
logger.error(s"Encountered error executing function within a database transaction ($txId). Rolling back transaction.", e)
db.rollbackIfActive()
throw e
} finally {
db.close()
}
}
}
}

View File

@ -16,7 +16,7 @@ object Dependencies {
lazy val awsV = "1.11.423"
lazy val jaxbV = "2.3.0"
lazy val ip4sV = "1.1.1"
lazy val fs2V = "2.4.4"
lazy val fs2V = "2.4.5"
lazy val ficusV = "1.4.3"
lazy val apiDependencies = Seq(
@ -30,10 +30,10 @@ object Dependencies {
"com.github.ben-manes.caffeine" % "caffeine" % "2.2.7",
"com.github.cb372" %% "scalacache-caffeine" % "0.9.4",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"dnsjava" % "dnsjava" % "2.1.8",
"dnsjava" % "dnsjava" % "3.4.2",
"org.apache.commons" % "commons-lang3" % "3.4",
"org.apache.commons" % "commons-text" % "1.4",
"org.flywaydb" % "flyway-core" % "5.1.4",
"org.flywaydb" % "flyway-core" % "8.0.0",
"org.json4s" %% "json4s-ext" % "3.5.3",
"org.json4s" %% "json4s-jackson" % "3.5.3",
"org.scalikejdbc" %% "scalikejdbc" % scalikejdbcV,
@ -71,17 +71,20 @@ object Dependencies {
"com.sun.xml.bind" % "jaxb-impl" % jaxbV,
"ch.qos.logback" % "logback-classic" % "1.0.7",
"io.dropwizard.metrics" % "metrics-jvm" % "3.2.2",
"co.fs2" %% "fs2-core" % "2.3.0",
"co.fs2" %% "fs2-core" % fs2V,
"javax.xml.bind" % "jaxb-api" % "2.3.0",
"javax.activation" % "activation" % "1.1.1"
"javax.activation" % "activation" % "1.1.1",
"org.scalikejdbc" %% "scalikejdbc" % scalikejdbcV,
"org.scalikejdbc" %% "scalikejdbc-config" % scalikejdbcV
)
lazy val mysqlDependencies = Seq(
"org.flywaydb" % "flyway-core" % "5.1.4",
"org.flywaydb" % "flyway-core" % "8.0.0",
"org.mariadb.jdbc" % "mariadb-java-client" % "2.3.0",
"org.scalikejdbc" %% "scalikejdbc" % scalikejdbcV,
"org.scalikejdbc" %% "scalikejdbc-config" % scalikejdbcV,
"com.zaxxer" % "HikariCP" % "3.2.0"
"com.zaxxer" % "HikariCP" % "3.2.0",
"com.h2database" % "h2" % "1.4.200",
)
lazy val sqsDependencies = Seq(
@ -122,7 +125,7 @@ object Dependencies {
"com.nimbusds" % "oauth2-oidc-sdk" % "6.5",
"com.nimbusds" % "nimbus-jose-jwt" % "7.0",
"co.fs2" %% "fs2-core" % fs2V,
"de.leanovate.play-mockws" %% "play-mockws" % "2.7.1" % "test",
"de.leanovate.play-mockws" %% "play-mockws" % "2.7.1" % "test",
"com.iheart" %% "ficus" % ficusV
)
}