mirror of
https://github.com/mtan93/SmartThingsPublic.git
synced 2026-03-08 05:31:56 +00:00
New Initial State Event Streamer: removed buffering and scheduled tasks for flushing buffer
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
/**
|
/**
|
||||||
* Initial State Event Streamer
|
* Initial State Event Streamer
|
||||||
*
|
*
|
||||||
* Copyright 2015 David Sulpy
|
* Copyright 2016 David Sulpy
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||||
* in compliance with the License. You may obtain a copy of the License at:
|
* in compliance with the License. You may obtain a copy of the License at:
|
||||||
@@ -77,6 +77,62 @@ mappings {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def getAccessKey() {
|
||||||
|
log.trace "get access key"
|
||||||
|
if (atomicState.accessKey == null) {
|
||||||
|
httpError(404, "Access Key Not Found")
|
||||||
|
} else {
|
||||||
|
[
|
||||||
|
accessKey: atomicState.accessKey
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def getBucketKey() {
|
||||||
|
log.trace "get bucket key"
|
||||||
|
if (atomicState.bucketKey == null) {
|
||||||
|
httpError(404, "Bucket key Not Found")
|
||||||
|
} else {
|
||||||
|
[
|
||||||
|
bucketKey: atomicState.bucketKey,
|
||||||
|
bucketName: atomicState.bucketName
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def setBucketKey() {
|
||||||
|
log.trace "set bucket key"
|
||||||
|
def newBucketKey = request.JSON?.bucketKey
|
||||||
|
def newBucketName = request.JSON?.bucketName
|
||||||
|
|
||||||
|
log.debug "bucket name: $newBucketName"
|
||||||
|
log.debug "bucket key: $newBucketKey"
|
||||||
|
|
||||||
|
if (newBucketKey && (newBucketKey != atomicState.bucketKey || newBucketName != atomicState.bucketName)) {
|
||||||
|
atomicState.bucketKey = "$newBucketKey"
|
||||||
|
atomicState.bucketName = "$newBucketName"
|
||||||
|
atomicState.isBucketCreated = false
|
||||||
|
}
|
||||||
|
|
||||||
|
tryCreateBucket()
|
||||||
|
}
|
||||||
|
|
||||||
|
def setAccessKey() {
|
||||||
|
log.trace "set access key"
|
||||||
|
def newAccessKey = request.JSON?.accessKey
|
||||||
|
def newGrokerSubdomain = request.JSON?.grokerSubdomain
|
||||||
|
|
||||||
|
if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) {
|
||||||
|
atomicState.grokerSubdomain = "$newGrokerSubdomain"
|
||||||
|
atomicState.isBucketCreated = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newAccessKey && newAccessKey != atomicState.accessKey) {
|
||||||
|
atomicState.accessKey = "$newAccessKey"
|
||||||
|
atomicState.isBucketCreated = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def subscribeToEvents() {
|
def subscribeToEvents() {
|
||||||
if (accelerometers != null) {
|
if (accelerometers != null) {
|
||||||
subscribe(accelerometers, "acceleration", genericHandler)
|
subscribe(accelerometers, "acceleration", genericHandler)
|
||||||
@@ -169,85 +225,27 @@ def subscribeToEvents() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getAccessKey() {
|
|
||||||
log.trace "get access key"
|
|
||||||
if (atomicState.accessKey == null) {
|
|
||||||
httpError(404, "Access Key Not Found")
|
|
||||||
} else {
|
|
||||||
[
|
|
||||||
accessKey: atomicState.accessKey
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def getBucketKey() {
|
|
||||||
log.trace "get bucket key"
|
|
||||||
if (atomicState.bucketKey == null) {
|
|
||||||
httpError(404, "Bucket key Not Found")
|
|
||||||
} else {
|
|
||||||
[
|
|
||||||
bucketKey: atomicState.bucketKey,
|
|
||||||
bucketName: atomicState.bucketName
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def setBucketKey() {
|
|
||||||
log.trace "set bucket key"
|
|
||||||
def newBucketKey = request.JSON?.bucketKey
|
|
||||||
def newBucketName = request.JSON?.bucketName
|
|
||||||
|
|
||||||
log.debug "bucket name: $newBucketName"
|
|
||||||
log.debug "bucket key: $newBucketKey"
|
|
||||||
|
|
||||||
if (newBucketKey && (newBucketKey != atomicState.bucketKey || newBucketName != atomicState.bucketName)) {
|
|
||||||
atomicState.bucketKey = "$newBucketKey"
|
|
||||||
atomicState.bucketName = "$newBucketName"
|
|
||||||
atomicState.isBucketCreated = false
|
|
||||||
}
|
|
||||||
|
|
||||||
tryCreateBucket()
|
|
||||||
}
|
|
||||||
|
|
||||||
def setAccessKey() {
|
|
||||||
log.trace "set access key"
|
|
||||||
def newAccessKey = request.JSON?.accessKey
|
|
||||||
def newGrokerSubdomain = request.JSON?.grokerSubdomain
|
|
||||||
|
|
||||||
if (newGrokerSubdomain && newGrokerSubdomain != "" && newGrokerSubdomain != atomicState.grokerSubdomain) {
|
|
||||||
atomicState.grokerSubdomain = "$newGrokerSubdomain"
|
|
||||||
atomicState.isBucketCreated = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if (newAccessKey && newAccessKey != atomicState.accessKey) {
|
|
||||||
atomicState.accessKey = "$newAccessKey"
|
|
||||||
atomicState.isBucketCreated = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def installed() {
|
def installed() {
|
||||||
atomicState.version = "1.0.18"
|
atomicState.version = "1.1.0"
|
||||||
|
|
||||||
|
atomicState.isBucketCreated = false
|
||||||
|
atomicState.grokerSubdomain = "groker"
|
||||||
|
|
||||||
subscribeToEvents()
|
subscribeToEvents()
|
||||||
|
|
||||||
atomicState.isBucketCreated = false
|
atomicState.isBucketCreated = false
|
||||||
atomicState.grokerSubdomain = "groker"
|
atomicState.grokerSubdomain = "groker"
|
||||||
atomicState.eventBuffer = []
|
|
||||||
|
|
||||||
runEvery15Minutes(flushBuffer)
|
|
||||||
|
|
||||||
log.debug "installed (version $atomicState.version)"
|
log.debug "installed (version $atomicState.version)"
|
||||||
}
|
}
|
||||||
|
|
||||||
def updated() {
|
def updated() {
|
||||||
atomicState.version = "1.0.18"
|
atomicState.version = "1.1.0"
|
||||||
unsubscribe()
|
unsubscribe()
|
||||||
|
|
||||||
if (atomicState.bucketKey != null && atomicState.accessKey != null) {
|
if (atomicState.bucketKey != null && atomicState.accessKey != null) {
|
||||||
atomicState.isBucketCreated = false
|
atomicState.isBucketCreated = false
|
||||||
}
|
}
|
||||||
if (atomicState.eventBuffer == null) {
|
|
||||||
atomicState.eventBuffer = []
|
|
||||||
}
|
|
||||||
if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") {
|
if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") {
|
||||||
atomicState.grokerSubdomain = "groker"
|
atomicState.grokerSubdomain = "groker"
|
||||||
}
|
}
|
||||||
@@ -327,37 +325,17 @@ def genericHandler(evt) {
|
|||||||
eventHandler(key, value)
|
eventHandler(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is a handler function for flushing the event buffer
|
|
||||||
// after a specified amount of time to reduce the load on ST servers
|
|
||||||
def flushBuffer() {
|
|
||||||
def eventBuffer = atomicState.eventBuffer
|
|
||||||
log.trace "About to flush the buffer on schedule"
|
|
||||||
if (eventBuffer != null && eventBuffer.size() > 0) {
|
|
||||||
atomicState.eventBuffer = []
|
|
||||||
tryShipEvents(eventBuffer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def eventHandler(name, value) {
|
def eventHandler(name, value) {
|
||||||
def epoch = now() / 1000
|
def epoch = now() / 1000
|
||||||
def eventBuffer = atomicState.eventBuffer ?: []
|
|
||||||
eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"]
|
|
||||||
|
|
||||||
if (eventBuffer.size() >= 10) {
|
def event = new JsonSlurper().parseText("{\"key\": \"$name\", \"value\": \"$value\", \"epoch\": \"$epoch\"}")
|
||||||
// Clear eventBuffer right away since we've already pulled it off of atomicState to reduce the risk of missing
|
|
||||||
// events. This assumes the grokerSubdomain, accessKey, and bucketKey are set correctly to avoid the eventBuffer
|
tryShipEvents(event)
|
||||||
// from growing unbounded.
|
|
||||||
atomicState.eventBuffer = []
|
log.debug "Shipped Event: " + event
|
||||||
tryShipEvents(eventBuffer)
|
|
||||||
} else {
|
|
||||||
// Make sure we persist the updated eventBuffer with the new event added back to atomicState
|
|
||||||
atomicState.eventBuffer = eventBuffer
|
|
||||||
}
|
|
||||||
log.debug "Event added to buffer: " + eventBuffer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// a helper function for shipping the atomicState.eventBuffer to Initial State
|
def tryShipEvents(event) {
|
||||||
def tryShipEvents(eventBuffer) {
|
|
||||||
|
|
||||||
def grokerSubdomain = atomicState.grokerSubdomain
|
def grokerSubdomain = atomicState.grokerSubdomain
|
||||||
// can't ship events if there is no grokerSubdomain
|
// can't ship events if there is no grokerSubdomain
|
||||||
@@ -380,7 +358,7 @@ def tryShipEvents(eventBuffer) {
|
|||||||
"X-IS-AccessKey": "${accessKey}",
|
"X-IS-AccessKey": "${accessKey}",
|
||||||
"Accept-Version": "0.0.2"
|
"Accept-Version": "0.0.2"
|
||||||
],
|
],
|
||||||
body: eventBuffer
|
body: event
|
||||||
]
|
]
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
Reference in New Issue
Block a user