diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index 95c38de..2231575 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -11,9 +11,9 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * for the specific language governing permissions and limitations under the License. - * + * * SmartThings data is sent from this SmartApp to Initial State. This is event data only for - * devices for which the user has authorized. Likewise, Initial State's services call this + * devices for which the user has authorized. Likewise, Initial State's services call this * SmartApp on the user's behalf to configure Initial State specific parameters. The ToS and * Privacy Policy for Initial State can be found here: https://www.initialstate.com/terms */ @@ -90,7 +90,7 @@ def subscribeToEvents() { if (beacons != null) { subscribe(beacons, "presence", genericHandler) } - + if (cos != null) { subscribe(cos, "carbonMonoxide", genericHandler) } @@ -218,7 +218,7 @@ def setAccessKey() { atomicState.grokerSubdomain = "$newGrokerSubdomain" atomicState.isBucketCreated = false } - + if (newAccessKey && newAccessKey != atomicState.accessKey) { atomicState.accessKey = "$newAccessKey" atomicState.isBucketCreated = false @@ -262,7 +262,7 @@ def uninstalled() { } def tryCreateBucket() { - + // can't ship events if there is no grokerSubdomain if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") { log.error "streaming url is currently null" @@ -330,57 +330,54 @@ def genericHandler(evt) { // This is a handler function for flushing the event buffer // after a specified amount of time to reduce the load on ST servers def flushBuffer() { + def eventBuffer = atomicState.eventBuffer log.trace "About to flush the buffer on schedule" - if (atomicState.eventBuffer != null && atomicState.eventBuffer.size() > 0) { - tryShipEvents() + if (eventBuffer != null && eventBuffer.size() > 0) { + atomicState.eventBuffer = [] + tryShipEvents(eventBuffer) } } def eventHandler(name, value) { - log.debug atomicState.eventBuffer - - def eventBuffer = atomicState.eventBuffer def epoch = now() / 1000 - - // if for some reason this code block is being run - // but the SmartApp wasn't propery setup during install - // we need to set initialize the eventBuffer. - if (!atomicState.eventBuffer) { - atomicState.eventBuffer = [] - } + def eventBuffer = atomicState.eventBuffer ?: [] eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"] - - log.debug eventBuffer - - atomicState.eventBuffer = eventBuffer if (eventBuffer.size() >= 10) { - tryShipEvents() + // Clear eventBuffer right away since we've already pulled it off of atomicState to reduce the risk of missing + // events. This assumes the grokerSubdomain, accessKey, and bucketKey are set correctly to avoid the eventBuffer + // from growing unbounded. + atomicState.eventBuffer = [] + tryShipEvents(eventBuffer) } + log.debug "Event added to buffer: " + eventBuffer } // a helper function for shipping the atomicState.eventBuffer to Initial State -def tryShipEvents() { +def tryShipEvents(eventBuffer) { + def grokerSubdomain = atomicState.grokerSubdomain // can't ship events if there is no grokerSubdomain - if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") { + if (grokerSubdomain == null || grokerSubdomain == "") { log.error "streaming url is currently null" return } + def accessKey = atomicState.accessKey + def bucketKey = atomicState.bucketKey // can't ship if access key and bucket key are null, so finish trying - if (atomicState.accessKey == null || atomicState.bucketKey == null) { + if (accessKey == null || bucketKey == null) { return } def eventPost = [ - uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events", + uri: "https://${grokerSubdomain}.initialstate.com/api/events", headers: [ "Content-Type": "application/json", - "X-IS-BucketKey": "${atomicState.bucketKey}", - "X-IS-AccessKey": "${atomicState.accessKey}", + "X-IS-BucketKey": "${bucketKey}", + "X-IS-AccessKey": "${accessKey}", "Accept-Version": "0.0.2" ], - body: atomicState.eventBuffer + body: eventBuffer ] try { @@ -389,13 +386,10 @@ def tryShipEvents() { log.debug "shipped events and got ${resp.status}" if (resp.status >= 400) { log.error "shipping failed... ${resp.data}" - } else { - // clear the buffer - atomicState.eventBuffer = [] } } } catch (e) { log.error "shipping events failed: $e" } - + } \ No newline at end of file