diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index d82ccb3..b4cf20b 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -205,6 +205,8 @@ def setBucketKey() { atomicState.bucketName = "$newBucketName" atomicState.isBucketCreated = false } + + tryCreateBucket() } def setAccessKey() { @@ -224,12 +226,12 @@ def setAccessKey() { } def installed() { - atomicState.version = "1.0.17" + atomicState.version = "1.0.18" subscribeToEvents() atomicState.isBucketCreated = false atomicState.grokerSubdomain = "groker" - atomicState.eventBuffer = []; + atomicState.eventBuffer = [] runEvery15Minutes(flushBuffer) @@ -242,22 +244,31 @@ def updated() { if (atomicState.bucketKey != null && atomicState.accessKey != null) { atomicState.isBucketCreated = false } - + if (atomicState.eventBuffer == null) { + atomicState.eventBuffer = [] + } + subscribeToEvents() log.debug "updated (version $atomicState.version)" } def uninstalled() { - unsubscribe() - unschedule() log.debug "uninstalled (version $atomicState.version)" } -def createBucket() { +def tryCreateBucket() { + + // if the bucket has already been created, no need to continue + if (atomicState.isBucketCreated) { + return + } if (!atomicState.bucketName) { atomicState.bucketName = atomicState.bucketKey + } + if (!atomicState.accessKey) { + return } def bucketName = "${atomicState.bucketName}" def bucketKey = "${atomicState.bucketKey}" @@ -301,13 +312,7 @@ def genericHandler(evt) { } def value = "$evt.value" - if (atomicState.accessKey == null || atomicState.bucketKey == null) { - return - } - - if (!atomicState.isBucketCreated) { - createBucket() - } + tryCreateBucket() eventHandler(key, value) } @@ -317,28 +322,34 @@ def genericHandler(evt) { def flushBuffer() { log.trace "About to flush the buffer on schedule" if (atomicState.eventBuffer != null && atomicState.eventBuffer.size() > 0) { - shipEvents(); + tryShipEvents() } } def eventHandler(name, value) { - log.debug atomicState.eventBuffer; + log.debug atomicState.eventBuffer - def eventBuffer = atomicState.eventBuffer; - def epoch = now() / 1000; + def eventBuffer = atomicState.eventBuffer + def epoch = now() / 1000 eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"] - log.debug eventBuffer; + log.debug eventBuffer - atomicState.eventBuffer = eventBuffer; + atomicState.eventBuffer = eventBuffer if (eventBuffer.size() >= 10) { - shipEvents(); + tryShipEvents() } } // a helper function for shipping the atomicState.eventBuffer to Initial State -def shipEvents() { +def tryShipEvents() { + + // can't ship if access key and bucket key are null, so finish trying + if (atomicState.accessKey == null || atomicState.bucketKey == null) { + return + } + def eventPost = [ uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events", headers: [ @@ -348,7 +359,7 @@ def shipEvents() { "Accept-Version": "0.0.2" ], body: atomicState.eventBuffer - ]; + ] try { // post the events to initial state @@ -358,7 +369,7 @@ def shipEvents() { log.error "shipping failed... ${resp.data}" } else { // clear the buffer - atomicState.eventBuffer = []; + atomicState.eventBuffer = [] } } } catch (e) {