From 5719bbcaac024dc03226549e05527b48c707db44 Mon Sep 17 00:00:00 2001 From: David Sulpy Date: Thu, 10 Sep 2015 11:14:27 -0500 Subject: [PATCH 1/4] switched from using state to using atomicState in preparation of event buffering; using atomicState exclusively per important tip found here: http://docs.smartthings.com/en/latest/smartapp-developers-guide/state.html#atomic-state --- .../initial-state-event-streamer.groovy | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index 4f2b8c7..7c6ac38 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -13,6 +13,7 @@ * for the specific language governing permissions and limitations under the License. * */ + definition( name: "Initial State Event Streamer", namespace: "initialstate.events", @@ -173,24 +174,24 @@ def subscribeToEvents() { def getAccessKey() { log.trace "get access key" - if (state.accessKey == null) { + if (atomicState.accessKey == null) { httpError(404, "Access Key Not Found") } else { [ - grokerRootUrl: state.grokerRootUrl, - accessKey: state.accessKey + grokerRootUrl: atomicState.grokerRootUrl, + accessKey: atomicState.accessKey ] } } def getBucketKey() { log.trace "get bucket key" - if (state.bucketKey == null) { + if (atomicState.bucketKey == null) { httpError(404, "Bucket key Not Found") } else { [ - bucketKey: state.bucketKey, - bucketName: state.bucketName + bucketKey: atomicState.bucketKey, + bucketName: atomicState.bucketName ] } } @@ -203,10 +204,10 @@ def setBucketKey() { log.debug "bucket name: $newBucketName" log.debug "bucket key: $newBucketKey" - if (newBucketKey && (newBucketKey != state.bucketKey || newBucketName != state.bucketName)) { - state.bucketKey = "$newBucketKey" - state.bucketName = "$newBucketName" - state.isBucketCreated = false + if (newBucketKey && (newBucketKey != atomicState.bucketKey || newBucketName != atomicState.bucketName)) { + atomicState.bucketKey = "$newBucketKey" + atomicState.bucketName = "$newBucketName" + atomicState.isBucketCreated = false } } @@ -215,14 +216,14 @@ def setAccessKey() { def newAccessKey = request.JSON?.accessKey def newGrokerRootUrl = request.JSON?.grokerRootUrl - if (newGrokerRootUrl && newGrokerRootUrl != "" && newGrokerRootUrl != state.grokerRootUrl) { - state.grokerRootUrl = "$newGrokerRootUrl" - state.isBucketCreated = false + if (newGrokerRootUrl && newGrokerRootUrl != "" && newGrokerRootUrl != atomicState.grokerRootUrl) { + atomicState.grokerRootUrl = "$newGrokerRootUrl" + atomicState.isBucketCreated = false } - if (newAccessKey && newAccessKey != state.accessKey) { - state.accessKey = "$newAccessKey" - state.isBucketCreated = false + if (newAccessKey && newAccessKey != atomicState.accessKey) { + atomicState.accessKey = "$newAccessKey" + atomicState.isBucketCreated = false } } @@ -230,15 +231,15 @@ def installed() { subscribeToEvents() - state.isBucketCreated = false - state.grookerRootUrl = "https://groker.initialstate.com" + atomicState.isBucketCreated = false + atomicState.grookerRootUrl = "https://groker.initialstate.com" } def updated() { unsubscribe() - if (state.bucketKey != null && state.accessKey != null) { - state.isBucketCreated = false + if (atomicState.bucketKey != null && atomicState.accessKey != null) { + atomicState.isBucketCreated = false } subscribeToEvents() @@ -246,17 +247,17 @@ def updated() { def createBucket() { - if (!state.bucketName) { - state.bucketName = state.bucketKey + if (!atomicState.bucketName) { + atomicState.bucketName = atomicState.bucketKey } - def bucketName = "${state.bucketName}" - def bucketKey = "${state.bucketKey}" - def accessKey = "${state.accessKey}" + def bucketName = "${atomicState.bucketName}" + def bucketKey = "${atomicState.bucketKey}" + def accessKey = "${atomicState.accessKey}" def bucketCreateBody = new JsonSlurper().parseText("{\"bucketKey\": \"$bucketKey\", \"bucketName\": \"$bucketName\"}") def bucketCreatePost = [ - uri: '${state.grokerRootUrl}/api/buckets', + uri: '${atomicState.grokerRootUrl}/api/buckets', headers: [ "Content-Type": "application/json", "X-IS-AccessKey": accessKey, @@ -269,7 +270,7 @@ def createBucket() { httpPostJson(bucketCreatePost) { log.debug "bucket posted" - state.isBucketCreated = true + atomicState.isBucketCreated = true } } @@ -287,21 +288,21 @@ def genericHandler(evt) { def eventHandler(name, value) { - if (state.accessKey == null || state.bucketKey == null) { + if (atomicState.accessKey == null || atomicState.bucketKey == null) { return } - if (!state.isBucketCreated) { + if (!atomicState.isBucketCreated) { createBucket() } def eventBody = new JsonSlurper().parseText("[{\"key\": \"$name\", \"value\": \"$value\"}]") def eventPost = [ - uri: '${state.grokerRootUrl}/api/events', + uri: '${atomicState.grokerRootUrl}/api/events', headers: [ "Content-Type": "application/json", - "X-IS-BucketKey": "${state.bucketKey}", - "X-IS-AccessKey": "${state.accessKey}", + "X-IS-BucketKey": "${atomicState.bucketKey}", + "X-IS-AccessKey": "${atomicState.accessKey}", "Accept-Version": "0.0.2" ], body: eventBody From bde5abcdb51ad5a5d829ac0b1e3c054c2a66a4b2 Mon Sep 17 00:00:00 2001 From: David Sulpy Date: Fri, 11 Sep 2015 10:31:30 -0500 Subject: [PATCH 2/4] added buffering of events to reduce the calls from ST to IS; added buffer flushing after 10 events or 15 min with a scheduler; added a version to show in logs and help troubleshoot --- .../initial-state-event-streamer.groovy | 114 +++++++++++++----- 1 file changed, 86 insertions(+), 28 deletions(-) diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index 7c6ac38..245c06d 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -11,7 +11,11 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * for the specific language governing permissions and limitations under the License. - * + * + * SmartThings data is sent from this SmartApp to Initial State. This is event data only for + * devices for which the user has authorized. Likewise, Initial State's services call this + * SmartApp on the user's behalf to configure Initial State specific parameters. The ToS and + * Privacy Policy for Initial State can be found here: https://www.initialstate.com/terms */ definition( @@ -33,7 +37,6 @@ preferences { input "alarms", "capability.alarm", title: "Alarms", multiple: true, required: false input "batteries", "capability.battery", title: "Batteries", multiple: true, required: false input "beacons", "capability.beacon", title: "Beacons", multiple: true, required: false - input "buttons", "capability.button", title: "Buttons", multiple: true, required: false input "cos", "capability.carbonMonoxideDetector", title: "Carbon Monoxide Detectors", multiple: true, required: false input "colors", "capability.colorControl", title: "Color Controllers", multiple: true, required: false input "contacts", "capability.contactSensor", title: "Contact Sensors", multiple: true, required: false @@ -88,9 +91,6 @@ def subscribeToEvents() { subscribe(beacons, "presence", genericHandler) } - if (buttons != null) { - subscribe(buttons, "button", genericHandler) - } if (cos != null) { subscribe(cos, "carbonMonoxide", genericHandler) } @@ -170,6 +170,10 @@ def subscribeToEvents() { if (waterSensors != null) { subscribe(waterSensors, "water", genericHandler) } + + if (canSchedule()) { + runEvery15Minutes(flushBuffer()) + } } def getAccessKey() { @@ -214,13 +218,13 @@ def setBucketKey() { def setAccessKey() { log.trace "set access key" def newAccessKey = request.JSON?.accessKey - def newGrokerRootUrl = request.JSON?.grokerRootUrl + def newGrokerSubdomain = request.JSON?.grokerSubdomain - if (newGrokerRootUrl && newGrokerRootUrl != "" && newGrokerRootUrl != atomicState.grokerRootUrl) { - atomicState.grokerRootUrl = "$newGrokerRootUrl" + if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) { + atomicState.grokerSubdomain = "$newGrokerSubdomain" atomicState.isBucketCreated = false } - + if (newAccessKey && newAccessKey != atomicState.accessKey) { atomicState.accessKey = "$newAccessKey" atomicState.isBucketCreated = false @@ -228,11 +232,14 @@ def setAccessKey() { } def installed() { - + atomicState.version = "1.0.17" subscribeToEvents() atomicState.isBucketCreated = false - atomicState.grookerRootUrl = "https://groker.initialstate.com" + atomicState.grokerSubdomain = "groker" + atomicState.eventBuffer = []; + + log.debug "installed (version $atomicState.version)" } def updated() { @@ -243,6 +250,14 @@ def updated() { } subscribeToEvents() + + log.debug "updated (version $atomicState.version)" +} + +def uninstalled() { + unsubscribe() + unschedule() + log.debug "uninstalled (version $atomicState.version)" } def createBucket() { @@ -257,7 +272,7 @@ def createBucket() { def bucketCreateBody = new JsonSlurper().parseText("{\"bucketKey\": \"$bucketKey\", \"bucketName\": \"$bucketName\"}") def bucketCreatePost = [ - uri: '${atomicState.grokerRootUrl}/api/buckets', + uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/buckets", headers: [ "Content-Type": "application/json", "X-IS-AccessKey": accessKey, @@ -268,10 +283,20 @@ def createBucket() { log.debug bucketCreatePost - httpPostJson(bucketCreatePost) { - log.debug "bucket posted" - atomicState.isBucketCreated = true + try { + // Create a bucket on Initial State so the data has a logical grouping + httpPostJson(bucketCreatePost) { resp => + log.debug "bucket posted" + if (resp.status >= 400) { + log.error "bucket not created successfully" + } else { + atomicState.isBucketCreated = true + } + } + } catch (e) { + log.error "bucket creation error: $e" } + } def genericHandler(evt) { @@ -283,11 +308,6 @@ def genericHandler(evt) { } def value = "$evt.value" - eventHandler(key, value) -} - -def eventHandler(name, value) { - if (atomicState.accessKey == null || atomicState.bucketKey == null) { return } @@ -296,21 +316,59 @@ def eventHandler(name, value) { createBucket() } - def eventBody = new JsonSlurper().parseText("[{\"key\": \"$name\", \"value\": \"$value\"}]") + eventHandler(key, value) +} + +// This is a handler function for flushing the event buffer +// after a specified amount of time to reduce the load on ST servers +def flushBuffer() { + if (atomicState.eventBuffer.size() > 0) { + shipEvents(); + } +} + +def eventHandler(name, value) { + log.debug atomicState.eventBuffer; + + def eventBuffer = atomicState.eventBuffer; + def epoch = now() / 1000; + eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"] + + log.debug eventBuffer; + + atomicState.eventBuffer = eventBuffer; + + if (eventBuffer.size() >= 10) { + shipEvents(); + } +} + +// a helper function for shipping the atomicState.eventBuffer to Initial State +def shipEvents() { def eventPost = [ - uri: '${atomicState.grokerRootUrl}/api/events', + uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events", headers: [ "Content-Type": "application/json", "X-IS-BucketKey": "${atomicState.bucketKey}", "X-IS-AccessKey": "${atomicState.accessKey}", "Accept-Version": "0.0.2" ], - body: eventBody - ] + body: atomicState.eventBuffer + ]; - log.debug eventPost - - httpPostJson(eventPost) { - log.debug "event data posted" + try { + // post the events to initial state + httpPostJson(eventPost) { resp => + log.debug "shipped events and got ${resp.status}" + if (resp.status >= 400) { + log.error "shipping failed... ${resp.data}" + } else { + // clear the buffer + atomicState.eventBuffer = []; + } + } + } catch (e) { + log.error "shipping events failed: $e" } + } \ No newline at end of file From 621bcfadd29fd76c0ba3e981eebe91eb549f1229 Mon Sep 17 00:00:00 2001 From: David Sulpy Date: Fri, 11 Sep 2015 10:39:58 -0500 Subject: [PATCH 3/4] fixed a goovy syntax issue with http posts callback closures --- .../initial-state-event-streamer.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index 16a9390..d24e271 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -280,7 +280,7 @@ def createBucket() { try { // Create a bucket on Initial State so the data has a logical grouping - httpPostJson(bucketCreatePost) { resp => + httpPostJson(bucketCreatePost) { resp -> log.debug "bucket posted" if (resp.status >= 400) { log.error "bucket not created successfully" @@ -353,7 +353,7 @@ def shipEvents() { try { // post the events to initial state - httpPostJson(eventPost) { resp => + httpPostJson(eventPost) { resp -> log.debug "shipped events and got ${resp.status}" if (resp.status >= 400) { log.error "shipping failed... ${resp.data}" From 7baad1c35e1eec142e9d9d7a7287c2b834e55733 Mon Sep 17 00:00:00 2001 From: David Sulpy Date: Fri, 11 Sep 2015 11:09:03 -0500 Subject: [PATCH 4/4] fixed the scheduling from throwing exceptions because it was using parenthesis when passing the handler method in to the scheduling method; ensured that the scheduler doesn't send empty events --- .../initial-state-event-streamer.groovy | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy index d24e271..d82ccb3 100644 --- a/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy +++ b/smartapps/initialstate-events/initial-state-event-streamer.src/initial-state-event-streamer.groovy @@ -167,10 +167,6 @@ def subscribeToEvents() { if (waterSensors != null) { subscribe(waterSensors, "water", genericHandler) } - - if (canSchedule()) { - runEvery15Minutes(flushBuffer()) - } } def getAccessKey() { @@ -235,6 +231,8 @@ def installed() { atomicState.grokerSubdomain = "groker" atomicState.eventBuffer = []; + runEvery15Minutes(flushBuffer) + log.debug "installed (version $atomicState.version)" } @@ -317,7 +315,8 @@ def genericHandler(evt) { // This is a handler function for flushing the event buffer // after a specified amount of time to reduce the load on ST servers def flushBuffer() { - if (atomicState.eventBuffer.size() > 0) { + log.trace "About to flush the buffer on schedule" + if (atomicState.eventBuffer != null && atomicState.eventBuffer.size() > 0) { shipEvents(); } }