From bde5abcdb51ad5a5d829ac0b1e3c054c2a66a4b2 Mon Sep 17 00:00:00 2001 From: David Sulpy Date: Fri, 11 Sep 2015 10:31:30 -0500 Subject: [PATCH] added buffering of events to reduce the calls from ST to IS; added buffer flushing after 10 events or 15 min with a scheduler; added a version to show in logs and help troubleshoot --- .../initial-state-event-streamer.groovy | 114 +++++++++++++----- 1 file changed, 86 insertions(+), 28 deletions(-) diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index 7c6ac38..245c06d 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -11,7 +11,11 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * for the specific language governing permissions and limitations under the License. - * + * + * SmartThings data is sent from this SmartApp to Initial State. This is event data only for + * devices for which the user has authorized. Likewise, Initial State's services call this + * SmartApp on the user's behalf to configure Initial State specific parameters. The ToS and + * Privacy Policy for Initial State can be found here: https://www.initialstate.com/terms */ definition( @@ -33,7 +37,6 @@ preferences { input "alarms", "capability.alarm", title: "Alarms", multiple: true, required: false input "batteries", "capability.battery", title: "Batteries", multiple: true, required: false input "beacons", "capability.beacon", title: "Beacons", multiple: true, required: false - input "buttons", "capability.button", title: "Buttons", multiple: true, required: false input "cos", "capability.carbonMonoxideDetector", title: "Carbon Monoxide Detectors", multiple: true, required: false input "colors", "capability.colorControl", title: "Color Controllers", multiple: true, required: false input "contacts", "capability.contactSensor", title: "Contact Sensors", multiple: true, required: false @@ -88,9 +91,6 @@ def subscribeToEvents() { subscribe(beacons, "presence", genericHandler) } - if (buttons != null) { - subscribe(buttons, "button", genericHandler) - } if (cos != null) { subscribe(cos, "carbonMonoxide", genericHandler) } @@ -170,6 +170,10 @@ def subscribeToEvents() { if (waterSensors != null) { subscribe(waterSensors, "water", genericHandler) } + + if (canSchedule()) { + runEvery15Minutes(flushBuffer()) + } } def getAccessKey() { @@ -214,13 +218,13 @@ def setBucketKey() { def setAccessKey() { log.trace "set access key" def newAccessKey = request.JSON?.accessKey - def newGrokerRootUrl = request.JSON?.grokerRootUrl + def newGrokerSubdomain = request.JSON?.grokerSubdomain - if (newGrokerRootUrl && newGrokerRootUrl != "" && newGrokerRootUrl != atomicState.grokerRootUrl) { - atomicState.grokerRootUrl = "$newGrokerRootUrl" + if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) { + atomicState.grokerSubdomain = "$newGrokerSubdomain" atomicState.isBucketCreated = false } - + if (newAccessKey && newAccessKey != atomicState.accessKey) { atomicState.accessKey = "$newAccessKey" atomicState.isBucketCreated = false @@ -228,11 +232,14 @@ def setAccessKey() { } def installed() { - + atomicState.version = "1.0.17" subscribeToEvents() atomicState.isBucketCreated = false - atomicState.grookerRootUrl = "https://groker.initialstate.com" + atomicState.grokerSubdomain = "groker" + atomicState.eventBuffer = []; + + log.debug "installed (version $atomicState.version)" } def updated() { @@ -243,6 +250,14 @@ def updated() { } subscribeToEvents() + + log.debug "updated (version $atomicState.version)" +} + +def uninstalled() { + unsubscribe() + unschedule() + log.debug "uninstalled (version $atomicState.version)" } def createBucket() { @@ -257,7 +272,7 @@ def createBucket() { def bucketCreateBody = new JsonSlurper().parseText("{\"bucketKey\": \"$bucketKey\", \"bucketName\": \"$bucketName\"}") def bucketCreatePost = [ - uri: '${atomicState.grokerRootUrl}/api/buckets', + uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/buckets", headers: [ "Content-Type": "application/json", "X-IS-AccessKey": accessKey, @@ -268,10 +283,20 @@ def createBucket() { log.debug bucketCreatePost - httpPostJson(bucketCreatePost) { - log.debug "bucket posted" - atomicState.isBucketCreated = true + try { + // Create a bucket on Initial State so the data has a logical grouping + httpPostJson(bucketCreatePost) { resp => + log.debug "bucket posted" + if (resp.status >= 400) { + log.error "bucket not created successfully" + } else { + atomicState.isBucketCreated = true + } + } + } catch (e) { + log.error "bucket creation error: $e" } + } def genericHandler(evt) { @@ -283,11 +308,6 @@ def genericHandler(evt) { } def value = "$evt.value" - eventHandler(key, value) -} - -def eventHandler(name, value) { - if (atomicState.accessKey == null || atomicState.bucketKey == null) { return } @@ -296,21 +316,59 @@ def eventHandler(name, value) { createBucket() } - def eventBody = new JsonSlurper().parseText("[{\"key\": \"$name\", \"value\": \"$value\"}]") + eventHandler(key, value) +} + +// This is a handler function for flushing the event buffer +// after a specified amount of time to reduce the load on ST servers +def flushBuffer() { + if (atomicState.eventBuffer.size() > 0) { + shipEvents(); + } +} + +def eventHandler(name, value) { + log.debug atomicState.eventBuffer; + + def eventBuffer = atomicState.eventBuffer; + def epoch = now() / 1000; + eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"] + + log.debug eventBuffer; + + atomicState.eventBuffer = eventBuffer; + + if (eventBuffer.size() >= 10) { + shipEvents(); + } +} + +// a helper function for shipping the atomicState.eventBuffer to Initial State +def shipEvents() { def eventPost = [ - uri: '${atomicState.grokerRootUrl}/api/events', + uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events", headers: [ "Content-Type": "application/json", "X-IS-BucketKey": "${atomicState.bucketKey}", "X-IS-AccessKey": "${atomicState.accessKey}", "Accept-Version": "0.0.2" ], - body: eventBody - ] + body: atomicState.eventBuffer + ]; - log.debug eventPost - - httpPostJson(eventPost) { - log.debug "event data posted" + try { + // post the events to initial state + httpPostJson(eventPost) { resp => + log.debug "shipped events and got ${resp.status}" + if (resp.status >= 400) { + log.error "shipping failed... ${resp.data}" + } else { + // clear the buffer + atomicState.eventBuffer = []; + } + } + } catch (e) { + log.error "shipping events failed: $e" } + } \ No newline at end of file