Merge pull request #12 from Langenfeld/vl/newbase

Vl/newbase
pull/15/head
Langenfeld 2021-12-13 10:47:11 +01:00 zatwierdzone przez GitHub
commit 400155583b
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
11 zmienionych plików z 992 dodań i 1138 usunięć

Wyświetl plik

@ -1,5 +1,9 @@
from .gitea import (
Gitea,
NotFoundException,
AlreadyExistsException,
)
from .apiobject import (
User,
Organization,
Team,
@ -9,6 +13,9 @@ from .gitea import (
AlreadyExistsException,
Issue,
Milestone,
Commit,
Comment,
Content
)
__all__ = [
@ -21,5 +28,8 @@ __all__ = [
'NotFoundException',
'AlreadyExistsException',
'Issue',
'Milestone'
'Milestone',
'Commit',
'Comment',
'Content'
]

729
gitea/apiobject.py 100644
Wyświetl plik

@ -0,0 +1,729 @@
import logging
from datetime import datetime
from typing import List, Tuple, Dict, Sequence, Optional, Union, Set
from .baseapiobject import ReadonlyApiObject, ApiObject
from .exceptions import *
class Organization(ApiObject):
"""see https://try.gitea.io/api/swagger#/organization/orgGetAll"""
API_OBJECT = """/orgs/{name}""" # <org>
ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org>
ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org>
ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org>
ORG_GET_MEMBERS = """/orgs/%s/members""" # <org>
ORG_IS_MEMBER = """/orgs/%s/members/%s""" # <org>, <username>
ORG_HEATMAP = """/users/%s/heatmap""" # <username>
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Organization): return False
return self.gitea == other.gitea and self.name == other.name
def __hash__(self):
return hash(self.gitea) ^ hash(self.name)
@classmethod
def request(cls, gitea: 'Gitea', name: str) -> 'Organization':
return cls._request(gitea, {"name": name})
@classmethod
def parse_response(cls, gitea, result) -> 'Organization':
api_object = super().parse_response(gitea, result)
# add "name" field to make this behave similar to users
Organization._add_read_property("name", result["username"], api_object)
return api_object
_patchable_fields = {"description", "full_name", "location", "visibility", "website"}
def commit(self):
values = self.get_dirty_fields()
args = {"name": self.name}
self.gitea.requests_patch(
Organization.API_OBJECT.format(**args), data=values
)
self.dirty_fields = {}
def get_repositories(self) -> List["Repository"]:
results = self.gitea.requests_get_paginated(
Organization.ORG_REPOS_REQUEST % self.username
)
return [Repository.parse_response(self.gitea, result) for result in results]
def get_repository(self, name) -> "Repository":
repos = self.get_repositories()
for repo in repos:
if repo.name == name:
return repo
raise NotFoundException("Repository %s not existent in organization." % name)
def get_teams(self) -> List["Team"]:
results = self.gitea.requests_get(
Organization.ORG_TEAMS_REQUEST % self.username
)
teams = [Team.parse_response(self.gitea, result) for result in results]
# organisation seems to be missing using this request, so we add org manually
for t in teams: setattr(t, "_organization", self)
return teams
def get_team(self, name) -> "Team":
teams = self.get_teams()
for team in teams:
if team.name == name:
return team
raise NotFoundException("Team not existent in organization.")
def get_members(self) -> List["User"]:
results = self.gitea.requests_get(Organization.ORG_GET_MEMBERS % self.username)
return [User.parse_response(self.gitea, result) for result in results]
def is_member(self, username) -> bool:
if isinstance(username, User):
username = username.username
try:
# returns 204 if its ok, 404 if its not
self.gitea.requests_get(
Organization.ORG_IS_MEMBER % (self.username, username)
)
return True
except:
return False
def remove_member(self, user: "User"):
path = f"/orgs/{self.username}/members/{user.username}"
self.gitea.requests_delete(path)
def delete(self):
""" Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User"""
for repo in self.get_repositories():
repo.delete()
self.gitea.requests_delete(Organization.API_OBJECT.format(name=self.username))
self.deleted = True
def get_heatmap(self) -> List[Tuple[datetime, int]]:
results = self.gitea.requests_get(User.USER_HEATMAP % self.username)
results = [
(datetime.fromtimestamp(result["timestamp"]), result["contributions"])
for result in results
]
return results
class User(ApiObject):
API_OBJECT = """/users/{name}""" # <org>
USER_MAIL = """/user/emails?sudo=%s""" # <name>
USER_PATCH = """/admin/users/%s""" # <username>
ADMIN_DELETE_USER = """/admin/users/%s""" # <username>
ADMIN_EDIT_USER = """/admin/users/{username}""" # <username>
USER_HEATMAP = """/users/%s/heatmap""" # <username>
def __init__(self, gitea):
super().__init__(gitea)
self._emails = []
def __eq__(self, other):
if not isinstance(other, User): return False
return self.gitea == other.gitea and self.id == other.id
def __hash__(self):
return hash(self.gitea) ^ hash(self.id)
@property
def emails(self):
self.__request_emails()
return self._emails
@classmethod
def request(cls, gitea: 'Gitea', name: str) -> "User":
api_object = cls._request(gitea, {"name": name})
return api_object
_patchable_fields = {
"active",
"admin",
"allow_create_organization",
"allow_git_hook",
"allow_import_local",
"email",
"full_name",
"location",
"login_name",
"max_repo_creation",
"must_change_password",
"password",
"prohibit_login",
"website",
}
def commit(self, login_name: str, source_id: int = 0):
"""
Unfortunately it is necessary to require the login name
as well as the login source (that is not supplied when getting a user) for
changing a user.
Usually source_id is 0 and the login_name is equal to the username.
"""
values = self.get_dirty_fields()
values.update(
# api-doc says that the "source_id" is necessary; works without though
{"login_name": login_name, "source_id": source_id}
)
args = {"username": self.username}
self.gitea.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values)
self.dirty_fields = {}
def get_repositories(self) -> List["Repository"]:
""" Get all Repositories owned by this User."""
url = f"/users/{self.username}/repos"
results = self.gitea.requests_get(url)
return [Repository.parse_response(self.gitea, result) for result in results]
def get_orgs(self) -> List[Organization]:
""" Get all Organizations this user is a member of."""
url = f"/users/{self.username}/orgs"
results = self.gitea.requests_get(url)
return [Organization.parse_response(self.gitea, result) for result in results]
def get_teams(self) -> List['Team']:
url = f"/user/teams"
results = self.gitea.requests_get(url, sudo=self)
return [Team.parse_response(self.gitea, result) for result in results]
def get_accessible_repos(self) -> List['Repository']:
""" Get all Repositories accessible by the logged in User."""
results = self.gitea.requests_get("/user/repos", sudo=self)
return [Repository.parse_response(self, result) for result in results]
def __request_emails(self):
result = self.gitea.requests_get(User.USER_MAIL % self.login)
# report if the adress changed by this
for mail in result:
self._emails.append(mail["email"])
if mail["primary"]:
self._email = mail["email"]
def delete(self):
""" Deletes this User. Also deletes all Repositories he owns."""
self.gitea.requests_delete(User.ADMIN_DELETE_USER % self.username)
self.deleted = True
def get_heatmap(self) -> List[Tuple[datetime, int]]:
results = self.gitea.requests_get(User.USER_HEATMAP % self.username)
results = [
(datetime.fromtimestamp(result["timestamp"]), result["contributions"])
for result in results
]
return results
class Branch(ReadonlyApiObject):
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Branch):
return False
return self.commit == other.commit and self.name == other.name
def __hash__(self):
return hash(self.commit["id"]) ^ hash(self.name)
_fields_to_parsers = {
# This is not a commit object
# "commit": lambda gitea, c: Commit.parse_response(gitea, c)
}
@classmethod
def request(cls, gitea: 'Gitea', owner: str, repo: str, ref: str):
return cls._request(gitea, {"owner": owner, "repo": repo, "ref": ref})
class Repository(ApiObject):
API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame>
REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # <owner>, <reponame>, <username>
REPO_SEARCH = """/repos/search/%s""" # <reponame>
REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame>
REPO_ISSUES = """/repos/{owner}/{repo}/issues""" # <owner, reponame>
REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame>
REPO_TIMES = """/repos/%s/%s/times""" # <owner>, <reponame>
REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username>
REPO_COMMITS = "/repos/%s/%s/commits" # <owner>, <reponame>
REPO_TRANSFER = "/repos/{owner}/{repo}/transfer"
REPO_CONTENTS = "/repos/{owner}/{repo}/contents"
REPO_CONTENT = """/repos/{owner}/{repo}/contents/{filepath}"""
REPO_MILESTONES = """/repos/{owner}/{repo}/milestones"""
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Repository): return False
return self.owner == other.owner and self.name == other.name
def __hash__(self):
return hash(self.owner) ^ hash(self.name)
_fields_to_parsers = {
# dont know how to tell apart user and org as owner except form email being empty.
"owner": lambda gitea, r: Organization.parse_response(gitea, r)
if r["email"] == "" else User.parse_response(gitea, r),
"updated_at": lambda gitea, t: Util.convert_time(t),
}
@classmethod
def request(cls, gitea: 'Gitea', owner: str, name: str):
return cls._request(gitea, {"owner": owner, "name": name})
_patchable_fields = {
"allow_merge_commits",
"allow_rebase",
"allow_rebase_explicit",
"allow_squash_merge",
"archived",
"default_branch",
"description",
"has_issues",
"has_pull_requests",
"has_wiki",
"ignore_whitespace_conflicts",
"name",
"private",
"website",
}
def get_branches(self) -> List['Branch']:
"""Get all the Branches of this Repository."""
results = self.gitea.requests_get(
Repository.REPO_BRANCHES % (self.owner.username, self.name)
)
return [Branch.parse_response(self.gitea, result) for result in results]
def add_branch(self, create_from: Branch, newname: str) -> "Branch":
"""Add a branch to the repository"""
# Note: will only work with gitea 1.13 or higher!
data = {"new_branch_name": newname, "old_branch_name": create_from.name}
result = self.gitea.requests_post(
Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data
)
return Branch.parse_response(self.gitea, result)
def get_issues(self) -> List["Issue"]:
"""Get all Issues of this Repository (open and closed)"""
return self.get_issues_state(Issue.OPENED) + self.get_issues_state(Issue.CLOSED)
def get_commits(self) -> List["Commit"]:
"""Get all the Commits of this Repository."""
try:
results = self.gitea.requests_get_paginated(
Repository.REPO_COMMITS % (self.owner.username, self.name)
)
except ConflictException as err:
logging.warning(err)
logging.warning(
"Repository %s/%s is Empty" % (self.owner.username, self.name)
)
results = []
return [Commit.parse_response(self.gitea, result) for result in results]
def get_issues_state(self, state) -> List["Issue"]:
"""Get issues of state Issue.open or Issue.closed of a repository."""
assert state in [Issue.OPENED, Issue.CLOSED]
issues = []
data = {"state": state}
results = self.gitea.requests_get_paginated(
Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), params=data
)
for result in results:
issue = Issue.parse_response(self.gitea, result)
# adding data not contained in the issue answer
Issue._add_read_property("repo", self, issue)
Issue._add_read_property("owner", self.owner, issue)
issues.append(issue)
return issues
def get_times(self):
results = self.gitea.requests_get(
Repository.REPO_TIMES % (self.owner.username, self.name)
)
return results
def get_user_time(self, username) -> float:
if isinstance(username, User):
username = username.username
results = self.gitea.requests_get(
Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
)
time = sum(r["time"] for r in results)
return time
def get_full_name(self) -> str:
return self.owner.username + "/" + self.name
def create_issue(self, title, assignees=frozenset(), description="") -> ApiObject:
data = {
"assignees": assignees,
"body": description,
"closed": False,
"title": title,
}
result = self.gitea.requests_post(
Repository.REPO_ISSUES.format(owner=self.owner.username, repo=self.name), data=data
)
return Issue.parse_response(self.gitea, result)
def create_milestone(self, title: str, description: str, due_date: str = None, state: str = "open") -> "Milestone":
url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name)
data = {"title": title, "description": description, "state": state}
if due_date: data["due_date"] = due_date
result = self.gitea.requests_post(url, data=data)
return Milestone.parse_response(self.gitea, result)
def create_gitea_hook(self, hook_url: str, events: List[str]):
url = f"/repos/{self.owner.username}/{self.name}/hooks"
data = {
"type": "gitea",
"config": {"content_type": "json", "url": hook_url},
"events": events,
"active": True,
}
return self.gitea.requests_post(url, data=data)
def list_hooks(self):
url = f"/repos/{self.owner.username}/{self.name}/hooks"
return self.gitea.requests_get(url)
def delete_hook(self, id: str):
url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}"
self.gitea.requests_delete(url)
def is_collaborator(self, username) -> bool:
if isinstance(username, User):
username = username.username
try:
# returns 204 if its ok, 404 if its not
self.gitea.requests_get(
Repository.REPO_IS_COLLABORATOR
% (self.owner.username, self.name, username)
)
return True
except:
return False
def get_users_with_access(self) -> Sequence[User]:
url = f"/repos/{self.owner.username}/{self.name}/collaborators"
response = self.gitea.requests_get(url)
collabs = [User.parse_response(self.gitea, user) for user in response]
if isinstance(self.owner, User):
return collabs + [self.owner]
else:
# owner must be org
teams = self.owner.get_teams()
for team in teams:
team_repos = team.get_repos()
if self.name in [n.name for n in team_repos]:
collabs += team.get_members()
return collabs
def remove_collaborator(self, user_name: str):
url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}"
self.gitea.requests_delete(url)
def transfer_ownership(self, new_owner: Union["User", "Organization"], new_teams: Set["Team"] = frozenset()):
url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name)
data = {"new_owner": new_owner.username}
if isinstance(new_owner, Organization):
new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()]
data["team_ids"] = new_team_ids
self.gitea.requests_post(url, data=data)
# TODO: make sure this instance is either updated or discarded
def get_git_content(self: str = None, commit: "Commit" = None) -> List["Content"]:
"""https://git.sopranium.de/api/swagger#/repository/repoGetContentsList"""
url = Repository.REPO_CONTENTS.format(owner=self.owner.username, repo=self.name)
data = {"ref": "HEAD" if commit is None else commit.sha}
result = [Content.parse_response(self.gitea, f) for f in self.gitea.requests_get(url, data)]
return result
def get_file_content(self, content: "Content", commit: "Commit" = None) -> Union[str, List["Content"]]:
"""https://git.sopranium.de/api/swagger#/repository/repoGetContents"""
url = Repository.REPO_CONTENT.format(owner=self.owner.username,
repo=self.name, filepath=content.path)
data = {"ref": "HEAD" if commit is None else commit.sha}
if content.type == Content.FILE:
return self.gitea.requests_get(url, data)["content"]
else:
return [Content.parse_response(self.gitea, f) for f in self.gitea.requests_get(url, data)]
def delete(self):
self.gitea.requests_delete(
Repository.REPO_DELETE % (self.owner.username, self.name)
)
self.deleted = True
class Milestone(ApiObject):
API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo>
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Milestone): return False
return self.gitea == other.gitea and self.id == other.id
def __hash__(self):
return hash(self.gitea) ^ hash(self.id)
_fields_to_parsers = {
"closed_at": lambda gitea, t: Util.convert_time(t),
"due_on": lambda gitea, t: Util.convert_time(t),
}
_patchable_fields = {
"allow_merge_commits",
"allow_rebase",
"allow_rebase_explicit",
"allow_squash_merge",
"archived",
"default_branch",
"description",
"has_issues",
"has_pull_requests",
"has_wiki",
"ignore_whitespace_conflicts",
"name",
"private",
"website",
}
@classmethod
def request(cls, gitea: 'Gitea', owner: str, repo: str, number: str):
return cls._request(gitea, {"owner": owner, "repo": repo, "number": number})
class Comment(ApiObject):
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Comment): return False
return self.repo == other.repo and self.id == other.id
def __hash__(self):
return hash(self.repo) ^ hash(self.id)
_fields_to_parsers = {
"user": lambda gitea, r: User.parse_response(gitea, r),
"created_at": lambda gitea, t: Util.convert_time(t),
"updated_at": lambda gitea, t: Util.convert_time(t),
}
class Commit(ReadonlyApiObject):
def __init__(self, gitea):
super().__init__(gitea)
_fields_to_parsers = {
# NOTE: api may return None for commiters that are no gitea users
"author": lambda gitea, u: User.parse_response(gitea, u) if u else None
}
def __eq__(self, other):
if not isinstance(other, Commit): return False
return self.sha == other.sha
def __hash__(self):
return hash(self.sha)
@classmethod
def parse_response(cls, gitea, result) -> 'Commit':
commit_cache = result["commit"]
api_object = cls(gitea)
cls._initialize(gitea, api_object, result)
# inner_commit for legacy reasons
Commit._add_read_property("inner_commit", commit_cache, api_object)
return api_object
class Issue(ApiObject):
API_OBJECT = """/repos/{owner}/{repo}/issues/{index}""" # <owner, repo, index>
GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index>
GET_COMMENTS = """/repos/%s/%s/issues/comments"""
CREATE_ISSUE = """/repos/{owner}/{repo}/issues"""
OPENED = "open"
CLOSED = "closed"
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Issue): return False
return self.repo == other.repo and self.id == other.id
def __hash__(self):
return hash(self.repo) ^ hash(self.id)
_fields_to_parsers = {
"milestone": lambda gitea, m: Milestone.parse_response(gitea, m),
"user": lambda gitea, u: User.parse_response(gitea, u),
"assignee": lambda gitea, u: User.parse_response(gitea, u),
"assignees": lambda gitea, us: [User.parse_response(gitea, u) for u in us],
"state": lambda gitea, s: Issue.CLOSED if s == "closed" else Issue.OPENED,
# Repository in this request is just a "RepositoryMeta" record, thus request whole object
"repository": lambda gitea, r: Repository.request(gitea, r["owner"], r["name"])
}
_parsers_to_fields = {
"milestone": lambda m: m.id,
}
_patchable_fields = {
"assignee",
"assignees",
"body",
"due_date",
"milestone",
"state",
"title",
}
def commit(self):
values = self.get_dirty_fields()
args = {"owner": self.repository.owner.username, "repo": self.repository.name, "index": self.number}
self.gitea.requests_patch(Issue.API_OBJECT.format(**args), data=values)
self.dirty_fields = {}
@classmethod
def request(cls, gitea: 'Gitea', owner: str, repo: str, number: str):
api_object = cls._request(gitea, {"owner": owner, "repo": repo, "index": number})
return api_object
@classmethod
def create_issue(cls, gitea, repo: Repository, title: str, body: str = ""):
args = {"owner": repo.owner.username, "repo": repo.name}
data = {"title": title, "body": body}
result = gitea.requests_post(Issue.CREATE_ISSUE.format(**args), data=data)
return Issue.parse_response(gitea, result)
def get_time_sum(self, user: User) -> int:
results = self.gitea.requests_get(
Issue.GET_TIME % (self.owner.username, self.repo.name, self.number)
)
return sum(
result["time"]
for result in results
if result and result["user_id"] == user.id
)
def get_times(self) -> Optional[Dict]:
return self.gitea.requests_get(
Issue.GET_TIME % (self.owner.username, self.repository.name, self.number)
)
def delete_time(self, time_id: str):
path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times/{time_id}"
self.gitea.requests_delete(path)
def add_time(self, time: int, created: str = None, user_name: User = None):
path = f"/repos/{self.owner.username}/{self.repository.name}/issues/{self.number}/times"
self.gitea.requests_post(
path, data={"created": created, "time": int(time), "user_name": user_name}
)
def get_comments(self) -> List[ApiObject]:
results = self.gitea.requests_get(
Issue.GET_COMMENTS % (self.owner.username, self.repo.name)
)
allProjectComments = [
Comment.parse_response(self.gitea, result) for result in results
]
# Comparing the issue id with the URL seems to be the only (!) way to get to the comments of one issue
return [
comment
for comment in allProjectComments
if comment.issue_url.endswith("/" + str(self.number))
]
class Team(ApiObject):
API_OBJECT = """/teams/{id}""" # <id>
ADD_USER = """/teams/%s/members/%s""" # <id, username to add>
ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo>
TEAM_DELETE = """/teams/%s""" # <id>
GET_MEMBERS = """/teams/%s/members""" # <id>
GET_REPOS = """/teams/%s/repos""" # <id>
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Team): return False
return self.organization == other.organization and self.id == other.id
def __hash__(self):
return hash(self.organization) ^ hash(self.id)
_fields_to_parsers = {
"organization": lambda gitea, o: Organization.parse_response(gitea, o)
}
@classmethod
def request(cls, gitea: 'Gitea', organization: str, team: str):
return cls._request(gitea, {"id": id})
_patchable_fields = {"description", "name", "permission", "units"}
def add_user(self, user: User):
self.gitea.requests_put(Team.ADD_USER % (self.id, user.login))
def add_repo(self, org: Organization, repo: Repository):
self.gitea.requests_put(Team.ADD_REPO % (self.id, org, repo.name))
def get_members(self):
""" Get all users assigned to the team. """
results = self.gitea.requests_get(Team.GET_MEMBERS % self.id)
return [User.parse_response(self.gitea, result) for result in results]
def get_repos(self):
""" Get all repos of this Team."""
results = self.gitea.requests_get(Team.GET_REPOS % self.id)
return [Repository.parse_response(self.gitea, result) for result in results]
def delete(self):
self.gitea.requests_delete(Team.TEAM_DELETE % self.id)
self.deleted = True
def remove_team_member(self, user_name: str):
url = f"/teams/{self.id}/members/{user_name}"
self.gitea.requests_delete(url)
class Content(ReadonlyApiObject):
FILE = "file"
def __init__(self, gitea):
super().__init__(gitea)
def __eq__(self, other):
if not isinstance(other, Team): return False
return self.repo == self.repo and self.sha == other.sha and self.name == other.name
def __hash__(self):
return hash(self.repo) ^ hash(self.sha) ^ hash(self.name)
class Util:
@staticmethod
def convert_time(time: str) -> datetime:
""" Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)"""
try:
return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z")
except ValueError:
return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S")

Wyświetl plik

@ -0,0 +1,117 @@
from .exceptions import ObjectIsInvalid, MissiongEqualyImplementation, RawRequestEndpointMissing
class ReadonlyApiObject:
def __init__(self, gitea):
self.gitea = gitea
self.deleted = False # set if .delete was called, so that an exception is risen
def __str__(self):
return "GiteaAPIObject (%s):" % (type(self))
def __eq__(self, other):
"""Compare only fields that are part of the gitea-data identity"""
raise MissiongEqualyImplementation()
def __hash__(self):
"""Hash only fields that are part of the gitea-data identity"""
raise MissiongEqualyImplementation()
_fields_to_parsers = {}
@classmethod
def request(cls, gitea):
if hasattr("API_OBJECT", cls):
return cls._request(gitea)
else:
raise RawRequestEndpointMissing()
@classmethod
def _request(cls, gitea, args):
result = cls._get_gitea_api_object(gitea, args)
api_object = cls.parse_response(gitea, result)
return api_object
@classmethod
def _get_gitea_api_object(cls, gitea, args):
"""Retrieving an object always as GET_API_OBJECT """
return gitea.requests_get(cls.API_OBJECT.format(**args))
@classmethod
def parse_response(cls, gitea, result) -> "ReadonlyApiObject":
# gitea.logger.debug("Found api object of type %s (id: %s)" % (type(cls), id))
api_object = cls(gitea)
cls._initialize(gitea, api_object, result)
return api_object
@classmethod
def _initialize(cls, gitea, api_object, result):
for name, value in result.items():
if name in cls._fields_to_parsers and value is not None:
parse_func = cls._fields_to_parsers[name]
value = parse_func(gitea, value)
cls._add_read_property(name, value, api_object)
# add all patchable fields missing in the request to be writable
for name in cls._fields_to_parsers.keys():
if not hasattr(api_object, name):
cls._add_read_property(name, None, api_object)
@classmethod
def _add_read_property(cls, name, value, api_object):
if not hasattr(api_object, name):
setattr(api_object, "_" + name, value)
prop = property(
(lambda n: lambda self: self._get_var(n))(name))
setattr(cls, name, prop)
else:
raise AttributeError(f"Attribute {name} already exists on api object.")
def _get_var(self, name):
if self.deleted:
raise ObjectIsInvalid()
return getattr(self, "_" + name)
class ApiObject(ReadonlyApiObject):
_patchable_fields = set()
def __init__(self, gitea):
super().__init__(gitea)
self._dirty_fields = set()
def commit(self):
raise NotImplemented()
_parsers_to_fields = {}
def get_dirty_fields(self):
dirty_fields_values = {}
for field in self._dirty_fields:
value = getattr(self, field)
if field in self._parsers_to_fields:
dirty_fields_values[field] = self._parsers_to_fields[field](value)
else:
dirty_fields_values[field] = value
return dirty_fields_values
@classmethod
def _initialize(cls, gitea, api_object, result):
super()._initialize(gitea, api_object, result)
for name in cls._patchable_fields:
cls._add_write_property(name, None, api_object)
@classmethod
def _add_write_property(cls, name, value, api_object):
if not hasattr(api_object, "_" + name):
setattr(api_object, "_" + name, value)
prop = property(
(lambda n: lambda self: self._get_var(n))(name),
(lambda n: lambda self, v: self.__set_var(n, v))(name))
setattr(cls, name, prop)
def __set_var(self, name, i):
if self.deleted:
raise ObjectIsInvalid()
self._dirty_fields.add(name)
setattr(self, "_" + name, i)

Wyświetl plik

@ -1,91 +0,0 @@
from .exceptions import ObjectIsInvalid, MissiongEqualyImplementation
class BasicGiteaApiObject:
GET_API_OBJECT = "FORMAT/STINING/{argument}"
PATCH_API_OBJECT = "FORMAT/STINING/{argument}"
def __init__(self, gitea):
self.gitea = gitea
self.deleted = False # set if .delete was called, so that an exception is risen
self.dirty_fields = set()
def __str__(self):
return "GiteaAPIObject (%s):" % (type(self))
def __eq__(self, other):
"""Compare only fields that are part of the gitea-data identity"""
raise MissiongEqualyImplementation()
def __hash__(self):
"""Hash only fields that are part of the gitea-data identity"""
raise MissiongEqualyImplementation()
fields_to_parsers = {}
def commit(self):
raise NotImplemented()
def get_dirty_fields(self):
return {name: getattr(self, name) for name in self.dirty_fields}
@classmethod
def parse_response(cls, gitea, result) -> "BasicGiteaApiObject":
# gitea.logger.debug("Found api object of type %s (id: %s)" % (type(cls), id))
api_object = cls(gitea)
cls._initialize(gitea, api_object, result)
return api_object
@classmethod
def _get_gitea_api_object(cls, gitea, args):
"""Retrieving an object always as GET_API_OBJECT """
return gitea.requests_get(cls.GET_API_OBJECT.format(**args))
patchable_fields = set()
@classmethod
def _initialize(cls, gitea, api_object, result):
for name, value in result.items():
if name in cls.fields_to_parsers and value is not None:
parse_func = cls.fields_to_parsers[name]
value = parse_func(gitea, value)
if name in cls.patchable_fields:
cls._add_property(name, value, api_object)
else:
cls._add_readonly_property(name,value,api_object)
# add all patchable fields missing in the request to be writable
for name in cls.patchable_fields:
if not hasattr(api_object,name):
cls._add_property(name, None, api_object)
for name in cls.fields_to_parsers.keys():
if not hasattr(api_object,name):
cls._add_readonly_property(name, None, api_object)
@classmethod
def _add_property(cls, name, value, api_object):
if not hasattr(api_object, name):
prop = property(
(lambda name: lambda self: self.__get_var(name))(name),
(lambda name: lambda self, v: self.__set_var(name, v))(name))
setattr(cls, name, prop)
setattr(api_object, "_" + name, value)
@classmethod
def _add_readonly_property(cls, name, value, api_object):
if not hasattr(api_object, name):
prop = property(
(lambda name: lambda self: self.__get_var(name))(name))
setattr(cls, name, prop)
setattr(api_object, "_" + name, value)
def __set_var(self, name, i):
if self.deleted:
raise ObjectIsInvalid()
self.dirty_fields.add(name)
setattr(self, "_" + name, i)
def __get_var(self, name):
if self.deleted:
raise ObjectIsInvalid()
return getattr(self, "_" + name)

Wyświetl plik

@ -13,6 +13,13 @@ class ObjectIsInvalid(Exception):
class ConflictException(Exception):
pass
class RawRequestEndpointMissing(Exception):
"""This ApiObject can only be obtained through other api objects and does not have
diret .request method."""
pass
class MissiongEqualyImplementation(Exception):
"""
Each Object obtained from the gitea api must be able to check itself for equality in relation to its

Plik diff jest za duży Load Diff

Wyświetl plik

@ -1,27 +0,0 @@
from .basicGiteaApiObject import BasicGiteaApiObject
class GiteaApiObject(BasicGiteaApiObject):
GET_API_OBJECT = "FORMAT/STRING/{argument}"
PATCH_API_OBJECT = "FORMAT/STRING/{argument}"
def __init__(self, gitea):
super(GiteaApiObject, self).__init__(gitea)
@classmethod
def request(cls, gitea, id):
"""Use for giving a nice e.g. 'request(gita, orgname, repo, ticket)'.
All args are put into an args tuple for passing around"""
return cls._request(gitea)
@classmethod
def _request(cls, gitea, args):
result = cls._get_gitea_api_object(gitea, args)
api_object = cls.parse_response(gitea, result)
# hack: not all necessary request args in api result (e.g. repo name in issue)
for key, value in args.items():
if not hasattr(api_object, key):
setattr(api_object, key, value)
return api_object

Wyświetl plik

@ -1,2 +1,3 @@
requests
pytest
frozendict

Wyświetl plik

@ -5,7 +5,7 @@ with open('README.md') as readme_file:
setup_args = dict(
name='py-gitea',
version='0.1.8',
version='0.2.0',
description='A python wrapper for the Gitea API',
long_description_content_type="text/markdown",
long_description=README,

Wyświetl plik

@ -1,7 +1,7 @@
import pytest
import uuid
from gitea import Gitea, User, Organization, Team, Repository, Issue
from gitea import Gitea, User, Organization, Team, Repository, Issue, Milestone
from gitea import NotFoundException, AlreadyExistsException
# put a ".token" file into your directory containg only the token for gitea
@ -172,6 +172,13 @@ def test_create_team(instance):
assert team.description == "descr"
assert team.organization == org
def test_create_milestone(instance):
org = Organization.request(instance, test_org)
repo = org.get_repository(test_repo)
ms = repo.create_milestone("I love this Milestone", "Find an otter to adopt this milestone")
assert isinstance(ms, Milestone)
assert ms.title == "I love this Milestone"
def test_user_teams(instance):
org = Organization.request(instance, test_org)
team = org.get_team(test_team)
@ -206,6 +213,30 @@ def test_hashing(instance):
commit = repo.get_commits()[0]
assert len(set([org, team, user, repo, issue, branch, commit, milestone]))
def test_change_issue(instance):
org = Organization.request(instance, test_org)
repo = org.get_repositories()[0]
ms_title = "othermilestone"
issue = Issue.create_issue(instance, repo, "IssueTestissue with Testinput", "asdf2332")
new_body = "some new description with some more of that char stuff :)"
issue.body = new_body
issue.commit()
number = issue.number
del issue
issue2 = Issue.request(instance, org.username, repo.name, number)
assert issue2.body == new_body
milestone = repo.create_milestone(ms_title, "this is only a teststone2")
issue2.milestone = milestone
issue2.commit()
del issue2
issue3 = Issue.request(instance, org.username, repo.name, number)
assert issue3.milestone is not None
assert issue3.milestone.description == "this is only a teststone2"
issues = repo.get_issues()
assert len([issue for issue in issues
if issue.milestone is not None and issue.milestone.title == ms_title]) > 0
def test_team_get_org(instance):
org = Organization.request(instance, test_org)
user = instance.get_user_by_name(test_user)
@ -282,7 +313,10 @@ def test_delete_org(instance):
def test_delete_user(instance):
user = User.request(instance, test_user)
user_name = test_user + "delte_test"
email = user_name + "@example.org"
user = instance.create_user(user_name, email, "abcdefg1.23AB", send_notify=False)
assert user.username == user_name
user.delete()
with pytest.raises(NotFoundException) as e:
User.request(instance, test_user)
User.request(instance, user_name)

Wyświetl plik

@ -36,4 +36,13 @@ def test_list_repos(instance):
for i in range(1, 34):
instance.create_repo(org, test_repo + "_" + str(i), str(i))
repos = org.get_repositories()
assert len(repos) >= 33
assert len(repos) >= 33
def test_list_issue(instance):
org = Organization.request(instance, test_org)
repo = instance.create_repo(org, test_repo, "Testing a huge number of Issues and how they are listed")
for x in range(0, 100):
Issue.create_issue(instance, repo, "TestIssue" + str(x), "We will be to many to be listed on one page")
issues = repo.get_issues()
assert len(issues) > 98