forked from andyburke/autonomous.contact
refactor: events to a pure stream instead of being part of topics
NOTE: tests are passing, but the client is broken
This commit is contained in:
parent
c34069066d
commit
a5707e2f81
31 changed files with 934 additions and 686 deletions
252
public/api/events/index.ts
Normal file
252
public/api/events/index.ts
Normal file
|
|
@ -0,0 +1,252 @@
|
|||
import lurid from '@andyburke/lurid';
|
||||
import { get_session, get_user, PRECHECK_TABLE, require_user, user_has_write_permission_for_event } from '../../../utils/prechecks.ts';
|
||||
import * as CANNED_RESPONSES from '../../../utils/canned_responses.ts';
|
||||
import { EVENT, EVENTS, VALIDATE_EVENT } from '../../../models/event.ts';
|
||||
import parse_body from '../../../utils/bodyparser.ts';
|
||||
import { FSDB_SEARCH_OPTIONS, WALK_ENTRY } from '@andyburke/fsdb';
|
||||
import { WATCH, WATCHES } from '../../../models/watch.ts';
|
||||
import { flatten } from '../../../utils/object_helpers.ts';
|
||||
import { CHANNEL, CHANNELS } from '../../../models/channel.ts';
|
||||
|
||||
export const PRECHECKS: PRECHECK_TABLE = {};
|
||||
|
||||
// GET /api/events - get events
|
||||
// query parameters:
|
||||
// partial_id: the partial id subset you would like to match (remember, lurids are lexigraphically sorted)
|
||||
PRECHECKS.GET = [get_session, get_user, require_user];
|
||||
export async function GET(request: Request, meta: Record<string, any>): Promise<Response> {
|
||||
const sorts = EVENTS.sorts;
|
||||
const sort_name: string = meta.query.sort ?? 'newest';
|
||||
const key = sort_name as keyof typeof sorts;
|
||||
const sort: any = sorts[key];
|
||||
if (!sort) {
|
||||
return Response.json({
|
||||
error: {
|
||||
message: 'You must specify a sort: newest, oldest, latest, stalest',
|
||||
cause: 'invalid_sort'
|
||||
}
|
||||
}, {
|
||||
status: 400
|
||||
});
|
||||
}
|
||||
|
||||
const options: FSDB_SEARCH_OPTIONS<EVENT> = {
|
||||
...(meta.query ?? {}),
|
||||
limit: Math.min(parseInt(meta.query?.limit ?? '100', 10), 1_000),
|
||||
offset: Math.max(parseInt(meta.query?.offset ?? '0', 10), 0),
|
||||
sort,
|
||||
filter: (entry: WALK_ENTRY<EVENT>) => {
|
||||
const {
|
||||
event_type,
|
||||
event_id
|
||||
} = /^.*\/events\/(?<event_type>.*?)\/.*\/(?<event_id>[A-Za-z-]+)\.json$/.exec(entry.path)?.groups ?? {};
|
||||
|
||||
if (meta.query.after_id && event_id <= meta.query.after_id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (meta.query.before_id && event_id >= meta.query.before_id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (meta.query.type && !meta.query.type.split(',').includes(event_type)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
const headers = {
|
||||
'Cache-Control': 'no-cache, must-revalidate'
|
||||
};
|
||||
|
||||
const results = (await EVENTS.all(options))
|
||||
.map((entry: WALK_ENTRY<EVENT>) => entry.load())
|
||||
.filter((event) => typeof event.channel === 'undefined') // channel events must be queried via the channel's api
|
||||
.sort((lhs_item: EVENT, rhs_item: EVENT) => rhs_item.timestamps.created.localeCompare(lhs_item.timestamps.created));
|
||||
|
||||
// long-polling support
|
||||
if (results.length === 0 && meta.query.wait) {
|
||||
return new Promise((resolve, reject) => {
|
||||
function on_create(create_event: any) {
|
||||
if (meta.query.type && !meta.query.type.split(',').includes(create_event.item.type)) {
|
||||
return;
|
||||
}
|
||||
|
||||
results.push(create_event.item);
|
||||
clearTimeout(timeout);
|
||||
EVENTS.off('create', on_create);
|
||||
|
||||
return resolve(Response.json(results, {
|
||||
status: 200,
|
||||
headers
|
||||
}));
|
||||
}
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
EVENTS.off('create', on_create);
|
||||
return resolve(Response.json(results, {
|
||||
status: 200,
|
||||
headers
|
||||
}));
|
||||
}, 60_000); // 60 seconds
|
||||
EVENTS.on('create', on_create);
|
||||
request.signal.addEventListener('abort', () => {
|
||||
EVENTS.off('create', on_create);
|
||||
clearTimeout(timeout);
|
||||
reject(new Error('request aborted'));
|
||||
});
|
||||
Deno.addSignalListener('SIGINT', () => {
|
||||
EVENTS.off('create', on_create);
|
||||
clearTimeout(timeout);
|
||||
return resolve(Response.json(results, {
|
||||
status: 200,
|
||||
headers
|
||||
}));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return Response.json(results, {
|
||||
status: 200,
|
||||
headers
|
||||
});
|
||||
}
|
||||
|
||||
async function update_watches(event: EVENT) {
|
||||
const limit = 100;
|
||||
|
||||
let more_to_process;
|
||||
let offset = 0;
|
||||
do {
|
||||
const watches: WATCH[] = (await WATCHES.all({
|
||||
limit,
|
||||
offset
|
||||
})).map((entry) => entry.load());
|
||||
|
||||
for (const watch of watches) {
|
||||
if (typeof watch.type === 'string' && event.type !== watch.type) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeof watch.parent_id === 'string' && event.parent_id !== watch.parent_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeof watch.channel === 'string' && event.channel !== watch.channel) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeof watch.topic === 'string' && event.topic !== watch.topic) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeof watch.tags !== 'undefined' && !watch.tags.every((tag) => event.tags?.includes(tag))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (typeof watch.data !== 'undefined') {
|
||||
const event_data = flatten(event.data ?? {});
|
||||
for (const [key, value] of Object.entries(flatten(watch.data))) {
|
||||
const matcher = new RegExp(value);
|
||||
if (!matcher.test(event_data[key])) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (event.id < watch.last_id_seen) {
|
||||
continue;
|
||||
}
|
||||
|
||||
watch.last_id_seen = event.id;
|
||||
|
||||
// TODO: send a notification
|
||||
console.dir({
|
||||
notification: {
|
||||
watch,
|
||||
event
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
offset += watches.length;
|
||||
more_to_process = watches.length === limit;
|
||||
} while (more_to_process);
|
||||
}
|
||||
|
||||
// POST /api/events - Create an event
|
||||
PRECHECKS.POST = [get_session, get_user, require_user, (_req: Request, meta: Record<string, any>): Response | undefined => {
|
||||
const user_can_create_events = meta.user.permissions.some((permission: string) => permission.indexOf('events.create') === 0);
|
||||
|
||||
if (!user_can_create_events) {
|
||||
return CANNED_RESPONSES.permission_denied();
|
||||
}
|
||||
}];
|
||||
export async function POST(req: Request, meta: Record<string, any>): Promise<Response> {
|
||||
try {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
const body = await parse_body(req);
|
||||
const event: EVENT = {
|
||||
type: 'unknown',
|
||||
...body,
|
||||
creator_id: meta.user.id,
|
||||
timestamps: {
|
||||
created: now,
|
||||
updated: now
|
||||
}
|
||||
};
|
||||
|
||||
event.id = `${event.type}:${lurid()}`;
|
||||
|
||||
const errors = VALIDATE_EVENT(event);
|
||||
if (errors) {
|
||||
return Response.json({
|
||||
errors
|
||||
}, {
|
||||
status: 400
|
||||
});
|
||||
}
|
||||
|
||||
if (!user_has_write_permission_for_event(meta.user, event)) {
|
||||
return CANNED_RESPONSES.permission_denied();
|
||||
}
|
||||
|
||||
if (event.channel) {
|
||||
const channel: CHANNEL | null = await CHANNELS.get(event.channel);
|
||||
if (!channel) {
|
||||
return Response.json({
|
||||
errors: [{
|
||||
cause: 'missing_channel',
|
||||
message: 'No such channel exists.'
|
||||
}]
|
||||
}, {
|
||||
status: 400
|
||||
});
|
||||
}
|
||||
|
||||
const user_can_write_events_to_channel = channel.permissions.events.write.length === 0 ? true : channel.permissions.events.write.includes(meta.user.id);
|
||||
|
||||
if (!user_can_write_events_to_channel) {
|
||||
return CANNED_RESPONSES.permission_denied();
|
||||
}
|
||||
}
|
||||
|
||||
await EVENTS.create(event);
|
||||
|
||||
update_watches(event);
|
||||
|
||||
return Response.json(event, {
|
||||
status: 201
|
||||
});
|
||||
} catch (error) {
|
||||
return Response.json({
|
||||
error: {
|
||||
message: (error as Error).message ?? 'Unknown Error!',
|
||||
cause: (error as Error).cause ?? 'unknown'
|
||||
}
|
||||
}, { status: 500 });
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue