enhance(backend): improve chart engine

This commit is contained in:
syuilo
2021-12-14 18:12:37 +09:00
parent d95fafb5b3
commit 0be4e10462
36 changed files with 604 additions and 248 deletions

View File

@@ -30,7 +30,7 @@ type Log = {
/**
* 集計のグループ
*/
group: string | null;
group?: string | null;
/**
* 集計日時のUnixタイムスタンプ(秒)
@@ -38,7 +38,7 @@ type Log = {
date: number;
};
const camelToSnake = (str: string) => {
const camelToSnake = (str: string): string => {
return str.replace(/([A-Z])/g, s => '_' + s.charAt(0).toLowerCase());
};
@@ -47,6 +47,7 @@ const removeDuplicates = (array: any[]) => Array.from(new Set(array));
/**
* 様々なチャートの管理を司るクラス
*/
// eslint-disable-next-line import/no-default-export
export default abstract class Chart<T extends Record<string, any>> {
private static readonly columnPrefix = '___';
private static readonly columnDot = '_';
@@ -57,7 +58,8 @@ export default abstract class Chart<T extends Record<string, any>> {
group: string | null;
}[] = [];
public schema: SimpleSchema;
protected repository: Repository<Log>;
protected repositoryForHour: Repository<Log>;
protected repositoryForDay: Repository<Log>;
protected abstract genNewLog(latest: T): DeepPartial<T>;
@@ -181,9 +183,15 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
public static schemaToEntity(name: string, schema: SimpleSchema): EntitySchema {
return new EntitySchema({
name: `__chart__${camelToSnake(name)}`,
public static schemaToEntity(name: string, schema: SimpleSchema, grouped = false): {
hour: EntitySchema,
day: EntitySchema,
} {
const createEntity = (span: 'hour' | 'day'): EntitySchema => new EntitySchema({
name:
span === 'hour' ? `__chart__${camelToSnake(name)}` :
span === 'day' ? `__chart_day__${camelToSnake(name)}` :
new Error('not happen') as never,
columns: {
id: {
type: 'integer',
@@ -193,37 +201,45 @@ export default abstract class Chart<T extends Record<string, any>> {
date: {
type: 'integer',
},
group: {
type: 'varchar',
length: 128,
nullable: true,
},
...(grouped ? {
group: {
type: 'varchar',
length: 128,
},
} : {}),
...Chart.convertSchemaToFlatColumnDefinitions(schema),
},
indices: [{
columns: ['date', 'group'],
columns: grouped ? ['date', 'group'] : ['date'],
unique: true,
}, { // groupにnullが含まれると↑のuniqueは機能しないので↓の部分インデックスでカバー
columns: ['date'],
unique: true,
where: '"group" IS NULL',
}],
uniques: [{
columns: grouped ? ['date', 'group'] : ['date'],
}],
relations: {
/* TODO
group: {
target: () => Foo,
type: 'many-to-one',
onDelete: 'CASCADE',
},
*/
},
});
return {
hour: createEntity('hour'),
day: createEntity('day'),
};
}
constructor(name: string, schema: SimpleSchema, grouped = false) {
this.name = name;
this.schema = schema;
const entity = Chart.schemaToEntity(name, schema);
const keys = ['date'];
if (grouped) keys.push('group');
entity.options.uniques = [{
columns: keys,
}];
this.repository = getRepository<Log>(entity);
const { hour, day } = Chart.schemaToEntity(name, schema, grouped);
this.repositoryForHour = getRepository<Log>(hour);
this.repositoryForDay = getRepository<Log>(day);
}
@autobind
@@ -247,24 +263,40 @@ export default abstract class Chart<T extends Record<string, any>> {
}
@autobind
private getLatestLog(group: string | null = null): Promise<Log | null> {
return this.repository.findOne({
private getLatestLog(group: string | null, span: 'hour' | 'day'): Promise<Log | null> {
const repository =
span === 'hour' ? this.repositoryForHour :
span === 'day' ? this.repositoryForDay :
new Error('not happen') as never;
return repository.findOne(group ? {
group: group,
}, {
} : {}, {
order: {
date: -1,
},
}).then(x => x || null);
}
/**
* 現在(=今のHour or Day)のログをデータベースから探して、あればそれを返し、なければ作成して返します。
*/
@autobind
private async getCurrentLog(group: string | null = null): Promise<Log> {
private async claimCurrentLog(group: string | null, span: 'hour' | 'day'): Promise<Log> {
const [y, m, d, h] = Chart.getCurrentDate();
const current = dateUTC([y, m, d, h]);
const current = dateUTC(
span === 'hour' ? [y, m, d, h] :
span === 'day' ? [y, m, d] :
new Error('not happen') as never);
// 現在(=今のHour)のログ
const currentLog = await this.repository.findOne({
const repository =
span === 'hour' ? this.repositoryForHour :
span === 'day' ? this.repositoryForDay :
new Error('not happen') as never;
// 現在(=今のHour or Day)のログ
const currentLog = await repository.findOne({
date: Chart.dateToTimestamp(current),
...(group ? { group: group } : {}),
});
@@ -283,7 +315,7 @@ export default abstract class Chart<T extends Record<string, any>> {
// * 昨日何もチャートを更新するような出来事がなかった場合は、
// * ログがそもそも作られずドキュメントが存在しないということがあり得るため、
// * 「昨日の」と決め打ちせずに「もっとも最近の」とします
const latest = await this.getLatestLog(group);
const latest = await this.getLatestLog(group, span);
if (latest != null) {
const obj = Chart.convertFlattenColumnsToObject(latest) as T;
@@ -297,16 +329,16 @@ export default abstract class Chart<T extends Record<string, any>> {
// 初期ログデータを作成
data = this.getNewLog(null);
logger.info(`${this.name + (group ? `:${group}` : '')}: Initial commit created`);
logger.info(`${this.name + (group ? `:${group}` : '')}(${span}): Initial commit created`);
}
const date = Chart.dateToTimestamp(current);
const lockKey = `${this.name}:${date}:${group}`;
const lockKey = group ? `${this.name}:${date}:${span}:${group}` : `${this.name}:${date}:${span}`;
const unlock = await getChartInsertLock(lockKey);
try {
// ロック内でもう1回チェックする
const currentLog = await this.repository.findOne({
const currentLog = await repository.findOne({
date: date,
...(group ? { group: group } : {}),
});
@@ -315,13 +347,13 @@ export default abstract class Chart<T extends Record<string, any>> {
if (currentLog != null) return currentLog;
// 新規ログ挿入
log = await this.repository.insert({
group: group,
log = await repository.insert({
date: date,
...(group ? { group: group } : {}),
...Chart.convertObjectToFlattenColumns(data),
}).then(x => this.repository.findOneOrFail(x.identifiers[0]));
}).then(x => repository.findOneOrFail(x.identifiers[0]));
logger.info(`${this.name + (group ? `:${group}` : '')}: New commit created`);
logger.info(`${this.name + (group ? `:${group}` : '')}(${span}): New commit created`);
return log;
} finally {
@@ -349,10 +381,10 @@ export default abstract class Chart<T extends Record<string, any>> {
// そのログは本来は 01:00~ のログとしてDBに保存されて欲しいのに、02:00~ のログ扱いになってしまう。
// これを回避するための実装は複雑になりそうなため、一旦保留。
const update = async (log: Log) => {
const update = async (logHour: Log, logDay: Log): Promise<void> => {
const finalDiffs = {} as Record<string, number | unknown[]>;
for (const diff of this.buffer.filter(q => q.group === log.group).map(q => q.diff)) {
for (const diff of this.buffer.filter(q => q.group == null || (q.group === logHour.group)).map(q => q.diff)) {
const columns = Chart.convertObjectToFlattenColumns(diff);
for (const [k, v] of Object.entries(columns)) {
@@ -371,36 +403,60 @@ export default abstract class Chart<T extends Record<string, any>> {
const query = Chart.convertQuery(finalDiffs);
// ログ更新
await this.repository.createQueryBuilder()
.update()
.set(query)
.where('id = :id', { id: log.id })
.execute();
await Promise.all([
this.repositoryForHour.createQueryBuilder()
.update()
.set(query)
.where('id = :id', { id: logHour.id })
.execute(),
this.repositoryForDay.createQueryBuilder()
.update()
.set(query)
.where('id = :id', { id: logDay.id })
.execute(),
]);
logger.info(`${this.name + (log.group ? `:${log.group}` : '')}: Updated`);
logger.info(`${this.name + (logHour.group ? `:${logHour.group}` : '')}: Updated`);
// TODO: この一連の処理が始まった後に新たにbufferに入ったものは消さないようにする
this.buffer = this.buffer.filter(q => q.group !== log.group);
this.buffer = this.buffer.filter(q => q.group != null && (q.group !== logHour.group));
};
const groups = removeDuplicates(this.buffer.map(log => log.group));
await Promise.all(groups.map(group => this.getCurrentLog(group).then(log => update(log))));
await Promise.all(
groups.map(group =>
Promise.all([
this.claimCurrentLog(group, 'hour'),
this.claimCurrentLog(group, 'day'),
]).then(([logHour, logDay]) =>
update(logHour, logDay))));
}
@autobind
public async resync(group: string | null = null): Promise<any> {
public async resync(group: string | null = null): Promise<void> {
const data = await this.fetchActual(group);
const update = async (log: Log) => {
await this.repository.createQueryBuilder()
.update()
.set(Chart.convertObjectToFlattenColumns(data))
.where('id = :id', { id: log.id })
.execute();
const update = async (logHour: Log, logDay: Log): Promise<void> => {
await Promise.all([
this.repositoryForHour.createQueryBuilder()
.update()
.set(Chart.convertObjectToFlattenColumns(data))
.where('id = :id', { id: logHour.id })
.execute(),
this.repositoryForDay.createQueryBuilder()
.update()
.set(Chart.convertObjectToFlattenColumns(data))
.where('id = :id', { id: logDay.id })
.execute(),
]);
};
return this.getCurrentLog(group).then(log => update(log));
return Promise.all([
this.claimCurrentLog(group, 'hour'),
this.claimCurrentLog(group, 'day'),
]).then(([logHour, logDay]) =>
update(logHour, logDay));
}
@autobind
@@ -418,13 +474,18 @@ export default abstract class Chart<T extends Record<string, any>> {
const gt =
span === 'day' ? subtractTime(cursor ? dateUTC([y2, m2, d2, 0]) : dateUTC([y, m, d, 0]), amount - 1, 'day') :
span === 'hour' ? subtractTime(cursor ? dateUTC([y2, m2, d2, h2]) : dateUTC([y, m, d, h]), amount - 1, 'hour') :
null as never;
new Error('not happen') as never;
const repository =
span === 'hour' ? this.repositoryForHour :
span === 'day' ? this.repositoryForDay :
new Error('not happen') as never;
// ログ取得
let logs = await this.repository.find({
let logs = await repository.find({
where: {
group: group,
date: Between(Chart.dateToTimestamp(gt), Chart.dateToTimestamp(lt)),
...(group ? { group: group } : {}),
},
order: {
date: -1,
@@ -435,9 +496,9 @@ export default abstract class Chart<T extends Record<string, any>> {
if (logs.length === 0) {
// もっとも新しいログを持ってくる
// (すくなくともひとつログが無いと隙間埋めできないため)
const recentLog = await this.repository.findOne({
const recentLog = await repository.findOne(group ? {
group: group,
}, {
} : {}, {
order: {
date: -1,
},
@@ -451,9 +512,9 @@ export default abstract class Chart<T extends Record<string, any>> {
} else if (!isTimeSame(new Date(logs[logs.length - 1].date * 1000), gt)) {
// 要求された範囲の最も古い箇所時点での最も新しいログを持ってきて末尾に追加する
// (隙間埋めできないため)
const outdatedLog = await this.repository.findOne({
group: group,
const outdatedLog = await repository.findOne({
date: LessThan(Chart.dateToTimestamp(gt)),
...(group ? { group: group } : {}),
}, {
order: {
date: -1,
@@ -467,60 +528,26 @@ export default abstract class Chart<T extends Record<string, any>> {
const chart: T[] = [];
if (span === 'hour') {
for (let i = (amount - 1); i >= 0; i--) {
const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour');
for (let i = (amount - 1); i >= 0; i--) {
const current =
span === 'hour' ? subtractTime(dateUTC([y, m, d, h]), i, 'hour') :
span === 'day' ? subtractTime(dateUTC([y, m, d]), i, 'day') :
new Error('not happen') as never;
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
if (log) {
const data = Chart.convertFlattenColumnsToObject(log);
chart.unshift(Chart.countUniqueFields(data) as T);
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest) as T : null;
chart.unshift(Chart.countUniqueFields(this.getNewLog(data)) as T);
}
}
} else if (span === 'day') {
const logsForEachDays: T[][] = [];
let currentDay = -1;
let currentDayIndex = -1;
for (let i = ((amount - 1) * 24) + h; i >= 0; i--) {
const current = subtractTime(dateUTC([y, m, d, h]), i, 'hour');
const _currentDay = Chart.parseDate(current)[2];
if (currentDay != _currentDay) currentDayIndex++;
currentDay = _currentDay;
const log = logs.find(l => isTimeSame(new Date(l.date * 1000), current));
if (log) {
if (logsForEachDays[currentDayIndex]) {
logsForEachDays[currentDayIndex].unshift(Chart.convertFlattenColumnsToObject(log) as T);
} else {
logsForEachDays[currentDayIndex] = [Chart.convertFlattenColumnsToObject(log) as T];
}
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest) as T : null;
const newLog = this.getNewLog(data);
if (logsForEachDays[currentDayIndex]) {
logsForEachDays[currentDayIndex].unshift(newLog);
} else {
logsForEachDays[currentDayIndex] = [newLog];
}
}
}
for (const logs of logsForEachDays) {
const log = this.aggregate(logs);
chart.unshift(Chart.countUniqueFields(log) as T);
if (log) {
const data = Chart.convertFlattenColumnsToObject(log);
chart.unshift(Chart.countUniqueFields(data) as T);
} else {
// 隙間埋め
const latest = logs.find(l => isTimeBefore(new Date(l.date * 1000), current));
const data = latest ? Chart.convertFlattenColumnsToObject(latest) as T : null;
chart.unshift(Chart.countUniqueFields(this.getNewLog(data)) as T);
}
}
const res: ArrayValue<T> = {} as any;
const res = {} as Record<string, unknown>;
/**
* [{ foo: 1, bar: 5 }, { foo: 2, bar: 6 }, { foo: 3, bar: 7 }]
@@ -528,7 +555,7 @@ export default abstract class Chart<T extends Record<string, any>> {
* { foo: [1, 2, 3], bar: [5, 6, 7] }
* にする
*/
const compact = (x: Obj, path?: string) => {
const compact = (x: Obj, path?: string): void => {
for (const [k, v] of Object.entries(x)) {
const p = path ? `${path}.${k}` : k;
if (typeof v === 'object' && !Array.isArray(v)) {
@@ -542,7 +569,7 @@ export default abstract class Chart<T extends Record<string, any>> {
compact(chart[0]);
return res;
return res as ArrayValue<T>;
}
}