Browse Source

initial

master
alistair 1 year ago
commit
72c17d2ef6
  1. 1
      c-colours
  2. 0
      example.py
  3. 84
      justsetuptest.py
  4. 231
      read_db.py
  5. 11
      requirements.txt
  6. BIN
      test.py
  7. 645
      treeplot.py
  8. 498
      window.py

1
c-colours

@ -0,0 +1 @@ @@ -0,0 +1 @@
Subproject commit 2974d506e909aab34b030a9d38069c220642fc8e

0
example.py

84
justsetuptest.py

@ -0,0 +1,84 @@ @@ -0,0 +1,84 @@
from read_db import get_from_db
import pickle
from treeplot import *
print("Loading")
DATA_CACHE = "treedata.b"
from pathlib import Path
import os
# data = {}
# path = "/home/alistair/Documents/UQ/"
# path = "/home/alistair/Documents/UQ/2023/COSC3000"
# path = "/run/media/alistair/storj/linux"
#
# for p in Path(path).glob("**"):
# size = os.path.getsize(p)
# p = tuple(x for x in str(p).split("/") if len(x) > 0)
# if len(p) == 0:
# continue
# data[p] = size
#
#
# for i in range(10000):
# if len({p[i+1:]:None}) == len(data):
# end = i
# break
# i = 0
# while len({k[i + 2 :]: None for k in data}) == len(data):
# i += 1
#
# data = {k[i:]: v for k, v in data.items()}
# clean = {}
# for p, size in data.items():
# clean[p] = DataFrame(p, size, size)
#
padding = 3
tm = None
if not os.path.exists(DATA_CACHE):
clean = get_from_db()
print("Building Tree")
map = TreeMap(clean)
print("MAP:", list(map.levels.values())[0:10])
tm = map.to_tree()
for x in list(tm.items())[0:8]:
print(x.weight)
tm = tm.pack(Rect(0, 0, WIDTH, HEIGHT), padding=padding)
for x in list(tm.items())[0:8]:
print(x.weight)
exit(1)
print("Setting labels")
for t in tm.items():
t.attribs["label"] = "/".join(t.path)
tm.attribs["label"] = "root"
print("Done.")
print("Saving to file", DATA_CACHE)
with open(DATA_CACHE, "wb") as f:
pickle.dump(tm, f)
else:
print("Loading from file", DATA_CACHE)
with open(DATA_CACHE, "rb") as f:
tm = pickle.load(f)
print("Packing tree")
tm: TreeMap = tm.pack(Rect(0, 0, WIDTH, HEIGHT), padding=padding)
print("Shrinking tree. Size:", tm.count_children())
tm = tm.trim(threshold=0.1)
print("New size:", tm.count_children())
tm: TreeMap = tm.pack(Rect(0, 0, WIDTH, HEIGHT), padding=padding)
rerooted = tm
parent = [rerooted]
child_frame = rerooted
levels = 3

231
read_db.py

@ -0,0 +1,231 @@ @@ -0,0 +1,231 @@
import sqlite3
from treeplot import DataFrame
from treeplot import TreeFrame
from collections import OrderedDict
from collections import defaultdict
# read from shared memory db created in R
DB_FILE = "../proj/shared-processed-database.db"
DB_FILE_ORIG = "../proj/database.db"
def get_from_db(query="SELECT * FROM stats_by_file", db_file=DB_FILE):
print("Reading from db")
with sqlite3.connect(db_file) as con:
cur = con.cursor()
data = {}
for path, additions, deletions in cur.execute(query):
size = additions + deletions
if size > 0:
weight = additions / size
p = tuple(path.split("/"))
data[p] = DataFrame(
p, size, weight
)
print(list(data.items())[0:5])
print("Loaded data", len(data))
return data
def transfer_weight_from_db(tm: TreeFrame, query="SELECT * FROM stats_by_file", db_file=DB_FILE):
with sqlite3.connect(db_file) as con:
cur = con.cursor()
data = {}
for path, additions, deletions in cur.execute(query):
size = additions + deletions
weight = 0
if size > 0:
weight = additions / size
key = tuple(path.split("/"))
item: TreeFrame = tm.locate_at_key(key)
if item is None:
print("key not found", key)
continue
item.attribs["adds"] = additions
item.attribs["dels"] = deletions
print(list(data.items())[0:5])
print("Loaded data", len(data))
tm.cumsum_attribute("adds", into_attribute="additions")
tm.cumsum_attribute("dels", into_attribute="deletions")
max_size = 0
max_delta = 0
for node in tm.items():
node.attribs["total"] = node.attribs["additions"] + node.attribs["deletions"]
if node.attribs["total"] > max_delta:
max_delta = node.attribs["total"]
if node.size > max_size:
max_size = node.size
for node in tm.items():
node.weight = node.attribs["total"] * (max_size / max_delta) / node.size
print(node.weight)
def union(*items):
x = set()
for y in items:
if not isinstance(y, set):
for k in y:
x.update(k)
else:
x.update(y)
return x
def transfer_author_weight_from_db(tm: TreeFrame):
db_file = DB_FILE_ORIG
query = "select author_name, author_email, commit_time, new_file_name, addition_lines, deletion_lines from " \
"full_deltas"
for node in tm.items():
node.attribs["this_authors"] = set()
node.attribs["this_commits"] = 0
for node in tm.items():
node.attribs["this_authors"] = set()
with sqlite3.connect(db_file) as con:
cur = con.cursor()
data = {}
count = 0
for author_name, author_email, commit_time, path, additions, deletions in cur.execute(query):
key = tuple(path.split("/"))
item: TreeFrame = tm.locate_at_key(key)
while len(key) > 1 and item is None:
key = key[0:-1]
item: TreeFrame = tm.locate_at_key(key)
if item is None:
print("key not found", key)
continue
if item is None:
print("key not found", key)
continue
item.attribs["adds"] = additions
item.attribs["dels"] = deletions
item.attribs["this_authors"].add(author_email)
item.attribs["this_commits"] += 1
count += 1
tm.cumsum_attribute("this_commits", into_attribute="commits")
tm.cumsum_attribute("this_authors", into_attribute="authors", op=union)
tm.cumsum_attribute("adds", into_attribute="additions")
tm.cumsum_attribute("dels", into_attribute="deletions")
max_size = 0
max_delta = 0
for node in tm.items():
if "authors" not in node.attribs:
node.attribs["num_authors"] = 0
else:
node.attribs["num_authors"] = len(node.attribs["authors"])
max_authors = max(len(x.attribs["authors"]) for x in tm.items())
max_this_authors = max(len(x.attribs["this_authors"]) for x in tm.items())
for node in tm.items():
node.weight = len(node.attribs["this_authors"])
print("done")
def transfer_author_weight_from_db_days(tm: TreeFrame):
db_file = DB_FILE
query = "select * from delta_stats_files_time order by `floor_date(as.Date(date), unit = \"week\")`"
all_dates = OrderedDict()
for node in tm.items():
node.attribs["additions_date"] = defaultdict(int)
node.attribs["deletions_date"] = defaultdict(int)
with sqlite3.connect(db_file) as con:
cur = con.cursor()
data = {}
count = 0
for path, date, additions, deletions in cur.execute(query):
key = tuple(path.split("/"))
item: TreeFrame = tm.locate_at_key(key)
while len(key) > 1 and item is None:
key = key[0:-1]
item: TreeFrame = tm.locate_at_key(key)
if item is None:
print("key not found", key)
continue
if item is None:
print("key not found", key)
continue
all_dates[date] = None
item.attribs["additions_date"][date] += additions
item.attribs["deletions_date"][date] += deletions
count += 1
for k in tm.items():
k.weight = 0
print("done")
return list(all_dates.keys())
def transfer_source_analysis_from_db(tm: TreeFrame):
db_file = "../ts/build/database.db"
query = "select * from source_file;"
for node in tm.items():
for k in ["is_source", "is_header", "lines", "bytes", "comments", "comment_lines", "functions", "function_lines"]:
node.attribs[k] = 0
with sqlite3.connect(db_file) as con:
cur = con.cursor()
for x in cur.execute(query):
print(x)
kid, path, extension, is_source, is_header, lines, bytes, comments, comment_lines, functions, function_lines = x
key = tuple(path.replace("/run/media/alistair/storj/linux/", "").split("/"))
item: TreeFrame = tm.locate_at_key(key)
while len(key) > 1 and item is None:
print("Actual key not found", path)
key = key[0:-1]
item: TreeFrame = tm.locate_at_key(key)
if item is None:
print("key not found", key, path)
continue
if item is None:
print("key not found", key, path)
continue
print("Key found: ", key)
item.attribs["is_source"] += is_source
item.attribs["is_header"] += is_header
item.attribs["lines"] += lines
item.attribs["function_lines"] += function_lines
item.attribs["comment_lines"] += comment_lines
item.attribs["comments"] += comments
item.attribs["functions"] += functions
tm.cumsum_attribute("lines")
tm.cumsum_attribute("comments")
tm.cumsum_attribute("comment_lines")
tm.cumsum_attribute("functions")
tm.cumsum_attribute("function_lines")
for k in tm.items():
k.weight = 0
print("done")

11
requirements.txt

@ -0,0 +1,11 @@ @@ -0,0 +1,11 @@
click==7.1.2
colorama==0.4.6
glfw==2.5.9
magic-python==0.4.0
numpy==1.24.3
pybind11==2.10.4
PyOpenGL==3.1.6
python-magic==0.4.27
shellingham==1.5.0.post1
skia-python==87.5
typer==0.2.1

BIN
test.py

Binary file not shown.

645
treeplot.py

@ -0,0 +1,645 @@ @@ -0,0 +1,645 @@
import numpy as np
import skia
from typing import NamedTuple, TypedDict
from functools import cache
from colourpal import *
from collections import OrderedDict
class Rect(NamedTuple):
x: int
y: int
w: int
h: int
def area(self):
return self.w * self.h
def center(self):
return self.x + self.w / 2, self.y + self.h / 2
@cache
def skia(self):
return skia.Rect(self.x, self.y, self.x + self.w, self.y + self.h)
def contains_point(self, x: int, y: int):
right = self.x + self.w
bottom = self.y + self.h
left = self.x
top = self.y
return (x > left and x < right) and (y > top and y < bottom)
def rect_step_towards(self: Rect, other: Rect, amount=0.2):
if amount >= 1:
return other, True
right = self.x + self.w
bottom = self.y + self.h
left = self.x
top = self.y
other_right = other.x + other.w
other_bottom = other.y + other.h
other_left = other.x
other_top = other.y
add_right = other_right - right
add_left = other_left - left
add_top = other_top - top
add_bottom = other_bottom - bottom
diff = abs(add_left) + abs(add_right) + abs(add_top) + abs(add_bottom)
if diff < 5 * amount:
return other, True
left += amount * add_left
right += amount * add_right
top += amount * add_top
bottom += amount * add_bottom
return Rect(left, top, right - left, bottom - top), False
def rect_subdivide_with(self: Rect, props: list["TreeFrame"], padding=0):
# assert isinstance (props[0] , TreeFrame)
if self.h > self.w:
subs = Rect(self.y, self.x, self.h, self.w).subdivide_with(
props, padding=padding
)
return [(df, Rect(s.y, s.x, s.h, s.w)) for df, s in subs]
rect = Rect(self.x, self.y, self.w, self.h)
total = sum(x.size for x in props)
width_budget = self.w - padding * (len(props) + 1)
rects = []
x = rect.x + padding
y = rect.y + padding
h = self.h - 2 * padding
for df in props:
normalised = float(df.size) / float(total)
df.attribs["normalised"] = normalised
w = normalised * width_budget
rects.append((df, Rect(x, y, w, h)))
x += w + padding
return rects
def rect_subdivide(self: Rect, proportions: list[int], padding=0):
if self.h > self.w:
subs = Rect(self.y, self.x, self.h, self.w).subdivide(
proportions, padding=padding
)
return [Rect(s.y, s.x, s.h, s.w) for s in subs]
rect = Rect(self.x, self.y, self.w, self.h)
total = sum(proportions)
width_budget = self.w - padding * (len(proportions) + 1)
rects = []
x = rect.x + padding
y = rect.y + padding
h = self.h - 2 * padding
for proportion in proportions:
w = math.floor((float(proportion) / float(total)) * width_budget)
rects.append(Rect(x, y, w, h))
x += w + padding
return rects
Rect.subdivide = rect_subdivide
Rect.subdivide_with = rect_subdivide_with
Rect.step_towards = rect_step_towards
from collections import defaultdict
from functools import lru_cache
class DataFrame(NamedTuple):
path: tuple[str]
size: int
weight: float
class FrameAttribs(TypedDict):
color: str
label: str
rect: Rect
class TreeFrame:
path: tuple[str]
size: int
weight: float
children: OrderedDict
attribs: dict
def __init__(self, path, size, weight, children, attribs):
self.path = path
self.size = size
self.weight = weight
self.children = children
self.attribs = attribs
def is_leaf(self):
return len(self.children) == 0
def trim(self, threshold=0.01):
children = OrderedDict(
dict(
(k, v.trim())
for k, v in filter(
lambda x: x[1].attribs["normalised"] > threshold,
self.children.items(),
)
)
)
return TreeFrame(self.path, self.size, self.weight, children, self.attribs)
def count_children(self):
return 1 + sum(v.count_children() for v in self.children.values())
def cumsum_attribute(self, attribute, into_attribute=None, op=sum):
for child in self.children.values():
child.cumsum_attribute(attribute, into_attribute=into_attribute, op=op)
if into_attribute is None:
into_attribute = attribute
amount = op(child.attribs[into_attribute] for child in self.children.values()
if into_attribute in child.attribs)
if attribute in self.attribs:
amount = op((amount, self.attribs[attribute]))
self.attribs[into_attribute] = amount
def locate_at_key(self, key: tuple, rem=tuple()):
if len(key) == 0 and len(rem) == 0:
return self
rem = rem + (key[len(rem)],)
if key == rem:
if rem in self.children:
return self.children[rem]
if len(rem) < len(key):
if rem in self.children:
return self.children[rem].locate_at_key(key, rem)
return None
def pack(
self, rect: Rect, level=0, max_level=-1, at=tuple(), padding=0, rattrib="rect"
):
child = self.locate_at_key(at)
if len(at) > 0:
if child is not None:
return chlid.pack(
rect,
level=0,
max_level=max_level,
at=at[1:],
padding=padding,
rattrib=rattrib,
)
return None
# Locate subtree
# if len(at) > 0:
# p = tuple(list(self.path) + [at[0]])
# for k, i in self.children.items():
# if i.path == p:
# return i.pack(
# rect,
# level=0,
# max_level=max_level,
# at=at[1:],
# padding=padding,
# rattrib=rattrib,
# )
# return None
self.attribs["level"] = level
self.attribs[rattrib] = rect
if max_level != -1 and level > max_level:
self.attribs.pop(rattrib)
return self
if len(self.children) < 1:
return self
for df, crect in rect.subdivide_with(self.children.values(), padding=padding):
df.attribs[rattrib] = crect
df.pack(
crect,
level=level + 1,
max_level=max_level,
padding=padding,
rattrib=rattrib,
)
return self
def leaves(self, max_level=-1, level=0):
if self.is_leaf():
yield self
if max_level == -1 or level < max_level:
for k, i in self.children.items():
yield from i.leaves(max_level=max_level, level=level + 1)
def items(self, max_level=-1, level=0):
yield self
if max_level == -1 or level < max_level:
for k, i in self.children.items():
yield from i.items(max_level=max_level, level=level + 1)
def level(self) -> int:
if "level" in self.attribs:
return self.attribs["level"]
return len(self.path) - 1
def locate_at_position(self, x: int, y: int, levels=-1):
containing = None
for k, child in self.children.items():
if child.attribs["rect"].contains_point(x, y):
containing = child
break
if containing is None:
return self
cr = containing.attribs["rect"]
if levels < 0:
next = containing.locate_at_position(x, y, levels)
if next is not None:
return next
else:
return self
if levels > 0:
return containing.locate_at_position(x, y, levels - 1)
return containing
class hashable_defaultdict(defaultdict):
def __hash__(self):
return id(self)
class hashable_immutable_dict(dict):
def __hash__(self):
return id(self)
def _immutable(self, *args, **kws):
raise TypeError("object is immutable")
__setitem__ = _immutable
__delitem__ = _immutable
clear = _immutable
update = _immutable
setdefault = _immutable
pop = _immutable
popitem = _immutable
class TreeMap:
levels: hashable_immutable_dict[tuple, DataFrame]
def __init__(self, levels):
self.levels = hashable_immutable_dict(levels)
self.levels_cache = hashable_defaultdict(list)
for k, v in self.levels.items():
for l in range(len(k) + 1):
self.levels_cache[k[0:l]].append((k, v))
def locate_n(self, path: tuple):
yield from self.levels_cache[path]
# for k, v in self.levels.items():
# if k[0 : len(path)] == path:
# yield (k, v)
def to_groups(self):
groups = defaultdict(dict)
for k, v in self.levels.items():
for i in range(len(k)):
groups[i][k[0:i]] = self.locate_n(k[0:i])
return groups
def to_tree_r(self, path: tuple):
groups = self.groups_under(path)
total = sum(x.size for x in groups)
if len(groups) == 0:
return TreeFrame(path, 0, 0, {}, {})
if len(groups) == 1:
g = groups[0]
return TreeFrame(path, g.size, g.weight, {}, {})
return TreeFrame(
path,
sum(x.size for x in groups),
sum(x.weight for x in groups) / len(groups),
{g.path: self.to_tree_r(g.path) for g in groups if g.path != path},
{},
)
def sort(self):
return TreeMap(
{
k: v
for k, v in sorted(self.levels.items(), key=lambda item: item[1].size)
}
)
def to_tree(self, base_name="base"):
unique = {}
for k in self.levels.keys():
unique[(k[0],)] = None
res = {}
for k in unique.keys():
res[k] = self.to_tree_r(k)
if len(res) == 1:
return list(res.values())[0]
if len(res) != 1:
return TreeFrame(("base",), 1, 1, res, {})
@cache
def groups_under(self, key: tuple):
total_size = defaultdict(int)
avg_weight = defaultdict(float)
num_items = defaultdict(int)
for k, v in self.locate_n(key):
total_size[k[0 : len(key) + 1]] += v.size
avg_weight[k[0 : len(key) + 1]] += v.weight
num_items[k[0 : len(key) + 1]] += 1
for k in avg_weight:
avg_weight[k] /= num_items[k]
return [DataFrame(k, total_size[k], avg_weight[k]) for k in total_size]
import random
import math
def to_rgb(colour: CColour):
return colour.get_r(), colour.get_g(), colour.get_b()
#return int(255 * colour.get_red()), int(255 * colour.get_green()), int(255 * colour.get_blue()),
def to_sk(colour: CColour):
return skia.Color(
colour.get_r(),
colour.get_g(),
colour.get_b()
)
#def to_sk(colour: Color):
# return to_sk_ccol(colour)
# return skia.Color(
# int(255 * colour.get_red()),
# int(255 * colour.get_green()),
# int(255 * colour.get_blue()),
# )
#
def set_colours_gradient(y: TreeFrame, gradient: list[CColour]):
print("Set colours")
for tf in y.items():
index = int((len(gradient) - 1) * tf.attribs["gradient"])
tf.attribs["colour"] = gradient[index].hsv()
def set_colours_bw_new(y: TreeFrame):
print("Set colours")
for tf in y.items():
tf.attribs["colour"] = hsv(0, 0, 1)
#def set_colours_bw(y: TreeFrame):
# print("Set colours")
# for tf in y.items():
# tf.attribs["colour"] = Color("white")
# tf.attribs["colour"].set_luminance(min(tf.weight, 1))
#
def set_colours_new(y: TreeFrame, start: CColour, end: CColour):
y.attribs["colour"] = start
#y.attribs["colour"].set_v(1)
colours = list(make_gradient(start, end, 101))
"""
levels = [0] + list(
np.cumsum(
[
math.floor(100 * child.attribs["normalised"])
for child in y.children.values()
]
)
)
"""
children = list(y.children.values())
cum = 0
next_cum = 0
for i in range(len(y.children)):
child = children[i]
next_cum += math.floor(100 * child.attribs["normalised"])
#col_index = math.floor(100 * child.attribs["normalised"])
set_colours_new(child, colours[cum], colours[next_cum])
cum = next_cum
#def set_colours(y: TreeFrame, start: Color, end: Color):
# y.attribs["colour"] = Color(start)
# y.attribs["colour"].set_luminance(0.7)
#
# colours = list(start.range_to(end, 101))
# """
# levels = [0] + list(
# np.cumsum(
# [
# math.floor(100 * child.attribs["normalised"])
# for child in y.children.values()
# ]
# )
# )
# """
# children = list(y.children.values())
# cum = 0
# next_cum = 0
# for i in range(len(y.children)):
# child = children[i]
# next_cum += math.floor(100 * child.attribs["normalised"])
# #col_index = math.floor(100 * child.attribs["normalised"])
# set_colours(child, colours[cum], colours[next_cum])
# cum = next_cum
#
def draw_labels(
canvas, y: TreeFrame, label_level=1, max_level=4, border_width=1, text_scale=1,
autoscale=False
):
text_paint = skia.Paint(AntiAlias=True, Color=skia.ColorBLACK)
text_stroke_paint = skia.Paint(AntiAlias=True,
Color=skia.ColorWHITE,
Style=skia.Paint.kStroke_Style,
StrokeWidth=text_scale * 2)
for tm in y.items(max_level=max_level):
rect = tm.attribs["rect"]
if tm.level() == label_level:
font = skia.Font(None, 12)
font.setSize(12 * text_scale)
text = skia.TextBlob(tm.attribs["label"], font)
textb = text.bounds()
cx, cy = rect.center()
tw, th = textb.width(), textb.height()
if "sk_colour" not in tm.attribs:
colour = to_sk(tm.attribs["colour"])
stroke_colour = tm.attribs["colour"].hsv()
stroke_colour.set_v(stroke_colour.get_v() * 0.8)
stroke_colour = to_sk(stroke_colour)
tm.attribs["rgbcol"] = to_rgb(tm.attribs["colour"])
tm.attribs["sk_colour"] = colour
tm.attribs["sk_stroke_colour"] = stroke_colour
r, g, b = tm.attribs["rgbcol"]
colour = skia.Color(int(min(tm.weight, 1) * r),
int(min(tm.weight, 1) * g),
int(min(tm.weight, 1) * b))
text_stroke_paint = skia.Paint(AntiAlias=True,
Color=skia.ColorWHITE,
Style=skia.Paint.kStroke_Style,
StrokeWidth=text_scale * 2)
if tw < rect.w and th < rect.h:
canvas.drawTextBlob(text, cx - tw / 2, cy + th / 2, text_stroke_paint)
canvas.drawTextBlob(text, cx - tw / 2, cy + th / 2, text_paint)
elif autoscale:
new_scale = min(rect.w / tw, rect.h / th)
if new_scale > 0.1:
sz = font.getSize()
font.setSize(sz * new_scale)
text = skia.TextBlob(tm.attribs["label"], font)
textb = text.bounds()
tw, th = textb.width(), textb.height()
text_stroke_paint = skia.Paint(AntiAlias=True,
Color=skia.ColorWHITE,
Style=skia.Paint.kStroke_Style,
StrokeWidth=text_scale * 2 * new_scale)
canvas.drawTextBlob(text, cx - tw / 2, cy + th / 2, text_stroke_paint)
canvas.drawTextBlob(text, cx - tw / 2, cy + th / 2, text_paint)
def draw(
canvas, y: TreeFrame, label_level=1, max_level=4, border_width=1, text_scale=1, leaves_only=False
):
# canvas.clear(skia.ColorWHITE)
iter = y.leaves if leaves_only else y.items
for tm in iter(max_level=max_level):
if "rect" not in tm.attribs:
max_level = tm.level()
continue
rect = tm.attribs["rect"]
if rect.w < 0 or rect.h < 0:
continue
colour = None
stroke_colour = None
if "sk_colour" in tm.attribs:
colour = tm.attribs["sk_colour"]
stroke_colour = tm.attribs["sk_stroke_colour"]
r,g,b = tm.attribs["rgbcol"]
colour = skia.Color(int(min(tm.weight, 1) * r),
int(min(tm.weight, 1) * g),
int(min(tm.weight, 1) * b))
elif "colour" in tm.attribs:
colour = to_sk(tm.attribs["colour"])
stroke_colour = tm.attribs["colour"].hsv()
stroke_colour.set_v(stroke_colour.get_v() * 0.8)
stroke_colour = to_sk(stroke_colour)
tm.attribs["rgbcol"] = to_rgb(tm.attribs["colour"])
tm.attribs["sk_colour"] = colour
tm.attribs["sk_stroke_colour"] = stroke_colour
r,g,b = tm.attribs["rgbcol"]
colour = skia.Color(int(min(tm.weight, 1) * r),
int(min(tm.weight, 1) * g),
int(min(tm.weight, 1) * b))
else:
colour = skia.Color(
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
)
stroke_colour = colour
t_border_width = border_width
if "highlight" in tm.attribs:
colour = to_sk(tm.attribs["colour"])
stroke_colour = tm.attribs["colour"]
stroke_colour.set_v(0.1)
stroke_colour = to_sk(stroke_colour)
t_border_width = border_width * 2
paint = skia.Paint(AntiAlias=False, Color=colour)
# if tm.level() < 1:
# paint = skia.Paint(AntiAlias=False, Color=skia.ColorWHITE)
stroke_style = skia.Paint(
AntiAlias=True,
Color=colour,
Style=skia.Paint.kStroke_Style,
StrokeWidth=t_border_width,
)
canvas.drawRect(rect.skia(), paint)
canvas.drawRect(rect.skia(), stroke_style)
# Text labels
def draw_text(
canvas, text: str, cx: float, cy: float, size=12
):
font = skia.Font(None, size)
text = skia.TextBlob(text, font)
textb = text.bounds()
text_paint = skia.Paint(AntiAlias=True, Color=skia.ColorWHITE, style=skia.Paint.kFill_Style)
text_stroke_paint = skia.Paint(AntiAlias=True,
Color=skia.ColorWHITE,
Style=skia.Paint.kStroke_Style,
StrokeWidth=2)
#tw, th = textb.width(), textb.height()
#canvas.drawTextBlob(text, cx, cy, text_stroke_paint)
canvas.drawTextBlob(text, cx, cy, text_paint)
# canvas.clear(skia.ColorWHITE)

498
window.py

@ -0,0 +1,498 @@ @@ -0,0 +1,498 @@
import contextlib, glfw, skia
import sys
import time
from datetime import datetime, timedelta, timezone
from pprint import pprint
from OpenGL import GL
from read_db import get_from_db, transfer_weight_from_db, transfer_author_weight_from_db, \
transfer_author_weight_from_db_days, transfer_source_analysis_from_db
import pickle
from treeplot import *
import magic
from colourpal import *
import math
import os
DATE_SPACE = 0
WIDTH, HEIGHT = 1500, 1000
RECT_HEIGHT = HEIGHT - DATE_SPACE
# https://gist.github.com/archeoid/6f4df73886730d09faabfe6d02e79fb2
DATA_CACHE = "treedata.b"
FILES_LIST_CACHE = "filesystemdata.b"
@contextlib.contextmanager
def glfw_window():
if not glfw.init():
raise RuntimeError("glfw.init() failed")
glfw.window_hint(glfw.STENCIL_BITS, 8)
window = glfw.create_window(WIDTH, HEIGHT, "", None, None) # glfw.get_primary_monitor()
glfw.make_context_current(window)
yield window
glfw.terminate()
@contextlib.contextmanager
def skia_surface(window):
context = skia.GrDirectContext.MakeGL()
(fb_width, fb_height) = glfw.get_framebuffer_size(window)
backend_render_target = skia.GrBackendRenderTarget(
fb_width,
fb_height,
0, # sampleCnt
0, # stencilBits
skia.GrGLFramebufferInfo(0, GL.GL_RGBA8),
)
surface = skia.Surface.MakeFromBackendRenderTarget(
context,
backend_render_target,
skia.kBottomLeft_GrSurfaceOrigin,
skia.kRGBA_8888_ColorType,
skia.ColorSpace.MakeSRGB(),
)
assert surface is not None
yield surface
context.abandonContext()
print("Loading")
from pathlib import Path
import os
if not os.path.exists(FILES_LIST_CACHE):
data = {}
# path = "/home/alistair/Documents/UQ/"
# path = "/home/alistair/Documents/UQ/2023/COSC3000"
path = "/run/media/alistair/storj/linux/"
for p in Path(path).glob("**/*"):
size = 1 # os.path.getsize(p)
if os.path.isfile(p) and "text" in magic.from_file(p):
try:
with open(p, "r") as f:
size = len(f.readlines())
except:
print("LDSALDjLKADJLAJDA")
elif os.path.isdir(p):
continue
pp = tuple(x for x in str(p).split("/") if len(x) > 0)
if len(pp) == 0:
continue
data[pp] = size
# for i in range(10000):
# if len({p[i+1:]:None}) == len(data):
# end = i
# break
i = 0
while len({k[i + 2:]: None for k in data}) == len(data):
i += 1
data = {k[i:]: v for k, v in data.items()}
clean = {}
for p, size in data.items():
p = p[1:]
if len(p) > 0:
clean[p] = DataFrame(p, size, size)
with open(FILES_LIST_CACHE, "wb") as f:
pickle.dump(clean, f)
else:
with open(FILES_LIST_CACHE, "rb") as f:
clean = pickle.load(f)
clearcolour = skia.ColorBLACK
padding = 0
leaves_only=True
tm = None
if not os.path.exists(DATA_CACHE):
# clean = get_from_db()
print("Building Tree")
map = TreeMap(clean)
print("MAP:", list(map.levels.values())[0:10])
tm = map.to_tree()
tm = tm.pack(Rect(0, 0, WIDTH, RECT_HEIGHT), padding=padding)
print("Setting labels")
for t in tm.items():
t.attribs["label"] = "/".join(t.path)
tm.attribs["label"] = "root"
print("Done.")
# transfer_weight_from_db(tm)
#dates = transfer_author_weight_from_db_days(tm)
transfer_author_weight_from_db(tm)
#tm.attribs["all_dates"] = dates
print("Saving to file", DATA_CACHE)
with open(DATA_CACHE, "wb") as f:
pickle.dump(tm, f)
else:
print("Loading from file", DATA_CACHE)
with open(DATA_CACHE, "rb") as f:
tm = pickle.load(f)
print("Packing tree")
tm: TreeMap = tm.pack(Rect(0, 0, WIDTH, RECT_HEIGHT), padding=padding)
print("Shrinking tree. Size:", tm.count_children())
# tm = tm.trim(threshold=0.05)
print("New size:", tm.count_children())
tm: TreeMap = tm.pack(Rect(0, 0, WIDTH, RECT_HEIGHT), padding=padding)
#transfer_source_analysis_from_db(tm)
rerooted = tm
parent = [rerooted]
child_frame = rerooted
anim_speed = 1.1
levels = 1000
#
# for node in tm.items():
# # With transfer_author_weight_from_db(tm)
# # Num deltas
# node.weight = 1.1 * math.log(1 + node.attribs["additions"] + node.attribs["deletions"])
#
def authors_heatmap():
global tm
max_authors = max(len(n.attribs["this_authors"]) for n in tm.items())
for node in tm.items():
node.weight = 5 * len(node.attribs["this_authors"]) / max_authors
# With transfer_author_weight_from_db(tm)
# num authors
set_colours_new(tm, hsv(0, 1, 1), hsv(364, 1, 1))
def additions_to_deletions_heatmap():
global tm
max_d = math.log(max(t.attribs["additions"] + t.attribs["deletions"] for t in tm.leaves()))
max_d = max(t.attribs["additions"] + t.attribs["deletions"] for t in tm.leaves())
print(max_d)
for node in tm.items():
# With transfer_author_weight_from_db(tm)
# Amount added
if (node.attribs["additions"] + node.attribs["deletions"]) > 0:
node.weight = math.log(node.attribs["additions"] + node.attribs["deletions"]) / math.log(max_d)
node.attribs["gradient"] = node.attribs["additions"] / (node.attribs["additions"] + node.attribs["deletions"])
else:
node.attribs["gradient"] = 0.5
node.weight = 0
grad = lerp_gradient(rgb(255, 90, 0 ), rgb(0, 90, 255), 100)
set_colours_gradient(tm, grad)
#
additions_to_deletions_heatmap()
#authors_heatmap()
#for node in tm.items():
# node.weight = 1.0
# Scaling for weight
# max_weight = max(node.weight for node in tm.items())
# scaling = 1/(max_weight)
# for node in tm.items():
# node.weight *= scaling
# print(max_weight)
print("Setting Colours")
#set_colours(tm, Color("red"), Color("blue"))
# set_colours_bw(tm)
#set_colours_new(tm, hsv(0, 1, 1.0), hsv(364, 1, 1.0))
tm.attribs["colour"] = rgb(255,255,255)
#for node in tm.items():
# # # With
# # # Amount added
# # # transfer_source_analysis_from_db(tm)
# node.weight = node.attribs["comment_lines"] / node.attribs["lines"] if node.attribs["lines"] > 0 else 0
# if not (node.attribs["is_source"] or node.attribs["is_header"]) and node.is_leaf():
# node.attribs["colour"] = hsv(0, 0, 0.2)
# node.weight = 1
## print(node.weight)
#
resize = False
finished_animation = True
start_animation = True
animation_is_going_up = False
dragging = False
translation = (0, 0)
zoom = 1
def handle_resize(window, width, height):
global resize
global rerooted
global WIDTH
global HEIGHT
WIDTH = width
HEIGHT = height
RECT_HEIGHT = height - DATE_SPACE
resize = True
rerooted = child_frame
rerooted = rerooted.pack(
Rect(0, 0, WIDTH, RECT_HEIGHT), padding=padding, rattrib="rect_target"
)
def handle_mouse_click(window, button, action, mods):
x, y = glfw.get_cursor_pos(window)
global rerooted
global update
global parent
global child_frame
global start_animation
global animating_up
global dragging
global translation
if action == glfw.PRESS and button == glfw.MOUSE_BUTTON_1:
if glfw.get_key(window, glfw.KEY_LEFT_SHIFT) == glfw.PRESS:
dragging = glfw.get_cursor_pos(window)
print("drag")
return
if action == glfw.RELEASE:
if dragging:
x, y = glfw.get_cursor_pos(window)
translation = (
translation[0] + x - dragging[0],
translation[1] + y - dragging[1],
)
dragging = None
return
if action == glfw.RELEASE and button == glfw.MOUSE_BUTTON_2:
rerooted = child_frame # if cancelling animation
new = parent.pop(-1)
new.pack(Rect(0, 0, WIDTH, RECT_HEIGHT), padding=padding, rattrib="rect_target")
child_frame = new
if (len(parent)) == 0:
parent = [new]
start_animation = True
animation_is_going_up = True
return
if not finished_animation:
return
if action == glfw.RELEASE and button == glfw.MOUSE_BUTTON_1:
new = rerooted.locate_at_position(x, y, levels=0)
if new is None:
return
parent.append(rerooted)
new.pack(Rect(0, 0, WIDTH, RECT_HEIGHT), padding=padding, rattrib="rect_target")
# rerooted = new
child_frame = new
start_animation = True
animation_is_going_up = False
def handle_scroll(window, x, y):
global zoom
zoom += 0.1 * y
zoom = max(min(zoom, 5), 1)
need_redraw = True
def main():
frame = 0
global resize
global levels
global rerooted
global finished_animation
global start_animation
animation_background = None
current_date = 0
with glfw_window() as window:
GL.glClear(GL.GL_COLOR_BUFFER_BIT)
glfw.set_window_size_callback(window, handle_resize)
glfw.set_mouse_button_callback(window, handle_mouse_click)
glfw.set_scroll_callback(window, handle_scroll)
while True:
with skia_surface(window) as surface:
def redraw(canvas, force=False):
global need_redraw
if not (force or need_redraw or resize or not finished_animation):
return
draw(
canvas,
rerooted,
max_level=levels,
label_level=2,
text_scale=2 / zoom,
leaves_only=leaves_only
)
canvas.scale(1 / zoom, 1 / zoom)
canvas.translate(-tx, -ty)
draw_labels(canvas, rerooted, max_level=levels, label_level=4, text_scale=1 / zoom)
draw_labels(canvas, rerooted, max_level=levels, label_level=3, text_scale=1 / zoom)
draw_labels(canvas, rerooted, max_level=levels, label_level=2, text_scale=2 / zoom)
draw_labels(canvas, rerooted, max_level=levels, label_level=1, text_scale=3 / zoom, autoscale=True)
need_redraw = False
while not resize:
if (
glfw.get_key(window, glfw.KEY_ESCAPE) == glfw.PRESS
) or glfw.window_should_close(window):
print("done")
return
if glfw.get_key(window, glfw.KEY_EQUAL) == glfw.PRESS:
levels += 1
if glfw.get_key(window, glfw.KEY_MINUS) == glfw.PRESS:
levels -= 1
if glfw.get_key(window, glfw.KEY_S) == glfw.PRESS:
outf = "out/" + str(time.time()) + "-" "_".join(rerooted.path)
print("Saving")
image = surface.makeImageSnapshot()
if image is not None:
image.save(outf + ".png", skia.kPNG)
else:
print("Error saving")
# https://kyamagu.github.io/skia-python/tutorial/canvas.html#pdf
stream = skia.FILEWStream(outf + ".pdf")
with skia.PDF.MakeDocument(stream) as document:
with document.page(WIDTH, HEIGHT) as canvas:
redraw(canvas, force=True)
stream.flush()
print("Saved PDF")
# Next date
# if "all_dates" in tm.attribs:
# this_date = tm.attribs["all_dates"][current_date]
# print("date: ", this_date)
# for k in tm.items():
# if this_date in k.attribs["additions_date"]:
# k.weight = math.log(1 + k.attribs["additions_date"][this_date] + k.attribs["deletions_date"][this_date])
# k.weight *= 0.1
# else:
# k.weight = 0
# current_date = (current_date + 1) % len(tm.attribs["all_dates"])
# #set_colours_bw(tm)
# #set_colours(tm, Color("red"), Color("blue"))
# tm.attribs["colour"] = Color("white")
tx, ty = translation
if dragging:
tx, ty = (
translation[0] + x - dragging[0],
translation[1] + y - dragging[1],
)
# Mouse transformed to canvas
x, y = glfw.get_cursor_pos(window)
adj_x, adj_y = (
(x + translation[0]) / zoom,
(y + translation[1]) / zoom,
)
# highlight
#highlighted = rerooted.locate_at_position(adj_x, adj_y, levels=0)
#highlighted.attribs["highlight"] = False
# Interpolate animation
if start_animation:
start_animation = False
if not animation_is_going_up:
animation_background = surface.makeImageSnapshot()
finished_animation = True
for t in child_frame.items():
if "rect_target" in t.attribs:
t.attribs["rect"], finished = t.attribs[
"rect"
].step_towards(t.attribs["rect_target"], amount=anim_speed)
if finished:
t.attribs.pop("rect_target")
else:
finished_animation = False
if not finished_animation:
with surface as canvas:
canvas.translate(tx, ty)
canvas.scale(zoom, zoom)
canvas.clear(clearcolour)
canvas.drawImage(animation_background, 0, 0)
draw(
canvas,
child_frame,
max_level=levels,
label_level=-1,
leaves_only=leaves_only
)
canvas.scale(1 / zoom, 1 / zoom)
canvas.translate(-tx, -ty)
else:
rerooted = child_frame
with surface as canvas:
redraw(canvas, force=True)
# ddd = datetime.strptime(time.ctime(this_date * 60 * 60 * 24), "%a %b %d %H:%M:%S %Y")
# draw_text(canvas, ddd.strftime("Week from %d %b %Y"), 10, 1080 - 10, size=30)
# Draw label tooltip
treeframe = rerooted.locate_at_position(
adj_x, adj_y, levels=levels - 1
)
if glfw.get_key(window, glfw.KEY_SPACE) == glfw.PRESS:
pprint(treeframe)
pprint(treeframe.attribs)
treeframe = None
if treeframe is not None:
text = skia.TextBlob(
treeframe.attribs["label"],
# + " value: "
# + str(treeframe.size),
skia.Font(None, 12),
)
textb = text.bounds()
tw, th = textb.width(), textb.height()
paint1 = skia.Paint(AntiAlias=True, Color=skia.ColorWHITE)
paint2 = skia.Paint(AntiAlias=True, Color=skia.ColorBLACK)
rx = min(x, WIDTH - tw - 10)
ry = y + th + 15
bg = skia.Rect(rx, ry, rx + tw, ry + th)
with surface as canvas:
# canvas.drawRect(bg, paint1)
# canvas.scale(zoom, zoom)
# canvas.translate(tx, ty)
canvas.drawTextBlob(text, rx, ry, paint2)
canvas.drawTextBlob(text, rx, ry, paint2)
# canvas.translate(-tx, -ty)
# canvas.scale(1 / zoom, 1 / zoom)
surface.flushAndSubmit()
glfw.swap_buffers(window)
#glfw.poll_events()
glfw.wait_events()
#highlighted.attribs.pop("highlight")
# image = surface.makeImageSnapshot()
# image.save(f"anim/{current_date:04}-out.png", skia.kPNG)
resize = False
# import cProfile
# cProfile.run('main()')
main()
sys.exit(0)
Loading…
Cancel
Save