diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index 1265df6..5fa0ae9 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -1,7 +1,7 @@ /** * Initial State Event Streamer * - * Copyright 2015 David Sulpy + * Copyright 2016 David Sulpy * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at: @@ -77,6 +77,62 @@ mappings { } } +def getAccessKey() { + log.trace "get access key" + if (atomicState.accessKey == null) { + httpError(404, "Access Key Not Found") + } else { + [ + accessKey: atomicState.accessKey + ] + } +} + +def getBucketKey() { + log.trace "get bucket key" + if (atomicState.bucketKey == null) { + httpError(404, "Bucket key Not Found") + } else { + [ + bucketKey: atomicState.bucketKey, + bucketName: atomicState.bucketName + ] + } +} + +def setBucketKey() { + log.trace "set bucket key" + def newBucketKey = request.JSON?.bucketKey + def newBucketName = request.JSON?.bucketName + + log.debug "bucket name: $newBucketName" + log.debug "bucket key: $newBucketKey" + + if (newBucketKey && (newBucketKey != atomicState.bucketKey || newBucketName != atomicState.bucketName)) { + atomicState.bucketKey = "$newBucketKey" + atomicState.bucketName = "$newBucketName" + atomicState.isBucketCreated = false + } + + tryCreateBucket() +} + +def setAccessKey() { + log.trace "set access key" + def newAccessKey = request.JSON?.accessKey + def newGrokerSubdomain = request.JSON?.grokerSubdomain + + if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) { + atomicState.grokerSubdomain = "$newGrokerSubdomain" + atomicState.isBucketCreated = false + } + + if (newAccessKey && newAccessKey != atomicState.accessKey) { + atomicState.accessKey = "$newAccessKey" + atomicState.isBucketCreated = false + } +} + def subscribeToEvents() { if (accelerometers != null) { subscribe(accelerometers, "acceleration", genericHandler) @@ -169,85 +225,27 @@ def subscribeToEvents() { } } -def getAccessKey() { - log.trace "get access key" - if (atomicState.accessKey == null) { - httpError(404, "Access Key Not Found") - } else { - [ - accessKey: atomicState.accessKey - ] - } -} - -def getBucketKey() { - log.trace "get bucket key" - if (atomicState.bucketKey == null) { - httpError(404, "Bucket key Not Found") - } else { - [ - bucketKey: atomicState.bucketKey, - bucketName: atomicState.bucketName - ] - } -} - -def setBucketKey() { - log.trace "set bucket key" - def newBucketKey = request.JSON?.bucketKey - def newBucketName = request.JSON?.bucketName - - log.debug "bucket name: $newBucketName" - log.debug "bucket key: $newBucketKey" - - if (newBucketKey && (newBucketKey != atomicState.bucketKey || newBucketName != atomicState.bucketName)) { - atomicState.bucketKey = "$newBucketKey" - atomicState.bucketName = "$newBucketName" - atomicState.isBucketCreated = false - } - - tryCreateBucket() -} - -def setAccessKey() { - log.trace "set access key" - def newAccessKey = request.JSON?.accessKey - def newGrokerSubdomain = request.JSON?.grokerSubdomain - - if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) { - atomicState.grokerSubdomain = "$newGrokerSubdomain" - atomicState.isBucketCreated = false - } - - if (newAccessKey && newAccessKey != atomicState.accessKey) { - atomicState.accessKey = "$newAccessKey" - atomicState.isBucketCreated = false - } -} - def installed() { - atomicState.version = "1.0.18" + atomicState.version = "1.1.0" + + atomicState.isBucketCreated = false + atomicState.grokerSubdomain = "groker" + subscribeToEvents() atomicState.isBucketCreated = false atomicState.grokerSubdomain = "groker" - atomicState.eventBuffer = [] - - runEvery15Minutes(flushBuffer) log.debug "installed (version $atomicState.version)" } def updated() { - atomicState.version = "1.0.18" + atomicState.version = "1.1.0" unsubscribe() if (atomicState.bucketKey != null && atomicState.accessKey != null) { atomicState.isBucketCreated = false } - if (atomicState.eventBuffer == null) { - atomicState.eventBuffer = [] - } if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") { atomicState.grokerSubdomain = "groker" } @@ -327,37 +325,17 @@ def genericHandler(evt) { eventHandler(key, value) } -// This is a handler function for flushing the event buffer -// after a specified amount of time to reduce the load on ST servers -def flushBuffer() { - def eventBuffer = atomicState.eventBuffer - log.trace "About to flush the buffer on schedule" - if (eventBuffer != null && eventBuffer.size() > 0) { - atomicState.eventBuffer = [] - tryShipEvents(eventBuffer) - } -} - def eventHandler(name, value) { def epoch = now() / 1000 - def eventBuffer = atomicState.eventBuffer ?: [] - eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"] - if (eventBuffer.size() >= 10) { - // Clear eventBuffer right away since we've already pulled it off of atomicState to reduce the risk of missing - // events. This assumes the grokerSubdomain, accessKey, and bucketKey are set correctly to avoid the eventBuffer - // from growing unbounded. - atomicState.eventBuffer = [] - tryShipEvents(eventBuffer) - } else { - // Make sure we persist the updated eventBuffer with the new event added back to atomicState - atomicState.eventBuffer = eventBuffer - } - log.debug "Event added to buffer: " + eventBuffer + def event = new JsonSlurper().parseText("{\"key\": \"$name\", \"value\": \"$value\", \"epoch\": \"$epoch\"}") + + tryShipEvents(event) + + log.debug "Shipped Event: " + event } -// a helper function for shipping the atomicState.eventBuffer to Initial State -def tryShipEvents(eventBuffer) { +def tryShipEvents(event) { def grokerSubdomain = atomicState.grokerSubdomain // can't ship events if there is no grokerSubdomain @@ -380,7 +358,7 @@ def tryShipEvents(eventBuffer) { "X-IS-AccessKey": "${accessKey}", "Accept-Version": "0.0.2" ], - body: eventBuffer + body: event ] try {