Improve performance when streaming to Inital State and prevent buffer growing unbounded

This commit is contained in:
rappleg
2015-12-14 18:40:55 -06:00
parent e529624d36
commit 18bfa87948

View File

@@ -11,9 +11,9 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License. * for the specific language governing permissions and limitations under the License.
* *
* SmartThings data is sent from this SmartApp to Initial State. This is event data only for * SmartThings data is sent from this SmartApp to Initial State. This is event data only for
* devices for which the user has authorized. Likewise, Initial State's services call this * devices for which the user has authorized. Likewise, Initial State's services call this
* SmartApp on the user's behalf to configure Initial State specific parameters. The ToS and * SmartApp on the user's behalf to configure Initial State specific parameters. The ToS and
* Privacy Policy for Initial State can be found here: https://www.initialstate.com/terms * Privacy Policy for Initial State can be found here: https://www.initialstate.com/terms
*/ */
@@ -90,7 +90,7 @@ def subscribeToEvents() {
if (beacons != null) { if (beacons != null) {
subscribe(beacons, "presence", genericHandler) subscribe(beacons, "presence", genericHandler)
} }
if (cos != null) { if (cos != null) {
subscribe(cos, "carbonMonoxide", genericHandler) subscribe(cos, "carbonMonoxide", genericHandler)
} }
@@ -218,7 +218,7 @@ def setAccessKey() {
atomicState.grokerSubdomain = "$newGrokerSubdomain" atomicState.grokerSubdomain = "$newGrokerSubdomain"
atomicState.isBucketCreated = false atomicState.isBucketCreated = false
} }
if (newAccessKey && newAccessKey != atomicState.accessKey) { if (newAccessKey && newAccessKey != atomicState.accessKey) {
atomicState.accessKey = "$newAccessKey" atomicState.accessKey = "$newAccessKey"
atomicState.isBucketCreated = false atomicState.isBucketCreated = false
@@ -262,7 +262,7 @@ def uninstalled() {
} }
def tryCreateBucket() { def tryCreateBucket() {
// can't ship events if there is no grokerSubdomain // can't ship events if there is no grokerSubdomain
if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") { if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") {
log.error "streaming url is currently null" log.error "streaming url is currently null"
@@ -330,57 +330,54 @@ def genericHandler(evt) {
// This is a handler function for flushing the event buffer // This is a handler function for flushing the event buffer
// after a specified amount of time to reduce the load on ST servers // after a specified amount of time to reduce the load on ST servers
def flushBuffer() { def flushBuffer() {
def eventBuffer = atomicState.eventBuffer
log.trace "About to flush the buffer on schedule" log.trace "About to flush the buffer on schedule"
if (atomicState.eventBuffer != null && atomicState.eventBuffer.size() > 0) { if (eventBuffer != null && eventBuffer.size() > 0) {
tryShipEvents() atomicState.eventBuffer = []
tryShipEvents(eventBuffer)
} }
} }
def eventHandler(name, value) { def eventHandler(name, value) {
log.debug atomicState.eventBuffer
def eventBuffer = atomicState.eventBuffer
def epoch = now() / 1000 def epoch = now() / 1000
def eventBuffer = atomicState.eventBuffer ?: []
// if for some reason this code block is being run
// but the SmartApp wasn't propery setup during install
// we need to set initialize the eventBuffer.
if (!atomicState.eventBuffer) {
atomicState.eventBuffer = []
}
eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"] eventBuffer << [key: "$name", value: "$value", epoch: "$epoch"]
log.debug eventBuffer
atomicState.eventBuffer = eventBuffer
if (eventBuffer.size() >= 10) { if (eventBuffer.size() >= 10) {
tryShipEvents() // Clear eventBuffer right away since we've already pulled it off of atomicState to reduce the risk of missing
// events. This assumes the grokerSubdomain, accessKey, and bucketKey are set correctly to avoid the eventBuffer
// from growing unbounded.
atomicState.eventBuffer = []
tryShipEvents(eventBuffer)
} }
log.debug "Event added to buffer: " + eventBuffer
} }
// a helper function for shipping the atomicState.eventBuffer to Initial State // a helper function for shipping the atomicState.eventBuffer to Initial State
def tryShipEvents() { def tryShipEvents(eventBuffer) {
def grokerSubdomain = atomicState.grokerSubdomain
// can't ship events if there is no grokerSubdomain // can't ship events if there is no grokerSubdomain
if (atomicState.grokerSubdomain == null || atomicState.grokerSubdomain == "") { if (grokerSubdomain == null || grokerSubdomain == "") {
log.error "streaming url is currently null" log.error "streaming url is currently null"
return return
} }
def accessKey = atomicState.accessKey
def bucketKey = atomicState.bucketKey
// can't ship if access key and bucket key are null, so finish trying // can't ship if access key and bucket key are null, so finish trying
if (atomicState.accessKey == null || atomicState.bucketKey == null) { if (accessKey == null || bucketKey == null) {
return return
} }
def eventPost = [ def eventPost = [
uri: "https://${atomicState.grokerSubdomain}.initialstate.com/api/events", uri: "https://${grokerSubdomain}.initialstate.com/api/events",
headers: [ headers: [
"Content-Type": "application/json", "Content-Type": "application/json",
"X-IS-BucketKey": "${atomicState.bucketKey}", "X-IS-BucketKey": "${bucketKey}",
"X-IS-AccessKey": "${atomicState.accessKey}", "X-IS-AccessKey": "${accessKey}",
"Accept-Version": "0.0.2" "Accept-Version": "0.0.2"
], ],
body: atomicState.eventBuffer body: eventBuffer
] ]
try { try {
@@ -389,13 +386,10 @@ def tryShipEvents() {
log.debug "shipped events and got ${resp.status}" log.debug "shipped events and got ${resp.status}"
if (resp.status >= 400) { if (resp.status >= 400) {
log.error "shipping failed... ${resp.data}" log.error "shipping failed... ${resp.data}"
} else {
// clear the buffer
atomicState.eventBuffer = []
} }
} }
} catch (e) { } catch (e) {
log.error "shipping events failed: $e" log.error "shipping events failed: $e"
} }
} }