2
0
mirror of https://github.com/VinylDNS/vinyldns synced 2025-08-22 02:02:14 +00:00

Add SNS Notifier (#760)

Changes in this pull request:
- Notify SNS topic on batch completion
This commit is contained in:
Dave Cleaver 2019-07-26 11:42:12 -04:00 committed by Paul Cleary
parent 07dfbe091d
commit e51261ba2c
9 changed files with 424 additions and 2 deletions

View File

@ -15,7 +15,7 @@
# Note: this will not remove the actual images from your
# machine, just the running containers
IDS=$(docker ps -a | grep -e 'mysql:5.7' -e 'cnadiminti/dynamodb-local:2017-02-16' -e 's12v/elasticmq:0.13.8' -e 'vinyldns' -e 'flaviovs/mock-smtp' | awk '{print $1}')
IDS=$(docker ps -a | grep -e 'mysql:5.7' -e 'cnadiminti/dynamodb-local:2017-02-16' -e 's12v/elasticmq:0.13.8' -e 'vinyldns' -e 'flaviovs/mock-smtp' -e 'localstack/localstack' | awk '{print $1}')
echo "killing..."
echo $(echo "$IDS" | xargs -I {} docker kill {})

View File

@ -33,6 +33,18 @@ services:
volumes:
- ./elasticmq/custom.conf:/etc/elasticmq/elasticmq.conf
localstack:
image: localstack/localstack
ports:
- "19006:19006"
- "19007:19007"
environment:
- SERVICES=sns:19006,sqs:19007
- START_WEB=0
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${TMPDIR:-/tmp/localstack}:/tmp/localstack"
mail:
image: flaviovs/mock-smtp
ports:

View File

@ -0,0 +1,121 @@
/*
* Copyright 2018 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vinyldns.api.notifier.sns
import com.typesafe.config.{Config, ConfigFactory}
import vinyldns.core.notifier._
import vinyldns.api.MySqlApiIntegrationSpec
import vinyldns.mysql.MySqlIntegrationSpec
import org.scalatest.{Matchers, WordSpecLike}
import vinyldns.core.domain.batch._
import vinyldns.core.domain.record.RecordType
import vinyldns.core.domain.record.AData
import org.joda.time.DateTime
import vinyldns.core.TestMembershipData._
import cats.effect.IO
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.AmazonSQSClientBuilder
import org.json4s.jackson.JsonMethods._
import org.json4s.DefaultFormats
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.auth.AWSStaticCredentialsProvider
class SnsNotifierIntegrationSpec
extends MySqlApiIntegrationSpec
with MySqlIntegrationSpec
with Matchers
with WordSpecLike {
import vinyldns.api.domain.DomainValidations._
implicit val formats = DefaultFormats
val snsConfig: Config = ConfigFactory.load().getConfig("vinyldns.sns.settings")
"Sns Notifier" should {
"send a notification" in {
val batchChange = BatchChange(
okUser.id,
okUser.userName,
None,
DateTime.parse("2019-07-22T19:38:23Z"),
List(
SingleAddChange(
Some("some-zone-id"),
Some("zone-name"),
Some("record-name"),
"a" * HOST_MAX_LENGTH,
RecordType.A,
300,
AData("1.1.1.1"),
SingleChangeStatus.Complete,
None,
None,
None
)),
approvalStatus = BatchChangeApprovalStatus.AutoApproved,
id = "a615e2bb-8b35-4a39-8947-1edd0e653afa"
)
val credentialsProvider = new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
snsConfig.getString("access-key"),
snsConfig.getString("secret-key")))
val sns = AmazonSNSClientBuilder.standard
.withEndpointConfiguration(
new EndpointConfiguration(
snsConfig.getString("service-endpoint"),
snsConfig.getString("signing-region")))
.withCredentials(credentialsProvider)
.build()
val sqs = AmazonSQSClientBuilder
.standard()
.withEndpointConfiguration(
new EndpointConfiguration("http://127.0.0.1:19007", "us-east-1")
)
.withCredentials(credentialsProvider)
.build()
val program = for {
queueUrl <- IO { sqs.createQueue("batchChanges").getQueueUrl }
topic <- IO { sns.createTopic("batchChanges").getTopicArn }
_ <- IO { sns.subscribe(topic, "sqs", queueUrl) }
notifier <- new SnsNotifierProvider()
.load(NotifierConfig("", snsConfig), userRepository)
_ <- notifier.notify(Notification(batchChange))
messages <- IO { sqs.receiveMessage(queueUrl).getMessages }
_ <- IO {
sns.deleteTopic(topic)
sqs.deleteQueue(queueUrl)
}
} yield messages
val messages = program.unsafeRunSync()
messages.size should be(1)
val notification = parse(messages.get(0).getBody)
(notification \ "Message").extract[String] should be(
"""{"userId":"ok","userName":"ok","createdTimestamp":"2019-07-22T19:38:23Z",""" +
""""status":"Complete","id":"a615e2bb-8b35-4a39-8947-1edd0e653afa"}""")
}
}
}

View File

@ -98,6 +98,17 @@ vinyldns {
}
}
sns {
class-name = "vinyldns.api.notifier.sns.SnsNotifierProvider"
settings {
topic-arn = "arn:aws:sns:us-east-1:000000000000:batchChanges"
access-key = "vinyldnsTest"
secret-key = "notNeededForSnsLocal"
service-endpoint = "http://127.0.0.1:19006"
signing-region = "us-east-1"
}
}
defaultZoneConnection {
name = "vinyldns."
keyName = "vinyldns."

View File

@ -0,0 +1,51 @@
/*
* Copyright 2018 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vinyldns.api.notifier.sns
import vinyldns.core.notifier.{Notification, Notifier}
import cats.effect.IO
import cats.syntax.functor._
import vinyldns.core.domain.batch.{BatchChange, BatchChangeInfo}
import vinyldns.api.route.VinylDNSJsonProtocol
import org.json4s.jackson.JsonMethods._
import com.amazonaws.services.sns.AmazonSNS
import com.amazonaws.services.sns.model.PublishRequest
import com.amazonaws.services.sns.model.MessageAttributeValue
import org.json4s.JsonAST.JNull
class SnsNotifier(config: SnsNotifierConfig, sns: AmazonSNS)
extends Notifier
with VinylDNSJsonProtocol {
def notify(notification: Notification[_]): IO[Unit] =
notification.change match {
case bc: BatchChange => sendBatchChangeNotification(BatchChangeInfo(bc))
case _ => IO.unit
}
def sendBatchChangeNotification(bc: BatchChangeInfo): IO[Unit] =
IO {
val message =
compact(
render(BatchChangeInfoSerializer.toJson(bc).replace(List("changes"), JNull)).noNulls)
val request = new PublishRequest(config.topicArn, message)
request.addMessageAttributesEntry(
"userName",
new MessageAttributeValue().withDataType("String").withStringValue(bc.userName))
sns.publish(request)
}.void
}

View File

@ -0,0 +1,25 @@
/*
* Copyright 2018 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vinyldns.api.notifier.sns
case class SnsNotifierConfig(
topicArn: String,
serviceEndpoint: String,
signingRegion: String,
accessKey: String,
secretKey: String
)

View File

@ -0,0 +1,54 @@
/*
* Copyright 2018 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vinyldns.api.notifier.sns
import vinyldns.core.notifier.{Notifier, NotifierConfig, NotifierProvider}
import vinyldns.core.domain.membership.UserRepository
import pureconfig.module.catseffect.loadConfigF
import cats.effect.IO
import com.amazonaws.services.sns.AmazonSNS
import org.slf4j.LoggerFactory
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
class SnsNotifierProvider extends NotifierProvider {
private val logger = LoggerFactory.getLogger(classOf[SnsNotifierProvider])
def load(config: NotifierConfig, userRepository: UserRepository): IO[Notifier] =
for {
snsConfig <- loadConfigF[IO, SnsNotifierConfig](config.settings)
client <- createClient(snsConfig)
} yield new SnsNotifier(snsConfig, client)
def createClient(config: SnsNotifierConfig): IO[AmazonSNS] = IO {
logger.error(
"Setting up sns notifier client with settings: " +
s"service endpoint: ${config.serviceEndpoint}; " +
s"signing region: ${config.signingRegion}; " +
s"topic name: ${config.topicArn}")
AmazonSNSClientBuilder.standard
.withEndpointConfiguration(
new EndpointConfiguration(config.serviceEndpoint, config.signingRegion))
.withCredentials(new AWSStaticCredentialsProvider(
new BasicAWSCredentials(config.accessKey, config.secretKey)))
.build()
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright 2018 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package vinyldns.api.notifier.sns
import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
import org.scalatest.mockito.MockitoSugar
import vinyldns.api.CatsHelpers
import vinyldns.core.domain.membership.UserRepository
import vinyldns.core.notifier.Notification
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.ArgumentCaptor
import cats.effect.IO
import vinyldns.core.domain.batch.BatchChange
import org.joda.time.DateTime
import vinyldns.core.domain.batch.BatchChangeApprovalStatus
import vinyldns.core.domain.batch.SingleChange
import vinyldns.core.domain.batch.SingleAddChange
import vinyldns.core.domain.batch.SingleDeleteChange
import vinyldns.core.domain.record.RecordType
import vinyldns.core.domain.record.AData
import _root_.vinyldns.core.domain.batch.SingleChangeStatus
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import scala.collection.JavaConverters._
import vinyldns.core.notifier.NotifierConfig
import com.amazonaws.services.sns.AmazonSNS
import com.amazonaws.services.sns.model.PublishRequest
import com.amazonaws.services.sns.model.PublishResult
class SnsNotifierSpec
extends WordSpec
with Matchers
with MockitoSugar
with BeforeAndAfterEach
with CatsHelpers {
val mockUserRepository = mock[UserRepository]
val mockSns = mock[AmazonSNS]
override protected def beforeEach(): Unit =
reset(mockUserRepository, mockSns)
def batchChange(
description: Option[String] = None,
changes: List[SingleChange] = List.empty): BatchChange =
BatchChange(
"test",
"testUser",
description,
DateTime.parse("2019-07-22T17:01:19Z"),
changes,
None,
BatchChangeApprovalStatus.AutoApproved,
None,
None,
None,
"testBatch")
"Sns Notifier" should {
"do nothing for unsupported Notifications" in {
val snsConfig: Config = ConfigFactory.parseMap(
Map[String, Any](
"topic-arn" -> "batches",
"service-endpoint" -> "someValue",
"signing-region" -> "us-east-1",
"access-key" -> "access",
"secret-key" -> "secret"
).asJava)
val notifier = new SnsNotifierProvider()
.load(NotifierConfig("", snsConfig), mockUserRepository)
.unsafeRunSync()
notifier.notify(Notification("this won't be supported ever")) should be(IO.unit)
}
"send a notification" in {
val notifier = new SnsNotifier(
SnsNotifierConfig("batches", "someValue", "us-east-1", "access", "secret"),
mockSns
)
val requestArgument = ArgumentCaptor.forClass(classOf[PublishRequest])
doReturn(new PublishResult()).when(mockSns).publish(requestArgument.capture())
val description = "notes"
val singleChanges: List[SingleChange] = List(
SingleAddChange(
Some(""),
Some(""),
Some(""),
"www.test.com",
RecordType.A,
200,
AData("1.2.3.4"),
SingleChangeStatus.Complete,
None,
None,
None,
List.empty),
SingleDeleteChange(
Some(""),
Some(""),
Some(""),
"deleteme.test.com",
RecordType.A,
SingleChangeStatus.Failed,
Some("message for you"),
None,
None,
List.empty)
)
val change = batchChange(Some(description), singleChanges)
notifier.notify(Notification(change)).unsafeRunSync()
val request = requestArgument.getValue()
request.getTopicArn() should be("batches")
val userNameAttribute = request.getMessageAttributes().get("userName")
userNameAttribute.getDataType() should be("String")
userNameAttribute.getStringValue() should be("testUser")
request
.getMessage() should be("""{"userId":"test","userName":"testUser","comments":"notes",""" +
""""createdTimestamp":"2019-07-22T17:01:19Z","status":"PartialFailure","id":"testBatch"}""")
verify(mockSns).publish(any[PublishRequest])
}
}
}

View File

@ -52,7 +52,8 @@ object Dependencies {
"com.comcast" %% "ip4s-cats" % ip4sV,
"com.iheart" %% "ficus" % "1.4.3",
"com.sun.mail" % "javax.mail" % "1.6.2",
"javax.mail" % "javax.mail-api" % "1.6.2"
"javax.mail" % "javax.mail-api" % "1.6.2",
"com.amazonaws" % "aws-java-sdk-sns" % awsV withSources()
)
lazy val coreDependencies = Seq(