import json
import urllib
import pandas as pd
import copy
from typing import Union, List, Dict, Literal
import logging
from pathlib import Path
import numpy as np
from ..errors import DataNotFound
from . import Registry
log = logging.getLogger("argopy.utils.monitors")
[docs]
class GreenCoding:
"""GreenCoding API helper class for argopy carbon footprint
This class uses the `Green-Coding Solutions API <https://api.green-coding.io>`_ to retrieve CI tests energy consumption data.
This class also uses :class:`Github` to get PRs and release information for argopy.
Examples
--------
.. code-block:: python
:caption: GreenCoding API
from argopy.utils import GreenCoding
GreenCoding().workflows
GreenCoding().measurements(branch='master', start_date='2025-01-01')
GreenCoding().measurements(branch='385/merge', start_date='2025-01-01')
GreenCoding().total_measurements(branches=['master', '385/merge'])
GreenCoding().footprint_for_release('v1.0.0')
GreenCoding().footprint_since_last_release()
.. code-block:: python
:caption: Get metrics for another repo
ac = GreenCoding()
ac.repo = 'argopy-status'
ac.workflows = [{'ID': '2724029', 'Name': 'API status'}]
ac.total_measurements()
"""
owner = "euroargodev"
"""Github owner parameter, default to 'euroargodev'"""
repo = "argopy"
"""Github repo parameter, default to 'argopy'"""
workflows = [
{"ID": "22344160", "Name": "CI tests"},
{"ID": "25052179", "Name": "CI tests upstream"},
]
"""List of github actions workflow parameters"""
[docs]
def __init__(self):
from ..stores import httpstore
self.fs = httpstore(cache=True)
self.gh = Github()
self.URI = Registry(name="GreenCoding API calls")
[docs]
def measurements(
self,
branch: str = "master",
start_date: Union[str, pd.Timestamp] = "2024-06-01",
end_date: Union[str, pd.Timestamp] = None,
errors: Literal["raise", "ignore", "silent"] = "ignore",
) -> List[Dict]:
"""Return measurements for a given branch and date period
Parameters
----------
branch : str, default='master'
Name of the branch to retrieve measurements for
start_date : str, :class:`pandas.Timestamp`, default="2024-06-01"
Measurements starting date, default to the beginning of measurements.
end_date : str, :class:`pandas.Timestamp`, default=None
Measurements ending date, default to today
errors: Literal, default: ``ignore``
Define how to handle errors raised during data fetching:
- ``ignore`` (default): Do not stop processing, simply issue a debug message in logging console
- ``raise``: Raise any error encountered
- ``silent``: Do not stop processing and do not issue log message
Returns
-------
List[Dict] :
List of workflows name, ID and measurements as :class:`pandas.DataFrame`
See Also
--------
:class:`GreenCoding.workflows`
"""
if end_date is None:
end_date = pd.to_datetime("now", utc=True)
else:
end_date = pd.to_datetime(end_date)
if start_date is None:
start_date = pd.to_datetime("2020-03-17", utc=True) # 1st argopy release
else:
start_date = pd.to_datetime(start_date)
long_names = [ # noqa: F841
"Energy Value [J]",
"Run ID",
"Ran At",
"Label",
"CPU",
"Commit Hash",
"Duration [ms]",
"Platform",
"Avg. CPU Utilization",
"Workflow",
"Latitude",
"Longitude",
"City",
"Grid Intensity [gCOE2/kWh]",
"gCO2eq of run []",
]
names = [
"Energy",
"runID",
"Date",
"Workflow_Step",
"CPU_type",
"CommitHash",
"Duration",
"platform",
"Avg. CPU_utilization",
"Workflow",
"Latitude",
"Longitude",
"City",
"GridIntensity",
"gCO2eq",
]
results = copy.deepcopy(self.workflows)
for irow, workflow in enumerate(self.workflows):
payload = {
"repo": f"{self.owner}/{self.repo}",
"branch": branch,
"workflow": workflow["ID"],
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
}
uri = (
"https://api.green-coding.io/v1/ci/measurements?"
+ urllib.parse.urlencode(payload)
)
self.URI.commit(uri)
try:
data = self.fs.open_json(uri)
# Remove out of range values:
for ii, item in enumerate(data["data"]):
if item[names.index("Energy")] / 1e6 > 20000:
data["data"].pop(ii)
elif item[names.index("gCO2eq")] / 1e6 > 1000:
data["data"].pop(ii)
df = pd.DataFrame(data["data"], columns=names)
df["Duration"] = df["Duration"] / 1e6 # seconds
df["gCO2eq"] = df["gCO2eq"] / 1e6 # gCO2eq
df["Energy"] = df["Energy"] / 1e6 # Joules
df2 = df[["Workflow_Step", "Energy", "Duration", "gCO2eq"]]
df2 = df2.groupby("Workflow_Step").sum()
results[irow]["measurements"] = df2
except json.JSONDecodeError:
msg = (
"No data returned, probably because the branch '%s' was not found at %s"
% (branch, uri)
)
if errors == "raise":
from ..errors import DataNotFound
raise DataNotFound(msg)
elif errors == "ignore":
log.debug(msg)
except Exception:
if errors == "raise":
raise
elif errors == "ignore":
log.error("Error: {e}")
return results
[docs]
def total_measurements(self, branches: List[str] = ["master"], **kwargs) -> float:
"""Compute the cumulated measurements of gCO2eq for a list of branches and workflows
Parameters
----------
branches : List[str], default = ['master']
List of branches to retrieve measurements for.
Note that for a given merged PR number, the branch name is ``<PR>/merged``.
**kwargs:
Other Parameters are passed to :class:`GreenCoding.measurements`
Returns
-------
float
"""
gCO2eq = 0
for branch in branches:
for wkf in self.measurements(branch=branch, **kwargs):
if "measurements" in wkf:
gCO2eq += wkf["measurements"]["gCO2eq"].sum()
return gCO2eq
def __repr__(self):
summary = [f"<GreenCoding.{self.owner}.{self.repo}>"]
summary.append("Last release date: %s" % self.gh.lastrelease_date)
summary.append(
"%i PRs merged since the last release"
% len(self.gh.ls_PRmerged_since_last_release)
)
summary.append(
"Workflows analysed: %s"
% ", ".join(
["%s [%s]" % (wkf["Name"], wkf["ID"]) for wkf in self.workflows]
)
)
summary.append(
"gCO2eq since last release (including 'master' branch'): %0.2f"
% self.footprint_since_last_release(errors="ignore")
)
return "\n- ".join(summary)
[docs]
@staticmethod
def shieldsio_badge(
value: float, label: str = "Total carbon emitted [gCO2eq]"
) -> str:
"""Insert value in a Shields.io badge and return url
Insert total measurement value into a shields.io badge url
Parameters
----------
value : float
The carbon footprint value
label: str, default: 'Total carbon emitted [gCO2eq]'
The badge label to use
Returns
-------
str
"""
payload = {
"style": "plastic",
"labelColor": "grey",
}
t = lambda t: urllib.parse.quote(t) # noqa: E731
uri = "https://img.shields.io/badge/%s-%s-%s?" % (
t(label),
t("%0.2f" % value),
"black",
) + urllib.parse.urlencode(payload)
return uri
[docs]
@staticmethod
def shieldsio_endpoint(
value: float, outfile: Path = None, label: str = "Total carbon emitted [gCO2eq]"
) -> Path:
"""Insert value in a Shields.io json endpoint file
This is used by the argopy monitoring repository.
See live results at:
https://github.com/euroargodev/argopy-status?tab=readme-ov-file#energy-impact
Parameters
----------
value : float
The carbon footprint value
outfile: :class:`pathlib.Path`, default: None
Path toward json file
label: str, default: 'Total carbon emitted [gCO2eq]'
The badge label to use
Returns
-------
:class:`pathlib.Path`
"""
def save_to_json(label, message, outfile):
"""Save a shields.io badge endpoint to a json file"""
data = {}
data["schemaVersion"] = 1
data["label"] = label
data["labelColor"] = "grey"
data["message"] = message
data["color"] = "black"
with open(outfile, "w") as f:
json.dump(data, f)
return outfile
return save_to_json(
label=label,
message="%0.2f" % value,
outfile=Path(outfile),
)
[docs]
class Github:
"""Github repository helper class
This class uses the `Github API <https://docs.github.com/en/rest>`_, to get PRs and release information.
It is primarily meant to be used by :class:`GreenCoding`.
Examples
--------
.. code-block:: python
:caption: Github API
from argopy.utils import Github
Github().releases
Github().lastrelease_date
Github().lastrelease_tag
Github().get_PRtitle(385)
Github().ls_PRs()
Github().ls_PRs('2025-01-01')
Github().ls_PRmerged('2025-01-01')
Github().ls_PRmerged('2024-01-01', '2025-01-01')
Github().ls_PRmerged_since_last_release
Github().ls_PRmerged_in_release('v1.0.0')
"""
owner = "euroargodev"
"""Github owner parameter, default to 'euroargodev'"""
repo = "argopy"
"""Github repo parameter, default to 'argopy'"""
_js_data_pulls = None
"""Place holder for json data returned from the Github API pulls endpoint"""
[docs]
def __init__(self, repo: str = "argopy", owner: str = "euroargodev"):
from ..stores import httpstore
self.fs = httpstore(cache=True)
self.owner = owner
self.repo = repo
@property
def releases(self) -> pd.DataFrame:
"""List of published releases
Returns
-------
:class:`pandas.DataFrame`
"""
js = self.fs.open_json(
f"https://api.github.com/repos/{self.owner}/{self.repo}/releases"
)
results = []
for rel in js:
results.append(
{
"tag": rel["tag_name"].strip(),
"published": not bool(rel["draft"]),
"date": rel["published_at"],
}
)
df = pd.DataFrame(results)
df = df.sort_values("date").reset_index(drop=1)
return df
@property
def lastrelease_date(self) -> pd.Timestamp:
"""Last release publication date
Returns
-------
:class:`pandas.Timestamp`
"""
df = self.releases
return pd.to_datetime(df["date"].max())
@property
def lastrelease_tag(self) -> str:
"""Last release tag name
Returns
-------
str
"""
df = self.releases
return df["tag"].iloc[-1]
[docs]
def get_PRtitle(self, pr_num: int) -> str:
"""Get the title of a given PR number
Parameters
----------
pr_num : int
Returns
-------
str
"""
js = self.fs.open_json(
f"https://api.github.com/repos/{self.owner}/{self.repo}/pulls/{pr_num}"
)
return js["title"]
@property
def _json_pulls(self):
"""Return json data from Github API pulls endpoint
All results are returned, i.e. from all pages
https://docs.github.com/en/rest/pulls/pulls?apiVersion=2022-11-28#list-pull-requests
"""
def list_uri(max_page=5):
uri = []
for page in range(1, max_page):
payload = {
"state": "all", # open/closed
"per_page": 100,
"page": page,
"sort": "update",
"direction": "asc",
}
url = (
f"https://api.github.com/repos/{self.owner}/{self.repo}/pulls?"
+ urllib.parse.urlencode(payload)
)
uri.append(url)
return uri
# [print(u) for u in list_uri()]
if self._js_data_pulls is None:
pages = self.fs.open_mfjson(list_uri())
self._js_data_pulls = []
for page in pages:
for rec in page:
self._js_data_pulls.append(rec)
return self._js_data_pulls
[docs]
def ls_PRs(
self,
start_date: Union[str, pd.Timestamp] = None,
end_date: Union[str, pd.Timestamp] = None,
date: Literal["created", "updated", "merged"] = "created",
) -> pd.DataFrame:
"""List all PRs, possibly between 2 dates
Parameters
----------
start_date : :class:`pandas.Timestamp`, default = None
end_date : :class:`pandas.Timestamp`, default = None
date: Literal['created', 'updated', 'merged'], default = 'created'
Date filter key, use to apply bounds set by start_date and/or end_date
Returns
-------
:class:`pandas.DataFrame`
"""
start_date = (
pd.to_datetime(start_date, utc=True)
if start_date is not None
else pd.to_datetime(start_date, utc=True)
)
end_date = (
pd.to_datetime(end_date, utc=True)
if end_date is not None
else pd.to_datetime(end_date, utc=True)
)
results = []
for j in self._json_pulls:
merged = pd.to_datetime(j["merged_at"])
ismerged = merged is not None
created = pd.to_datetime(j["created_at"])
updated = pd.to_datetime(j["updated_at"])
sorting_key = "ID"
date_filter = False
if start_date is not None and end_date is not None:
if date == "created":
sorting_key = "created"
if created >= start_date and created <= end_date:
date_filter = True
elif date == "updated":
sorting_key = "updated"
if updated >= start_date and updated <= end_date:
date_filter = True
elif date == "merged" and ismerged:
sorting_key = "merged"
if merged >= start_date and merged <= end_date:
date_filter = True
elif start_date is not None:
if date == "created":
sorting_key = "created"
if created >= start_date:
date_filter = True
elif date == "updated":
sorting_key = "updated"
if updated >= start_date:
date_filter = True
elif date == "merged" and ismerged:
sorting_key = "merged"
if merged >= start_date:
date_filter = True
elif end_date is not None:
if date == "created":
sorting_key = "created"
if created >= end_date:
date_filter = True
elif date == "updated":
sorting_key = "updated"
if updated >= end_date:
date_filter = True
elif date == "merged" and ismerged:
sorting_key = "merged"
if merged >= end_date:
date_filter = True
else:
date_filter = True
if date_filter:
results.append(
{
"ID": j["number"],
"title": j["title"],
"created": created,
"updated": updated,
"merged": merged,
"state": j["state"],
}
)
if len(results) == 0:
raise DataNotFound()
else:
df = pd.DataFrame(results).sort_values(sorting_key).reset_index(drop=1)
return df
[docs]
def ls_PRmerged(
self,
start_date: Union[str, pd.Timestamp] = None,
end_date: Union[str, pd.Timestamp] = None,
) -> pd.DataFrame:
"""List merged PRs
Parameters
----------
start_date : :class:`pandas.Timestamp`
end_date : :class:`pandas.Timestamp`
Returns
-------
:class:`pandas.DataFrame`
See Also
--------
:class:`Github.ls_PRs`
"""
df = self.ls_PRs(start_date=start_date, end_date=end_date, date="merged")
df = df[np.logical_and(df["merged"].notnull(), df["state"] == "closed")]
df = df.sort_values("merged").reset_index(drop=1)
return df
@property
def ls_PRmerged_since_last_release(self) -> pd.DataFrame:
"""List PRs merged since the last release
Returns
-------
:class:`pandas.DataFrame`
See Also
--------
:class:`Github.ls_PRmerged`, :class:`Github.ls_PRmerged_in_release`, :class:`GreenCoding.lastrelease_date`
"""
return self.ls_PRmerged(start_date=self.lastrelease_date)
[docs]
def ls_PRmerged_in_release(self, release: str) -> pd.DataFrame:
"""List PRs included in a given release
Parameters
----------
release : str
The release tag name to use
Returns
-------
:class:`pandas.DataFrame`
See Also
--------
:class:`Github.ls_PRmerged`, :class:`Github.ls_PRmerged_since_last_release`
"""
df_rel = self.releases
if release not in df_rel["tag"].values:
raise ValueError("Release '%s' does not exist !" % release)
# Get this release publication date:
end_date = df_rel[df_rel["tag"] == release]["date"].values[0]
# Get the previous release publication date:
start_date = df_rel.iloc[df_rel[df_rel["tag"] == release].index[0] - 1]["date"]
if release == df_rel["tag"].iloc[0]:
raise ValueError("No PRs for the initial release !")
return self.ls_PRmerged(start_date=start_date, end_date=end_date)
[docs]
def ls_PRbaseline(
self,
start_date: Union[str, pd.Timestamp] = None,
end_date: Union[str, pd.Timestamp] = None,
) -> pd.DataFrame:
"""List all PRs in the baseline
A *baseline* PR is a PR is not merged back to the master branch, hence such a PR is not included in any release.
Parameters
----------
start_date : :class:`pandas.Timestamp`, default = None
end_date : :class:`pandas.Timestamp`, default = None
Returns
-------
:class:`pandas.DataFrame`
See Also
--------
:class:`Github.ls_PRs`
"""
df = self.ls_PRs(start_date=start_date, end_date=end_date, date="updated")
df = df[~df["merged"].notnull()]
return df