lin_ / functi.py
Zelyanoth's picture
Upload 11 files
c9329ee verified
raw
history blame
15.2 kB
import hashlib
import secrets
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
import os
import time
import datetime
from line_db import DatabaseManager
from urllib.parse import urlencode
from taipy.gui import navigate # type: ignore
from gradio_client import Client
import pandas as pd
from requests_oauthlib import OAuth2Session
import requests
from timing_lin import *
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
# Create the scheduler globally (start it once)
apsched = BackgroundScheduler()
apsched.start()
Linked_account_name = " "
Linked_social_network = " "
data_schedule ={}
scope = ['openid', 'profile', 'email', 'w_member_social']
time_value_hour = 18
time_value_minute = 00
day_value = "Monday"
Linked_social_network = "Linkedin"
api_key_hugging = os.environ.get("hugging_key")
Source_table = {}
data_account = {}
data_schedule = {}
data_schedule_before = {}
Source_table_before = {}
data_account_before = {}
url: str = os.environ.get("SUPABASE_URL") # type: ignore
key: str = os.environ.get("SUPABASE_KEY") # type: ignore
is_logged_in = False
current_user = None
message= ''
show_register= False
login_email= ''
login_password= ''
register_email= ''
register_password= ''
confirm_password= ''
source_ = " "
source_add_message = " "
user_inf = " "
generated_post = "test"
token = " "
authorization_url = " "
urlss = ""
states = ""
social_network = "Linkedin"
db_manager = DatabaseManager(url,key)
client = Client("Zelyanoth/Linkedin_poster_dev",hf_token = api_key_hugging)
client_id = os.environ.get("CLIENT_ID")
redirect_url = os.environ.get("RED_URL")
client_secret = os.environ.get("CLIENT_SECRET")
linkedin = OAuth2Session(client_id, redirect_uri=redirect_url, scope=scope)
def replanifier_toutes_les_tâches(df):
# Efface toutes les anciennes tâches
df.apply(
lambda r: planifier_ligne(r["id"],r["id_social"], r["user_id"], r["schedule_time"],r["social_network"],r["adjusted_time"]),
axis=1
)
def post_generation_for_robot(id,social,idd) :
try :
print("⏳ Tâche planifizzzzzzzzzzzzzzzée pour",flush = True)
generated_post = client.predict(
code=id,
api_name="/poster_linkedin"
)
db_manager.add_post(social,generated_post,idd)
except Exception as e:
print("Erreur dans gen():", e, flush=True)
def post_publishing_for_robot(id_social,id_user,idd,ss) :
try :
print("⏳ Tâche planifiée pour post_pubsih",flush = True)
resp = db_manager.fetching_user_identif(id_user,ss)
dd = db_manager.fetching_post(id_social,idd)
data = pd.DataFrame(resp.data)
print(data)
first = data[data['id'] == id_social].iloc[0]
token_value = first["token"]
sub_value = first["sub"]
post = dd["Text_content"].iloc[0]
print("⏳ Tâche planifiée pour gfjfxd",flush = True)
url = "https://api.linkedin.com/v2/ugcPosts"
headers = {
"Authorization": f"Bearer {token_value}",
"X-Restli-Protocol-Version": "2.0.0",
"Content-Type": "application/json"
}
body = {
"author": f"urn:li:person:{sub_value}",
"lifecycleState": "PUBLISHED",
"specificContent": {
"com.linkedin.ugc.ShareContent": {
"shareCommentary": {
"text": post
},
"shareMediaCategory": "NONE"
}
},
"visibility": {
"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
}
}
resp = requests.post(url, headers=headers, json=body)
db_manager.update_post(id_social,idd)
print([resp.status_code, resp.text],flush = True)
except Exception as e:
print("Erreur dans post():", e, flush=True)
def planifier_ligne(id_schedule, id_social, user_id, schedule_time_str, ss, adjusted_time):
# Parse schedule_time_str and adjusted_time
parts = schedule_time_str.strip().split()
part_adj = adjusted_time.strip().split()
if len(parts) != 2 or ':' not in parts[1]:
print(f"❌ Format invalide : {schedule_time_str}", flush=True)
return
if len(part_adj) != 2 or ':' not in part_adj[1]:
print(f"❌ Format invalide : {adjusted_time}", flush=True)
return
jour, hm = parts
jour_adj, hm_adj = part_adj
try:
hour, minute = map(int, hm.split(':'))
hour_adj, minute_adj = map(int, hm_adj.split(':'))
except ValueError:
print(f"❌ Heure invalide : {hm}", flush=True)
return
# Map day names to APScheduler format
day_map = {
"monday": "mon",
"tuesday": "tue",
"wednesday": "wed",
"thursday": "thu",
"friday": "fri",
"saturday": "sat",
"sunday": "sun",
}
jour_key = jour.lower()
jour_key_adj = jour_adj.lower()
if jour_key not in day_map or jour_key_adj not in day_map:
print(f"❌ Jour non reconnu : {jour}/{jour_adj}", flush=True)
return
# Remove previous jobs for this schedule (optional, if you want to avoid duplicates)
try :
apsched.remove_job(f"pub-{id_schedule}-{schedule_time_str}", jobstore=None)
apsched.remove_job(f"gen-{id_schedule}-{schedule_time_str}", jobstore=None)
except Exception as e:
print(f"❌ Erreur lors de la suppression des tâches : {e}", flush=True)
# Schedule publishing
apsched.add_job(
lambda: post_publishing_for_robot(id_social, user_id, id_schedule, ss),
CronTrigger(day_of_week=day_map[jour_key], hour=hour, minute=minute),
id=f"pub-{id_schedule}-{schedule_time_str}"
)
# Schedule generation
apsched.add_job(
lambda: post_generation_for_robot(user_id, id_social, id_schedule),
CronTrigger(day_of_week=day_map[jour_key_adj], hour=hour_adj, minute=minute_adj),
id=f"gen-{id_schedule}-{schedule_time_str}"
)
print(f"⏳ APScheduler: Tâche planifiée pour {id_social} ({user_id}) le {jour} à {hour:02d}:{minute:02d} et {jour_adj} à {hour_adj:02d}:{minute_adj:02d}", flush=True)
def add_scheduling(state):
"""Add new scheduling with thread safety"""
try:
if isinstance(state.day_value, list):
for day in state.day_value:
timesche = f"{day} {int(state.time_value_hour)}:{int(state.time_value_minute)}"
# Get current schedule
df = db_manager.fetch_schedule_table()
if not df.empty:
df, final_time = add_request(df, timesche)
else:
jour, horaire = timesche.split()
horaire = horaire.replace(';', ':')
h, m = map(int, horaire.split(':'))
m -= 7 # 7 minutes before for generation
final_time = f"{jour} {h}:{m:02d}"
# Add to database
db_manager.create_scheduling_for_user(
state.user_inf.user.id,
state.Linked_social_network,
timesche,
final_time
)
else:
timesche = f"{state.day_value} {int(state.time_value_hour)}:{int(state.time_value_minute)}"
# Get current schedule
df = db_manager.fetch_schedule_table()
if not df.empty:
df, final_time = add_request(df, timesche)
else:
jour, horaire = timesche.split()
horaire = horaire.replace(';', ':')
h, m = map(int, horaire.split(':'))
m -= 7 # 7 minutes before for generation
final_time = f"{jour} {h}:{m:02d}"
# Add to database
db_manager.create_scheduling_for_user(
state.user_inf.user.id,
state.Linked_social_network,
timesche,
final_time
)
# Refresh the schedule after adding
df = db_manager.fetch_schedule_table()
state.data_schedule = db_manager.fetch_schedule_table_acc(state.user_inf.user.id)
# Reschedule all tasks
replanifier_toutes_les_tâches(df)
print(f"✅ Scheduling added successfully", flush=True)
except Exception as e:
print(f"❌ Error in add_scheduling: {e}", flush=True)
def planning():
df = db_manager.fetch_schedule_table()
if not df.empty :
replanifier_toutes_les_tâches(df)
def post_publishing(state) :
resp = db_manager.fetching_user_identif(state.user_inf.user.id,state.social_network)
data = pd.DataFrame(resp.data)
first = data[data['social_network'] == state.social_network].iloc[0]
token_value = first["token"]
sub_value = first["sub"]
url = "https://api.linkedin.com/v2/ugcPosts"
headers = {
"Authorization": f"Bearer {token_value}",
"X-Restli-Protocol-Version": "2.0.0",
"Content-Type": "application/json"
}
body = {
"author": f"urn:li:person:{sub_value}",
"lifecycleState": "PUBLISHED",
"specificContent": {
"com.linkedin.ugc.ShareContent": {
"shareCommentary": {
"text": state.generated_post
},
"shareMediaCategory": "NONE"
}
},
"visibility": {
"com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
}
}
resp = requests.post(url, headers=headers, json=body)
print([resp.status_code, resp.text],flush = True)
def post_generation(state) :
state.generated_post = client.predict(
code=state.user_inf.user.id,
api_name="/poster_linkedin"
)
def authen(state) :
if state.Linked_social_network == "Linkedin" :
print("jhdijb",flush = True)
state.urlss, state.states = linkedin.authorization_url(
'https://www.linkedin.com/oauth/v2/authorization'
)
navigate(state, state.urlss)
def on_my_clicking(state, action, payload) :
print(action,flush = True)
print(payload["args"][0],flush = True)
if payload["args"][0] == "Accueil" :
on_logout(state)
navigate(state, payload["args"][0])
return " "
def add_source(state) :
result = client.predict(
rss_link=state.source_ + "__thi_irrh'èçs_my_id__! "+state.user_inf.user.id,
api_name="/ajouter_rss"
)
state.source_add_message = result
data = db_manager.fetch_source_table(state.user_inf.user.id)
state.Source_table = pd.DataFrame(data)
def delete_source(state, var_name: str, payload: dict) :
state.Source_table_before = state.Source_table
state.get_gui().table_on_delete(state, var_name, payload)
diff = state.Source_table_before.merge(state.Source_table, how="outer", indicator=True) \
.query('_merge != "both"') \
.drop(columns='_merge')
valeurs = diff['id'].tolist()
db_manager.delete_from_table("Source",valeurs)
def delete_account(state, var_name: str, payload: dict) :
state.data_account_before = state.data_account
state.get_gui().table_on_delete(state, var_name, payload)
diff = state.data_account_before.merge(state.data_account, how="outer", indicator=True) \
.query('_merge != "both"') \
.drop(columns='_merge')
valeurs = diff['id'].tolist()
db_manager.delete_from_table("Social_network",valeurs)
def delete_schedule(state, var_name: str, payload: dict) :
state.data_schedule_before = state.data_schedule
state.get_gui().table_on_delete(state, var_name, payload)
diff = state.data_schedule_before.merge(state.data_schedule, how="outer", indicator=True) \
.query('_merge != "both"') \
.drop(columns='_merge')
valeurs = diff['id'].tolist()
db_manager.delete_from_table("Scheduling",valeurs)
def on_login(state, payload):
"""Handle login form submission"""
time.sleep(0.7)
email = state.login_email
password = state.login_password
if not email or not password:
state.message = "Please enter both email and password"
return
success, message,state.user_inf = db_manager.authenticate_user(email, password)
if success:
state.current_user = email
data = db_manager.fetch_source_table(state.user_inf.user.id)
dataac = db_manager.fetch_account_table(state.user_inf.user.id)
state.data_schedule = db_manager.fetch_schedule_table_acc(state.user_inf.user.id)
state.data_account =pd.DataFrame(dataac)
state.Source_table = pd.DataFrame(data)
navigate(state, "Source_Management")
state.is_logged_in = True
state.message = f"Welcome back, {email}!"
# Clear form
state.login_email = ""
state.login_password = ""
else:
state.message = message
def on_register(state):
"""Handle registration form submission"""
time.sleep(0.7)
email = state.register_email
password = state.register_password
confirm_password = state.confirm_password
if not email or not password or not confirm_password:
state.message = "Please fill in all fields"
return
if password != confirm_password:
state.message = "Passwords do not match"
return
if len(password) < 8:
state.message = "Password must be at least 8 characters long"
return
success, message,user_inf = db_manager.create_user(email, password) # type: ignore
if success:
state.message = "Registration successful! Please log in."
state.show_register = False
# Clear form
state.register_email = ""
state.register_password = ""
state.confirm_password = ""
else:
state.message = message
def on_logout(state):
"""Handle logout"""
state.current_user = None
state.is_logged_in = False
state.message = "Logged out successfully"
state.login_email = ""
state.login_password = ""
def toggle_register(state):
"""Toggle between login and register forms"""
state.show_register = not state.show_register
state.message = ""
state.login_email = ""
state.login_password = ""
state.register_email = ""
state.register_password = ""
state.confirm_password = ""