refactored the methods for creating buckets and sending events to be more idempotent friendly; called tryCreateBucket on bucketKey setting to create the bucket sooner in the workflow than an events shipment

This commit is contained in:
David Sulpy
2015-09-14 16:30:40 -05:00
parent bad978afbd
commit 1adb4000a6

View File

@@ -205,6 +205,8 @@ def setBucketKey() {
atomicState.bucketName = "$newBucketName" atomicState.bucketName = "$newBucketName"
atomicState.isBucketCreated = false atomicState.isBucketCreated = false
} }
tryCreateBucket()
} }
def setAccessKey() { def setAccessKey() {
@@ -255,7 +257,12 @@ def uninstalled() {
log.debug "uninstalled (version $atomicState.version)" log.debug "uninstalled (version $atomicState.version)"
} }
def createBucket() { def tryCreateBucket() {
// if the bucket has already been created, no need to continue
if (atomicState.isBucketCreated) {
return
}
if (!atomicState.bucketName) { if (!atomicState.bucketName) {
atomicState.bucketName = atomicState.bucketKey atomicState.bucketName = atomicState.bucketKey
@@ -305,13 +312,7 @@ def genericHandler(evt) {
} }
def value = "$evt.value" def value = "$evt.value"
if (atomicState.accessKey == null || atomicState.bucketKey == null) { tryCreateBucket()
return
}
if (!atomicState.isBucketCreated) {
createBucket()
}
eventHandler(key, value) eventHandler(key, value)
} }
@@ -321,7 +322,7 @@ def genericHandler(evt) {
def flushBuffer() { def flushBuffer() {
log.trace "About to flush the buffer on schedule" log.trace "About to flush the buffer on schedule"
if (atomicState.eventBuffer != null && atomicState.eventBuffer.size() > 0) { if (atomicState.eventBuffer != null && atomicState.eventBuffer.size() > 0) {
shipEvents() tryShipEvents()
} }
} }
@@ -337,12 +338,18 @@ def eventHandler(name, value) {
atomicState.eventBuffer = eventBuffer atomicState.eventBuffer = eventBuffer
if (eventBuffer.size() >= 10) { if (eventBuffer.size() >= 10) {
shipEvents() tryShipEvents()
} }
} }
// a helper function for shipping the atomicState.eventBuffer to Initial State // a helper function for shipping the atomicState.eventBuffer to Initial State
def shipEvents() { def tryShipEvents() {
// can't ship if access key and bucket key are null, so finish trying
if (atomicState.accessKey == null || atomicState.bucketKey == null) {
return
}
def eventPost = [ def eventPost = [
uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events", uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events",
headers: [ headers: [