Source code for olx.offer

#!/usr/bin/python
# -*- coding: utf-8 -*-
import datetime as dt
import json
import logging
import re

from bs4 import BeautifulSoup

from olx.utils import get_content_for_url

try:
    from __builtin__ import unicode
except ImportError:
    unicode = lambda x, *args: x

log = logging.getLogger(__file__)


[docs]def get_title(offer_markup): """ Searches for offer title on offer page :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Title of offer :rtype: str, None """ html_parser = BeautifulSoup(offer_markup, "html.parser") return html_parser.h1.text.strip()
[docs]def parse_tracking_data(offer_markup): """ Parses price and add_id from OLX tracking data script :param offer_markup: Head from offer page :type offer_markup: str :return: Tuple of int price and it's currency or None if this offer page got deleted :rtype: tuple, None :except: This offer page got deleted and has no tracking script. """ html_parser = BeautifulSoup(offer_markup, "html.parser") scripts = html_parser.head.find_all("script") metadata_script = None for script in scripts: if "ad_id" in script.text: metadata_script = script.text break if not metadata_script: return None, None, None data_dict = json.loads(re.split("pageView|;", metadata_script)[3].replace('":{', "{").replace("}}'", "}")) return int(data_dict.get("ad_price", 0)) or None, data_dict.get("price_currency"), data_dict["ad_id"]
[docs]def get_additional_rent(offer_markup): """ Searches for additional rental costs :param offer_markup: :type offer_markup: str :return: Additional rent :rtype: int """ html_parser = BeautifulSoup(offer_markup, "html.parser") table = html_parser.find_all(class_="item") for element in table: if "Czynsz" in element.text: return int(("".join(re.findall(r'\d+', element.text)))) return
[docs]def get_gps(offer_markup): """ Searches for gps coordinates (latitude and longitude) :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Tuple of gps coordinates :rtype: tuple """ html_parser = BeautifulSoup(offer_markup, "html.parser") gps_lat = html_parser.find(class_="mapcontainer").attrs['data-lat'] gps_lon = html_parser.find(class_="mapcontainer").attrs['data-lon'] return gps_lat, gps_lon
[docs]def get_poster_name(offer_markup): """ Searches for poster name :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Poster name or None if poster name was not found (offer is outdated) :rtype: str, None :except: Poster name not found """ poster_name_parser = BeautifulSoup(offer_markup, "html.parser").find(class_="offer-user__actions") try: if poster_name_parser.a is not None: found_name = poster_name_parser.a.text.strip() if not found_name: found_name = poster_name_parser.a.next_sibling.next_sibling.text.strip() else: found_name = poster_name_parser.h4.text.strip() except AttributeError: return return found_name
[docs]def get_surface(offer_markup): """ Searches for surface in offer markup :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Surface or None if there is no surface :rtype: float, None :except: When there is no offer surface it will return None """ html_parser = BeautifulSoup(offer_markup, "html.parser") try: surface = html_parser.sup.parent.text except AttributeError: return return float(surface.replace("m2", "").strip().replace(",", ".").replace(" ", "")) if "m2" in surface else None
[docs]def parse_description(offer_markup): """ Searches for description if offer markup :param offer_markup: Body from offer page markup :type offer_markup: str :return: Description of offer :rtype: str """ html_parser = BeautifulSoup(offer_markup, "html.parser") return html_parser.find(id="textContent").text.replace(" ", "").replace("\n", " ").replace("\r", "").strip()
[docs]def get_img_url(offer_markup): """ Searches for images in offer markup :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Images of offer in list :rtype: list """ html_parser = BeautifulSoup(offer_markup, "html.parser") images = html_parser.find_all(class_="bigImage") output = [] for img in images: output.append(img.attrs["src"]) return output
[docs]def get_month_num_for_string(value): """ Map for polish month names :param value: Month value :type value: str :return: Month number :rtype: int """ value = value.lower()[:3] return { 'sty': 1, 'lut': 2, 'mar': 3, 'kwi': 4, 'maj': 5, 'cze': 6, 'lip': 7, 'sie': 8, 'wrz': 9, 'paź': 10, 'lis': 11, 'gru': 12, }.get(value)
[docs]def get_date_added(offer_markup): """ Searches of date of adding offer :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Unix timestamp :rtype: int """ html_parser = BeautifulSoup(offer_markup, "html.parser") date = html_parser.find(class_="offer-bottombar__item").em.strong.text date = date.replace("o ", "").replace(" ", "").replace(", ", " ").strip() date_parts = date.split(' ') hour, minute = map(int, date_parts[0].split(':')) month = get_month_num_for_string(date_parts[2]) year = int(date_parts[3]) day = int(date_parts[1]) date_added = dt.datetime(year=year, hour=hour, minute=minute, day=day, month=month) return int((date_added - dt.datetime(1970, 1, 1)).total_seconds())
[docs]def parse_region(offer_markup): """ Parses region information :param offer_markup: Class "offerbody" from offer page markup :type offer_markup: str :return: Region of offer :rtype: list """ html_parser = BeautifulSoup(offer_markup, "html.parser") region = html_parser.find(class_="show-map-link").text return region.replace(", ", ",").split(",")
[docs]def get_gpt_script(offer_markup): """ Parses data from script of Google Tag Manager :param offer_markup: Body from offer page markup :type offer_markup: str :return: GPT dict data :rtype: dict """ html_parser = BeautifulSoup(offer_markup, "html.parser") scripts = html_parser.find_all('script') data = '' for script in scripts: if script.string and "GPT.targeting" in script.string: data = script.string break try: data_dict = json.loads((re.split('GPT.targeting = |;', data))[2].replace(";", "")) except json.JSONDecodeError as e: logging.info("JSON failed to parse GPT offer attributes. Error: {0}".format(e)) data_dict = {} return data_dict
[docs]def parse_flat_data(offer_markup, data_dict): """ Parses flat data Data includes if offer private or business, number of floor, number of rooms, built type and furniture. :param offer_markup: Body from offer page markup :param data_dict: Dict with GPT script data :type offer_markup: str :type data_dict: dict :return: Dictionary of flat data :rtype: dict """ translate = {"one": 1, "two": 2, "three": 3, "four": 4} rooms = data_dict.get("rooms") if rooms is not None: rooms = translate[rooms[0]] floor = data_dict.get("floor_select", [None])[0] if floor is not None: floor = int(floor.replace("floor_", "")) return { "floor": floor, "rooms": rooms, "built_type": data_dict.get("builttype", [None])[0], "furniture": data_dict.get("furniture", [None])[0] == 'yes', "surface": get_surface(offer_markup), "additional_rent": get_additional_rent(offer_markup), }
[docs]def parse_offer(url): """ Parses data from offer page url :param url: Offer page markup :param url: Url of current offer page :type url: str :return: Dictionary with all offer details or None if offer is not available anymore :rtype: dict, None """ log.info(url) html_parser = BeautifulSoup(get_content_for_url(url).content, "html.parser") offer_content = str(html_parser.body) poster_name = get_poster_name(offer_content) price, currency, add_id = parse_tracking_data(str(html_parser.head)) if not all([add_id, poster_name]): log.info("Offer {0} is not available anymore.".format(url)) return region = parse_region(offer_content) if len(region) == 4: city, powiat, voivodeship, district = region elif len(region) == 3: city, voivodeship, district = region elif len(region) == 2: city, voivodeship = region district = None else: city, voivodeship, district = None, None, None data_dict = get_gpt_script(offer_content) result = { "title": get_title(offer_content), "add_id": add_id, "price": price, "currency": currency, "city": city, "district": district, "voivodeship": voivodeship, "gps": get_gps(offer_content), "description": parse_description(offer_content), "poster_name": poster_name, "url": url, "date_added": get_date_added(offer_content), "date_added_readable": dt.datetime.fromtimestamp(get_date_added(offer_content)).isoformat(), "images": get_img_url(offer_content), "private_business": data_dict.get("private_business"), } flat_data = parse_flat_data(offer_content, data_dict) if flat_data and any(flat_data.values()): result.update(flat_data) return result