2
0
mirror of https://github.com/VinylDNS/vinyldns synced 2025-08-22 02:02:14 +00:00

Update to latest cats and fs2 (#301)

Needed to add implicit `ContextShift` whenever we use `par` features in the codebase.

Needed to add implicit `Timer` whenever we need to use scheduling or races.
This commit is contained in:
Paul Cleary 2018-10-25 10:15:55 -04:00 committed by GitHub
parent acfb880450
commit 5a02347cfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 187 additions and 127 deletions

View File

@ -16,37 +16,35 @@
package vinyldns.api.engine
import java.util.concurrent.Executors
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import cats.implicits._
import fs2.{Scheduler, Stream}
import fs2.Stream
import org.joda.time.DateTime
import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar
import org.scalatest.time.{Millis, Seconds, Span}
import vinyldns.api.{DynamoDBApiIntegrationSpec, MySqlApiIntegrationSpec, VinylDNSTestData}
import vinyldns.api.domain.record.RecordSetChangeGenerator
import vinyldns.core.domain.record._
import vinyldns.api.domain.zone._
import vinyldns.api.engine.sqs.SqsConnection
import vinyldns.api.repository.ApiDataAccessor
import vinyldns.dynamodb.repository.{
DynamoDBRecordChangeRepository,
DynamoDBRecordSetRepository,
DynamoDBRepositorySettings,
DynamoDBZoneChangeRepository
}
import vinyldns.api.{DynamoDBApiIntegrationSpec, MySqlApiIntegrationSpec, VinylDNSTestData}
import vinyldns.core.domain.membership.{
GroupChangeRepository,
GroupRepository,
MembershipRepository,
UserRepository
}
import vinyldns.core.domain.record._
import vinyldns.core.domain.zone._
import vinyldns.dynamodb.repository.{
DynamoDBRecordChangeRepository,
DynamoDBRecordSetRepository,
DynamoDBRepositorySettings,
DynamoDBZoneChangeRepository
}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class ZoneCommandHandlerIntegrationSpec
extends DynamoDBApiIntegrationSpec
@ -57,8 +55,9 @@ class ZoneCommandHandlerIntegrationSpec
import vinyldns.api.engine.sqs.SqsConverters._
private implicit val sched: Scheduler =
Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2))
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val zoneName = "vinyldns."
private val zoneChangeTable = "zoneChangesTest"
@ -71,12 +70,11 @@ class ZoneCommandHandlerIntegrationSpec
private implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis))
private implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global
private var repositories: ApiDataAccessor = _
private var sqsConn: SqsConnection = _
private var str: Stream[IO, Unit] = _
private val stopSignal = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
private val stopSignal = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
// Items to seed in DB
private val testZone = Zone(

View File

@ -19,7 +19,8 @@ package vinyldns.api
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.{ActorMaterializer, Materializer}
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import fs2.concurrent.SignallingRef
import io.prometheus.client.CollectorRegistry
import io.prometheus.client.dropwizard.DropwizardExports
import io.prometheus.client.hotspot.DefaultExports
@ -46,6 +47,7 @@ object Boot extends App {
private implicit val system: ActorSystem = VinylDNSConfig.system
private implicit val materializer: Materializer = ActorMaterializer()
private implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global
private implicit val cs: ContextShift[IO] = IO.contextShift(ec)
def vinyldnsBanner(): IO[String] = IO {
val stream = getClass.getResourceAsStream("/vinyldns-ascii.txt")
@ -69,13 +71,14 @@ object Boot extends App {
sqsConfig <- IO(VinylDNSConfig.sqsConfig)
sqsConnection <- IO(SqsConnection(sqsConfig))
processingDisabled <- IO(VinylDNSConfig.vinyldnsConfig.getBoolean("processing-disabled"))
processingSignal <- fs2.async.signalOf[IO, Boolean](processingDisabled)
processingSignal <- SignallingRef[IO, Boolean](processingDisabled)
restHost <- IO(VinylDNSConfig.restConfig.getString("host"))
restPort <- IO(VinylDNSConfig.restConfig.getInt("port"))
batchChangeLimit <- IO(VinylDNSConfig.vinyldnsConfig.getInt("batch-change-limit"))
syncDelay <- IO(VinylDNSConfig.vinyldnsConfig.getInt("sync-delay"))
_ <- fs2.async.start(
ProductionZoneCommandHandler.run(sqsConnection, processingSignal, repositories, sqsConfig))
_ <- ProductionZoneCommandHandler
.run(sqsConnection, processingSignal, repositories, sqsConfig)
.start
} yield {
val zoneValidations = new ZoneValidations(syncDelay)
val batchChangeValidations = new BatchChangeValidations(batchChangeLimit, AccessValidations)

View File

@ -20,11 +20,15 @@ import cats.data._
import cats.effect._
import cats.implicits._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext.Implicits.global
object Interfaces {
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
/* Our standard business error type */
type Result[A] = EitherT[IO, Throwable, A]
@ -104,7 +108,6 @@ object Interfaces {
case None => result[A](ifNone)
}
}
}
object Result {

View File

@ -17,7 +17,7 @@
package vinyldns.api
import akka.actor.ActorSystem
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.typesafe.config.{Config, ConfigFactory}
import pureconfig.module.catseffect.loadConfigF
@ -30,6 +30,9 @@ import vinyldns.core.repository.DataStoreConfig
object VinylDNSConfig {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
lazy val config: Config = ConfigFactory.load()
lazy val vinyldnsConfig: Config = config.getConfig("vinyldns")

View File

@ -16,9 +16,9 @@
package vinyldns.api.backend
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import fs2._
import fs2.async.mutable.Signal
import fs2.concurrent.SignallingRef
import org.slf4j.LoggerFactory
import vinyldns.api.domain.dns.DnsConnection
import vinyldns.api.engine.{RecordSetChangeHandler, ZoneChangeHandler, ZoneSyncHandler}
@ -27,12 +27,13 @@ import vinyldns.core.domain.record.{RecordChangeRepository, RecordSetChange, Rec
import vinyldns.core.domain.zone._
import vinyldns.core.queue.{CommandMessage, MessageCount, MessageQueue}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object CommandHandler {
private val logger = LoggerFactory.getLogger("vinyldns.api.backend.CommandHandler")
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
/* The outcome of handling the message */
sealed trait MessageOutcome {
@ -48,8 +49,8 @@ object CommandHandler {
mq: MessageQueue,
count: MessageCount,
pollingInterval: FiniteDuration,
pauseSignal: Signal[IO, Boolean],
defaultConn: ZoneConnection)(implicit scheduler: Scheduler): Stream[IO, Unit] = {
pauseSignal: SignallingRef[IO, Boolean],
defaultConn: ZoneConnection)(implicit timer: Timer[IO]): Stream[IO, Unit] = {
// Polls queue for message batches, connected to the signal which is toggled in the status endpoint
val messageSource = startPolling(mq, count, pollingInterval).pauseWhen(pauseSignal)
@ -66,7 +67,7 @@ object CommandHandler {
// concurrently run 4 message batches, so we can have 40 messages max running concurrently
def flow(): Stream[IO, Unit] =
messageSource
.join(4)
.parJoin(4)
.observe(increaseTimeoutWhenSyncing)
.through(changeRequestProcessor)
.to(updateQueue)
@ -82,13 +83,13 @@ object CommandHandler {
/* Polls Message Queue for messages */
def startPolling(mq: MessageQueue, count: MessageCount, pollingInterval: FiniteDuration)(
implicit scheduler: Scheduler): Stream[IO, Stream[IO, CommandMessage]] = {
implicit timer: Timer[IO]): Stream[IO, Stream[IO, CommandMessage]] = {
def pollingStream(): Stream[IO, Stream[IO, CommandMessage]] =
// every delay duration, we poll
scheduler
Stream
.fixedDelay[IO](pollingInterval)
.evalMap[Chunk[CommandMessage]] { _ =>
.evalMap[IO, Chunk[CommandMessage]] { _ =>
// get the messages from the queue, transform them to a Chunk of messages
mq.receive(count).map(msgs => Chunk(msgs: _*))
}
@ -110,7 +111,7 @@ object CommandHandler {
/* We should only change visibility timeout for zone syncs and creates, which could take minutes */
def changeVisibilityTimeoutWhenSyncing(mq: MessageQueue): Sink[IO, CommandMessage] =
_.evalMap { message =>
_.evalMap[IO, Any] { message =>
message.command match {
case sync: ZoneChange
if sync.changeType == ZoneChangeType.Sync || sync.changeType == ZoneChangeType.Create =>
@ -129,7 +130,7 @@ object CommandHandler {
recordChangeProcessor: (DnsConnection, RecordSetChange) => IO[RecordSetChange],
zoneSyncProcessor: ZoneChange => IO[ZoneChange],
defaultConn: ZoneConnection): Pipe[IO, CommandMessage, MessageOutcome] =
_.evalMap[MessageOutcome] { message =>
_.evalMap[IO, MessageOutcome] { message =>
message.command match {
case sync: ZoneChange
if sync.changeType == ZoneChangeType.Sync || sync.changeType == ZoneChangeType.Create =>
@ -169,7 +170,7 @@ object CommandHandler {
/* On success, delete the message; on failure retry */
def messageSink(mq: MessageQueue): Sink[IO, MessageOutcome] =
_.evalMap[Any] {
_.evalMap[IO, Any] {
case DeleteMessage(msg) =>
mq.remove(msg)
case RetryMessage(msg) =>
@ -181,14 +182,14 @@ object CommandHandler {
def run(
mq: MessageQueue,
msgsPerPoll: MessageCount,
processingSignal: Signal[IO, Boolean],
processingSignal: SignallingRef[IO, Boolean],
pollingInterval: FiniteDuration,
zoneRepo: ZoneRepository,
zoneChangeRepo: ZoneChangeRepository,
recordSetRepo: RecordSetRepository,
recordChangeRepo: RecordChangeRepository,
batchChangeRepo: BatchChangeRepository,
defaultConn: ZoneConnection)(implicit scheduler: Scheduler): IO[Unit] = {
defaultConn: ZoneConnection)(implicit timer: Timer[IO]): IO[Unit] = {
// Handlers for each type of change request
val zoneChangeHandler =
ZoneChangeHandler(zoneRepo, zoneChangeRepo)

View File

@ -16,28 +16,30 @@
package vinyldns.api.engine
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import cats.implicits._
import fs2._
import org.slf4j.LoggerFactory
import vinyldns.api.domain.dns.DnsConnection
import vinyldns.api.domain.dns.DnsProtocol.NoError
import vinyldns.core.domain.record._
import vinyldns.api.domain.record.RecordSetHelpers._
import vinyldns.core.domain.batch.{BatchChangeRepository, SingleChange}
import vinyldns.core.domain.record._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object RecordSetChangeHandler {
private val logger = LoggerFactory.getLogger("vinyldns.api.engine.RecordSetChangeHandler")
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
def apply(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
batchChangeRepository: BatchChangeRepository)(
implicit scheduler: Scheduler): (DnsConnection, RecordSetChange) => IO[RecordSetChange] =
implicit timer: Timer[IO]): (DnsConnection, RecordSetChange) => IO[RecordSetChange] =
(conn, recordSetChange) => {
process(
recordSetRepository,
@ -52,7 +54,7 @@ object RecordSetChangeHandler {
recordChangeRepository: RecordChangeRepository,
batchChangeRepository: BatchChangeRepository,
conn: DnsConnection,
recordSetChange: RecordSetChange)(implicit scheduler: Scheduler): IO[RecordSetChange] =
recordSetChange: RecordSetChange)(implicit timer: Timer[IO]): IO[RecordSetChange] =
for {
wildCardExists <- wildCardExistsForRecord(recordSetChange.recordSet, recordSetRepository)
completedState <- fsm(Pending(recordSetChange), conn, wildCardExists)
@ -144,8 +146,7 @@ object RecordSetChangeHandler {
}
private def fsm(state: ProcessorState, conn: DnsConnection, wildcardExists: Boolean)(
implicit
scheduler: Scheduler): IO[ProcessorState] = {
implicit timer: Timer[IO]): IO[ProcessorState] = {
/**
* If there is a wildcard record with the same type, then we skip validation and verification steps.

View File

@ -16,23 +16,23 @@
package vinyldns.api.engine
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.TimeUnit
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import com.amazonaws.services.sqs.model._
import com.typesafe.config.Config
import fs2._
import fs2.async.mutable.Signal
import fs2.concurrent.SignallingRef
import org.slf4j.LoggerFactory
import vinyldns.api.VinylDNSConfig
import vinyldns.api.domain.dns.DnsConnection
import vinyldns.core.domain.record.RecordSetChange
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType}
import vinyldns.api.engine.sqs.SqsConnection
import vinyldns.api.repository.ApiDataAccessor
import vinyldns.core.domain.record.RecordSetChange
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType}
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object ZoneCommandHandler {
@ -40,6 +40,8 @@ object ZoneCommandHandler {
import vinyldns.api.engine.sqs.SqsConverters._
private val logger = LoggerFactory.getLogger("vinyldns.api.sqs.ZoneCommandHandler")
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
/* The outcome of handling the message */
sealed trait MessageOutcome {
@ -66,7 +68,7 @@ object ZoneCommandHandler {
dataAccessor: ApiDataAccessor,
sqsConnection: SqsConnection,
pollingInterval: FiniteDuration,
pauseSignal: Signal[IO, Boolean])(implicit scheduler: Scheduler): Stream[IO, Unit] = {
pauseSignal: SignallingRef[IO, Boolean])(implicit timer: Timer[IO]): Stream[IO, Unit] = {
// Polls SQS for message batches, connected to the signal which is toggled in the status endpoint
val sqsMessageSource = startPolling(sqsConnection, pollingInterval).pauseWhen(pauseSignal)
@ -95,7 +97,7 @@ object ZoneCommandHandler {
def flow(): Stream[IO, Unit] =
sqsMessageSource
.through(genMessageStreams)
.join(4)
.parJoin(4)
.through(genChangeRequests)
.observe(increaseTimeoutForZoneSyncs)
.through(changeRequestProcessor)
@ -112,12 +114,12 @@ object ZoneCommandHandler {
/* Polls SQS for messages */
def startPolling(sqsConnection: SqsConnection, pollingInterval: FiniteDuration)(
implicit scheduler: Scheduler): Stream[IO, ReceiveMessageResult] = {
implicit timer: Timer[IO]): Stream[IO, ReceiveMessageResult] = {
def pollingStream(): Stream[IO, ReceiveMessageResult] =
scheduler
Stream
.fixedDelay[IO](pollingInterval)
.evalMap[ReceiveMessageResult] { _ =>
.evalMap[IO, ReceiveMessageResult] { _ =>
sqsConnection.receiveMessageBatch(
new ReceiveMessageRequest()
.withMaxNumberOfMessages(10)
@ -135,7 +137,7 @@ object ZoneCommandHandler {
/* We should only change visibility timeout for zone syncs, which could take minutes */
def changeVisibilityTimeoutForZoneSyncs(sqsConnection: SqsConnection): Sink[IO, ChangeRequest] =
_.evalMap[Any] {
_.evalMap[IO, Any] {
case ZoneSyncRequest(sync, msg) =>
logger.info(
s"Updating visibility timeout for zone sync; changeId=${sync.id} messageId=${msg.getMessageId}")
@ -178,7 +180,7 @@ object ZoneCommandHandler {
zoneChangeProcessor: ZoneChange => IO[ZoneChange],
recordChangeProcessor: (DnsConnection, RecordSetChange) => IO[RecordSetChange],
zoneSyncProcessor: ZoneChange => IO[ZoneChange]): Pipe[IO, ChangeRequest, MessageOutcome] =
_.evalMap[MessageOutcome] {
_.evalMap[IO, MessageOutcome] {
case zsr @ ZoneSyncRequest(_, _) =>
val doSync =
for {
@ -219,7 +221,7 @@ object ZoneCommandHandler {
/* On success, delete the message; on failure, do nothing and allow it to retry */
def messageSink(sqsConnection: SqsConnection): Sink[IO, MessageOutcome] =
_.evalMap[Any] {
_.evalMap[IO, Any] {
case DeleteMessage(msg) =>
sqsConnection.deleteMessage(
new DeleteMessageRequest().withReceiptHandle(msg.getReceiptHandle))
@ -237,11 +239,10 @@ object ProductionZoneCommandHandler {
def run(
sqsConnection: SqsConnection,
processingSignal: Signal[IO, Boolean],
processingSignal: SignallingRef[IO, Boolean],
dataAccessor: ApiDataAccessor,
config: Config): IO[Unit] = {
implicit val scheduler: Scheduler =
Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2))
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
for {
pollingInterval <- IO.pure(

View File

@ -16,7 +16,7 @@
package vinyldns.api.engine
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.syntax.all._
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
@ -29,6 +29,8 @@ import vinyldns.core.route.Monitored
object ZoneSyncHandler extends DnsConversions with Monitored {
private implicit val logger = LoggerFactory.getLogger("vinyldns.engine.ZoneSyncHandler")
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
def apply(
recordSetRepository: RecordSetRepository,

View File

@ -16,7 +16,7 @@
package vinyldns.api.repository
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.implicits._
import org.joda.time.DateTime
import vinyldns.core.domain.membership._
@ -24,6 +24,9 @@ import vinyldns.core.domain.membership._
// $COVERAGE-OFF$
object TestDataLoader {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
final val testUser = User(
userName = "testuser",
id = "testuser",

View File

@ -20,7 +20,7 @@ import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives
import akka.util.Timeout
import cats.effect.IO
import fs2.async.mutable.Signal
import fs2.concurrent.SignallingRef
import vinyldns.api.VinylDNSConfig
import scala.concurrent.duration._
@ -43,7 +43,7 @@ trait StatusRoute extends Directives {
implicit val timeout = Timeout(10.seconds)
def processingDisabled: Signal[IO, Boolean]
def processingDisabled: SignallingRef[IO, Boolean]
val statusRoute =
(get & path("status")) {

View File

@ -19,7 +19,6 @@ package vinyldns.api.route
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.server.RequestContext
import cats.effect._
import cats.syntax.all._
import vinyldns.api.crypto.Crypto
import vinyldns.api.domain.auth.AuthPrincipalProvider
import vinyldns.core.crypto.CryptoAlgebra

View File

@ -23,7 +23,7 @@ import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.RouteResult.{Complete, Rejected}
import akka.http.scaladsl.server.directives.LogEntry
import cats.effect.IO
import fs2.async.mutable.Signal
import fs2.concurrent.SignallingRef
import io.prometheus.client.CollectorRegistry
import vinyldns.api.domain.auth.MembershipAuthPrincipalProvider
import vinyldns.api.domain.batch.BatchChangeServiceAlgebra
@ -101,7 +101,7 @@ object VinylDNSService {
// $COVERAGE-OFF$
class VinylDNSService(
val membershipService: MembershipServiceAlgebra,
val processingDisabled: Signal[IO, Boolean],
val processingDisabled: SignallingRef[IO, Boolean],
val zoneService: ZoneServiceAlgebra,
val healthService: HealthService,
val recordSetService: RecordSetServiceAlgebra,

View File

@ -25,9 +25,12 @@ import scala.concurrent.duration._
import org.scalatest.Assertions._
import org.scalatest.matchers.{MatchResult, Matcher}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
trait CatsHelpers {
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
def await[E, T](f: => IO[T], duration: FiniteDuration = 1.second): T = {
val i: IO[Either[E, T]] = f.attempt.map {

View File

@ -23,13 +23,16 @@ import cats.implicits._
import cats.scalatest.ValidatedMatchers
import org.scalatest.{Matchers, PropSpec}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag
final case class TimeoutException(message: String) extends Throwable(message)
trait ResultHelpers {
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
def await[T](f: => IO[_], duration: FiniteDuration = 1.second): T =
awaitResultOf[T](f.map(_.asInstanceOf[T]).attempt, duration).toOption.get

View File

@ -15,28 +15,25 @@
*/
package vinyldns.api.backend
import java.util.concurrent.Executors
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import cats.scalatest.EitherMatchers
import fs2._
import org.mockito
import org.mockito.Matchers._
import org.mockito.{ArgumentCaptor, Mockito}
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, Mockito}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterEach, EitherValues, Matchers, WordSpec}
import vinyldns.api.VinylDNSTestData
import vinyldns.api.backend.CommandHandler.{DeleteMessage, RetryMessage}
import vinyldns.api.domain.dns.DnsConnection
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType, ZoneCommand}
import vinyldns.core.domain.batch.BatchChangeRepository
import vinyldns.core.domain.record.{RecordChangeRepository, RecordSetChange, RecordSetRepository}
import vinyldns.core.domain.zone._
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType, ZoneCommand, _}
import vinyldns.core.queue.{CommandMessage, MessageCount, MessageId, MessageQueue}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
class CommandHandlerSpec
extends WordSpec
@ -53,8 +50,9 @@ class CommandHandlerSpec
}
private val mq = mock[MessageQueue]
implicit val sched: Scheduler =
Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2))
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val messages = for { i <- 0 to 10 } yield
TestCommandMessage(pendingCreateAAAA, i.toString)
private val count = MessageCount(10).right.value
@ -250,7 +248,7 @@ class CommandHandlerSpec
"main flow" should {
"process successfully" in {
val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
val cmd = TestCommandMessage(pendingCreateAAAA, "foo")
// stage pulling from the message queue
@ -288,18 +286,17 @@ class CommandHandlerSpec
verify(mq).remove(cmd)
}
"continue processing on unexpected failure" in {
val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
val cmd = TestCommandMessage(pendingCreateAAAA, "foo")
// stage pulling from the message queue, make sure we always return our command
doReturn(IO.pure(List(cmd)))
// stage pulling from the message queue, return an error then our command
doReturn(IO.raiseError(new RuntimeException("fail")))
.doReturn(IO.pure(List(cmd)))
.when(mq)
.receive(count)
// stage our record change processing failure, and then a success
doReturn(IO.raiseError(new RuntimeException("fail")))
.doReturn(IO.pure(cmd.command))
// stage our record change processing our command
doReturn(IO.pure(cmd.command))
.when(mockRecordChangeProcessor)
.apply(any[DnsConnection], any[RecordSetChange])
@ -326,9 +323,9 @@ class CommandHandlerSpec
// verify our interactions
verify(mq, atLeastOnce()).receive(count)
// verify that our record was attempted two times
verify(mockRecordChangeProcessor, times(2))
.apply(any[DnsConnection], mockito.Matchers.eq(pendingCreateAAAA))
// verify that our message queue was polled twice
verify(mq, times(2))
.receive(count)
verify(mq).remove(cmd)
}
}
@ -336,7 +333,7 @@ class CommandHandlerSpec
"run" should {
"process a zone update change through the flow" in {
// testing the run method, which does nothing more than simplify construction of the main flow
val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
val cmd = TestCommandMessage(zoneUpdate, "foo")
val zoneRepo = mock[ZoneRepository]

View File

@ -16,10 +16,7 @@
package vinyldns.api.engine
import java.util.concurrent.Executors
import cats.effect.IO
import fs2.Scheduler
import cats.effect.{IO, Timer}
import org.joda.time.DateTime
import org.mockito.ArgumentCaptor
import org.mockito.Matchers._
@ -29,12 +26,12 @@ import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
import org.xbill.DNS
import vinyldns.api.domain.dns.DnsConnection
import vinyldns.api.domain.dns.DnsProtocol.{NoError, Refused}
import vinyldns.core.domain.record.RecordType.RecordType
import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetRepository, _}
import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, Failure, ReadyToApply}
import vinyldns.api.repository.InMemoryBatchChangeRepository
import vinyldns.api.{CatsHelpers, Interfaces, VinylDNSTestData}
import vinyldns.core.domain.batch.{BatchChange, SingleAddChange, SingleChangeStatus}
import vinyldns.core.domain.record.RecordType.RecordType
import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetRepository, _}
import scala.concurrent.ExecutionContext
@ -46,6 +43,7 @@ class RecordSetChangeHandlerSpec
with BeforeAndAfterEach
with CatsHelpers {
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private val mockConn = mock[DnsConnection]
private val mockRsRepo = mock[RecordSetRepository]
private val mockChangeRepo = mock[RecordChangeRepository]
@ -90,8 +88,6 @@ class RecordSetChangeHandlerSpec
completeCreateAAAA.copy(singleBatchChangeIds = completeCreateAAAASingleChanges.map(_.id))
private val cs = ChangeSet(rsChange)
implicit val sched: Scheduler =
Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2))
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global
private val underTest = RecordSetChangeHandler(mockRsRepo, mockChangeRepo, batchRepo)

View File

@ -17,11 +17,9 @@
package vinyldns.api.engine
import java.util.Base64
import java.util.concurrent.Executors
import cats.effect.IO
import cats.effect.{ContextShift, IO, Timer}
import com.amazonaws.services.sqs.model._
import fs2.Scheduler
import org.mockito.Matchers.{eq => sameas, _}
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, Mockito}
@ -29,10 +27,10 @@ import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterEach, Matchers, OneInstancePerTest, WordSpec}
import vinyldns.api.VinylDNSTestData
import vinyldns.api.domain.dns.DnsConnection
import vinyldns.core.domain.record.RecordSetChange
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType}
import vinyldns.api.engine.sqs.SqsConnection
import vinyldns.api.engine.sqs.SqsConverters.{SqsRecordSetChangeMessage, SqsZoneChangeMessage}
import vinyldns.core.domain.record.RecordSetChange
import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType}
import vinyldns.core.protobuf.ProtobufConversions
import scala.collection.JavaConverters._
@ -59,9 +57,9 @@ class ZoneCommandHandlerSpec
private def rmrIO = IO.pure(mockRmr)
private implicit val ec: ExecutionContext = ExecutionContext.global
private implicit val sched: Scheduler =
Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2))
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
override def beforeEach(): Unit = {
reset(mockRmr, mockSqs, mockMsg, zcp, rcp, zsp)
@ -79,7 +77,7 @@ class ZoneCommandHandlerSpec
"pollingStream" should {
"poll for ReceiveMessageResults" in {
val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
val test = startPolling(mockSqs, 80.millis).interruptWhen(stop)
test.compile.drain.unsafeToFuture()
@ -97,7 +95,7 @@ class ZoneCommandHandlerSpec
.when(fetchFail)
.receiveMessageBatch(any[ReceiveMessageRequest])
val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
val test = startPolling(fetchFail, 80.millis).interruptWhen(stop)
test.compile.drain.unsafeToFuture()
@ -210,7 +208,7 @@ class ZoneCommandHandlerSpec
doReturn(List(msg1, msg2).asJava).when(mockRmr).getMessages
val test = fs2.Stream.eval(IO.pure(mockRmr)).through(genMessageStreams).join(1)
val test = fs2.Stream.eval(IO.pure(mockRmr)).through(genMessageStreams).parJoin(1)
val result = test.compile.toVector.unsafeRunSync()
result should contain theSameElementsAs List(msg1, msg2)

View File

@ -20,8 +20,8 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.testkit.ScalatestRouteTest
import cats.effect.IO
import fs2.async.mutable.Signal
import cats.effect.{ContextShift, IO}
import fs2.concurrent.SignallingRef
import org.scalatest._
import org.scalatest.mockito.MockitoSugar
@ -38,8 +38,11 @@ class StatusRoutingSpec
def actorRefFactory: ActorSystem = system
val processingDisabled: Signal[IO, Boolean] =
fs2.async.signalOf[IO, Boolean](false).unsafeRunSync()
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
val processingDisabled: SignallingRef[IO, Boolean] =
fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync()
"GET /status" should {
"return the current status of true" in {

View File

@ -17,7 +17,7 @@
package vinyldns.core.repository
import cats.data._
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.implicits._
import vinyldns.core.crypto.CryptoAlgebra
import org.slf4j.LoggerFactory
@ -39,6 +39,7 @@ object DataStoreLoader {
}
private val logger = LoggerFactory.getLogger("DataStoreLoader")
implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global)
def loadAll[A <: DataAccessor](
configs: List[DataStoreConfig],

View File

@ -66,7 +66,7 @@ trait Monitored {
*/
object Monitor {
lazy val monitors: mutable.Map[String, Monitor] = concurrent.TrieMap.empty
lazy val monitors: mutable.Map[String, Monitor] = scala.collection.concurrent.TrieMap.empty
def apply(name: String): Monitor = monitors.getOrElseUpdate(name, new Monitor(name))

View File

@ -17,7 +17,7 @@
package vinyldns.core.repository
import cats.data._
import cats.implicits._
import cats.syntax.validated._
import cats.scalatest.{EitherMatchers, EitherValues, ValidatedMatchers}
import com.typesafe.config.{Config, ConfigFactory}
import org.scalatest.mockito.MockitoSugar

View File

@ -16,7 +16,7 @@
package vinyldns.dynamodb.repository
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest
import com.typesafe.config.{Config, ConfigFactory}
@ -31,6 +31,8 @@ import vinyldns.core.repository.RepositoryName._
class DynamoDBDataStoreProviderIntegrationSpec extends DynamoDBIntegrationSpec {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
val config: Config = ConfigFactory.load()
val dynamoDBConfig: DataStoreConfig =
pureconfig.loadConfigOrThrow[DataStoreConfig](config, "dynamodb")

View File

@ -16,6 +16,7 @@
package vinyldns.dynamodb.repository
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.services.dynamodbv2.model._
import org.joda.time.DateTime
@ -26,6 +27,9 @@ import scala.concurrent.duration._
import scala.util.Random
class DynamoDBGroupChangeRepositoryIntegrationSpec extends DynamoDBIntegrationSpec {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private implicit def dateTimeOrdering: Ordering[DateTime] = Ordering.fromLessThan(_.isAfter(_))
private val GROUP_CHANGES_TABLE = "group-changes-live"

View File

@ -16,6 +16,7 @@
package vinyldns.dynamodb.repository
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.services.dynamodbv2.model._
import vinyldns.core.domain.membership.{Group, GroupStatus}
@ -24,6 +25,10 @@ import vinyldns.core.TestMembershipData._
import scala.concurrent.duration._
class DynamoDBGroupRepositoryIntegrationSpec extends DynamoDBIntegrationSpec {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val GROUP_TABLE = "groups-live"
private val tableConfig = DynamoDBRepositorySettings(s"$GROUP_TABLE", 30, 30)

View File

@ -16,11 +16,15 @@
package vinyldns.dynamodb.repository
import cats.effect.{ContextShift, IO}
import cats.implicits._
import scala.concurrent.duration._
class DynamoDBMembershipRepositoryIntegrationSpec extends DynamoDBIntegrationSpec {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val membershipTable = "membership-live"
private val tableConfig = DynamoDBRepositorySettings(s"$membershipTable", 30, 30)

View File

@ -18,6 +18,7 @@ package vinyldns.dynamodb.repository
import java.util.UUID
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.services.dynamodbv2.model._
import org.joda.time.DateTime
@ -33,6 +34,8 @@ class DynamoDBRecordSetRepositoryIntegrationSpec
extends DynamoDBIntegrationSpec
with DynamoDBRecordSetConversions {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val recordSetTable = "record-sets-live"
private[repository] val recordSetTableName: String = recordSetTable

View File

@ -16,15 +16,19 @@
package vinyldns.dynamodb.repository
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest
import com.typesafe.config.ConfigFactory
import vinyldns.core.crypto.NoOpCrypto
import vinyldns.core.domain.membership.{User, LockStatus}
import vinyldns.core.domain.membership.{LockStatus, User}
import scala.concurrent.duration._
class DynamoDBUserRepositoryIntegrationSpec extends DynamoDBIntegrationSpec {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val userTable = "users-live"
private val tableConfig = DynamoDBRepositorySettings(s"$userTable", 30, 30)

View File

@ -16,6 +16,7 @@
package vinyldns.dynamodb.repository
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.services.dynamodbv2.model._
import org.joda.time.DateTime
@ -29,6 +30,9 @@ import scala.util.Random
class DynamoDBZoneChangeRepositoryIntegrationSpec extends DynamoDBIntegrationSpec {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val zoneChangeTable = "zone-changes-live"
private val tableConfig = DynamoDBRepositorySettings(s"$zoneChangeTable", 30, 30)

View File

@ -17,7 +17,7 @@
package vinyldns.dynamodb.repository
import cats.implicits._
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import org.slf4j.LoggerFactory
import vinyldns.core.repository._
import pureconfig.module.catseffect.loadConfigF
@ -33,6 +33,8 @@ class DynamoDBDataStoreProvider extends DataStoreProvider {
private val logger = LoggerFactory.getLogger("DynamoDBDataStoreProvider")
private val implementedRepositories =
Set(user, group, membership, groupChange, recordSet, recordChange, zoneChange, userChange)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
def load(config: DataStoreConfig, crypto: CryptoAlgebra): IO[DataStore] =
for {

View File

@ -28,7 +28,7 @@ import org.slf4j.Logger
import vinyldns.core.VinylDNSMetrics
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis)
@ -48,6 +48,7 @@ class DynamoDBHelper(dynamoDB: AmazonDynamoDBClient, log: Logger) {
private[repository] val retryCount: Int = 10
private val retryBackoff: FiniteDuration = 1.millis
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private[repository] val provisionedThroughputMeter =
VinylDNSMetrics.metricsRegistry.meter("dynamo.provisionedThroughput")

View File

@ -48,6 +48,8 @@ object DynamoDBUserRepository {
private[repository] val USER_NAME_INDEX_NAME = "username_index"
private[repository] val ACCESS_KEY_INDEX_NAME = "access_key_index"
private val log: Logger = LoggerFactory.getLogger(classOf[DynamoDBUserRepository])
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
def apply(
config: DynamoDBRepositorySettings,

View File

@ -51,6 +51,9 @@ class MySqlMessageQueueIntegrationSpec extends WordSpec with Matchers
import vinyldns.core.TestRecordSetData._
import vinyldns.core.TestZoneData._
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private val underTest = new MySqlMessageQueue()
private val rsChange: RecordSetChange = pendingCreateAAAA

View File

@ -18,6 +18,7 @@ package vinyldns.mysql.repository
import java.util.UUID
import cats.effect.{ContextShift, IO}
import cats.implicits._
import org.joda.time.DateTime
import org.scalatest._
@ -40,6 +41,8 @@ class MySqlZoneChangeRepositoryIntegrationSpec
with Inspectors
with OptionValues {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
private var repo: ZoneChangeRepository = _
object TestData {

View File

@ -37,6 +37,8 @@ class MySqlZoneRepository extends ZoneRepository with ProtobufConversions with M
private final val INITIAL_RETRY_DELAY = 1.millis
final val MAX_RETRIES = 10
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
/**
* use INSERT INTO ON DUPLICATE KEY UPDATE for the zone, which will update the values if the zone already exists

View File

@ -18,7 +18,7 @@ package controllers
import java.net.URI
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.typesafe.config.{Config, ConfigFactory}
import play.api.{ConfigLoader, Configuration}
@ -27,8 +27,12 @@ import vinyldns.core.repository.DataStoreConfig
import scala.collection.JavaConverters._
// $COVERAGE-OFF$
class Settings(private val config: Configuration) {
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
val ldapUser: String = config.get[String]("LDAP.user")
val ldapPwd: String = config.get[String]("LDAP.password")
val ldapDomain: String = config.get[String]("LDAP.domain")
@ -59,5 +63,5 @@ class Settings(private val config: Configuration) {
}
}
}
// $COVERAGE-ON$
object Settings extends Settings(Configuration(ConfigFactory.load()))

View File

@ -20,7 +20,7 @@ import java.util.Base64
import java.util.concurrent.TimeUnit.SECONDS
import cats.data._
import cats.effect.IO
import cats.effect.{ContextShift, IO}
import cats.implicits._
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
@ -47,6 +47,8 @@ class SqsMessageQueue(val queueUrl: String, val client: AmazonSQSAsync)
with Monitored {
import SqsMessageQueue._
private implicit val cs: ContextShift[IO] =
IO.contextShift(scala.concurrent.ExecutionContext.global)
// Helper for handling SQS requests and responses
def sqsAsync[A <: AmazonWebServiceRequest, B <: AmazonWebServiceResult[_]](

View File

@ -7,7 +7,7 @@ object Dependencies {
lazy val pureConfigV = "0.9.2"
lazy val metricsScalaV = "3.5.9"
lazy val prometheusV = "0.4.0"
lazy val catsEffectV = "0.10.1"
lazy val catsEffectV = "1.0.0"
lazy val configV = "1.3.2"
lazy val scalikejdbcV = "3.3.1"
lazy val scalaTestV = "3.0.4"
@ -37,7 +37,7 @@ object Dependencies {
"org.scalikejdbc" %% "scalikejdbc-config" % scalikejdbcV,
"org.scodec" %% "scodec-bits" % scodecV,
"org.slf4j" % "slf4j-api" % "1.7.7",
"co.fs2" %% "fs2-core" % "0.10.5",
"co.fs2" %% "fs2-core" % "1.0.0",
"com.github.pureconfig" %% "pureconfig" % pureConfigV,
"com.github.pureconfig" %% "pureconfig-cats-effect" % pureConfigV,
"io.prometheus" % "simpleclient_hotspot" % prometheusV,