Source code for pyowm.weatherapi25.parsers.stationhistoryparser
"""
Module containing a concrete implementation for JSONParser abstract class,
returning a StationHistory instance
"""
import json
import time
from pyowm.weatherapi25 import stationhistory
from pyowm.abstractions import jsonparser
from pyowm.exceptions import parse_response_error, api_response_error
[docs]class StationHistoryParser(jsonparser.JSONParser):
"""
Concrete *JSONParser* implementation building a *StationHistory* instance
out of raw JSON data coming from OWM Weather API responses.
"""
def __init__(self):
pass
[docs] def parse_JSON(self, JSON_string):
"""
Parses a *StationHistory* instance out of raw JSON data. Only certain
properties of the data are used: if these properties are not found or
cannot be parsed, an error is issued.
:param JSON_string: a raw JSON string
:type JSON_string: str
:returns: a *StationHistory* instance or ``None`` if no data is
available
:raises: *ParseResponseError* if it is impossible to find or parse the
data needed to build the result, *APIResponseError* if the JSON
string embeds an HTTP status error
"""
if JSON_string is None:
raise parse_response_error.ParseResponseError('JSON data is None')
d = json.loads(JSON_string)
# Check if server returned errors: this check overcomes the lack of use
# of HTTP error status codes by the OWM API but it's supposed to be
# deprecated as soon as the API implements a correct HTTP mechanism for
# communicating errors to the clients. In addition, in this specific
# case the OWM API responses are the very same either when no results
# are found for a station and when the station does not exist!
measurements = {}
try:
if 'cod' in d:
if d['cod'] != "200":
raise api_response_error.APIResponseError(
"OWM API: error - response payload: " + str(d), d['cod'])
if str(d['cnt']) == "0":
return None
else:
for item in d['list']:
if 'temp' not in item:
temp = None
elif isinstance(item['temp'], dict):
temp = item['temp']['v']
else:
temp = item['temp']
if 'humidity' not in item:
hum = None
elif isinstance(item['humidity'], dict):
hum = item['humidity']['v']
else:
hum = item['humidity']
if 'pressure' not in item:
pres = None
elif isinstance(item['pressure'], dict):
pres = item['pressure']['v']
else:
pres = item['pressure']
if 'rain' in item and isinstance(item['rain']['today'],
dict):
rain = item['rain']['today']['v']
else:
rain = None
if 'wind' in item and isinstance(item['wind']['speed'],
dict):
wind = item['wind']['speed']['v']
else:
wind = None
measurements[item['dt']] = {"temperature": temp,
"humidity": hum,
"pressure": pres,
"rain": rain,
"wind": wind
}
except KeyError:
raise parse_response_error.ParseResponseError(__name__ + \
': impossible to read JSON data')
current_time = round(time.time())
return stationhistory.StationHistory(None, None, current_time,
measurements)
def __repr__(self):
return "<%s.%s>" % (__name__, self.__class__.__name__)