Merge pull request #1180 from jimmyjames/ecobee-async-http-polling

[DVCSMP-1979] Use async http for polling and refresh tokens.
This commit is contained in:
Jim Anderson
2016-09-08 12:13:53 -05:00
committed by GitHub
3 changed files with 227 additions and 110 deletions

View File

@@ -67,6 +67,6 @@ def refresh() {
void poll() { void poll() {
log.debug "Executing 'poll' using parent SmartApp" log.debug "Executing 'poll' using parent SmartApp"
parent.pollChild() parent.poll()
} }

View File

@@ -133,7 +133,7 @@ def refresh() {
void poll() { void poll() {
log.debug "Executing 'poll' using parent SmartApp" log.debug "Executing 'poll' using parent SmartApp"
parent.pollChild() parent.poll()
} }
def generateEvent(Map results) { def generateEvent(Map results) {

View File

@@ -20,6 +20,8 @@
* JLH - 02-15-2014 - Fuller use of ecobee API * JLH - 02-15-2014 - Fuller use of ecobee API
* 10-28-2015 DVCSMP-604 - accessory sensor, DVCSMP-1174, DVCSMP-1111 - not respond to routines * 10-28-2015 DVCSMP-604 - accessory sensor, DVCSMP-1174, DVCSMP-1111 - not respond to routines
*/ */
include 'asynchttp_v1'
definition( definition(
name: "Ecobee (Connect)", name: "Ecobee (Connect)",
namespace: "smartthings", namespace: "smartthings",
@@ -244,9 +246,7 @@ def getEcobeeThermostats() {
uri: apiEndpoint, uri: apiEndpoint,
path: "/1/thermostat", path: "/1/thermostat",
headers: ["Content-Type": "text/json", "Authorization": "Bearer ${atomicState.authToken}"], headers: ["Content-Type": "text/json", "Authorization": "Bearer ${atomicState.authToken}"],
// TODO - the query string below is not consistent with the Ecobee docs: query: [json: toJson(bodyParams)]
// https://www.ecobee.com/home/developer/api/documentation/v1/operations/get-thermostats.shtml
query: [format: 'json', body: toJson(bodyParams)]
] ]
def stats = [:] def stats = [:]
@@ -265,9 +265,8 @@ def getEcobeeThermostats() {
} catch (groovyx.net.http.HttpResponseException e) { } catch (groovyx.net.http.HttpResponseException e) {
log.trace "Exception polling children: " + e.response.data.status log.trace "Exception polling children: " + e.response.data.status
if (e.response.data.status.code == 14) { if (e.response.data.status.code == 14) {
atomicState.action = "getEcobeeThermostats"
log.debug "Refreshing your auth_token!" log.debug "Refreshing your auth_token!"
refreshAuthToken() refreshAuthToken([async: false, nextAction: "getEcobeeThermostats"])
} }
} }
atomicState.thermostats = stats atomicState.thermostats = stats
@@ -358,16 +357,22 @@ def initialize() {
atomicState.timeSendPush = null atomicState.timeSendPush = null
atomicState.reAttempt = 0 atomicState.reAttempt = 0
pollHandler() //first time polling data data from thermostat initialPoll() //first time polling data data from thermostat
//automatically update devices status every 5 mins //automatically update devices status every 5 mins
runEvery5Minutes("poll") runEvery5Minutes("poll")
} }
def pollHandler() { /**
log.debug "pollHandler()" * Polls the child devices (synchronously).
pollChildren(null) // Hit the ecobee API for update on all thermostats * This is used during app install/update, and is synchronous
* to maintain current behavior that will cause install/update to fail
* if polling fails.
*/
def initialPoll() {
log.debug "initialPoll()"
pollChildrenSync() // Hit the ecobee API for update on all thermostats
atomicState.thermostats.each {stat -> atomicState.thermostats.each {stat ->
def dni = stat.key def dni = stat.key
@@ -380,36 +385,38 @@ def pollHandler() {
} }
} }
def pollChildren(child = null) { /**
def thermostatIdsString = getChildDeviceIdsString() * Polls Ecobee (asynchronously) for updated device state data.
* Called from within this Connect SmartApp as well as the child
* devices.
*/
def poll() {
log.debug "polling asynchronously"
asynchttp_v1.get('asyncPollResponseHandler', getPollParams())
}
/**
* Makes a (synchronous) request to the Ecobee API to get the data for the thermostats.
* This request is made synchronously here because it is called as part of the
* install/updated lifecycle, and changing it to asynchronous during the install/update
* lifecycle may change the behavior if there is an error in polling.
*
* If further analysis shows that polling can be done asynchronously during
* install/update without any adverse consequences, this should then be made
* asynchronous just as the scheduled polling is.
*/
def pollChildrenSync() {
log.debug "polling children: $thermostatIdsString" log.debug "polling children: $thermostatIdsString"
def requestBody = [ def params = getPollParams()
selection: [ params.query << ["Content-Type": "application/json"]
selectionType: "thermostats",
selectionMatch: thermostatIdsString,
includeExtendedRuntime: true,
includeSettings: true,
includeRuntime: true,
includeSensors: true
]
]
def result = false def result = false
log.debug "making synchronous poll request"
def pollParams = [
uri: apiEndpoint,
path: "/1/thermostat",
headers: ["Content-Type": "text/json", "Authorization": "Bearer ${atomicState.authToken}"],
// TODO - the query string below is not consistent with the Ecobee docs:
// https://www.ecobee.com/home/developer/api/documentation/v1/operations/get-thermostats.shtml
query: [format: 'json', body: toJson(requestBody)]
]
try{ try{
httpGet(pollParams) { resp -> httpGet(params) { resp ->
if(resp.status == 200) { if(resp.status == 200) {
log.debug "poll results returned resp.data ${resp.data}"
atomicState.remoteSensors = resp.data.thermostatList.remoteSensors atomicState.remoteSensors = resp.data.thermostatList.remoteSensors
updateSensorData() updateSensorData()
storeThermostatData(resp.data.thermostatList) storeThermostatData(resp.data.thermostatList)
@@ -420,40 +427,95 @@ def pollChildren(child = null) {
} catch (groovyx.net.http.HttpResponseException e) { } catch (groovyx.net.http.HttpResponseException e) {
log.trace "Exception polling children: " + e.response.data.status log.trace "Exception polling children: " + e.response.data.status
if (e.response.data.status.code == 14) { if (e.response.data.status.code == 14) {
atomicState.action = "pollChildren"
log.debug "Refreshing your auth_token!" log.debug "Refreshing your auth_token!"
refreshAuthToken() refreshAuthToken([async: false, nextAction: "pollChildrenSync"])
} }
} }
return result return result
} }
// Poll Child is invoked from the Child Device itself as part of the Poll Capability /**
def pollChild() { * Response handler for asynchronous request to get thermostat data.
def devices = getChildDevices() * Given a successful response, updates the sensor data, stores the thermostat
* data, and generates child device events.
if (pollChildren()) { *
devices.each { child -> * If the access token has expired, will issue a request to refresh the token
if (!child.device.deviceNetworkId.startsWith("ecobee_sensor")) { * (and pending successful token refresh, the poll request will be made again).
if(atomicState.thermostats[child.device.deviceNetworkId] != null) { */
def tData = atomicState.thermostats[child.device.deviceNetworkId] def asyncPollResponseHandler(response, data) {
log.info "pollChild(child)>> data for ${child.device.deviceNetworkId} : ${tData.data}" log.trace "async poll response handler"
child.generateEvent(tData.data) //parse received message from parent if (!response.hasError()) {
} else if(atomicState.thermostats[child.device.deviceNetworkId] == null) { if (response.status == 200) {
log.error "ERROR: Device connection removed? no data for ${child.device.deviceNetworkId}" def json
return null try {
} json = response.getJson()
} } catch (e) {
} log.error ("error parsing JSON", e)
} else { }
log.info "ERROR: pollChildren()" if (json) {
return null atomicState.remoteSensors = json.thermostatList.remoteSensors
} updateSensorData()
storeThermostatData(json.thermostatList)
generateChildThermostatEvent()
}
} else {
log.warn "Response returned non-200 response. Status: ${response.status}, data: ${response.getData()}"
}
} else {
log.trace "Exception polling children: ${response.getErrorMessage()}"
def errorJson
try {
errorJson = response.getErrorJson()
} catch (e) {
log.error("Unable to parse error json response", e)
}
if (errorJson?.status?.code == 14) {
log.debug "Refreshing your auth_token!"
refreshAuthToken([async: true, nextAction: "poll"])
} else {
log.warn "Error polling children that is not due to an expired token. Response: ${response.getErrorData()}"
}
}
} }
void poll() { private getPollParams() {
pollChild() def thermostatIdsString = getChildDeviceIdsString()
def requestBody = [
selection: [
selectionType: "thermostats",
selectionMatch: thermostatIdsString,
includeExtendedRuntime: true,
includeSettings: true,
includeRuntime: true,
includeSensors: true
]
]
return [
uri: apiEndpoint,
path: "/1/thermostat",
headers: ["Authorization": "Bearer ${atomicState.authToken}"],
query: [json: toJson(requestBody)]
]
}
/**
* Calls each child thermostat device to generate an event with the thermostat
* data.
*/
def generateChildThermostatEvent() {
log.trace("generateChildThermostatEvent")
getChildDevices().each { child ->
if (!child.device.deviceNetworkId.startsWith("ecobee_sensor")){
if(atomicState.thermostats[child.device.deviceNetworkId] != null) {
def tData = atomicState.thermostats[child.device.deviceNetworkId]
log.debug "calling child.generateEvent($tData.data)"
child.generateEvent(tData.data) //parse received message from parent
} else if(atomicState.thermostats[child.device.deviceNetworkId] == null) {
log.error "ERROR: Device connection removed? no data for ${child.device.deviceNetworkId}"
return null
}
}
}
} }
def availableModes(child) { def availableModes(child) {
@@ -553,47 +615,104 @@ def toQueryString(Map m) {
return m.collect { k, v -> "${k}=${URLEncoder.encode(v.toString())}" }.sort().join("&") return m.collect { k, v -> "${k}=${URLEncoder.encode(v.toString())}" }.sort().join("&")
} }
private refreshAuthToken() { /**
log.debug "refreshing auth token" * Uses the refresh token to get a new access token, then executes the nextAction.
* @param options - a map of options. valid options are async: true/false, which
* specifies if the refresh token request will be done asynchronously or not (default is false)
* nextAction: "nameOfMethod" specifies what method to execute after
* the token is refreshed (not required).
* (note: using a map as the parameter because we need to call it from a schedueled
* execution and we can only pass a data map to scheduled executions)
*/
private void refreshAuthToken(options) {
if(!atomicState.refreshToken) {
log.warn "Cannot not refresh OAuth token since there is no refreshToken stored"
} else {
def refreshParams = [
uri : apiEndpoint,
path : "/token",
query : [grant_type: 'refresh_token', code: "${atomicState.refreshToken}", client_id: smartThingsClientId],
]
if (options.async) {
refreshAuthTokenAsync(refreshParams, options.nextAction)
} else {
refreshAuthTokenSync(refreshParams, options.nextAction)
}
}
}
if(!atomicState.refreshToken) { private void refreshAuthTokenSync(params, nextAction = null) {
log.warn "Can not refresh OAuth token since there is no refreshToken stored" try {
} else { httpPost(refreshParams) { resp ->
def refreshParams = [ if(resp.status == 200) {
method: 'POST', log.debug "Token refreshed...calling saved RestAction now!"
uri : apiEndpoint, debugEvent("Token refreshed ... calling saved RestAction now!")
path : "/token", saveTokenAndResumeAction(resp.data, nextAction)
query : [grant_type: 'refresh_token', code: "${atomicState.refreshToken}", client_id: smartThingsClientId],
]
def notificationMessage = "is disconnected from SmartThings, because the access credential changed or was lost. Please go to the Ecobee (Connect) SmartApp and re-enter your account login credentials."
//changed to httpPost
try {
def jsonMap
httpPost(refreshParams) { resp ->
if(resp.status == 200) {
log.debug "Token refreshed...calling saved RestAction now!"
debugEvent("Token refreshed ... calling saved RestAction now!")
saveTokenAndResumeAction(resp.data)
}
} }
} catch (groovyx.net.http.HttpResponseException e) { }
log.error "refreshAuthToken() >> Error: e.statusCode ${e.statusCode}" } catch (groovyx.net.http.HttpResponseException e) {
def reAttemptPeriod = 300 // in sec log.error "refreshAuthToken() >> Error: e.statusCode ${e.statusCode}"
if (e.statusCode != 401) { // this issue might comes from exceed 20sec app execution, connectivity issue etc. reauthTokenErrorHandler(e.statusCode)
runIn(reAttemptPeriod, "refreshAuthToken") }
} else if (e.statusCode == 401) { // unauthorized }
atomicState.reAttempt = atomicState.reAttempt + 1
log.warn "reAttempt refreshAuthToken to try = ${atomicState.reAttempt}" private void refreshAuthTokenAsync(refreshParams, nextAction = null) {
if (atomicState.reAttempt <= 3) { log.debug "making asynchronous refresh request"
runIn(reAttemptPeriod, "refreshAuthToken") asynchttp_v1.post('refreshTokenResponseHandler', refreshParams, [nextAction: nextAction])
} else { }
sendPushAndFeeds(notificationMessage)
atomicState.reAttempt = 0 /**
} * The response handler for the request to refresh the authorization handler.
} * Stores the new authorization token and refresh token, and executes any action
} * (method) that failed due to the authorization token expiring.
} */
private void refreshTokenResponseHandler(response, data) {
if (!response.hasError()) {
if (response.status == 200) {
def json
try {
json = response.getJson()
} catch (e) {
log.error "error parsing json from response data: $response.data"
}
if (json) {
log.debug "asnyc refreshTokenHandler: Token refreshed...calling saved RestAction now!"
debugEvent("async Token refreshed ... calling saved RestAction now!")
saveTokenAndResumeAction(json, data.nextAction)
} else {
log.warn "successfully parsed json but result is empty or null"
}
} else {
log.debug "Non 200 response returned. Response code: ${response.code}, data: ${response.getData()}"
}
} else {
log.debug "async refreshTokenHandler: RESPONSE ERROR: ${response.getErrorJson()}"
reauthTokenErrorHandler(response.getErrorJson().code)
}
}
/**
* Retries refreshing the authorization token. Will attempt to get the refresh
* token later, in case there were errors retrieving it.
* Will retry a fixed number of times before sending a push notification to the
* user instructing them to reauthenticate
*/
private void reauthTokenErrorHandler(responseCode) {
def retryInterval = 300 // in seconds
def notificationMessage = "is disconnected from SmartThings, because the access credential changed or was lost. Please go to the Ecobee (Connect) SmartApp and re-enter your account login credentials."
// might get non-401 error from exceeding 20 second app limit, connectivity issues, etc.
if (responseCode != 401) {
runIn(retryInterval, "refreshAuthToken", [async: true])
} else if (responseCode == 401) { // unauthorized
atomicState.reAttempt = atomicState.reAttempt + 1
log.warn "reAttempt refreshAuthToken to try = ${atomicState.reAttempt}"
if (atomicState.reAttempt <= 3) {
runIn(retryInterval, "refreshAuthToken", [async: true])
} else {
sendPushAndFeeds(notificationMessage)
atomicState.reAttempt = 0
}
}
} }
/** /**
@@ -603,20 +722,20 @@ private refreshAuthToken() {
* *
* @param json - an object representing the parsed JSON response from Ecobee * @param json - an object representing the parsed JSON response from Ecobee
*/ */
private void saveTokenAndResumeAction(json) { private void saveTokenAndResumeAction(json, String nextAction) {
log.debug "token response json: $json" def debugMessage = "token response, scope: ${json?.scope}, expires_in: ${json?.expires_in}, token_type: ${json?.token_type}"
log.debug "debugMessage"
if (json) { if (json) {
debugEvent("Response = $json") debugEvent(debugMessage)
atomicState.refreshToken = json?.refresh_token atomicState.refreshToken = json?.refresh_token
atomicState.authToken = json?.access_token atomicState.authToken = json?.access_token
if (atomicState.action) { if (nextAction) {
log.debug "got refresh token, executing next action: ${atomicState.action}" log.debug "got refresh token, will execute next action (passed in!): $nextAction"
"${atomicState.action}"() "$nextAction"()
} }
} else { } else {
log.warn "did not get response body from refresh token response" log.warn "did not get response body from refresh token response"
} }
atomicState.action = ""
} }
/** /**
@@ -756,7 +875,6 @@ private boolean sendCommandToEcobee(Map bodyParams) {
try{ try{
httpPost(cmdParams) { resp -> httpPost(cmdParams) { resp ->
if(resp.status == 200) { if(resp.status == 200) {
log.debug "updated ${resp.data}"
def returnStatus = resp.data.status.code def returnStatus = resp.data.status.code
if (returnStatus == 0) { if (returnStatus == 0) {
log.debug "Successful call to ecobee API." log.debug "Successful call to ecobee API."
@@ -771,11 +889,10 @@ private boolean sendCommandToEcobee(Map bodyParams) {
log.trace "Exception Sending Json: " + e.response.data.status log.trace "Exception Sending Json: " + e.response.data.status
debugEvent ("sent Json & got http status ${e.statusCode} - ${e.response.data.status.code}") debugEvent ("sent Json & got http status ${e.statusCode} - ${e.response.data.status.code}")
if (e.response.data.status.code == 14) { if (e.response.data.status.code == 14) {
// TODO - figure out why we're setting the next action to be pollChildren // TODO - figure out why we're setting the next action to be poll
// after refreshing auth token. Is it to keep UI in sync, or just copy/paste error? // after refreshing auth token. Is it to keep UI in sync, or just copy/paste error?
atomicState.action = "pollChildren"
log.debug "Refreshing your auth_token!" log.debug "Refreshing your auth_token!"
refreshAuthToken() refreshAuthToken([async: true, nextAction: "poll"])
} else { } else {
debugEvent("Authentication error, invalid authentication method, lack of credentials, etc.") debugEvent("Authentication error, invalid authentication method, lack of credentials, etc.")
log.error "Authentication error, invalid authentication method, lack of credentials, etc." log.error "Authentication error, invalid authentication method, lack of credentials, etc."