����JFIF��H�H����Exif��MM�*���� ��3����V�����3������3�(��������������������3�����403WebShell
403Webshell
Server IP : 74.208.127.88  /  Your IP : 3.134.110.4
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux ubuntu 5.4.0-163-generic #180-Ubuntu SMP Tue Sep 5 13:21:23 UTC 2023 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/self/root/lib/python3/dist-packages/twisted/names/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/self/root/lib/python3/dist-packages/twisted/names/test/test_util.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Utilities for Twisted.names tests.
"""

from __future__ import division, absolute_import

from random import randrange

from zope.interface import implementer
from zope.interface.verify import verifyClass

from twisted.internet.address import IPv4Address
from twisted.internet.defer import succeed
from twisted.internet.task import Clock
from twisted.internet.interfaces import IReactorUDP, IUDPTransport



@implementer(IUDPTransport)
class MemoryDatagramTransport(object):
    """
    This L{IUDPTransport} implementation enforces the usual connection rules
    and captures sent traffic in a list for later inspection.

    @ivar _host: The host address to which this transport is bound.
    @ivar _protocol: The protocol connected to this transport.
    @ivar _sentPackets: A C{list} of two-tuples of the datagrams passed to
        C{write} and the addresses to which they are destined.

    @ivar _connectedTo: L{None} if this transport is unconnected, otherwise an
        address to which all traffic is supposedly sent.

    @ivar _maxPacketSize: An C{int} giving the maximum length of a datagram
        which will be successfully handled by C{write}.
    """
    def __init__(self, host, protocol, maxPacketSize):
        self._host = host
        self._protocol = protocol
        self._sentPackets = []
        self._connectedTo = None
        self._maxPacketSize = maxPacketSize


    def getHost(self):
        """
        Return the address which this transport is pretending to be bound
        to.
        """
        return IPv4Address('UDP', *self._host)


    def connect(self, host, port):
        """
        Connect this transport to the given address.
        """
        if self._connectedTo is not None:
            raise ValueError("Already connected")
        self._connectedTo = (host, port)


    def write(self, datagram, addr=None):
        """
        Send the given datagram.
        """
        if addr is None:
            addr = self._connectedTo
        if addr is None:
            raise ValueError("Need an address")
        if len(datagram) > self._maxPacketSize:
            raise ValueError("Packet too big")
        self._sentPackets.append((datagram, addr))


    def stopListening(self):
        """
        Shut down this transport.
        """
        self._protocol.stopProtocol()
        return succeed(None)


    def setBroadcastAllowed(self, enabled):
        """
        Dummy implementation to satisfy L{IUDPTransport}.
        """
        pass


    def getBroadcastAllowed(self):
        """
        Dummy implementation to satisfy L{IUDPTransport}.
        """
        pass


verifyClass(IUDPTransport, MemoryDatagramTransport)



@implementer(IReactorUDP)
class MemoryReactor(Clock):
    """
    An L{IReactorTime} and L{IReactorUDP} provider.

    Time is controlled deterministically via the base class, L{Clock}.  UDP is
    handled in-memory by connecting protocols to instances of
    L{MemoryDatagramTransport}.

    @ivar udpPorts: A C{dict} mapping port numbers to instances of
        L{MemoryDatagramTransport}.
    """
    def __init__(self):
        Clock.__init__(self)
        self.udpPorts = {}


    def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
        """
        Pretend to bind a UDP port and connect the given protocol to it.
        """
        if port == 0:
            while True:
                port = randrange(1, 2 ** 16)
                if port not in self.udpPorts:
                    break
        if port in self.udpPorts:
            raise ValueError("Address in use")
        transport = MemoryDatagramTransport(
            (interface, port), protocol, maxPacketSize)
        self.udpPorts[port] = transport
        protocol.makeConnection(transport)
        return transport

verifyClass(IReactorUDP, MemoryReactor)

Youez - 2016 - github.com/yon3zu
LinuXploit