mirror of
https://github.com/mtan93/SmartThingsPublic.git
synced 2026-03-08 05:31:56 +00:00
added buffering of events to reduce the calls from ST to IS; added buffer flushing after 10 events or 15 min with a scheduler; added a version to show in logs and help troubleshoot
This commit is contained in:
@@ -11,7 +11,11 @@
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||
* for the specific language governing permissions and limitations under the License.
|
||||
*
|
||||
*
|
||||
* SmartThings data is sent from this SmartApp to Initial State. This is event data only for
|
||||
* devices for which the user has authorized. Likewise, Initial State's services call this
|
||||
* SmartApp on the user's behalf to configure Initial State specific parameters. The ToS and
|
||||
* Privacy Policy for Initial State can be found here: https://www.initialstate.com/terms
|
||||
*/
|
||||
|
||||
definition(
|
||||
@@ -33,7 +37,6 @@ preferences {
|
||||
input "alarms", "capability.alarm", title: "Alarms", multiple: true, required: false
|
||||
input "batteries", "capability.battery", title: "Batteries", multiple: true, required: false
|
||||
input "beacons", "capability.beacon", title: "Beacons", multiple: true, required: false
|
||||
input "buttons", "capability.button", title: "Buttons", multiple: true, required: false
|
||||
input "cos", "capability.carbonMonoxideDetector", title: "Carbon Monoxide Detectors", multiple: true, required: false
|
||||
input "colors", "capability.colorControl", title: "Color Controllers", multiple: true, required: false
|
||||
input "contacts", "capability.contactSensor", title: "Contact Sensors", multiple: true, required: false
|
||||
@@ -88,9 +91,6 @@ def subscribeToEvents() {
|
||||
subscribe(beacons, "presence", genericHandler)
|
||||
}
|
||||
|
||||
if (buttons != null) {
|
||||
subscribe(buttons, "button", genericHandler)
|
||||
}
|
||||
if (cos != null) {
|
||||
subscribe(cos, "carbonMonoxide", genericHandler)
|
||||
}
|
||||
@@ -170,6 +170,10 @@ def subscribeToEvents() {
|
||||
if (waterSensors != null) {
|
||||
subscribe(waterSensors, "water", genericHandler)
|
||||
}
|
||||
|
||||
if (canSchedule()) {
|
||||
runEvery15Minutes(flushBuffer())
|
||||
}
|
||||
}
|
||||
|
||||
def getAccessKey() {
|
||||
@@ -214,13 +218,13 @@ def setBucketKey() {
|
||||
def setAccessKey() {
|
||||
log.trace "set access key"
|
||||
def newAccessKey = request.JSON?.accessKey
|
||||
def newGrokerRootUrl = request.JSON?.grokerRootUrl
|
||||
def newGrokerSubdomain = request.JSON?.grokerSubdomain
|
||||
|
||||
if (newGrokerRootUrl && newGrokerRootUrl != "" && newGrokerRootUrl != atomicState.grokerRootUrl) {
|
||||
atomicState.grokerRootUrl = "$newGrokerRootUrl"
|
||||
if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) {
|
||||
atomicState.grokerSubdomain = "$newGrokerSubdomain"
|
||||
atomicState.isBucketCreated = false
|
||||
}
|
||||
|
||||
|
||||
if (newAccessKey && newAccessKey != atomicState.accessKey) {
|
||||
atomicState.accessKey = "$newAccessKey"
|
||||
atomicState.isBucketCreated = false
|
||||
@@ -228,11 +232,14 @@ def setAccessKey() {
|
||||
}
|
||||
|
||||
def installed() {
|
||||
|
||||
atomicState.version = "1.0.17"
|
||||
subscribeToEvents()
|
||||
|
||||
atomicState.isBucketCreated = false
|
||||
atomicState.grookerRootUrl = "https://groker.initialstate.com"
|
||||
atomicState.grokerSubdomain = "groker"
|
||||
atomicState.eventBuffer = [];
|
||||
|
||||
log.debug "installed (version $atomicState.version)"
|
||||
}
|
||||
|
||||
def updated() {
|
||||
@@ -243,6 +250,14 @@ def updated() {
|
||||
}
|
||||
|
||||
subscribeToEvents()
|
||||
|
||||
log.debug "updated (version $atomicState.version)"
|
||||
}
|
||||
|
||||
def uninstalled() {
|
||||
unsubscribe()
|
||||
unschedule()
|
||||
log.debug "uninstalled (version $atomicState.version)"
|
||||
}
|
||||
|
||||
def createBucket() {
|
||||
@@ -257,7 +272,7 @@ def createBucket() {
|
||||
def bucketCreateBody = new JsonSlurper().parseText("{\"bucketKey\": \"$bucketKey\", \"bucketName\": \"$bucketName\"}")
|
||||
|
||||
def bucketCreatePost = [
|
||||
uri: '${atomicState.grokerRootUrl}/api/buckets',
|
||||
uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/buckets",
|
||||
headers: [
|
||||
"Content-Type": "application/json",
|
||||
"X-IS-AccessKey": accessKey,
|
||||
@@ -268,10 +283,20 @@ def createBucket() {
|
||||
|
||||
log.debug bucketCreatePost
|
||||
|
||||
httpPostJson(bucketCreatePost) {
|
||||
log.debug "bucket posted"
|
||||
atomicState.isBucketCreated = true
|
||||
try {
|
||||
// Create a bucket on Initial State so the data has a logical grouping
|
||||
httpPostJson(bucketCreatePost) { resp =>
|
||||
log.debug "bucket posted"
|
||||
if (resp.status >= 400) {
|
||||
log.error "bucket not created successfully"
|
||||
} else {
|
||||
atomicState.isBucketCreated = true
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
log.error "bucket creation error: $e"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def genericHandler(evt) {
|
||||
@@ -283,11 +308,6 @@ def genericHandler(evt) {
|
||||
}
|
||||
def value = "$evt.value"
|
||||
|
||||
eventHandler(key, value)
|
||||
}
|
||||
|
||||
def eventHandler(name, value) {
|
||||
|
||||
if (atomicState.accessKey == null || atomicState.bucketKey == null) {
|
||||
return
|
||||
}
|
||||
@@ -296,21 +316,59 @@ def eventHandler(name, value) {
|
||||
createBucket()
|
||||
}
|
||||
|
||||
def eventBody = new JsonSlurper().parseText("[{\"key\": \"$name\", \"value\": \"$value\"}]")
|
||||
eventHandler(key, value)
|
||||
}
|
||||
|
||||
// This is a handler function for flushing the event buffer
|
||||
// after a specified amount of time to reduce the load on ST servers
|
||||
def flushBuffer() {
|
||||
if (atomicState.eventBuffer.size() > 0) {
|
||||
shipEvents();
|
||||
}
|
||||
}
|
||||
|
||||
def eventHandler(name, value) {
|
||||
log.debug atomicState.eventBuffer;
|
||||
|
||||
def eventBuffer = atomicState.eventBuffer;
|
||||
def epoch = now() / 1000;
|
||||
eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"]
|
||||
|
||||
log.debug eventBuffer;
|
||||
|
||||
atomicState.eventBuffer = eventBuffer;
|
||||
|
||||
if (eventBuffer.size() >= 10) {
|
||||
shipEvents();
|
||||
}
|
||||
}
|
||||
|
||||
// a helper function for shipping the atomicState.eventBuffer to Initial State
|
||||
def shipEvents() {
|
||||
def eventPost = [
|
||||
uri: '${atomicState.grokerRootUrl}/api/events',
|
||||
uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events",
|
||||
headers: [
|
||||
"Content-Type": "application/json",
|
||||
"X-IS-BucketKey": "${atomicState.bucketKey}",
|
||||
"X-IS-AccessKey": "${atomicState.accessKey}",
|
||||
"Accept-Version": "0.0.2"
|
||||
],
|
||||
body: eventBody
|
||||
]
|
||||
body: atomicState.eventBuffer
|
||||
];
|
||||
|
||||
log.debug eventPost
|
||||
|
||||
httpPostJson(eventPost) {
|
||||
log.debug "event data posted"
|
||||
try {
|
||||
// post the events to initial state
|
||||
httpPostJson(eventPost) { resp =>
|
||||
log.debug "shipped events and got ${resp.status}"
|
||||
if (resp.status >= 400) {
|
||||
log.error "shipping failed... ${resp.data}"
|
||||
} else {
|
||||
// clear the buffer
|
||||
atomicState.eventBuffer = [];
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
log.error "shipping events failed: $e"
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user