diff --git a/modules/api/src/it/scala/vinyldns/api/engine/ZoneCommandHandlerIntegrationSpec.scala b/modules/api/src/it/scala/vinyldns/api/engine/ZoneCommandHandlerIntegrationSpec.scala index de539d21d..37e53ccaf 100644 --- a/modules/api/src/it/scala/vinyldns/api/engine/ZoneCommandHandlerIntegrationSpec.scala +++ b/modules/api/src/it/scala/vinyldns/api/engine/ZoneCommandHandlerIntegrationSpec.scala @@ -16,37 +16,35 @@ package vinyldns.api.engine -import java.util.concurrent.Executors - -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import cats.implicits._ -import fs2.{Scheduler, Stream} +import fs2.Stream import org.joda.time.DateTime import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar import org.scalatest.time.{Millis, Seconds, Span} -import vinyldns.api.{DynamoDBApiIntegrationSpec, MySqlApiIntegrationSpec, VinylDNSTestData} import vinyldns.api.domain.record.RecordSetChangeGenerator -import vinyldns.core.domain.record._ import vinyldns.api.domain.zone._ import vinyldns.api.engine.sqs.SqsConnection import vinyldns.api.repository.ApiDataAccessor -import vinyldns.dynamodb.repository.{ - DynamoDBRecordChangeRepository, - DynamoDBRecordSetRepository, - DynamoDBRepositorySettings, - DynamoDBZoneChangeRepository -} +import vinyldns.api.{DynamoDBApiIntegrationSpec, MySqlApiIntegrationSpec, VinylDNSTestData} import vinyldns.core.domain.membership.{ GroupChangeRepository, GroupRepository, MembershipRepository, UserRepository } +import vinyldns.core.domain.record._ import vinyldns.core.domain.zone._ +import vinyldns.dynamodb.repository.{ + DynamoDBRecordChangeRepository, + DynamoDBRecordSetRepository, + DynamoDBRepositorySettings, + DynamoDBZoneChangeRepository +} -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ class ZoneCommandHandlerIntegrationSpec extends DynamoDBApiIntegrationSpec @@ -57,8 +55,9 @@ class ZoneCommandHandlerIntegrationSpec import vinyldns.api.engine.sqs.SqsConverters._ - private implicit val sched: Scheduler = - Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2)) + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private val zoneName = "vinyldns." private val zoneChangeTable = "zoneChangesTest" @@ -71,12 +70,11 @@ class ZoneCommandHandlerIntegrationSpec private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis)) - private implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global private var repositories: ApiDataAccessor = _ private var sqsConn: SqsConnection = _ private var str: Stream[IO, Unit] = _ - private val stopSignal = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + private val stopSignal = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() // Items to seed in DB private val testZone = Zone( diff --git a/modules/api/src/main/scala/vinyldns/api/Boot.scala b/modules/api/src/main/scala/vinyldns/api/Boot.scala index 1d6e1dc23..bf7b05966 100644 --- a/modules/api/src/main/scala/vinyldns/api/Boot.scala +++ b/modules/api/src/main/scala/vinyldns/api/Boot.scala @@ -19,7 +19,8 @@ package vinyldns.api import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.stream.{ActorMaterializer, Materializer} -import cats.effect.IO +import cats.effect.{ContextShift, IO} +import fs2.concurrent.SignallingRef import io.prometheus.client.CollectorRegistry import io.prometheus.client.dropwizard.DropwizardExports import io.prometheus.client.hotspot.DefaultExports @@ -46,6 +47,7 @@ object Boot extends App { private implicit val system: ActorSystem = VinylDNSConfig.system private implicit val materializer: Materializer = ActorMaterializer() private implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global + private implicit val cs: ContextShift[IO] = IO.contextShift(ec) def vinyldnsBanner(): IO[String] = IO { val stream = getClass.getResourceAsStream("/vinyldns-ascii.txt") @@ -69,13 +71,14 @@ object Boot extends App { sqsConfig <- IO(VinylDNSConfig.sqsConfig) sqsConnection <- IO(SqsConnection(sqsConfig)) processingDisabled <- IO(VinylDNSConfig.vinyldnsConfig.getBoolean("processing-disabled")) - processingSignal <- fs2.async.signalOf[IO, Boolean](processingDisabled) + processingSignal <- SignallingRef[IO, Boolean](processingDisabled) restHost <- IO(VinylDNSConfig.restConfig.getString("host")) restPort <- IO(VinylDNSConfig.restConfig.getInt("port")) batchChangeLimit <- IO(VinylDNSConfig.vinyldnsConfig.getInt("batch-change-limit")) syncDelay <- IO(VinylDNSConfig.vinyldnsConfig.getInt("sync-delay")) - _ <- fs2.async.start( - ProductionZoneCommandHandler.run(sqsConnection, processingSignal, repositories, sqsConfig)) + _ <- ProductionZoneCommandHandler + .run(sqsConnection, processingSignal, repositories, sqsConfig) + .start } yield { val zoneValidations = new ZoneValidations(syncDelay) val batchChangeValidations = new BatchChangeValidations(batchChangeLimit, AccessValidations) diff --git a/modules/api/src/main/scala/vinyldns/api/Interfaces.scala b/modules/api/src/main/scala/vinyldns/api/Interfaces.scala index 3b9b59382..3c225129d 100644 --- a/modules/api/src/main/scala/vinyldns/api/Interfaces.scala +++ b/modules/api/src/main/scala/vinyldns/api/Interfaces.scala @@ -20,11 +20,15 @@ import cats.data._ import cats.effect._ import cats.implicits._ +import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -import scala.concurrent.ExecutionContext.Implicits.global object Interfaces { + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + /* Our standard business error type */ type Result[A] = EitherT[IO, Throwable, A] @@ -104,7 +108,6 @@ object Interfaces { case None => result[A](ifNone) } } - } object Result { diff --git a/modules/api/src/main/scala/vinyldns/api/VinylDNSConfig.scala b/modules/api/src/main/scala/vinyldns/api/VinylDNSConfig.scala index bf0600a78..f945a3b57 100644 --- a/modules/api/src/main/scala/vinyldns/api/VinylDNSConfig.scala +++ b/modules/api/src/main/scala/vinyldns/api/VinylDNSConfig.scala @@ -17,7 +17,7 @@ package vinyldns.api import akka.actor.ActorSystem -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.typesafe.config.{Config, ConfigFactory} import pureconfig.module.catseffect.loadConfigF @@ -30,6 +30,9 @@ import vinyldns.core.repository.DataStoreConfig object VinylDNSConfig { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + lazy val config: Config = ConfigFactory.load() lazy val vinyldnsConfig: Config = config.getConfig("vinyldns") diff --git a/modules/api/src/main/scala/vinyldns/api/backend/CommandHandler.scala b/modules/api/src/main/scala/vinyldns/api/backend/CommandHandler.scala index 05c9e908c..79a978e29 100644 --- a/modules/api/src/main/scala/vinyldns/api/backend/CommandHandler.scala +++ b/modules/api/src/main/scala/vinyldns/api/backend/CommandHandler.scala @@ -16,9 +16,9 @@ package vinyldns.api.backend -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import fs2._ -import fs2.async.mutable.Signal +import fs2.concurrent.SignallingRef import org.slf4j.LoggerFactory import vinyldns.api.domain.dns.DnsConnection import vinyldns.api.engine.{RecordSetChangeHandler, ZoneChangeHandler, ZoneSyncHandler} @@ -27,12 +27,13 @@ import vinyldns.core.domain.record.{RecordChangeRepository, RecordSetChange, Rec import vinyldns.core.domain.zone._ import vinyldns.core.queue.{CommandMessage, MessageCount, MessageQueue} -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ object CommandHandler { private val logger = LoggerFactory.getLogger("vinyldns.api.backend.CommandHandler") + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) /* The outcome of handling the message */ sealed trait MessageOutcome { @@ -48,8 +49,8 @@ object CommandHandler { mq: MessageQueue, count: MessageCount, pollingInterval: FiniteDuration, - pauseSignal: Signal[IO, Boolean], - defaultConn: ZoneConnection)(implicit scheduler: Scheduler): Stream[IO, Unit] = { + pauseSignal: SignallingRef[IO, Boolean], + defaultConn: ZoneConnection)(implicit timer: Timer[IO]): Stream[IO, Unit] = { // Polls queue for message batches, connected to the signal which is toggled in the status endpoint val messageSource = startPolling(mq, count, pollingInterval).pauseWhen(pauseSignal) @@ -66,7 +67,7 @@ object CommandHandler { // concurrently run 4 message batches, so we can have 40 messages max running concurrently def flow(): Stream[IO, Unit] = messageSource - .join(4) + .parJoin(4) .observe(increaseTimeoutWhenSyncing) .through(changeRequestProcessor) .to(updateQueue) @@ -82,13 +83,13 @@ object CommandHandler { /* Polls Message Queue for messages */ def startPolling(mq: MessageQueue, count: MessageCount, pollingInterval: FiniteDuration)( - implicit scheduler: Scheduler): Stream[IO, Stream[IO, CommandMessage]] = { + implicit timer: Timer[IO]): Stream[IO, Stream[IO, CommandMessage]] = { def pollingStream(): Stream[IO, Stream[IO, CommandMessage]] = // every delay duration, we poll - scheduler + Stream .fixedDelay[IO](pollingInterval) - .evalMap[Chunk[CommandMessage]] { _ => + .evalMap[IO, Chunk[CommandMessage]] { _ => // get the messages from the queue, transform them to a Chunk of messages mq.receive(count).map(msgs => Chunk(msgs: _*)) } @@ -110,7 +111,7 @@ object CommandHandler { /* We should only change visibility timeout for zone syncs and creates, which could take minutes */ def changeVisibilityTimeoutWhenSyncing(mq: MessageQueue): Sink[IO, CommandMessage] = - _.evalMap { message => + _.evalMap[IO, Any] { message => message.command match { case sync: ZoneChange if sync.changeType == ZoneChangeType.Sync || sync.changeType == ZoneChangeType.Create => @@ -129,7 +130,7 @@ object CommandHandler { recordChangeProcessor: (DnsConnection, RecordSetChange) => IO[RecordSetChange], zoneSyncProcessor: ZoneChange => IO[ZoneChange], defaultConn: ZoneConnection): Pipe[IO, CommandMessage, MessageOutcome] = - _.evalMap[MessageOutcome] { message => + _.evalMap[IO, MessageOutcome] { message => message.command match { case sync: ZoneChange if sync.changeType == ZoneChangeType.Sync || sync.changeType == ZoneChangeType.Create => @@ -169,7 +170,7 @@ object CommandHandler { /* On success, delete the message; on failure retry */ def messageSink(mq: MessageQueue): Sink[IO, MessageOutcome] = - _.evalMap[Any] { + _.evalMap[IO, Any] { case DeleteMessage(msg) => mq.remove(msg) case RetryMessage(msg) => @@ -181,14 +182,14 @@ object CommandHandler { def run( mq: MessageQueue, msgsPerPoll: MessageCount, - processingSignal: Signal[IO, Boolean], + processingSignal: SignallingRef[IO, Boolean], pollingInterval: FiniteDuration, zoneRepo: ZoneRepository, zoneChangeRepo: ZoneChangeRepository, recordSetRepo: RecordSetRepository, recordChangeRepo: RecordChangeRepository, batchChangeRepo: BatchChangeRepository, - defaultConn: ZoneConnection)(implicit scheduler: Scheduler): IO[Unit] = { + defaultConn: ZoneConnection)(implicit timer: Timer[IO]): IO[Unit] = { // Handlers for each type of change request val zoneChangeHandler = ZoneChangeHandler(zoneRepo, zoneChangeRepo) diff --git a/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala b/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala index dea28031a..f8f7b9520 100644 --- a/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala +++ b/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala @@ -16,28 +16,30 @@ package vinyldns.api.engine -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import cats.implicits._ -import fs2._ import org.slf4j.LoggerFactory import vinyldns.api.domain.dns.DnsConnection import vinyldns.api.domain.dns.DnsProtocol.NoError -import vinyldns.core.domain.record._ import vinyldns.api.domain.record.RecordSetHelpers._ import vinyldns.core.domain.batch.{BatchChangeRepository, SingleChange} +import vinyldns.core.domain.record._ +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global object RecordSetChangeHandler { private val logger = LoggerFactory.getLogger("vinyldns.api.engine.RecordSetChangeHandler") + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) def apply( recordSetRepository: RecordSetRepository, recordChangeRepository: RecordChangeRepository, batchChangeRepository: BatchChangeRepository)( - implicit scheduler: Scheduler): (DnsConnection, RecordSetChange) => IO[RecordSetChange] = + implicit timer: Timer[IO]): (DnsConnection, RecordSetChange) => IO[RecordSetChange] = (conn, recordSetChange) => { process( recordSetRepository, @@ -52,7 +54,7 @@ object RecordSetChangeHandler { recordChangeRepository: RecordChangeRepository, batchChangeRepository: BatchChangeRepository, conn: DnsConnection, - recordSetChange: RecordSetChange)(implicit scheduler: Scheduler): IO[RecordSetChange] = + recordSetChange: RecordSetChange)(implicit timer: Timer[IO]): IO[RecordSetChange] = for { wildCardExists <- wildCardExistsForRecord(recordSetChange.recordSet, recordSetRepository) completedState <- fsm(Pending(recordSetChange), conn, wildCardExists) @@ -144,8 +146,7 @@ object RecordSetChangeHandler { } private def fsm(state: ProcessorState, conn: DnsConnection, wildcardExists: Boolean)( - implicit - scheduler: Scheduler): IO[ProcessorState] = { + implicit timer: Timer[IO]): IO[ProcessorState] = { /** * If there is a wildcard record with the same type, then we skip validation and verification steps. diff --git a/modules/api/src/main/scala/vinyldns/api/engine/ZoneCommandHandler.scala b/modules/api/src/main/scala/vinyldns/api/engine/ZoneCommandHandler.scala index 8ed0e2abd..26102ed4c 100644 --- a/modules/api/src/main/scala/vinyldns/api/engine/ZoneCommandHandler.scala +++ b/modules/api/src/main/scala/vinyldns/api/engine/ZoneCommandHandler.scala @@ -16,23 +16,23 @@ package vinyldns.api.engine -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.TimeUnit -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import com.amazonaws.services.sqs.model._ import com.typesafe.config.Config import fs2._ -import fs2.async.mutable.Signal +import fs2.concurrent.SignallingRef import org.slf4j.LoggerFactory import vinyldns.api.VinylDNSConfig import vinyldns.api.domain.dns.DnsConnection -import vinyldns.core.domain.record.RecordSetChange -import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType} import vinyldns.api.engine.sqs.SqsConnection import vinyldns.api.repository.ApiDataAccessor +import vinyldns.core.domain.record.RecordSetChange +import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType} import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object ZoneCommandHandler { @@ -40,6 +40,8 @@ object ZoneCommandHandler { import vinyldns.api.engine.sqs.SqsConverters._ private val logger = LoggerFactory.getLogger("vinyldns.api.sqs.ZoneCommandHandler") + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) /* The outcome of handling the message */ sealed trait MessageOutcome { @@ -66,7 +68,7 @@ object ZoneCommandHandler { dataAccessor: ApiDataAccessor, sqsConnection: SqsConnection, pollingInterval: FiniteDuration, - pauseSignal: Signal[IO, Boolean])(implicit scheduler: Scheduler): Stream[IO, Unit] = { + pauseSignal: SignallingRef[IO, Boolean])(implicit timer: Timer[IO]): Stream[IO, Unit] = { // Polls SQS for message batches, connected to the signal which is toggled in the status endpoint val sqsMessageSource = startPolling(sqsConnection, pollingInterval).pauseWhen(pauseSignal) @@ -95,7 +97,7 @@ object ZoneCommandHandler { def flow(): Stream[IO, Unit] = sqsMessageSource .through(genMessageStreams) - .join(4) + .parJoin(4) .through(genChangeRequests) .observe(increaseTimeoutForZoneSyncs) .through(changeRequestProcessor) @@ -112,12 +114,12 @@ object ZoneCommandHandler { /* Polls SQS for messages */ def startPolling(sqsConnection: SqsConnection, pollingInterval: FiniteDuration)( - implicit scheduler: Scheduler): Stream[IO, ReceiveMessageResult] = { + implicit timer: Timer[IO]): Stream[IO, ReceiveMessageResult] = { def pollingStream(): Stream[IO, ReceiveMessageResult] = - scheduler + Stream .fixedDelay[IO](pollingInterval) - .evalMap[ReceiveMessageResult] { _ => + .evalMap[IO, ReceiveMessageResult] { _ => sqsConnection.receiveMessageBatch( new ReceiveMessageRequest() .withMaxNumberOfMessages(10) @@ -135,7 +137,7 @@ object ZoneCommandHandler { /* We should only change visibility timeout for zone syncs, which could take minutes */ def changeVisibilityTimeoutForZoneSyncs(sqsConnection: SqsConnection): Sink[IO, ChangeRequest] = - _.evalMap[Any] { + _.evalMap[IO, Any] { case ZoneSyncRequest(sync, msg) => logger.info( s"Updating visibility timeout for zone sync; changeId=${sync.id} messageId=${msg.getMessageId}") @@ -178,7 +180,7 @@ object ZoneCommandHandler { zoneChangeProcessor: ZoneChange => IO[ZoneChange], recordChangeProcessor: (DnsConnection, RecordSetChange) => IO[RecordSetChange], zoneSyncProcessor: ZoneChange => IO[ZoneChange]): Pipe[IO, ChangeRequest, MessageOutcome] = - _.evalMap[MessageOutcome] { + _.evalMap[IO, MessageOutcome] { case zsr @ ZoneSyncRequest(_, _) => val doSync = for { @@ -219,7 +221,7 @@ object ZoneCommandHandler { /* On success, delete the message; on failure, do nothing and allow it to retry */ def messageSink(sqsConnection: SqsConnection): Sink[IO, MessageOutcome] = - _.evalMap[Any] { + _.evalMap[IO, Any] { case DeleteMessage(msg) => sqsConnection.deleteMessage( new DeleteMessageRequest().withReceiptHandle(msg.getReceiptHandle)) @@ -237,11 +239,10 @@ object ProductionZoneCommandHandler { def run( sqsConnection: SqsConnection, - processingSignal: Signal[IO, Boolean], + processingSignal: SignallingRef[IO, Boolean], dataAccessor: ApiDataAccessor, config: Config): IO[Unit] = { - implicit val scheduler: Scheduler = - Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2)) + implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) for { pollingInterval <- IO.pure( diff --git a/modules/api/src/main/scala/vinyldns/api/engine/ZoneSyncHandler.scala b/modules/api/src/main/scala/vinyldns/api/engine/ZoneSyncHandler.scala index 64e2a9bad..956c70f0d 100644 --- a/modules/api/src/main/scala/vinyldns/api/engine/ZoneSyncHandler.scala +++ b/modules/api/src/main/scala/vinyldns/api/engine/ZoneSyncHandler.scala @@ -16,7 +16,7 @@ package vinyldns.api.engine -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.syntax.all._ import org.joda.time.DateTime import org.slf4j.LoggerFactory @@ -29,6 +29,8 @@ import vinyldns.core.route.Monitored object ZoneSyncHandler extends DnsConversions with Monitored { private implicit val logger = LoggerFactory.getLogger("vinyldns.engine.ZoneSyncHandler") + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) def apply( recordSetRepository: RecordSetRepository, diff --git a/modules/api/src/main/scala/vinyldns/api/repository/TestDataLoader.scala b/modules/api/src/main/scala/vinyldns/api/repository/TestDataLoader.scala index bf2d5d2f2..4e9c732e4 100644 --- a/modules/api/src/main/scala/vinyldns/api/repository/TestDataLoader.scala +++ b/modules/api/src/main/scala/vinyldns/api/repository/TestDataLoader.scala @@ -16,7 +16,7 @@ package vinyldns.api.repository -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.implicits._ import org.joda.time.DateTime import vinyldns.core.domain.membership._ @@ -24,6 +24,9 @@ import vinyldns.core.domain.membership._ // $COVERAGE-OFF$ object TestDataLoader { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + final val testUser = User( userName = "testuser", id = "testuser", diff --git a/modules/api/src/main/scala/vinyldns/api/route/StatusRouting.scala b/modules/api/src/main/scala/vinyldns/api/route/StatusRouting.scala index 66d1262d4..8302dca65 100644 --- a/modules/api/src/main/scala/vinyldns/api/route/StatusRouting.scala +++ b/modules/api/src/main/scala/vinyldns/api/route/StatusRouting.scala @@ -20,7 +20,7 @@ import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.Directives import akka.util.Timeout import cats.effect.IO -import fs2.async.mutable.Signal +import fs2.concurrent.SignallingRef import vinyldns.api.VinylDNSConfig import scala.concurrent.duration._ @@ -43,7 +43,7 @@ trait StatusRoute extends Directives { implicit val timeout = Timeout(10.seconds) - def processingDisabled: Signal[IO, Boolean] + def processingDisabled: SignallingRef[IO, Boolean] val statusRoute = (get & path("status")) { diff --git a/modules/api/src/main/scala/vinyldns/api/route/VinylDNSAuthentication.scala b/modules/api/src/main/scala/vinyldns/api/route/VinylDNSAuthentication.scala index dd39293ae..d6409bd06 100644 --- a/modules/api/src/main/scala/vinyldns/api/route/VinylDNSAuthentication.scala +++ b/modules/api/src/main/scala/vinyldns/api/route/VinylDNSAuthentication.scala @@ -19,7 +19,6 @@ package vinyldns.api.route import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.server.RequestContext import cats.effect._ -import cats.syntax.all._ import vinyldns.api.crypto.Crypto import vinyldns.api.domain.auth.AuthPrincipalProvider import vinyldns.core.crypto.CryptoAlgebra diff --git a/modules/api/src/main/scala/vinyldns/api/route/VinylDNSService.scala b/modules/api/src/main/scala/vinyldns/api/route/VinylDNSService.scala index 1a1b974bb..81cc83b12 100644 --- a/modules/api/src/main/scala/vinyldns/api/route/VinylDNSService.scala +++ b/modules/api/src/main/scala/vinyldns/api/route/VinylDNSService.scala @@ -23,7 +23,7 @@ import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.RouteResult.{Complete, Rejected} import akka.http.scaladsl.server.directives.LogEntry import cats.effect.IO -import fs2.async.mutable.Signal +import fs2.concurrent.SignallingRef import io.prometheus.client.CollectorRegistry import vinyldns.api.domain.auth.MembershipAuthPrincipalProvider import vinyldns.api.domain.batch.BatchChangeServiceAlgebra @@ -101,7 +101,7 @@ object VinylDNSService { // $COVERAGE-OFF$ class VinylDNSService( val membershipService: MembershipServiceAlgebra, - val processingDisabled: Signal[IO, Boolean], + val processingDisabled: SignallingRef[IO, Boolean], val zoneService: ZoneServiceAlgebra, val healthService: HealthService, val recordSetService: RecordSetServiceAlgebra, diff --git a/modules/api/src/test/scala/vinyldns/api/CatsHelpers.scala b/modules/api/src/test/scala/vinyldns/api/CatsHelpers.scala index cc74cf641..8f307f19a 100644 --- a/modules/api/src/test/scala/vinyldns/api/CatsHelpers.scala +++ b/modules/api/src/test/scala/vinyldns/api/CatsHelpers.scala @@ -25,9 +25,12 @@ import scala.concurrent.duration._ import org.scalatest.Assertions._ import org.scalatest.matchers.{MatchResult, Matcher} -import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.ExecutionContext trait CatsHelpers { + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) def await[E, T](f: => IO[T], duration: FiniteDuration = 1.second): T = { val i: IO[Either[E, T]] = f.attempt.map { diff --git a/modules/api/src/test/scala/vinyldns/api/ResultHelpers.scala b/modules/api/src/test/scala/vinyldns/api/ResultHelpers.scala index 2d9a2ff0c..69bd1a179 100644 --- a/modules/api/src/test/scala/vinyldns/api/ResultHelpers.scala +++ b/modules/api/src/test/scala/vinyldns/api/ResultHelpers.scala @@ -23,13 +23,16 @@ import cats.implicits._ import cats.scalatest.ValidatedMatchers import org.scalatest.{Matchers, PropSpec} +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag final case class TimeoutException(message: String) extends Throwable(message) trait ResultHelpers { + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) def await[T](f: => IO[_], duration: FiniteDuration = 1.second): T = awaitResultOf[T](f.map(_.asInstanceOf[T]).attempt, duration).toOption.get diff --git a/modules/api/src/test/scala/vinyldns/api/backend/CommandHandlerSpec.scala b/modules/api/src/test/scala/vinyldns/api/backend/CommandHandlerSpec.scala index f03f736a0..84d1adca5 100644 --- a/modules/api/src/test/scala/vinyldns/api/backend/CommandHandlerSpec.scala +++ b/modules/api/src/test/scala/vinyldns/api/backend/CommandHandlerSpec.scala @@ -15,28 +15,25 @@ */ package vinyldns.api.backend -import java.util.concurrent.Executors - -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import cats.scalatest.EitherMatchers import fs2._ import org.mockito import org.mockito.Matchers._ -import org.mockito.{ArgumentCaptor, Mockito} import org.mockito.Mockito._ +import org.mockito.{ArgumentCaptor, Mockito} import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterEach, EitherValues, Matchers, WordSpec} import vinyldns.api.VinylDNSTestData import vinyldns.api.backend.CommandHandler.{DeleteMessage, RetryMessage} import vinyldns.api.domain.dns.DnsConnection -import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType, ZoneCommand} import vinyldns.core.domain.batch.BatchChangeRepository import vinyldns.core.domain.record.{RecordChangeRepository, RecordSetChange, RecordSetRepository} -import vinyldns.core.domain.zone._ +import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType, ZoneCommand, _} import vinyldns.core.queue.{CommandMessage, MessageCount, MessageId, MessageQueue} +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global class CommandHandlerSpec extends WordSpec @@ -53,8 +50,9 @@ class CommandHandlerSpec } private val mq = mock[MessageQueue] - implicit val sched: Scheduler = - Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2)) + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private val messages = for { i <- 0 to 10 } yield TestCommandMessage(pendingCreateAAAA, i.toString) private val count = MessageCount(10).right.value @@ -250,7 +248,7 @@ class CommandHandlerSpec "main flow" should { "process successfully" in { - val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() val cmd = TestCommandMessage(pendingCreateAAAA, "foo") // stage pulling from the message queue @@ -288,18 +286,17 @@ class CommandHandlerSpec verify(mq).remove(cmd) } "continue processing on unexpected failure" in { - val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() val cmd = TestCommandMessage(pendingCreateAAAA, "foo") - // stage pulling from the message queue, make sure we always return our command - doReturn(IO.pure(List(cmd))) + // stage pulling from the message queue, return an error then our command + doReturn(IO.raiseError(new RuntimeException("fail"))) .doReturn(IO.pure(List(cmd))) .when(mq) .receive(count) - // stage our record change processing failure, and then a success - doReturn(IO.raiseError(new RuntimeException("fail"))) - .doReturn(IO.pure(cmd.command)) + // stage our record change processing our command + doReturn(IO.pure(cmd.command)) .when(mockRecordChangeProcessor) .apply(any[DnsConnection], any[RecordSetChange]) @@ -326,9 +323,9 @@ class CommandHandlerSpec // verify our interactions verify(mq, atLeastOnce()).receive(count) - // verify that our record was attempted two times - verify(mockRecordChangeProcessor, times(2)) - .apply(any[DnsConnection], mockito.Matchers.eq(pendingCreateAAAA)) + // verify that our message queue was polled twice + verify(mq, times(2)) + .receive(count) verify(mq).remove(cmd) } } @@ -336,7 +333,7 @@ class CommandHandlerSpec "run" should { "process a zone update change through the flow" in { // testing the run method, which does nothing more than simplify construction of the main flow - val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() val cmd = TestCommandMessage(zoneUpdate, "foo") val zoneRepo = mock[ZoneRepository] diff --git a/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala b/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala index 318ee69d6..99784c341 100644 --- a/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala +++ b/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala @@ -16,10 +16,7 @@ package vinyldns.api.engine -import java.util.concurrent.Executors - -import cats.effect.IO -import fs2.Scheduler +import cats.effect.{IO, Timer} import org.joda.time.DateTime import org.mockito.ArgumentCaptor import org.mockito.Matchers._ @@ -29,12 +26,12 @@ import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} import org.xbill.DNS import vinyldns.api.domain.dns.DnsConnection import vinyldns.api.domain.dns.DnsProtocol.{NoError, Refused} -import vinyldns.core.domain.record.RecordType.RecordType -import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetRepository, _} import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, Failure, ReadyToApply} import vinyldns.api.repository.InMemoryBatchChangeRepository import vinyldns.api.{CatsHelpers, Interfaces, VinylDNSTestData} import vinyldns.core.domain.batch.{BatchChange, SingleAddChange, SingleChangeStatus} +import vinyldns.core.domain.record.RecordType.RecordType +import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetRepository, _} import scala.concurrent.ExecutionContext @@ -46,6 +43,7 @@ class RecordSetChangeHandlerSpec with BeforeAndAfterEach with CatsHelpers { + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) private val mockConn = mock[DnsConnection] private val mockRsRepo = mock[RecordSetRepository] private val mockChangeRepo = mock[RecordChangeRepository] @@ -90,8 +88,6 @@ class RecordSetChangeHandlerSpec completeCreateAAAA.copy(singleBatchChangeIds = completeCreateAAAASingleChanges.map(_.id)) private val cs = ChangeSet(rsChange) - implicit val sched: Scheduler = - Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2)) implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.global private val underTest = RecordSetChangeHandler(mockRsRepo, mockChangeRepo, batchRepo) diff --git a/modules/api/src/test/scala/vinyldns/api/engine/ZoneCommandHandlerSpec.scala b/modules/api/src/test/scala/vinyldns/api/engine/ZoneCommandHandlerSpec.scala index a6bd97e11..39a0728ff 100644 --- a/modules/api/src/test/scala/vinyldns/api/engine/ZoneCommandHandlerSpec.scala +++ b/modules/api/src/test/scala/vinyldns/api/engine/ZoneCommandHandlerSpec.scala @@ -17,11 +17,9 @@ package vinyldns.api.engine import java.util.Base64 -import java.util.concurrent.Executors -import cats.effect.IO +import cats.effect.{ContextShift, IO, Timer} import com.amazonaws.services.sqs.model._ -import fs2.Scheduler import org.mockito.Matchers.{eq => sameas, _} import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, Mockito} @@ -29,10 +27,10 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterEach, Matchers, OneInstancePerTest, WordSpec} import vinyldns.api.VinylDNSTestData import vinyldns.api.domain.dns.DnsConnection -import vinyldns.core.domain.record.RecordSetChange -import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType} import vinyldns.api.engine.sqs.SqsConnection import vinyldns.api.engine.sqs.SqsConverters.{SqsRecordSetChangeMessage, SqsZoneChangeMessage} +import vinyldns.core.domain.record.RecordSetChange +import vinyldns.core.domain.zone.{ZoneChange, ZoneChangeType} import vinyldns.core.protobuf.ProtobufConversions import scala.collection.JavaConverters._ @@ -59,9 +57,9 @@ class ZoneCommandHandlerSpec private def rmrIO = IO.pure(mockRmr) - private implicit val ec: ExecutionContext = ExecutionContext.global - private implicit val sched: Scheduler = - Scheduler.fromScheduledExecutorService(Executors.newScheduledThreadPool(2)) + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) override def beforeEach(): Unit = { reset(mockRmr, mockSqs, mockMsg, zcp, rcp, zsp) @@ -79,7 +77,7 @@ class ZoneCommandHandlerSpec "pollingStream" should { "poll for ReceiveMessageResults" in { - val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() val test = startPolling(mockSqs, 80.millis).interruptWhen(stop) test.compile.drain.unsafeToFuture() @@ -97,7 +95,7 @@ class ZoneCommandHandlerSpec .when(fetchFail) .receiveMessageBatch(any[ReceiveMessageRequest]) - val stop = fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + val stop = fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() val test = startPolling(fetchFail, 80.millis).interruptWhen(stop) test.compile.drain.unsafeToFuture() @@ -210,7 +208,7 @@ class ZoneCommandHandlerSpec doReturn(List(msg1, msg2).asJava).when(mockRmr).getMessages - val test = fs2.Stream.eval(IO.pure(mockRmr)).through(genMessageStreams).join(1) + val test = fs2.Stream.eval(IO.pure(mockRmr)).through(genMessageStreams).parJoin(1) val result = test.compile.toVector.unsafeRunSync() result should contain theSameElementsAs List(msg1, msg2) diff --git a/modules/api/src/test/scala/vinyldns/api/route/StatusRoutingSpec.scala b/modules/api/src/test/scala/vinyldns/api/route/StatusRoutingSpec.scala index b2e18a6b2..dd7136376 100644 --- a/modules/api/src/test/scala/vinyldns/api/route/StatusRoutingSpec.scala +++ b/modules/api/src/test/scala/vinyldns/api/route/StatusRoutingSpec.scala @@ -20,8 +20,8 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.Directives import akka.http.scaladsl.testkit.ScalatestRouteTest -import cats.effect.IO -import fs2.async.mutable.Signal +import cats.effect.{ContextShift, IO} +import fs2.concurrent.SignallingRef import org.scalatest._ import org.scalatest.mockito.MockitoSugar @@ -38,8 +38,11 @@ class StatusRoutingSpec def actorRefFactory: ActorSystem = system - val processingDisabled: Signal[IO, Boolean] = - fs2.async.signalOf[IO, Boolean](false).unsafeRunSync() + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + + val processingDisabled: SignallingRef[IO, Boolean] = + fs2.concurrent.SignallingRef[IO, Boolean](false).unsafeRunSync() "GET /status" should { "return the current status of true" in { diff --git a/modules/core/src/main/scala/vinyldns/core/repository/DataStoreLoader.scala b/modules/core/src/main/scala/vinyldns/core/repository/DataStoreLoader.scala index 35aff6ae0..06923d2b2 100644 --- a/modules/core/src/main/scala/vinyldns/core/repository/DataStoreLoader.scala +++ b/modules/core/src/main/scala/vinyldns/core/repository/DataStoreLoader.scala @@ -17,7 +17,7 @@ package vinyldns.core.repository import cats.data._ -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.implicits._ import vinyldns.core.crypto.CryptoAlgebra import org.slf4j.LoggerFactory @@ -39,6 +39,7 @@ object DataStoreLoader { } private val logger = LoggerFactory.getLogger("DataStoreLoader") + implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global) def loadAll[A <: DataAccessor]( configs: List[DataStoreConfig], diff --git a/modules/core/src/main/scala/vinyldns/core/route/Monitor.scala b/modules/core/src/main/scala/vinyldns/core/route/Monitor.scala index 42b8c84c4..23e830368 100644 --- a/modules/core/src/main/scala/vinyldns/core/route/Monitor.scala +++ b/modules/core/src/main/scala/vinyldns/core/route/Monitor.scala @@ -66,7 +66,7 @@ trait Monitored { */ object Monitor { - lazy val monitors: mutable.Map[String, Monitor] = concurrent.TrieMap.empty + lazy val monitors: mutable.Map[String, Monitor] = scala.collection.concurrent.TrieMap.empty def apply(name: String): Monitor = monitors.getOrElseUpdate(name, new Monitor(name)) diff --git a/modules/core/src/test/scala/vinyldns/core/repository/DataStoreLoaderSpec.scala b/modules/core/src/test/scala/vinyldns/core/repository/DataStoreLoaderSpec.scala index e16794cce..489a55a36 100644 --- a/modules/core/src/test/scala/vinyldns/core/repository/DataStoreLoaderSpec.scala +++ b/modules/core/src/test/scala/vinyldns/core/repository/DataStoreLoaderSpec.scala @@ -17,7 +17,7 @@ package vinyldns.core.repository import cats.data._ -import cats.implicits._ +import cats.syntax.validated._ import cats.scalatest.{EitherMatchers, EitherValues, ValidatedMatchers} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.mockito.MockitoSugar diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProviderIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProviderIntegrationSpec.scala index 3bc8e806b..1f79983e5 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProviderIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProviderIntegrationSpec.scala @@ -16,7 +16,7 @@ package vinyldns.dynamodb.repository -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest import com.typesafe.config.{Config, ConfigFactory} @@ -31,6 +31,8 @@ import vinyldns.core.repository.RepositoryName._ class DynamoDBDataStoreProviderIntegrationSpec extends DynamoDBIntegrationSpec { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) val config: Config = ConfigFactory.load() val dynamoDBConfig: DataStoreConfig = pureconfig.loadConfigOrThrow[DataStoreConfig](config, "dynamodb") diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupChangeRepositoryIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupChangeRepositoryIntegrationSpec.scala index 4271222c1..8d04fae1e 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupChangeRepositoryIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupChangeRepositoryIntegrationSpec.scala @@ -16,6 +16,7 @@ package vinyldns.dynamodb.repository +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.services.dynamodbv2.model._ import org.joda.time.DateTime @@ -26,6 +27,9 @@ import scala.concurrent.duration._ import scala.util.Random class DynamoDBGroupChangeRepositoryIntegrationSpec extends DynamoDBIntegrationSpec { + + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private implicit def dateTimeOrdering: Ordering[DateTime] = Ordering.fromLessThan(_.isAfter(_)) private val GROUP_CHANGES_TABLE = "group-changes-live" diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupRepositoryIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupRepositoryIntegrationSpec.scala index a472a21d9..51c9c335e 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupRepositoryIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBGroupRepositoryIntegrationSpec.scala @@ -16,6 +16,7 @@ package vinyldns.dynamodb.repository +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.services.dynamodbv2.model._ import vinyldns.core.domain.membership.{Group, GroupStatus} @@ -24,6 +25,10 @@ import vinyldns.core.TestMembershipData._ import scala.concurrent.duration._ class DynamoDBGroupRepositoryIntegrationSpec extends DynamoDBIntegrationSpec { + + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + private val GROUP_TABLE = "groups-live" private val tableConfig = DynamoDBRepositorySettings(s"$GROUP_TABLE", 30, 30) diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBMembershipRepositoryIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBMembershipRepositoryIntegrationSpec.scala index 90f647069..f4c45f5ac 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBMembershipRepositoryIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBMembershipRepositoryIntegrationSpec.scala @@ -16,11 +16,15 @@ package vinyldns.dynamodb.repository +import cats.effect.{ContextShift, IO} import cats.implicits._ import scala.concurrent.duration._ class DynamoDBMembershipRepositoryIntegrationSpec extends DynamoDBIntegrationSpec { + + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private val membershipTable = "membership-live" private val tableConfig = DynamoDBRepositorySettings(s"$membershipTable", 30, 30) diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBRecordSetRepositoryIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBRecordSetRepositoryIntegrationSpec.scala index 67e5ab3b2..ed892afa1 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBRecordSetRepositoryIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBRecordSetRepositoryIntegrationSpec.scala @@ -18,6 +18,7 @@ package vinyldns.dynamodb.repository import java.util.UUID +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.services.dynamodbv2.model._ import org.joda.time.DateTime @@ -33,6 +34,8 @@ class DynamoDBRecordSetRepositoryIntegrationSpec extends DynamoDBIntegrationSpec with DynamoDBRecordSetConversions { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private val recordSetTable = "record-sets-live" private[repository] val recordSetTableName: String = recordSetTable diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBUserRepositoryIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBUserRepositoryIntegrationSpec.scala index d974b21c3..973e3207d 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBUserRepositoryIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBUserRepositoryIntegrationSpec.scala @@ -16,15 +16,19 @@ package vinyldns.dynamodb.repository +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest import com.typesafe.config.ConfigFactory import vinyldns.core.crypto.NoOpCrypto -import vinyldns.core.domain.membership.{User, LockStatus} +import vinyldns.core.domain.membership.{LockStatus, User} + import scala.concurrent.duration._ class DynamoDBUserRepositoryIntegrationSpec extends DynamoDBIntegrationSpec { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private val userTable = "users-live" private val tableConfig = DynamoDBRepositorySettings(s"$userTable", 30, 30) diff --git a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBZoneChangeRepositoryIntegrationSpec.scala b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBZoneChangeRepositoryIntegrationSpec.scala index 069c1fb1c..da2099066 100644 --- a/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBZoneChangeRepositoryIntegrationSpec.scala +++ b/modules/dynamodb/src/it/scala/vinyldns/dynamodb/repository/DynamoDBZoneChangeRepositoryIntegrationSpec.scala @@ -16,6 +16,7 @@ package vinyldns.dynamodb.repository +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.services.dynamodbv2.model._ import org.joda.time.DateTime @@ -29,6 +30,9 @@ import scala.util.Random class DynamoDBZoneChangeRepositoryIntegrationSpec extends DynamoDBIntegrationSpec { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + private val zoneChangeTable = "zone-changes-live" private val tableConfig = DynamoDBRepositorySettings(s"$zoneChangeTable", 30, 30) diff --git a/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProvider.scala b/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProvider.scala index 7fca344f6..302a35fc3 100644 --- a/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProvider.scala +++ b/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBDataStoreProvider.scala @@ -17,7 +17,7 @@ package vinyldns.dynamodb.repository import cats.implicits._ -import cats.effect.IO +import cats.effect.{ContextShift, IO} import org.slf4j.LoggerFactory import vinyldns.core.repository._ import pureconfig.module.catseffect.loadConfigF @@ -33,6 +33,8 @@ class DynamoDBDataStoreProvider extends DataStoreProvider { private val logger = LoggerFactory.getLogger("DynamoDBDataStoreProvider") private val implementedRepositories = Set(user, group, membership, groupChange, recordSet, recordChange, zoneChange, userChange) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) def load(config: DataStoreConfig, crypto: CryptoAlgebra): IO[DataStore] = for { diff --git a/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBHelper.scala b/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBHelper.scala index db4455f34..653d57e40 100644 --- a/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBHelper.scala +++ b/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBHelper.scala @@ -28,7 +28,7 @@ import org.slf4j.Logger import vinyldns.core.VinylDNSMetrics import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis) @@ -48,6 +48,7 @@ class DynamoDBHelper(dynamoDB: AmazonDynamoDBClient, log: Logger) { private[repository] val retryCount: Int = 10 private val retryBackoff: FiniteDuration = 1.millis + private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) private[repository] val provisionedThroughputMeter = VinylDNSMetrics.metricsRegistry.meter("dynamo.provisionedThroughput") diff --git a/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBUserRepository.scala b/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBUserRepository.scala index 807bbe833..57d1122a7 100644 --- a/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBUserRepository.scala +++ b/modules/dynamodb/src/main/scala/vinyldns/dynamodb/repository/DynamoDBUserRepository.scala @@ -48,6 +48,8 @@ object DynamoDBUserRepository { private[repository] val USER_NAME_INDEX_NAME = "username_index" private[repository] val ACCESS_KEY_INDEX_NAME = "access_key_index" private val log: Logger = LoggerFactory.getLogger(classOf[DynamoDBUserRepository]) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) def apply( config: DynamoDBRepositorySettings, diff --git a/modules/mysql/src/it/scala/vinyldns/mysql/queue/MySqlMessageQueueIntegrationSpec.scala b/modules/mysql/src/it/scala/vinyldns/mysql/queue/MySqlMessageQueueIntegrationSpec.scala index 2432563b6..503710819 100644 --- a/modules/mysql/src/it/scala/vinyldns/mysql/queue/MySqlMessageQueueIntegrationSpec.scala +++ b/modules/mysql/src/it/scala/vinyldns/mysql/queue/MySqlMessageQueueIntegrationSpec.scala @@ -51,6 +51,9 @@ class MySqlMessageQueueIntegrationSpec extends WordSpec with Matchers import vinyldns.core.TestRecordSetData._ import vinyldns.core.TestZoneData._ + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + private val underTest = new MySqlMessageQueue() private val rsChange: RecordSetChange = pendingCreateAAAA diff --git a/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlZoneChangeRepositoryIntegrationSpec.scala b/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlZoneChangeRepositoryIntegrationSpec.scala index cb6be6100..9c8daa706 100644 --- a/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlZoneChangeRepositoryIntegrationSpec.scala +++ b/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlZoneChangeRepositoryIntegrationSpec.scala @@ -18,6 +18,7 @@ package vinyldns.mysql.repository import java.util.UUID +import cats.effect.{ContextShift, IO} import cats.implicits._ import org.joda.time.DateTime import org.scalatest._ @@ -40,6 +41,8 @@ class MySqlZoneChangeRepositoryIntegrationSpec with Inspectors with OptionValues { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) private var repo: ZoneChangeRepository = _ object TestData { diff --git a/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlZoneRepository.scala b/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlZoneRepository.scala index 5cd1c49bc..b01fbbf3f 100644 --- a/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlZoneRepository.scala +++ b/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlZoneRepository.scala @@ -37,6 +37,8 @@ class MySqlZoneRepository extends ZoneRepository with ProtobufConversions with M private final val INITIAL_RETRY_DELAY = 1.millis final val MAX_RETRIES = 10 private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) /** * use INSERT INTO ON DUPLICATE KEY UPDATE for the zone, which will update the values if the zone already exists diff --git a/modules/portal/app/controllers/Settings.scala b/modules/portal/app/controllers/Settings.scala index 092e47238..96b6c2386 100644 --- a/modules/portal/app/controllers/Settings.scala +++ b/modules/portal/app/controllers/Settings.scala @@ -18,7 +18,7 @@ package controllers import java.net.URI -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.typesafe.config.{Config, ConfigFactory} import play.api.{ConfigLoader, Configuration} @@ -27,8 +27,12 @@ import vinyldns.core.repository.DataStoreConfig import scala.collection.JavaConverters._ +// $COVERAGE-OFF$ class Settings(private val config: Configuration) { + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) + val ldapUser: String = config.get[String]("LDAP.user") val ldapPwd: String = config.get[String]("LDAP.password") val ldapDomain: String = config.get[String]("LDAP.domain") @@ -59,5 +63,5 @@ class Settings(private val config: Configuration) { } } } - +// $COVERAGE-ON$ object Settings extends Settings(Configuration(ConfigFactory.load())) diff --git a/modules/sqs/src/main/scala/vinyldns/sqs/queue/SqsMessageQueue.scala b/modules/sqs/src/main/scala/vinyldns/sqs/queue/SqsMessageQueue.scala index f6f507adc..3a19b69a8 100644 --- a/modules/sqs/src/main/scala/vinyldns/sqs/queue/SqsMessageQueue.scala +++ b/modules/sqs/src/main/scala/vinyldns/sqs/queue/SqsMessageQueue.scala @@ -20,7 +20,7 @@ import java.util.Base64 import java.util.concurrent.TimeUnit.SECONDS import cats.data._ -import cats.effect.IO +import cats.effect.{ContextShift, IO} import cats.implicits._ import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration @@ -47,6 +47,8 @@ class SqsMessageQueue(val queueUrl: String, val client: AmazonSQSAsync) with Monitored { import SqsMessageQueue._ + private implicit val cs: ContextShift[IO] = + IO.contextShift(scala.concurrent.ExecutionContext.global) // Helper for handling SQS requests and responses def sqsAsync[A <: AmazonWebServiceRequest, B <: AmazonWebServiceResult[_]]( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7a4ff32f6..48da82f33 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,7 +7,7 @@ object Dependencies { lazy val pureConfigV = "0.9.2" lazy val metricsScalaV = "3.5.9" lazy val prometheusV = "0.4.0" - lazy val catsEffectV = "0.10.1" + lazy val catsEffectV = "1.0.0" lazy val configV = "1.3.2" lazy val scalikejdbcV = "3.3.1" lazy val scalaTestV = "3.0.4" @@ -37,7 +37,7 @@ object Dependencies { "org.scalikejdbc" %% "scalikejdbc-config" % scalikejdbcV, "org.scodec" %% "scodec-bits" % scodecV, "org.slf4j" % "slf4j-api" % "1.7.7", - "co.fs2" %% "fs2-core" % "0.10.5", + "co.fs2" %% "fs2-core" % "1.0.0", "com.github.pureconfig" %% "pureconfig" % pureConfigV, "com.github.pureconfig" %% "pureconfig-cats-effect" % pureConfigV, "io.prometheus" % "simpleclient_hotspot" % prometheusV,