Scale test changes

main
Simon Aubury 2023-02-02 18:15:56 +11:00
rodzic 91f54e564f
commit 7b6254dc0b
7 zmienionych plików z 45 dodań i 22 usunięć

2
.gitignore vendored
Wyświetl plik

@ -2,4 +2,4 @@ env/*
.DS_Store
__pycache__
BAK
data/*

Wyświetl plik

@ -9,7 +9,9 @@ Before you can start installing or using packages in your virtual environment yo
```console
source env/bin/activate
```
pip install --upgrade pip
pip install -r requirements.txt
```
# Federated timeline
@ -21,6 +23,8 @@ python mastodonlisten.py --baseURL https://data-folks.masto.host/ --enableKafka
# Kafka Connect
confluent-hub install confluentinc/kafka-connect-s3:10.3.0
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/mastodon-sink-s3/config -d '@./config/mastodon-sink-s3.json'

Wyświetl plik

@ -8,6 +8,12 @@
"type": "long",
"default": 0
},
{
"name": "created_at",
"type": ["null","int"],
"logicalType": "date",
"default" : "null"
},
{
"name": "app",
"type": "string",

Wyświetl plik

@ -9,7 +9,7 @@ services:
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- minio:/data
- ./data:/data
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
@ -29,5 +29,5 @@ services:
exit 0;
"
volumes:
minio:
# volumes:
# minio:

Wyświetl plik

@ -2,12 +2,14 @@ import mastodon
from mastodon import Mastodon
from bs4 import BeautifulSoup
import argparse
import datetime
from kafkaproducer import kafka_producer
# globals
base_url = ''
enable_kafka = False
quiet = False
# if enable_kafka:
topic_name, producer = kafka_producer()
@ -24,26 +26,22 @@ class Listener(mastodon.StreamListener):
m_lang = status.language
if m_lang is None:
m_lang = 'unknown'
m_user = status.account.username
app=''
# attribute only available on local
if hasattr(status, 'application'):
app = status.application.get('name')
# print(f'APP {app}')
# print(status.url)
# print(m_text)
# print('')
value_dict = {
'm_id': status.id,
'created_at': int(datetime.datetime.now().strftime('%s')),
'app': app,
'url': status.url,
'base_url': base_url,
'language': m_lang,
'favourites': status.favourites_count,
'username': status.account.username,
'username': m_user,
'bot': status.account.bot,
'tags': num_tags,
'characters': num_chars,
@ -51,19 +49,17 @@ class Listener(mastodon.StreamListener):
'mastodon_text': m_text
}
if enable_kafka:
try:
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
except Exception as exp:
print('******* ERROR')
print(value_dict)
# raise(exp)
if not quiet:
print(f'{m_user} {m_lang}', m_text[:30])
if enable_kafka:
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
def main():
global base_url
global enable_kafka
global quiet
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@ -82,6 +78,13 @@ def main():
required=False,
default=False)
parser.add_argument(
'--quiet',
help='Do not echo a summary of the toot',
action='store_true',
required=False,
default=False)
parser.add_argument(
'--baseURL',
help='Server URL',

4
requirements.txt 100644
Wyświetl plik

@ -0,0 +1,4 @@
Mastodon.py
BeautifulSoup4
confluent_kafka
avro

Wyświetl plik

@ -1,9 +1,15 @@
#!/bin/bash
BASE=/Users/simonaubury/git/saubury/mastodon-stream/
BASE=${HOME}/git/saubury/mastodon-stream/
PY=./env/bin/python
cd ${BASE}
while true; do echo Start; ${PY} mastodonlisten.py --enableKafka --public; sleep 30; done &
# while true; do echo Start; ${PY} mastodonlisten.py --enableKafka --public; sleep 30; done &
while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://mastodon.social --enableKafka --public; sleep 30; done
while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://hachyderm.io --enableKafka ; sleep 30; done
while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://mastodon.au/ --enableKafka ; sleep 30; done
while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://data-folks.masto.host --enableKafka ; sleep 30; done